Runbook Automation Testing and Incident Response Drills

Runbook Automation Testing and Incident Response Drills

When an incident happens at 3 AM, the quality of your response depends on two things: the quality of your runbooks and how many times your team has practiced them. Testing runbook automation and running incident drills converts theory into muscle memory.

Automated Runbooks vs. Reference Runbooks

Reference runbooks are documentation — step-by-step guides that humans follow.

Automated runbooks are executable — scripts or workflows that a human triggers and monitors.

The best runbook automation strategy: start with reference runbooks, identify the most common and repeatable steps, automate those steps, and keep the human in the loop for judgment calls.

Testing Automated Runbooks

Unit Testing Runbook Functions

# runbooks/database.py
import subprocess
import boto3

def get_replica_lag_seconds(replica_endpoint: str) -> float:
    """Get current replication lag for a PostgreSQL replica."""
    result = subprocess.run(
        ["psql", f"postgresql://monitor:pass@{replica_endpoint}/postgres",
         "-c", "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag",
         "-t", "--no-align"],
        capture_output=True, text=True, timeout=10
    )
    return float(result.stdout.strip())

def promote_replica(replica_identifier: str, region: str) -> str:
    """Promote an RDS read replica to primary."""
    rds = boto3.client('rds', region_name=region)
    rds.promote_read_replica(DBInstanceIdentifier=replica_identifier)
    
    waiter = rds.get_waiter('db_instance_available')
    waiter.wait(DBInstanceIdentifier=replica_identifier)
    
    instance = rds.describe_db_instances(DBInstanceIdentifier=replica_identifier)
    return instance['DBInstances'][0]['Endpoint']['Address']
# tests/test_runbooks/test_database.py
import pytest
from unittest.mock import patch, MagicMock
from runbooks.database import get_replica_lag_seconds, promote_replica

def test_get_replica_lag_parses_output():
    with patch('subprocess.run') as mock_run:
        mock_run.return_value = MagicMock(
            stdout="  4.5  \n",
            returncode=0
        )
        
        lag = get_replica_lag_seconds("replica.internal")
        assert lag == 4.5

def test_get_replica_lag_handles_null():
    """Null lag means no WAL has been received yet."""
    with patch('subprocess.run') as mock_run:
        mock_run.return_value = MagicMock(stdout="\n", returncode=0)  # Empty = NULL
        
        # Should return 0, not crash
        with pytest.raises(ValueError):
            # Verify the calling code handles this
            float("")

def test_promote_replica_waits_for_availability():
    with patch('boto3.client') as mock_boto:
        mock_rds = MagicMock()
        mock_boto.return_value = mock_rds
        mock_rds.describe_db_instances.return_value = {
            'DBInstances': [{
                'Endpoint': {'Address': 'new-primary.rds.amazonaws.com'}
            }]
        }
        
        new_endpoint = promote_replica("prod-replica-1", "us-east-1")
        
        assert new_endpoint == "new-primary.rds.amazonaws.com"
        mock_rds.promote_read_replica.assert_called_once_with(
            DBInstanceIdentifier="prod-replica-1"
        )
        # Waiter should have been called
        mock_rds.get_waiter.assert_called_with('db_instance_available')

Integration Testing Against Staging

import pytest
import os

@pytest.mark.integration
@pytest.mark.skipif(
    os.environ.get('TEST_ENV') != 'staging',
    reason="Integration tests only run in staging"
)
class TestDatabaseRunbooks:
    
    def test_replica_lag_is_measurable(self):
        """Verify lag check works against actual staging replica."""
        lag = get_replica_lag_seconds(os.environ['STAGING_REPLICA_ENDPOINT'])
        
        assert isinstance(lag, float)
        assert 0 <= lag <= 60, f"Lag {lag}s outside expected range (0-60s)"
    
    def test_runbook_complete_execution(self):
        """Run a complete non-destructive scenario: measure lag → report → alert."""
        lag = get_replica_lag_seconds(os.environ['STAGING_REPLICA_ENDPOINT'])
        
        if lag > 30:
            alert_message = f"High replication lag: {lag:.1f}s"
            result = send_slack_alert("#staging-ops", alert_message)
            assert result["ok"] is True
        else:
            print(f"Lag {lag:.1f}s — within normal bounds, no alert needed")

