Database Failover Testing: PostgreSQL, MySQL, and Redis Strategies

Database Failover Testing: PostgreSQL, MySQL, and Redis Strategies

Your database going down is not a matter of if — it's when. A disk fills up, a cloud provider has an AZ outage, a runaway query corrupts an index, a network partition isolates the primary. The question is whether your failover mechanism works correctly and whether your application handles the transition gracefully.

Database failover testing validates that your replication, automatic failover configuration, and application connection handling all work together to achieve your RTO and RPO. This guide covers practical testing strategies for PostgreSQL, MySQL, and Redis.

What Database Failover Testing Covers

A complete database failover test validates:

  1. Replication health — Is the replica up to date? What is the replication lag?
  2. Automatic failover trigger — Does the system detect the primary failure and promote the replica?
  3. Failover time — How long does the promotion take? Does it meet your RTO?
  4. Data consistency — What transactions were in-flight during failover? Are they committed or rolled back cleanly?
  5. Application behavior — Does the application reconnect automatically? Does it fail open or fail closed? Are connection pools drained and reset?
  6. Recovery point — What data was lost? Does it meet your RPO?

PostgreSQL Failover Testing

Setup: Streaming Replication + Patroni

Modern PostgreSQL HA typically uses Patroni (or similar) to manage automatic failover. Before testing, understand your setup:

# Check Patroni cluster status
patronictl -c /etc/patroni/patroni.yml list

<span class="hljs-comment"># Output:
<span class="hljs-comment"># + Cluster: postgres-cluster (7234567890123456789) +---------+----+-----------+
<span class="hljs-comment"># | Member           <span class="hljs-pipe">| Host          <span class="hljs-pipe">| Role    <span class="hljs-pipe">| State   <span class="hljs-pipe">| TL <span class="hljs-pipe">| Lag in MB |
<span class="hljs-comment"># +------------------+---------------+---------+---------+----+-----------+
<span class="hljs-comment"># | postgres-primary <span class="hljs-pipe">| 10.0.1.10:5432| Leader  <span class="hljs-pipe">| running <span class="hljs-pipe">|  3 <span class="hljs-pipe">|           |
<span class="hljs-comment"># | postgres-replica <span class="hljs-pipe">| 10.0.1.11:5432| Replica <span class="hljs-pipe">| running <span class="hljs-pipe">|  3 <span class="hljs-pipe">|         0 |
<span class="hljs-comment"># +------------------+---------------+---------+---------+----+-----------+

Measuring Replication Lag

Before triggering any failover, validate your replication baseline:

import psycopg2
from datetime import datetime

def check_replication_lag(primary_url: str, replica_url: str) -> dict:
    """Measure replication lag between primary and replica."""
    with psycopg2.connect(primary_url) as primary_conn, \
         psycopg2.connect(replica_url) as replica_conn:
        
        primary_cur = primary_conn.cursor()
        replica_cur = replica_conn.cursor()
        
        # Get primary LSN (Log Sequence Number)
        primary_cur.execute("SELECT pg_current_wal_lsn()")
        primary_lsn = primary_cur.fetchone()[0]
        
        # Get replica's received LSN
        replica_cur.execute("SELECT pg_last_wal_receive_lsn()")
        replica_received_lsn = replica_cur.fetchone()[0]
        
        # Get replica's applied LSN
        replica_cur.execute("SELECT pg_last_wal_replay_lsn()")
        replica_applied_lsn = replica_cur.fetchone()[0]
        
        # Get lag in bytes
        primary_cur.execute("""
            SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn,
                   (sent_lsn - replay_lsn) AS replication_lag_bytes
            FROM pg_stat_replication
        """)
        replication_stats = primary_cur.fetchall()
        
        # Get lag in time from replica
        replica_cur.execute("""
            SELECT now() - pg_last_xact_replay_timestamp() AS replication_delay
        """)
        time_lag = replica_cur.fetchone()[0]
        
        return {
            'primary_lsn': str(primary_lsn),
            'replica_received_lsn': str(replica_received_lsn),
            'replica_applied_lsn': str(replica_applied_lsn),
            'time_lag_seconds': time_lag.total_seconds() if time_lag else None,
            'replication_stats': replication_stats,
        }

lag = check_replication_lag("postgres://primary:5432/db", "postgres://replica:5432/db")
print(f"Replication lag: {lag['time_lag_seconds']:.2f}s")

# Fail if lag exceeds RPO threshold
assert lag['time_lag_seconds'] < 5, f"Replication lag {lag['time_lag_seconds']}s exceeds 5s threshold"

Simulating Primary Failure

