Testing for High Availability: Multi-Region Failover Validation

Testing for High Availability: Multi-Region Failover Validation

High availability is a promise — that your system will remain operational when components fail. Multi-region failover is how you keep that promise when an entire cloud region becomes unavailable. Testing your HA architecture requires deliberately causing the failures your system claims to handle.

HA Architecture Patterns

Active-passive (warm standby): Traffic serves from region A. Region B is provisioned and synced but not serving traffic. On failover, DNS or load balancers redirect to region B.

Active-active: Traffic serves from both regions simultaneously. Either region can handle the full load. Failover means routing all traffic to the surviving region.

Active-passive (cold standby): Region B is not running during normal operations. On failover, it must be provisioned and started from backup. Cheapest, but slowest RTO.

Testing DNS Failover

DNS failover is often the first mechanism tested. Verify that DNS routes to the backup region when the primary is unreachable:

import dns.resolver
import time
import requests

class DNSFailoverTester:
    def __init__(self, hostname: str, primary_region_ip: str, secondary_region_ip: str):
        self.hostname = hostname
        self.primary_ip = primary_region_ip
        self.secondary_ip = secondary_region_ip
    
    def get_resolved_ip(self) -> str:
        """Resolve hostname to IP."""
        answers = dns.resolver.resolve(self.hostname, 'A')
        return str(answers[0])
    
    def simulate_primary_failure(self):
        """Block access to primary region (requires elevated permissions)."""
        import subprocess
        subprocess.run(
            f"iptables -A OUTPUT -d {self.primary_ip} -j DROP",
            shell=True, check=True
        )
    
    def restore_primary(self):
        import subprocess
        subprocess.run(
            f"iptables -D OUTPUT -d {self.primary_ip} -j DROP",
            shell=True
        )
    
    def measure_failover_time(self) -> dict:
        """Measure how long DNS failover takes after primary failure."""
        # Baseline
        assert self.get_resolved_ip() == self.primary_ip, "Primary is not the current target"
        
        # Fail primary
        failure_time = time.time()
        self.simulate_primary_failure()
        
        # Poll until DNS resolves to secondary
        max_wait = 300  # 5 minutes
        while time.time() - failure_time < max_wait:
            current_ip = self.get_resolved_ip()
            if current_ip == self.secondary_ip:
                failover_time = time.time() - failure_time
                return {
                    "success": True,
                    "failover_seconds": failover_time,
                    "new_endpoint": current_ip
                }
            time.sleep(5)
        
        return {
            "success": False,
            "failover_seconds": None,
            "note": f"DNS did not failover within {max_wait} seconds"
        }

def test_dns_failover_within_sla():
    tester = DNSFailoverTester(
        hostname="api.yourservice.com",
        primary_region_ip="1.2.3.4",
        secondary_region_ip="5.6.7.8"
    )
    
    try:
        result = tester.measure_failover_time()
        
        assert result["success"], f"DNS failover did not complete: {result['note']}"
        assert result["failover_seconds"] <= 120, \
            f"DNS failover took {result['failover_seconds']:.0f}s, exceeds 2-minute SLA"
        
        print(f"✅ DNS failover completed in {result['failover_seconds']:.0f}s")
    finally:
        tester.restore_primary()

Testing Database Replication Lag

Before failing over, you need to know how much data you might lose (RPO). Measure replication lag under load:

import asyncpg
import asyncio
import time

async def measure_replication_lag(primary_dsn: str, replica_dsn: str) -> float:
    """
    Measure how far behind the replica is from the primary.
    Returns lag in seconds.
    """
    primary = await asyncpg.connect(primary_dsn)
    replica = await asyncpg.connect(replica_dsn)
    
    # Check using PostgreSQL's built-in replication lag view
    lag_result = await replica.fetchrow(
        """
        SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
        AS lag_seconds
        """
    )
    
    await primary.close()
    await replica.close()
    
    return lag_result["lag_seconds"] or 0.0

async def test_replication_lag_under_write_load():
    """Replication lag should stay under 5 seconds even during heavy writes."""
    PRIMARY = "postgresql://user:pass@primary.internal/app"
    REPLICA = "postgresql://user:pass@replica.internal/app"
    
    # Generate write load
    primary = await asyncpg.connect(PRIMARY)
    
    async def write_load():
        for i in range(1000):
            await primary.execute(
                "INSERT INTO load_test_events (data, created_at) VALUES ($1, NOW())",
                f"load-test-{i}"
            )
            await asyncio.sleep(0.01)
    
    async def monitor_lag():
        lags = []
        for _ in range(20):
            lag = await measure_replication_lag(PRIMARY, REPLICA)
            lags.append(lag)
            await asyncio.sleep(0.5)
        return lags
    
    write_task = asyncio.create_task(write_load())
    lag_task = asyncio.create_task(monitor_lag())
    
    await asyncio.gather(write_task, lag_task)
    lags = lag_task.result()
    
    max_lag = max(lags)
    assert max_lag <= 5.0, \
        f"Replication lag reached {max_lag:.1f}s under load, exceeds 5s RPO target"
    
    await primary.close()

Multi-Region Health Checks

import concurrent.futures
import requests
import time

REGIONS = {
    "us-east-1": "https://us-east-1.api.yourservice.com/health",
    "eu-west-1": "https://eu-west-1.api.yourservice.com/health",
    "ap-southeast-1": "https://ap-southeast-1.api.yourservice.com/health"
}

