Testing for High Availability: Multi-Region Failover Validation
High availability is a promise — that your system will remain operational when components fail. Multi-region failover is how you keep that promise when an entire cloud region becomes unavailable. Testing your HA architecture requires deliberately causing the failures your system claims to handle.
HA Architecture Patterns
Active-passive (warm standby): Traffic serves from region A. Region B is provisioned and synced but not serving traffic. On failover, DNS or load balancers redirect to region B.
Active-active: Traffic serves from both regions simultaneously. Either region can handle the full load. Failover means routing all traffic to the surviving region.
Active-passive (cold standby): Region B is not running during normal operations. On failover, it must be provisioned and started from backup. Cheapest, but slowest RTO.
Testing DNS Failover
DNS failover is often the first mechanism tested. Verify that DNS routes to the backup region when the primary is unreachable:
import dns.resolver
import time
import requests
class DNSFailoverTester:
def __init__(self, hostname: str, primary_region_ip: str, secondary_region_ip: str):
self.hostname = hostname
self.primary_ip = primary_region_ip
self.secondary_ip = secondary_region_ip
def get_resolved_ip(self) -> str:
"""Resolve hostname to IP."""
answers = dns.resolver.resolve(self.hostname, 'A')
return str(answers[0])
def simulate_primary_failure(self):
"""Block access to primary region (requires elevated permissions)."""
import subprocess
subprocess.run(
f"iptables -A OUTPUT -d {self.primary_ip} -j DROP",
shell=True, check=True
)
def restore_primary(self):
import subprocess
subprocess.run(
f"iptables -D OUTPUT -d {self.primary_ip} -j DROP",
shell=True
)
def measure_failover_time(self) -> dict:
"""Measure how long DNS failover takes after primary failure."""
# Baseline
assert self.get_resolved_ip() == self.primary_ip, "Primary is not the current target"
# Fail primary
failure_time = time.time()
self.simulate_primary_failure()
# Poll until DNS resolves to secondary
max_wait = 300 # 5 minutes
while time.time() - failure_time < max_wait:
current_ip = self.get_resolved_ip()
if current_ip == self.secondary_ip:
failover_time = time.time() - failure_time
return {
"success": True,
"failover_seconds": failover_time,
"new_endpoint": current_ip
}
time.sleep(5)
return {
"success": False,
"failover_seconds": None,
"note": f"DNS did not failover within {max_wait} seconds"
}
def test_dns_failover_within_sla():
tester = DNSFailoverTester(
hostname="api.yourservice.com",
primary_region_ip="1.2.3.4",
secondary_region_ip="5.6.7.8"
)
try:
result = tester.measure_failover_time()
assert result["success"], f"DNS failover did not complete: {result['note']}"
assert result["failover_seconds"] <= 120, \
f"DNS failover took {result['failover_seconds']:.0f}s, exceeds 2-minute SLA"
print(f"✅ DNS failover completed in {result['failover_seconds']:.0f}s")
finally:
tester.restore_primary()Testing Database Replication Lag
Before failing over, you need to know how much data you might lose (RPO). Measure replication lag under load:
import asyncpg
import asyncio
import time
async def measure_replication_lag(primary_dsn: str, replica_dsn: str) -> float:
"""
Measure how far behind the replica is from the primary.
Returns lag in seconds.
"""
primary = await asyncpg.connect(primary_dsn)
replica = await asyncpg.connect(replica_dsn)
# Check using PostgreSQL's built-in replication lag view
lag_result = await replica.fetchrow(
"""
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
AS lag_seconds
"""
)
await primary.close()
await replica.close()
return lag_result["lag_seconds"] or 0.0
async def test_replication_lag_under_write_load():
"""Replication lag should stay under 5 seconds even during heavy writes."""
PRIMARY = "postgresql://user:pass@primary.internal/app"
REPLICA = "postgresql://user:pass@replica.internal/app"
# Generate write load
primary = await asyncpg.connect(PRIMARY)
async def write_load():
for i in range(1000):
await primary.execute(
"INSERT INTO load_test_events (data, created_at) VALUES ($1, NOW())",
f"load-test-{i}"
)
await asyncio.sleep(0.01)
async def monitor_lag():
lags = []
for _ in range(20):
lag = await measure_replication_lag(PRIMARY, REPLICA)
lags.append(lag)
await asyncio.sleep(0.5)
return lags
write_task = asyncio.create_task(write_load())
lag_task = asyncio.create_task(monitor_lag())
await asyncio.gather(write_task, lag_task)
lags = lag_task.result()
max_lag = max(lags)
assert max_lag <= 5.0, \
f"Replication lag reached {max_lag:.1f}s under load, exceeds 5s RPO target"
await primary.close()Multi-Region Health Checks
import concurrent.futures
import requests
import time
REGIONS = {
"us-east-1": "https://us-east-1.api.yourservice.com/health",
"eu-west-1": "https://eu-west-1.api.yourservice.com/health",
"ap-southeast-1": "https://ap-southeast-1.api.yourservice.com/health"
}
def check_regional_health(region: str, url: str) -> dict:
try:
start = time.perf_counter()
response = requests.get(url, timeout=10)
latency_ms = (time.perf_counter() - start) * 1000
return {
"region": region,
"status": "healthy" if response.status_code == 200 else "degraded",
"status_code": response.status_code,
"latency_ms": latency_ms
}
except requests.exceptions.Timeout:
return {"region": region, "status": "timeout", "latency_ms": 10000}
except Exception as e:
return {"region": region, "status": "error", "error": str(e)}
def test_all_regions_healthy():
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(check_regional_health, region, url): region
for region, url in REGIONS.items()
}
results = [f.result() for f in concurrent.futures.as_completed(futures)]
unhealthy = [r for r in results if r["status"] != "healthy"]
assert len(unhealthy) == 0, \
f"Unhealthy regions: {[r['region'] for r in unhealthy]}\n" \
f"Details: {unhealthy}"
# All regions should respond within 2 seconds
slow_regions = [r for r in results if r.get("latency_ms", 0) > 2000]
assert len(slow_regions) == 0, \
f"Slow regions (>2s): {[(r['region'], r['latency_ms']) for r in slow_regions]}"Chaos: Simulating a Full Region Outage
import subprocess
import contextlib
@contextlib.contextmanager
def simulate_region_outage(region_cidr: str):
"""
Block all traffic to/from a simulated region CIDR.
In real environments, use cloud provider network controls.
This example uses iptables for local testing.
"""
subprocess.run(
f"iptables -A OUTPUT -d {region_cidr} -j DROP",
shell=True, check=True
)
subprocess.run(
f"iptables -A INPUT -s {region_cidr} -j DROP",
shell=True, check=True
)
try:
yield
finally:
subprocess.run(f"iptables -D OUTPUT -d {region_cidr} -j DROP", shell=True)
subprocess.run(f"iptables -D INPUT -s {region_cidr} -j DROP", shell=True)
def test_service_survives_primary_region_outage():
service_url = "https://api.yourservice.com"
primary_region_cidr = "10.0.0.0/8" # Simulated primary region
# Measure baseline
baseline = requests.get(f"{service_url}/health", timeout=5)
assert baseline.status_code == 200
with simulate_region_outage(primary_region_cidr):
# During outage — service should still respond via secondary
recovered = False
start = time.time()
while time.time() - start < 120: # Wait up to 2 minutes for failover
try:
response = requests.get(f"{service_url}/health", timeout=5)
if response.status_code == 200:
failover_time = time.time() - start
recovered = True
break
except Exception:
pass
time.sleep(5)
assert recovered, \
f"Service did not recover within 2 minutes of primary region outage"
print(f"✅ Failover completed in {failover_time:.0f}s")
# Service should still function during secondary operation
for _ in range(10):
response = requests.get(f"{service_url}/api/status", timeout=5)
assert response.status_code in (200, 204), \
f"Service returned {response.status_code} during failover operation"
time.sleep(2)
# After failover ends — service should detect primary recovery
time.sleep(30) # Wait for possible failback
response = requests.get(f"{service_url}/health", timeout=5)
assert response.status_code == 200, "Service not healthy after failback"Data Consistency Across Regions
For active-active deployments, verify that writes in one region eventually appear in all regions:
import uuid
import time
def test_cross_region_write_replication():
us_client = ServiceClient("https://us-east-1.api.yourservice.com")
eu_client = ServiceClient("https://eu-west-1.api.yourservice.com")
# Write in US
unique_value = str(uuid.uuid4())
us_client.create_record(key="cross-region-test", value=unique_value)
# Read from EU — should see it within 30 seconds
timeout = 30
start = time.time()
while time.time() - start < timeout:
eu_record = eu_client.get_record("cross-region-test")
if eu_record and eu_record["value"] == unique_value:
replication_time = time.time() - start
print(f"Replication completed in {replication_time:.1f}s")
assert replication_time <= 15, \
f"Replication took {replication_time:.1f}s, exceeds 15s target"
return
time.sleep(0.5)
pytest.fail(f"Record not replicated to EU within {timeout}s")HA Testing Checklist
## Infrastructure
- [ ] DNS failover time measured (target: < 2 minutes)
- [ ] Health check endpoints respond in all regions
- [ ] Database replication lag under 5s under load
- [ ] Cross-region write replication tested
## Failover Scenarios
- [ ] Primary region pod kill — auto-recovery tested
- [ ] Primary region network partition — DNS failover tested
- [ ] Database primary failure — replica promotion tested
- [ ] Load balancer failure — backup LB promotion tested
## Data Integrity
- [ ] No data loss during planned failover
- [ ] No duplicate writes during failover window
- [ ] Read consistency after failback
## Recovery
- [ ] RTO measured against SLO (target: < 15 min)
- [ ] RPO measured against SLO (target: < 60 seconds)
- [ ] Manual failback procedure tested
- [ ] Runbook last tested within 90 daysThe goal of HA testing is to make your failover procedure so well-practiced that when a real region outage happens, it's boring. The adrenaline should come from understanding what caused the outage, not from figuring out how to execute the failover.