#!/bin/bash
<span class="hljs-comment"># postgresql_failover_test.sh

<span class="hljs-built_in">set -euo pipefail

PRIMARY_HOST=<span class="hljs-string">"10.0.1.10"
REPLICA_HOST=<span class="hljs-string">"10.0.1.11"
DB_PORT=5432
APP_ENDPOINT=<span class="hljs-string">"http://api.internal/health"
RTO_SECONDS=30

<span class="hljs-built_in">echo <span class="hljs-string">"=== PostgreSQL Failover Test ==="
<span class="hljs-built_in">echo <span class="hljs-string">"Start time: $(date -u +%Y-%m-%dT%H:%M:%SZ)"

<span class="hljs-comment"># 1. Baseline health check
<span class="hljs-built_in">echo <span class="hljs-string">"Checking baseline..."
INITIAL_LAG=$(psql <span class="hljs-string">"postgres://$REPLICA_HOST:<span class="hljs-variable">$DB_PORT/postgres" \
  -t -c <span class="hljs-string">"SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::int")
<span class="hljs-built_in">echo <span class="hljs-string">"Initial replication lag: ${INITIAL_LAG}s"

<span class="hljs-comment"># 2. Start monitoring application health in background
FAILOVER_START_TIME=<span class="hljs-string">""
RECOVERY_TIME=<span class="hljs-string">""
REQUEST_LOG=$(<span class="hljs-built_in">mktemp)

<span class="hljs-function">monitor_app() {
  <span class="hljs-keyword">while <span class="hljs-literal">true; <span class="hljs-keyword">do
    TIMESTAMP=$(<span class="hljs-built_in">date +%s)
    HTTP_CODE=$(curl -s -o /dev/null -w <span class="hljs-string">"%{http_code}" --max-time 2 <span class="hljs-string">"$APP_ENDPOINT" <span class="hljs-pipe">|| <span class="hljs-built_in">echo <span class="hljs-string">"000")
    <span class="hljs-built_in">echo <span class="hljs-string">"$TIMESTAMP <span class="hljs-variable">$HTTP_CODE" >> <span class="hljs-string">"$REQUEST_LOG"
    <span class="hljs-built_in">sleep 0.5
  <span class="hljs-keyword">done
}
monitor_app &
MONITOR_PID=$!

<span class="hljs-built_in">sleep 5  <span class="hljs-comment"># Let baseline monitoring run

<span class="hljs-comment"># 3. Trigger primary failure (SIGKILL to simulate crash)
<span class="hljs-built_in">echo <span class="hljs-string">"Triggering primary failure at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
FAILOVER_EPOCH=$(<span class="hljs-built_in">date +%s)
ssh <span class="hljs-string">"$PRIMARY_HOST" <span class="hljs-string">"sudo systemctl stop postgresql"

<span class="hljs-comment"># 4. Wait and check application recovery
MAX_WAIT=$((FAILOVER_EPOCH + RTO_SECONDS + <span class="hljs-number">30))
RECOVERED=<span class="hljs-literal">false

<span class="hljs-keyword">while [ <span class="hljs-string">"$(date +%s)" -lt <span class="hljs-string">"$MAX_WAIT" ]; <span class="hljs-keyword">do
  HTTP_CODE=$(curl -s -o /dev/null -w <span class="hljs-string">"%{http_code}" --max-time 2 <span class="hljs-string">"$APP_ENDPOINT" <span class="hljs-pipe">|| <span class="hljs-built_in">echo <span class="hljs-string">"000")
  <span class="hljs-keyword">if [ <span class="hljs-string">"$HTTP_CODE" = <span class="hljs-string">"200" ]; <span class="hljs-keyword">then
    RECOVERY_EPOCH=$(<span class="hljs-built_in">date +%s)
    ACTUAL_RTO=$((RECOVERY_EPOCH - FAILOVER_EPOCH))
    <span class="hljs-built_in">echo <span class="hljs-string">"Application recovered in ${ACTUAL_RTO}s"
    RECOVERED=<span class="hljs-literal">true
    <span class="hljs-built_in">break
  <span class="hljs-keyword">fi
  <span class="hljs-built_in">sleep 1
<span class="hljs-keyword">done

<span class="hljs-comment"># 5. Stop monitoring
<span class="hljs-built_in">kill <span class="hljs-string">"$MONITOR_PID" 2>/dev/null <span class="hljs-pipe">|| <span class="hljs-literal">true

<span class="hljs-comment"># 6. Verify new primary
<span class="hljs-built_in">echo <span class="hljs-string">"Checking Patroni cluster status..."
patronictl -c /etc/patroni/patroni.yml list

