Database Failover Testing: PostgreSQL, MySQL, and Redis Strategies
Your database going down is not a matter of if — it's when. A disk fills up, a cloud provider has an AZ outage, a runaway query corrupts an index, a network partition isolates the primary. The question is whether your failover mechanism works correctly and whether your application handles the transition gracefully.
Database failover testing validates that your replication, automatic failover configuration, and application connection handling all work together to achieve your RTO and RPO. This guide covers practical testing strategies for PostgreSQL, MySQL, and Redis.
What Database Failover Testing Covers
A complete database failover test validates:
- Replication health — Is the replica up to date? What is the replication lag?
- Automatic failover trigger — Does the system detect the primary failure and promote the replica?
- Failover time — How long does the promotion take? Does it meet your RTO?
- Data consistency — What transactions were in-flight during failover? Are they committed or rolled back cleanly?
- Application behavior — Does the application reconnect automatically? Does it fail open or fail closed? Are connection pools drained and reset?
- Recovery point — What data was lost? Does it meet your RPO?
PostgreSQL Failover Testing
Setup: Streaming Replication + Patroni
Modern PostgreSQL HA typically uses Patroni (or similar) to manage automatic failover. Before testing, understand your setup:
# Check Patroni cluster status
patronictl -c /etc/patroni/patroni.yml list
<span class="hljs-comment"># Output:
<span class="hljs-comment"># + Cluster: postgres-cluster (7234567890123456789) +---------+----+-----------+
<span class="hljs-comment"># | Member <span class="hljs-pipe">| Host <span class="hljs-pipe">| Role <span class="hljs-pipe">| State <span class="hljs-pipe">| TL <span class="hljs-pipe">| Lag in MB |
<span class="hljs-comment"># +------------------+---------------+---------+---------+----+-----------+
<span class="hljs-comment"># | postgres-primary <span class="hljs-pipe">| 10.0.1.10:5432| Leader <span class="hljs-pipe">| running <span class="hljs-pipe">| 3 <span class="hljs-pipe">| |
<span class="hljs-comment"># | postgres-replica <span class="hljs-pipe">| 10.0.1.11:5432| Replica <span class="hljs-pipe">| running <span class="hljs-pipe">| 3 <span class="hljs-pipe">| 0 |
<span class="hljs-comment"># +------------------+---------------+---------+---------+----+-----------+Measuring Replication Lag
Before triggering any failover, validate your replication baseline:
import psycopg2
from datetime import datetime
def check_replication_lag(primary_url: str, replica_url: str) -> dict:
"""Measure replication lag between primary and replica."""
with psycopg2.connect(primary_url) as primary_conn, \
psycopg2.connect(replica_url) as replica_conn:
primary_cur = primary_conn.cursor()
replica_cur = replica_conn.cursor()
# Get primary LSN (Log Sequence Number)
primary_cur.execute("SELECT pg_current_wal_lsn()")
primary_lsn = primary_cur.fetchone()[0]
# Get replica's received LSN
replica_cur.execute("SELECT pg_last_wal_receive_lsn()")
replica_received_lsn = replica_cur.fetchone()[0]
# Get replica's applied LSN
replica_cur.execute("SELECT pg_last_wal_replay_lsn()")
replica_applied_lsn = replica_cur.fetchone()[0]
# Get lag in bytes
primary_cur.execute("""
SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn,
(sent_lsn - replay_lsn) AS replication_lag_bytes
FROM pg_stat_replication
""")
replication_stats = primary_cur.fetchall()
# Get lag in time from replica
replica_cur.execute("""
SELECT now() - pg_last_xact_replay_timestamp() AS replication_delay
""")
time_lag = replica_cur.fetchone()[0]
return {
'primary_lsn': str(primary_lsn),
'replica_received_lsn': str(replica_received_lsn),
'replica_applied_lsn': str(replica_applied_lsn),
'time_lag_seconds': time_lag.total_seconds() if time_lag else None,
'replication_stats': replication_stats,
}
lag = check_replication_lag("postgres://primary:5432/db", "postgres://replica:5432/db")
print(f"Replication lag: {lag['time_lag_seconds']:.2f}s")
# Fail if lag exceeds RPO threshold
assert lag['time_lag_seconds'] < 5, f"Replication lag {lag['time_lag_seconds']}s exceeds 5s threshold"Simulating Primary Failure
#!/bin/bash
<span class="hljs-comment"># postgresql_failover_test.sh
<span class="hljs-built_in">set -euo pipefail
PRIMARY_HOST=<span class="hljs-string">"10.0.1.10"
REPLICA_HOST=<span class="hljs-string">"10.0.1.11"
DB_PORT=5432
APP_ENDPOINT=<span class="hljs-string">"http://api.internal/health"
RTO_SECONDS=30
<span class="hljs-built_in">echo <span class="hljs-string">"=== PostgreSQL Failover Test ==="
<span class="hljs-built_in">echo <span class="hljs-string">"Start time: $(date -u +%Y-%m-%dT%H:%M:%SZ)"
<span class="hljs-comment"># 1. Baseline health check
<span class="hljs-built_in">echo <span class="hljs-string">"Checking baseline..."
INITIAL_LAG=$(psql <span class="hljs-string">"postgres://$REPLICA_HOST:<span class="hljs-variable">$DB_PORT/postgres" \
-t -c <span class="hljs-string">"SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::int")
<span class="hljs-built_in">echo <span class="hljs-string">"Initial replication lag: ${INITIAL_LAG}s"
<span class="hljs-comment"># 2. Start monitoring application health in background
FAILOVER_START_TIME=<span class="hljs-string">""
RECOVERY_TIME=<span class="hljs-string">""
REQUEST_LOG=$(<span class="hljs-built_in">mktemp)
<span class="hljs-function">monitor_app() {
<span class="hljs-keyword">while <span class="hljs-literal">true; <span class="hljs-keyword">do
TIMESTAMP=$(<span class="hljs-built_in">date +%s)
HTTP_CODE=$(curl -s -o /dev/null -w <span class="hljs-string">"%{http_code}" --max-time 2 <span class="hljs-string">"$APP_ENDPOINT" <span class="hljs-pipe">|| <span class="hljs-built_in">echo <span class="hljs-string">"000")
<span class="hljs-built_in">echo <span class="hljs-string">"$TIMESTAMP <span class="hljs-variable">$HTTP_CODE" >> <span class="hljs-string">"$REQUEST_LOG"
<span class="hljs-built_in">sleep 0.5
<span class="hljs-keyword">done
}
monitor_app &
MONITOR_PID=$!
<span class="hljs-built_in">sleep 5 <span class="hljs-comment"># Let baseline monitoring run
<span class="hljs-comment"># 3. Trigger primary failure (SIGKILL to simulate crash)
<span class="hljs-built_in">echo <span class="hljs-string">"Triggering primary failure at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
FAILOVER_EPOCH=$(<span class="hljs-built_in">date +%s)
ssh <span class="hljs-string">"$PRIMARY_HOST" <span class="hljs-string">"sudo systemctl stop postgresql"
<span class="hljs-comment"># 4. Wait and check application recovery
MAX_WAIT=$((FAILOVER_EPOCH + RTO_SECONDS + <span class="hljs-number">30))
RECOVERED=<span class="hljs-literal">false
<span class="hljs-keyword">while [ <span class="hljs-string">"$(date +%s)" -lt <span class="hljs-string">"$MAX_WAIT" ]; <span class="hljs-keyword">do
HTTP_CODE=$(curl -s -o /dev/null -w <span class="hljs-string">"%{http_code}" --max-time 2 <span class="hljs-string">"$APP_ENDPOINT" <span class="hljs-pipe">|| <span class="hljs-built_in">echo <span class="hljs-string">"000")
<span class="hljs-keyword">if [ <span class="hljs-string">"$HTTP_CODE" = <span class="hljs-string">"200" ]; <span class="hljs-keyword">then
RECOVERY_EPOCH=$(<span class="hljs-built_in">date +%s)
ACTUAL_RTO=$((RECOVERY_EPOCH - FAILOVER_EPOCH))
<span class="hljs-built_in">echo <span class="hljs-string">"Application recovered in ${ACTUAL_RTO}s"
RECOVERED=<span class="hljs-literal">true
<span class="hljs-built_in">break
<span class="hljs-keyword">fi
<span class="hljs-built_in">sleep 1
<span class="hljs-keyword">done
<span class="hljs-comment"># 5. Stop monitoring
<span class="hljs-built_in">kill <span class="hljs-string">"$MONITOR_PID" 2>/dev/null <span class="hljs-pipe">|| <span class="hljs-literal">true
<span class="hljs-comment"># 6. Verify new primary
<span class="hljs-built_in">echo <span class="hljs-string">"Checking Patroni cluster status..."
patronictl -c /etc/patroni/patroni.yml list
<span class="hljs-comment"># 7. Validate RTO
<span class="hljs-keyword">if [ <span class="hljs-string">"$RECOVERED" = <span class="hljs-literal">false ]; <span class="hljs-keyword">then
<span class="hljs-built_in">echo <span class="hljs-string">"FAIL: Application did not recover within $((RTO_SECONDS + 30))s"
<span class="hljs-built_in">exit 1
<span class="hljs-keyword">fi
<span class="hljs-keyword">if [ <span class="hljs-string">"$ACTUAL_RTO" -gt <span class="hljs-string">"$RTO_SECONDS" ]; <span class="hljs-keyword">then
<span class="hljs-built_in">echo <span class="hljs-string">"WARN: RTO ${ACTUAL_RTO}s exceeded target <span class="hljs-variable">${RTO_SECONDS}s"
<span class="hljs-keyword">else
<span class="hljs-built_in">echo <span class="hljs-string">"PASS: RTO ${ACTUAL_RTO}s within target <span class="hljs-variable">${RTO_SECONDS}s"
<span class="hljs-keyword">fi
<span class="hljs-comment"># 8. Analyze request log for error rate during failover
TOTAL_REQUESTS=$(<span class="hljs-built_in">wc -l < <span class="hljs-string">"$REQUEST_LOG")
FAILED_REQUESTS=$(grep -c <span class="hljs-string">" 000\| 50[0-9]" <span class="hljs-string">"$REQUEST_LOG" <span class="hljs-pipe">|| <span class="hljs-literal">true)
ERROR_RATE=$((FAILED_REQUESTS * <span class="hljs-number">100 / TOTAL_REQUESTS))
<span class="hljs-built_in">echo <span class="hljs-string">"Error rate during failover: ${FAILED_REQUESTS}/<span class="hljs-variable">${TOTAL_REQUESTS} (<span class="hljs-variable">${ERROR_RATE}%)"
<span class="hljs-built_in">rm -f <span class="hljs-string">"$REQUEST_LOG"Testing Application Connection Handling
The database may failover cleanly while your application refuses to reconnect. Test both:
import psycopg2
import psycopg2.pool
import time
import threading
from contextlib import contextmanager
def test_connection_pool_recovery(
primary_url: str,
trigger_failover: callable,
max_recovery_seconds: int = 30
) -> dict:
"""
Test that a connection pool recovers after primary failure.
Simulates real application behavior.
"""
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=primary_url
)
results = {
'requests_before': 0,
'requests_during': 0,
'requests_after': 0,
'errors_during': 0,
'recovery_time': None,
'first_successful_after': None,
}
start = time.time()
failover_time = None
def execute_query(phase: str):
try:
conn = pool.getconn()
try:
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM orders WHERE created_at > NOW() - INTERVAL '1 hour'")
result = cur.fetchone()[0]
conn.commit()
results[f'requests_{phase}'] += 1
return True
except psycopg2.Error as e:
conn.rollback()
raise
finally:
pool.putconn(conn)
except (psycopg2.Error, Exception):
if phase == 'during':
results['errors_during'] += 1
return False
# Pre-failover baseline
for _ in range(20):
execute_query('before')
time.sleep(0.1)
# Trigger failover
print("Triggering failover...")
failover_time = time.time()
trigger_thread = threading.Thread(target=trigger_failover)
trigger_thread.start()
# Execute queries during failover window
deadline = failover_time + 60
while time.time() < deadline:
success = execute_query('during')
if success and results['recovery_time'] is None and failover_time:
elapsed = time.time() - failover_time
if elapsed > 2: # Ignore brief hiccups
results['recovery_time'] = elapsed
time.sleep(0.5)
# Post-failover validation
for _ in range(20):
execute_query('after')
time.sleep(0.1)
trigger_thread.join()
pool.closeall()
return resultsMySQL Failover Testing (with Group Replication)
MySQL Group Replication provides automatic primary election. Testing is similar but uses different tooling:
#!/bin/bash
<span class="hljs-comment"># mysql_failover_test.sh
MYSQL_USER=<span class="hljs-string">"monitor"
MYSQL_PASS=<span class="hljs-string">"password"
PRIMARY=<span class="hljs-string">"mysql-primary.internal"
<span class="hljs-built_in">echo <span class="hljs-string">"=== MySQL Group Replication Status ==="
mysql -h <span class="hljs-string">"$PRIMARY" -u <span class="hljs-string">"$MYSQL_USER" -p<span class="hljs-string">"$MYSQL_PASS" -e <span class="hljs-string">"
SELECT
MEMBER_HOST, MEMBER_PORT, MEMBER_STATE, MEMBER_ROLE,
TRANSACTIONS_IN_QUEUE
FROM performance_schema.replication_group_members
JOIN performance_schema.replication_group_member_stats USING(MEMBER_ID);
"
<span class="hljs-comment"># Kill primary
<span class="hljs-built_in">echo <span class="hljs-string">"Triggering primary failure..."
ssh <span class="hljs-string">"$PRIMARY" <span class="hljs-string">"sudo systemctl stop mysql"
<span class="hljs-comment"># Wait for election
<span class="hljs-built_in">sleep 10
<span class="hljs-comment"># Find new primary
NEW_PRIMARY=$(mysql -h mysql-replica1.internal -u <span class="hljs-string">"$MYSQL_USER" -p<span class="hljs-string">"$MYSQL_PASS" -e <span class="hljs-string">"
SELECT MEMBER_HOST
FROM performance_schema.replication_group_members
WHERE MEMBER_ROLE = 'PRIMARY';" -s)
<span class="hljs-built_in">echo <span class="hljs-string">"New primary elected: $NEW_PRIMARY"
<span class="hljs-comment"># Validate writes work on new primary
TEST_VALUE=<span class="hljs-string">"failover_test_$(date +%s)"
mysql -h <span class="hljs-string">"$NEW_PRIMARY" -u <span class="hljs-string">"$MYSQL_USER" -p<span class="hljs-string">"$MYSQL_PASS" -e <span class="hljs-string">"
INSERT INTO test_db.failover_markers (value, created_at) VALUES ('$TEST_VALUE', NOW());"
<span class="hljs-built_in">echo <span class="hljs-string">"Write to new primary successful"Redis Sentinel Failover Testing
Redis Sentinel monitors Redis instances and coordinates automatic failover:
import redis
import redis.sentinel
import time
import threading
def test_redis_sentinel_failover(
sentinel_hosts: list[tuple[str, int]],
service_name: str,
trigger_failover: callable,
rto_seconds: int = 10
) -> dict:
"""Test Redis Sentinel automatic failover."""
sentinel = redis.sentinel.Sentinel(
sentinel_hosts,
socket_timeout=0.5,
retry_on_timeout=True
)
# Get initial master info
initial_master = sentinel.discover_master(service_name)
print(f"Initial master: {initial_master}")
errors = []
recovery_time = None
requests_completed = 0
failover_start = None
def continuous_write():
nonlocal recovery_time, requests_completed, failover_start
counter = 0
while True:
try:
master = sentinel.master_for(service_name, socket_timeout=0.2)
master.set(f"test:failover:{counter}", counter, ex=60)
requests_completed += 1
if failover_start and recovery_time is None:
elapsed = time.time() - failover_start
if elapsed > 1: # Stabilization buffer
recovery_time = elapsed
counter += 1
time.sleep(0.1)
except (redis.ConnectionError, redis.TimeoutError) as e:
if failover_start is None:
failover_start = time.time()
errors.append({'time': time.time(), 'error': str(e)})
time.sleep(0.2)
write_thread = threading.Thread(target=continuous_write, daemon=True)
write_thread.start()
time.sleep(5) # Baseline
# Trigger master failure
print("Triggering Redis master failure...")
trigger_failover()
# Wait for recovery
time.sleep(rto_seconds + 10)
# Discover new master
new_master = sentinel.discover_master(service_name)
print(f"New master after failover: {new_master}")
# Verify data integrity
master = sentinel.master_for(service_name)
latest_key = f"test:failover:{requests_completed - 1}"
value = master.get(latest_key)
return {
'initial_master': initial_master,
'new_master': new_master,
'master_changed': initial_master != new_master,
'failover_errors': len(errors),
'requests_completed': requests_completed,
'recovery_time_seconds': recovery_time,
'rto_met': recovery_time <= rto_seconds if recovery_time else False,
'data_integrity': value is not None,
}Redis Cluster Failover
For Redis Cluster (sharded setup):
#!/bin/bash
<span class="hljs-comment"># Test Redis cluster node failure handling
CLUSTER_NODE=<span class="hljs-string">"redis-1.internal:6379"
<span class="hljs-built_in">echo <span class="hljs-string">"=== Redis Cluster Info ==="
redis-cli -h redis-1.internal -p 6379 cluster nodes
<span class="hljs-comment"># Identify a master slot
MASTER_NODE=$(redis-cli -h redis-1.internal -p 6379 cluster nodes <span class="hljs-pipe">| grep master <span class="hljs-pipe">| <span class="hljs-built_in">head -1)
MASTER_HOST=$(<span class="hljs-built_in">echo <span class="hljs-string">"$MASTER_NODE" <span class="hljs-pipe">| awk <span class="hljs-string">'{print $2}' <span class="hljs-pipe">| <span class="hljs-built_in">cut -d: -f1 <span class="hljs-pipe">| <span class="hljs-built_in">cut -d@ -f1)
MASTER_PORT=$(<span class="hljs-built_in">echo <span class="hljs-string">"$MASTER_NODE" <span class="hljs-pipe">| awk <span class="hljs-string">'{print $2}' <span class="hljs-pipe">| <span class="hljs-built_in">cut -d: -f2 <span class="hljs-pipe">| <span class="hljs-built_in">cut -d@ -f1)
<span class="hljs-built_in">echo <span class="hljs-string">"Target master: $MASTER_HOST:<span class="hljs-variable">$MASTER_PORT"
<span class="hljs-comment"># Trigger failure
START_TIME=$(<span class="hljs-built_in">date +%s)
redis-cli -h <span class="hljs-string">"$MASTER_HOST" -p <span class="hljs-string">"$MASTER_PORT" debug <span class="hljs-built_in">sleep 60 &
<span class="hljs-comment"># Wait for failover
<span class="hljs-built_in">sleep 5
<span class="hljs-comment"># Check cluster state
redis-cli -h redis-1.internal -p 6379 cluster info <span class="hljs-pipe">| grep cluster_state
END_TIME=$(<span class="hljs-built_in">date +%s)
ELAPSED=$((END_TIME - START_TIME))
<span class="hljs-built_in">echo <span class="hljs-string">"Cluster recovered in ${ELAPSED}s"
redis-cli -h redis-1.internal -p 6379 cluster nodesAutomated Failover Testing in CI
Schedule regular failover tests in a staging environment:
# .github/workflows/db-failover-test.yml
name: Database Failover Tests
on:
schedule:
- cron: '0 3 * * 3' # Weekly on Wednesday 3am
workflow_dispatch:
jobs:
postgres-failover:
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v4
- name: Check replication lag
run: python tests/dr/check_pg_replication.py
env:
PRIMARY_URL: ${{ secrets.STAGING_PG_PRIMARY }}
REPLICA_URL: ${{ secrets.STAGING_PG_REPLICA }}
- name: Run failover test
run: python tests/dr/postgres_failover_test.py
timeout-minutes: 10
env:
PRIMARY_URL: ${{ secrets.STAGING_PG_PRIMARY }}
APP_URL: ${{ secrets.STAGING_APP_URL }}
RTO_SECONDS: 30
- name: Report results
if: always()
run: python tests/dr/notify_slack.py --results results.jsonCommon Failover Test Failures
Application holds stale connections: After failover, application still tries to write to old primary. Fix: configure connection pool with health_check_interval and retry logic with exponential backoff.
DNS caching prevents reconnection: Application cached the primary's IP. Fix: set short TTLs on database DNS records (30-60 seconds) and configure your connection library to re-resolve DNS.
Replication lag causes read-after-write failures: Application reads from replica immediately after writing to new primary, gets stale data. Fix: route reads to primary for a brief window after writes, or use synchronous_commit for critical writes.
Split-brain during network partition: Both old primary and new primary accept writes simultaneously. Fix: ensure your failover system uses fencing (Patroni uses DCS leadership with TTL, or STONITH for VM-based setups).
Summary
Database failover testing requires validating three things:
- The database layer: Replication is current, failover triggers correctly, new primary is writable
- The application layer: Connection pools recover, retry logic works, reads/writes route correctly
- The metrics: Actual RTO and RPO are measured and meet requirements
Test each layer in isolation first, then test end-to-end. Automate what you can — replication lag checks should run continuously, failover tests should run weekly in staging, and full DR tests should run at least quarterly.
The database failover you haven't tested is the one that will take 3 hours to recover instead of 30 seconds.