Incident Response Drills

Drills differ from tabletop exercises in one key way: you actually do the things, not just talk about them. Drills involve real systems (staging or production), real tools, and real time pressure.

Drill Types

Silent drill: The incident is injected without telling the on-call engineer. They receive a real alert and must respond normally. The most realistic, most disruptive.

Announced drill: The team knows a drill is happening. The scenario is revealed once the drill starts. Useful for training without the stress of a real 3 AM wake-up.

Walk-through drill: The team knows the exact scenario in advance. They execute the runbook step-by-step in real systems. Best for validating that runbooks are correct.

Measuring MTTR

Mean Time to Recovery is the most important incident response metric. Track it for every drill:

from datetime import datetime
from dataclasses import dataclass
from typing import Optional

@dataclass
class IncidentTimeline:
    incident_id: str
    failure_injected_at: datetime
    first_alert_fired_at: Optional[datetime] = None
    oncall_acknowledged_at: Optional[datetime] = None
    diagnosis_complete_at: Optional[datetime] = None
    mitigation_started_at: Optional[datetime] = None
    service_restored_at: Optional[datetime] = None
    
    def mtta(self) -> Optional[float]:
        """Mean Time to Acknowledge (minutes)."""
        if self.first_alert_fired_at and self.oncall_acknowledged_at:
            return (self.oncall_acknowledged_at - self.first_alert_fired_at).total_seconds() / 60
        return None
    
    def mttd(self) -> Optional[float]:
        """Mean Time to Diagnose (minutes from alert)."""
        if self.first_alert_fired_at and self.diagnosis_complete_at:
            return (self.diagnosis_complete_at - self.first_alert_fired_at).total_seconds() / 60
        return None
    
    def mttr(self) -> Optional[float]:
        """Mean Time to Recovery (minutes from failure injection)."""
        if self.service_restored_at:
            return (self.service_restored_at - self.failure_injected_at).total_seconds() / 60
        return None

def run_drill_with_timing(scenario_name: str, inject_failure, restore_service) -> IncidentTimeline:
    """Run a drill and capture timing at each stage."""
    timeline = IncidentTimeline(
        incident_id=f"drill-{scenario_name}-{int(time.time())}",
        failure_injected_at=datetime.utcnow()
    )
    
    # Inject failure
    inject_failure()
    print(f"[{timeline.failure_injected_at.strftime('%H:%M:%S')}] Failure injected")
    
    # Wait for alert to fire
    alert_fired = wait_for_alert(timeout_minutes=5)
    timeline.first_alert_fired_at = datetime.utcnow()
    print(f"[{timeline.first_alert_fired_at.strftime('%H:%M:%S')}] Alert fired")
    
    # Simulate on-call acknowledgment
    input("Press Enter when on-call acknowledges the alert...")
    timeline.oncall_acknowledged_at = datetime.utcnow()
    
    # Simulate diagnosis
    input("Press Enter when diagnosis is complete...")
    timeline.diagnosis_complete_at = datetime.utcnow()
    
    # Restore service
    restore_service()
    timeline.service_restored_at = datetime.utcnow()
    print(f"[{timeline.service_restored_at.strftime('%H:%M:%S')}] Service restored")
    
    print(f"\n=== Drill Results: {scenario_name} ===")
    print(f"MTTA: {timeline.mtta():.1f} minutes")
    print(f"MTTD: {timeline.mttd():.1f} minutes")
    print(f"MTTR: {timeline.mttr():.1f} minutes")
    
    return timeline

Scenario Library