<span class="hljs-comment"># 7. Validate RTO
<span class="hljs-keyword">if [ <span class="hljs-string">"$RECOVERED" = <span class="hljs-literal">false ]; <span class="hljs-keyword">then
  <span class="hljs-built_in">echo <span class="hljs-string">"FAIL: Application did not recover within $((RTO_SECONDS + 30))s"
  <span class="hljs-built_in">exit 1
<span class="hljs-keyword">fi

<span class="hljs-keyword">if [ <span class="hljs-string">"$ACTUAL_RTO" -gt <span class="hljs-string">"$RTO_SECONDS" ]; <span class="hljs-keyword">then
  <span class="hljs-built_in">echo <span class="hljs-string">"WARN: RTO ${ACTUAL_RTO}s exceeded target <span class="hljs-variable">${RTO_SECONDS}s"
<span class="hljs-keyword">else
  <span class="hljs-built_in">echo <span class="hljs-string">"PASS: RTO ${ACTUAL_RTO}s within target <span class="hljs-variable">${RTO_SECONDS}s"
<span class="hljs-keyword">fi

<span class="hljs-comment"># 8. Analyze request log for error rate during failover
TOTAL_REQUESTS=$(<span class="hljs-built_in">wc -l < <span class="hljs-string">"$REQUEST_LOG")
FAILED_REQUESTS=$(grep -c <span class="hljs-string">" 000\| 50[0-9]" <span class="hljs-string">"$REQUEST_LOG" <span class="hljs-pipe">|| <span class="hljs-literal">true)
ERROR_RATE=$((FAILED_REQUESTS * <span class="hljs-number">100 / TOTAL_REQUESTS))
<span class="hljs-built_in">echo <span class="hljs-string">"Error rate during failover: ${FAILED_REQUESTS}/<span class="hljs-variable">${TOTAL_REQUESTS} (<span class="hljs-variable">${ERROR_RATE}%)"

<span class="hljs-built_in">rm -f <span class="hljs-string">"$REQUEST_LOG"

Testing Application Connection Handling

The database may failover cleanly while your application refuses to reconnect. Test both:

import psycopg2
import psycopg2.pool
import time
import threading
from contextlib import contextmanager

def test_connection_pool_recovery(
    primary_url: str,
    trigger_failover: callable,
    max_recovery_seconds: int = 30
) -> dict:
    """
    Test that a connection pool recovers after primary failure.
    Simulates real application behavior.
    """
    pool = psycopg2.pool.ThreadedConnectionPool(
        minconn=2,
        maxconn=10,
        dsn=primary_url
    )
    
    results = {
        'requests_before': 0,
        'requests_during': 0,
        'requests_after': 0,
        'errors_during': 0,
        'recovery_time': None,
        'first_successful_after': None,
    }
    
    start = time.time()
    failover_time = None
    
    def execute_query(phase: str):
        try:
            conn = pool.getconn()
            try:
                cur = conn.cursor()
                cur.execute("SELECT COUNT(*) FROM orders WHERE created_at > NOW() - INTERVAL '1 hour'")
                result = cur.fetchone()[0]
                conn.commit()
                results[f'requests_{phase}'] += 1
                return True
            except psycopg2.Error as e:
                conn.rollback()
                raise
            finally:
                pool.putconn(conn)
        except (psycopg2.Error, Exception):
            if phase == 'during':
                results['errors_during'] += 1
            return False
    
    # Pre-failover baseline
    for _ in range(20):
        execute_query('before')
        time.sleep(0.1)
    
    # Trigger failover
    print("Triggering failover...")
    failover_time = time.time()
    trigger_thread = threading.Thread(target=trigger_failover)
    trigger_thread.start()
    
    # Execute queries during failover window
    deadline = failover_time + 60
    while time.time() < deadline:
        success = execute_query('during')
        if success and results['recovery_time'] is None and failover_time:
            elapsed = time.time() - failover_time
            if elapsed > 2:  # Ignore brief hiccups
                results['recovery_time'] = elapsed
        time.sleep(0.5)
    
    # Post-failover validation
    for _ in range(20):
        execute_query('after')
        time.sleep(0.1)
    
    trigger_thread.join()
    pool.closeall()
    
    return results

MySQL Failover Testing (with Group Replication)

MySQL Group Replication provides automatic primary election. Testing is similar but uses different tooling:

#!/bin/bash
<span class="hljs-comment"># mysql_failover_test.sh

MYSQL_USER=<span class="hljs-string">"monitor"
MYSQL_PASS=<span class="hljs-string">"password"
PRIMARY=<span class="hljs-string">"mysql-primary.internal"