def check_regional_health(region: str, url: str) -> dict:
    try:
        start = time.perf_counter()
        response = requests.get(url, timeout=10)
        latency_ms = (time.perf_counter() - start) * 1000
        
        return {
            "region": region,
            "status": "healthy" if response.status_code == 200 else "degraded",
            "status_code": response.status_code,
            "latency_ms": latency_ms
        }
    except requests.exceptions.Timeout:
        return {"region": region, "status": "timeout", "latency_ms": 10000}
    except Exception as e:
        return {"region": region, "status": "error", "error": str(e)}

def test_all_regions_healthy():
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = {
            executor.submit(check_regional_health, region, url): region
            for region, url in REGIONS.items()
        }
        results = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    unhealthy = [r for r in results if r["status"] != "healthy"]
    
    assert len(unhealthy) == 0, \
        f"Unhealthy regions: {[r['region'] for r in unhealthy]}\n" \
        f"Details: {unhealthy}"
    
    # All regions should respond within 2 seconds
    slow_regions = [r for r in results if r.get("latency_ms", 0) > 2000]
    assert len(slow_regions) == 0, \
        f"Slow regions (>2s): {[(r['region'], r['latency_ms']) for r in slow_regions]}"

Chaos: Simulating a Full Region Outage

import subprocess
import contextlib

@contextlib.contextmanager
def simulate_region_outage(region_cidr: str):
    """
    Block all traffic to/from a simulated region CIDR.
    
    In real environments, use cloud provider network controls.
    This example uses iptables for local testing.
    """
    subprocess.run(
        f"iptables -A OUTPUT -d {region_cidr} -j DROP",
        shell=True, check=True
    )
    subprocess.run(
        f"iptables -A INPUT -s {region_cidr} -j DROP",
        shell=True, check=True
    )
    try:
        yield
    finally:
        subprocess.run(f"iptables -D OUTPUT -d {region_cidr} -j DROP", shell=True)
        subprocess.run(f"iptables -D INPUT -s {region_cidr} -j DROP", shell=True)

def test_service_survives_primary_region_outage():
    service_url = "https://api.yourservice.com"
    primary_region_cidr = "10.0.0.0/8"  # Simulated primary region
    
    # Measure baseline
    baseline = requests.get(f"{service_url}/health", timeout=5)
    assert baseline.status_code == 200
    
    with simulate_region_outage(primary_region_cidr):
        # During outage — service should still respond via secondary
        recovered = False
        start = time.time()
        
        while time.time() - start < 120:  # Wait up to 2 minutes for failover
            try:
                response = requests.get(f"{service_url}/health", timeout=5)
                if response.status_code == 200:
                    failover_time = time.time() - start
                    recovered = True
                    break
            except Exception:
                pass
            time.sleep(5)
        
        assert recovered, \
            f"Service did not recover within 2 minutes of primary region outage"
        
        print(f"✅ Failover completed in {failover_time:.0f}s")
        
        # Service should still function during secondary operation
        for _ in range(10):
            response = requests.get(f"{service_url}/api/status", timeout=5)
            assert response.status_code in (200, 204), \
                f"Service returned {response.status_code} during failover operation"
            time.sleep(2)
    
    # After failover ends — service should detect primary recovery
    time.sleep(30)  # Wait for possible failback
    response = requests.get(f"{service_url}/health", timeout=5)
    assert response.status_code == 200, "Service not healthy after failback"

Data Consistency Across Regions

For active-active deployments, verify that writes in one region eventually appear in all regions:

import uuid
import time

def test_cross_region_write_replication():
    us_client = ServiceClient("https://us-east-1.api.yourservice.com")
    eu_client = ServiceClient("https://eu-west-1.api.yourservice.com")
    
    # Write in US
    unique_value = str(uuid.uuid4())
    us_client.create_record(key="cross-region-test", value=unique_value)
    
    # Read from EU — should see it within 30 seconds
    timeout = 30
    start = time.time()
    
    while time.time() - start < timeout:
        eu_record = eu_client.get_record("cross-region-test")
        if eu_record and eu_record["value"] == unique_value:
            replication_time = time.time() - start
            print(f"Replication completed in {replication_time:.1f}s")
            assert replication_time <= 15, \
                f"Replication took {replication_time:.1f}s, exceeds 15s target"
            return
        time.sleep(0.5)
    
    pytest.fail(f"Record not replicated to EU within {timeout}s")

HA Testing Checklist

## Infrastructure
- [ ] DNS failover time measured (target: < 2 minutes)
- [ ] Health check endpoints respond in all regions
- [ ] Database replication lag under 5s under load
- [ ] Cross-region write replication tested

## Failover Scenarios
- [ ] Primary region pod kill — auto-recovery tested
- [ ] Primary region network partition — DNS failover tested
- [ ] Database primary failure — replica promotion tested
- [ ] Load balancer failure — backup LB promotion tested

## Data Integrity
- [ ] No data loss during planned failover
- [ ] No duplicate writes during failover window
- [ ] Read consistency after failback

## Recovery
- [ ] RTO measured against SLO (target: < 15 min)
- [ ] RPO measured against SLO (target: < 60 seconds)
- [ ] Manual failback procedure tested
- [ ] Runbook last tested within 90 days

The goal of HA testing is to make your failover procedure so well-practiced that when a real region outage happens, it's boring. The adrenaline should come from understanding what caused the outage, not from figuring out how to execute the failover.

Read more