# scenario_library.py
DRILL_SCENARIOS = [
    {
        "name": "database_primary_failure",
        "description": "Primary database becomes unreachable",
        "inject": lambda: block_db_traffic(PRIMARY_DB_IP),
        "restore": lambda: unblock_db_traffic(PRIMARY_DB_IP),
        "expected_rto_minutes": 15,
        "expected_alerts": ["DatabasePrimaryDown", "ErrorRateHigh"],
        "runbook": "runbooks/database-failover.md"
    },
    {
        "name": "cache_failure",
        "description": "Redis cache unavailable",
        "inject": lambda: subprocess.run("kubectl scale deployment redis --replicas=0 -n production", shell=True),
        "restore": lambda: subprocess.run("kubectl scale deployment redis --replicas=3 -n production", shell=True),
        "expected_rto_minutes": 5,
        "expected_alerts": ["CacheDown"],
        "runbook": "runbooks/cache-recovery.md"
    },
    {
        "name": "high_memory_pod",
        "description": "Application pod consuming excess memory",
        "inject": lambda: stress_test_pod("api-service", memory_mb=7000),
        "restore": lambda: kill_high_memory_pod("api-service"),
        "expected_rto_minutes": 3,
        "expected_alerts": ["PodMemoryHigh"],
        "runbook": "runbooks/pod-memory.md"
    }
]

Alert Testing

Your runbooks are only as good as the alerts that trigger them. Test that alerts fire when they should and don't fire when they shouldn't.

class AlertTester:
    def __init__(self, prometheus_url: str, alertmanager_url: str):
        self.prom = prometheus_url
        self.am = alertmanager_url
    
    def inject_metric(self, metric_name: str, value: float, labels: dict):
        """Push a metric value to the test prometheus pushgateway."""
        import requests
        label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
        data = f"# TYPE {metric_name} gauge\n{metric_name}{{{label_str}}} {value}\n"
        requests.post(f"{self.prom}/metrics/job/alert-test", data=data)
    
    def wait_for_alert(self, alert_name: str, timeout_seconds: int = 120) -> bool:
        """Wait for an alert to appear in Alertmanager."""
        deadline = time.time() + timeout_seconds
        while time.time() < deadline:
            import requests
            response = requests.get(f"{self.am}/api/v2/alerts")
            active_alerts = response.json()
            
            for alert in active_alerts:
                if alert.get("labels", {}).get("alertname") == alert_name:
                    return True
            
            time.sleep(5)
        return False

def test_high_error_rate_alert_fires():
    """Verify ErrorRateHigh alert fires when error rate exceeds 5%."""
    tester = AlertTester(
        prometheus_url="http://prometheus.test:9090",
        alertmanager_url="http://alertmanager.test:9093"
    )
    
    # Inject high error rate
    tester.inject_metric(
        "http_requests_total",
        value=500,
        labels={"status": "500", "service": "api-service"}
    )
    tester.inject_metric(
        "http_requests_total",
        value=9500,
        labels={"status": "200", "service": "api-service"}
    )
    # 500/10000 = 5% error rate — should trigger alert
    
    alert_fired = tester.wait_for_alert("ErrorRateHigh", timeout_seconds=120)
    assert alert_fired, "ErrorRateHigh alert did not fire within 2 minutes"

def test_no_alert_for_normal_error_rate():
    """Verify no false-positive alert for 0.5% error rate."""
    tester = AlertTester(...)
    
    tester.inject_metric("http_requests_total", 5, {"status": "500", "service": "api-service"})
    tester.inject_metric("http_requests_total", 995, {"status": "200", "service": "api-service"})
    
    time.sleep(90)  # Wait for any alert to potentially fire
    
    import requests
    alerts = requests.get(f"{tester.am}/api/v2/alerts").json()
    assert not any(a["labels"]["alertname"] == "ErrorRateHigh" for a in alerts), \
        "False positive: ErrorRateHigh fired at 0.5% error rate"

Building a Drill Culture

Drills only improve your team if they're done regularly and honestly. Anti-patterns to avoid:

Unrealistic scenarios — If every drill is "pod restart and kubernetes heals it," you're not testing anything meaningful.

Blame in post-drill reviews — If engineers fear looking incompetent, they'll avoid participating. Make drills psychologically safe.

No follow-through — If the action items from drills never get completed, the drills are waste. Block time to close the tickets they generate.

Skipping post-drill review — The learning happens in the retrospective. A drill without a review is a fire drill, not a learning exercise.

The goal is a team that, when a real incident happens, says "we've practiced this" — not "what do we do now?"

Read more