<span class="hljs-built_in">echo <span class="hljs-string">"=== MySQL Group Replication Status ==="
mysql -h <span class="hljs-string">"$PRIMARY" -u <span class="hljs-string">"$MYSQL_USER" -p<span class="hljs-string">"$MYSQL_PASS" -e <span class="hljs-string">"
SELECT 
  MEMBER_HOST, MEMBER_PORT, MEMBER_STATE, MEMBER_ROLE,
  TRANSACTIONS_IN_QUEUE
FROM performance_schema.replication_group_members
JOIN performance_schema.replication_group_member_stats USING(MEMBER_ID);
"

<span class="hljs-comment"># Kill primary
<span class="hljs-built_in">echo <span class="hljs-string">"Triggering primary failure..."
ssh <span class="hljs-string">"$PRIMARY" <span class="hljs-string">"sudo systemctl stop mysql"

<span class="hljs-comment"># Wait for election
<span class="hljs-built_in">sleep 10

<span class="hljs-comment"># Find new primary
NEW_PRIMARY=$(mysql -h mysql-replica1.internal -u <span class="hljs-string">"$MYSQL_USER" -p<span class="hljs-string">"$MYSQL_PASS" -e <span class="hljs-string">"
SELECT MEMBER_HOST 
FROM performance_schema.replication_group_members 
WHERE MEMBER_ROLE = 'PRIMARY';" -s)

<span class="hljs-built_in">echo <span class="hljs-string">"New primary elected: $NEW_PRIMARY"

<span class="hljs-comment"># Validate writes work on new primary
TEST_VALUE=<span class="hljs-string">"failover_test_$(date +%s)"
mysql -h <span class="hljs-string">"$NEW_PRIMARY" -u <span class="hljs-string">"$MYSQL_USER" -p<span class="hljs-string">"$MYSQL_PASS" -e <span class="hljs-string">"
INSERT INTO test_db.failover_markers (value, created_at) VALUES ('$TEST_VALUE', NOW());"

<span class="hljs-built_in">echo <span class="hljs-string">"Write to new primary successful"

Redis Sentinel Failover Testing

Redis Sentinel monitors Redis instances and coordinates automatic failover:

import redis
import redis.sentinel
import time
import threading

def test_redis_sentinel_failover(
    sentinel_hosts: list[tuple[str, int]],
    service_name: str,
    trigger_failover: callable,
    rto_seconds: int = 10
) -> dict:
    """Test Redis Sentinel automatic failover."""
    
    sentinel = redis.sentinel.Sentinel(
        sentinel_hosts,
        socket_timeout=0.5,
        retry_on_timeout=True
    )
    
    # Get initial master info
    initial_master = sentinel.discover_master(service_name)
    print(f"Initial master: {initial_master}")
    
    errors = []
    recovery_time = None
    requests_completed = 0
    failover_start = None
    
    def continuous_write():
        nonlocal recovery_time, requests_completed, failover_start
        counter = 0
        
        while True:
            try:
                master = sentinel.master_for(service_name, socket_timeout=0.2)
                master.set(f"test:failover:{counter}", counter, ex=60)
                requests_completed += 1
                
                if failover_start and recovery_time is None:
                    elapsed = time.time() - failover_start
                    if elapsed > 1:  # Stabilization buffer
                        recovery_time = elapsed
                
                counter += 1
                time.sleep(0.1)
            except (redis.ConnectionError, redis.TimeoutError) as e:
                if failover_start is None:
                    failover_start = time.time()
                errors.append({'time': time.time(), 'error': str(e)})
                time.sleep(0.2)
    
    write_thread = threading.Thread(target=continuous_write, daemon=True)
    write_thread.start()
    
    time.sleep(5)  # Baseline
    
    # Trigger master failure
    print("Triggering Redis master failure...")
    trigger_failover()
    
    # Wait for recovery
    time.sleep(rto_seconds + 10)
    
    # Discover new master
    new_master = sentinel.discover_master(service_name)
    print(f"New master after failover: {new_master}")
    
    # Verify data integrity
    master = sentinel.master_for(service_name)
    latest_key = f"test:failover:{requests_completed - 1}"
    value = master.get(latest_key)
    
    return {
        'initial_master': initial_master,
        'new_master': new_master,
        'master_changed': initial_master != new_master,
        'failover_errors': len(errors),
        'requests_completed': requests_completed,
        'recovery_time_seconds': recovery_time,
        'rto_met': recovery_time <= rto_seconds if recovery_time else False,
        'data_integrity': value is not None,
    }

Redis Cluster Failover

For Redis Cluster (sharded setup):

#!/bin/bash
<span class="hljs-comment"># Test Redis cluster node failure handling

CLUSTER_NODE=<span class="hljs-string">"redis-1.internal:6379"

<span class="hljs-built_in">echo <span class="hljs-string">"=== Redis Cluster Info ==="
redis-cli -h redis-1.internal -p 6379 cluster nodes

<span class="hljs-comment"># Identify a master slot
MASTER_NODE=$(redis-cli -h redis-1.internal -p 6379 cluster nodes <span class="hljs-pipe">| grep master <span class="hljs-pipe">| <span class="hljs-built_in">head -1)
MASTER_HOST=$(<span class="hljs-built_in">echo <span class="hljs-string">"$MASTER_NODE" <span class="hljs-pipe">| awk <span class="hljs-string">'{print $2}' <span class="hljs-pipe">| <span class="hljs-built_in">cut -d: -f1 <span class="hljs-pipe">| <span class="hljs-built_in">cut -d@ -f1)
MASTER_PORT=$(<span class="hljs-built_in">echo <span class="hljs-string">"$MASTER_NODE" <span class="hljs-pipe">| awk <span class="hljs-string">'{print $2}' <span class="hljs-pipe">| <span class="hljs-built_in">cut -d: -f2 <span class="hljs-pipe">| <span class="hljs-built_in">cut -d@ -f1)

<span class="hljs-built_in">echo <span class="hljs-string">"Target master: $MASTER_HOST:<span class="hljs-variable">$MASTER_PORT"

<span class="hljs-comment"># Trigger failure
START_TIME=$(<span class="hljs-built_in">date +%s)
redis-cli -h <span class="hljs-string">"$MASTER_HOST" -p <span class="hljs-string">"$MASTER_PORT" debug <span class="hljs-built_in">sleep 60 &

<span class="hljs-comment"># Wait for failover
<span class="hljs-built_in">sleep 5

<span class="hljs-comment"># Check cluster state
redis-cli -h redis-1.internal -p 6379 cluster info <span class="hljs-pipe">| grep cluster_state

END_TIME=$(<span class="hljs-built_in">date +%s)
ELAPSED=$((END_TIME - START_TIME))
<span class="hljs-built_in">echo <span class="hljs-string">"Cluster recovered in ${ELAPSED}s"

redis-cli -h redis-1.internal -p 6379 cluster nodes

Automated Failover Testing in CI

Schedule regular failover tests in a staging environment:

# .github/workflows/db-failover-test.yml
name: Database Failover Tests

on:
  schedule:
    - cron: '0 3 * * 3'  # Weekly on Wednesday 3am
  workflow_dispatch:

jobs:
  postgres-failover:
    runs-on: ubuntu-latest
    environment: staging
    steps:
      - uses: actions/checkout@v4
      
      - name: Check replication lag
        run: python tests/dr/check_pg_replication.py
        env:
          PRIMARY_URL: ${{ secrets.STAGING_PG_PRIMARY }}
          REPLICA_URL: ${{ secrets.STAGING_PG_REPLICA }}
      
      - name: Run failover test
        run: python tests/dr/postgres_failover_test.py
        timeout-minutes: 10
        env:
          PRIMARY_URL: ${{ secrets.STAGING_PG_PRIMARY }}
          APP_URL: ${{ secrets.STAGING_APP_URL }}
          RTO_SECONDS: 30
      
      - name: Report results
        if: always()
        run: python tests/dr/notify_slack.py --results results.json

Common Failover Test Failures

Application holds stale connections: After failover, application still tries to write to old primary. Fix: configure connection pool with health_check_interval and retry logic with exponential backoff.

DNS caching prevents reconnection: Application cached the primary's IP. Fix: set short TTLs on database DNS records (30-60 seconds) and configure your connection library to re-resolve DNS.

Replication lag causes read-after-write failures: Application reads from replica immediately after writing to new primary, gets stale data. Fix: route reads to primary for a brief window after writes, or use synchronous_commit for critical writes.

Split-brain during network partition: Both old primary and new primary accept writes simultaneously. Fix: ensure your failover system uses fencing (Patroni uses DCS leadership with TTL, or STONITH for VM-based setups).

Summary

Database failover testing requires validating three things:

  1. The database layer: Replication is current, failover triggers correctly, new primary is writable
  2. The application layer: Connection pools recover, retry logic works, reads/writes route correctly
  3. The metrics: Actual RTO and RPO are measured and meet requirements

Test each layer in isolation first, then test end-to-end. Automate what you can — replication lag checks should run continuously, failover tests should run weekly in staging, and full DR tests should run at least quarterly.

The database failover you haven't tested is the one that will take 3 hours to recover instead of 30 seconds.

Read more