Runbook Automation Testing and Incident Response Drills
When an incident happens at 3 AM, the quality of your response depends on two things: the quality of your runbooks and how many times your team has practiced them. Testing runbook automation and running incident drills converts theory into muscle memory.
Automated Runbooks vs. Reference Runbooks
Reference runbooks are documentation — step-by-step guides that humans follow.
Automated runbooks are executable — scripts or workflows that a human triggers and monitors.
The best runbook automation strategy: start with reference runbooks, identify the most common and repeatable steps, automate those steps, and keep the human in the loop for judgment calls.
Testing Automated Runbooks
Unit Testing Runbook Functions
# runbooks/database.py
import subprocess
import boto3
def get_replica_lag_seconds(replica_endpoint: str) -> float:
"""Get current replication lag for a PostgreSQL replica."""
result = subprocess.run(
["psql", f"postgresql://monitor:pass@{replica_endpoint}/postgres",
"-c", "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag",
"-t", "--no-align"],
capture_output=True, text=True, timeout=10
)
return float(result.stdout.strip())
def promote_replica(replica_identifier: str, region: str) -> str:
"""Promote an RDS read replica to primary."""
rds = boto3.client('rds', region_name=region)
rds.promote_read_replica(DBInstanceIdentifier=replica_identifier)
waiter = rds.get_waiter('db_instance_available')
waiter.wait(DBInstanceIdentifier=replica_identifier)
instance = rds.describe_db_instances(DBInstanceIdentifier=replica_identifier)
return instance['DBInstances'][0]['Endpoint']['Address']# tests/test_runbooks/test_database.py
import pytest
from unittest.mock import patch, MagicMock
from runbooks.database import get_replica_lag_seconds, promote_replica
def test_get_replica_lag_parses_output():
with patch('subprocess.run') as mock_run:
mock_run.return_value = MagicMock(
stdout=" 4.5 \n",
returncode=0
)
lag = get_replica_lag_seconds("replica.internal")
assert lag == 4.5
def test_get_replica_lag_handles_null():
"""Null lag means no WAL has been received yet."""
with patch('subprocess.run') as mock_run:
mock_run.return_value = MagicMock(stdout="\n", returncode=0) # Empty = NULL
# Should return 0, not crash
with pytest.raises(ValueError):
# Verify the calling code handles this
float("")
def test_promote_replica_waits_for_availability():
with patch('boto3.client') as mock_boto:
mock_rds = MagicMock()
mock_boto.return_value = mock_rds
mock_rds.describe_db_instances.return_value = {
'DBInstances': [{
'Endpoint': {'Address': 'new-primary.rds.amazonaws.com'}
}]
}
new_endpoint = promote_replica("prod-replica-1", "us-east-1")
assert new_endpoint == "new-primary.rds.amazonaws.com"
mock_rds.promote_read_replica.assert_called_once_with(
DBInstanceIdentifier="prod-replica-1"
)
# Waiter should have been called
mock_rds.get_waiter.assert_called_with('db_instance_available')Integration Testing Against Staging
import pytest
import os
@pytest.mark.integration
@pytest.mark.skipif(
os.environ.get('TEST_ENV') != 'staging',
reason="Integration tests only run in staging"
)
class TestDatabaseRunbooks:
def test_replica_lag_is_measurable(self):
"""Verify lag check works against actual staging replica."""
lag = get_replica_lag_seconds(os.environ['STAGING_REPLICA_ENDPOINT'])
assert isinstance(lag, float)
assert 0 <= lag <= 60, f"Lag {lag}s outside expected range (0-60s)"
def test_runbook_complete_execution(self):
"""Run a complete non-destructive scenario: measure lag → report → alert."""
lag = get_replica_lag_seconds(os.environ['STAGING_REPLICA_ENDPOINT'])
if lag > 30:
alert_message = f"High replication lag: {lag:.1f}s"
result = send_slack_alert("#staging-ops", alert_message)
assert result["ok"] is True
else:
print(f"Lag {lag:.1f}s — within normal bounds, no alert needed")Incident Response Drills
Drills differ from tabletop exercises in one key way: you actually do the things, not just talk about them. Drills involve real systems (staging or production), real tools, and real time pressure.
Drill Types
Silent drill: The incident is injected without telling the on-call engineer. They receive a real alert and must respond normally. The most realistic, most disruptive.
Announced drill: The team knows a drill is happening. The scenario is revealed once the drill starts. Useful for training without the stress of a real 3 AM wake-up.
Walk-through drill: The team knows the exact scenario in advance. They execute the runbook step-by-step in real systems. Best for validating that runbooks are correct.
Measuring MTTR
Mean Time to Recovery is the most important incident response metric. Track it for every drill:
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
@dataclass
class IncidentTimeline:
incident_id: str
failure_injected_at: datetime
first_alert_fired_at: Optional[datetime] = None
oncall_acknowledged_at: Optional[datetime] = None
diagnosis_complete_at: Optional[datetime] = None
mitigation_started_at: Optional[datetime] = None
service_restored_at: Optional[datetime] = None
def mtta(self) -> Optional[float]:
"""Mean Time to Acknowledge (minutes)."""
if self.first_alert_fired_at and self.oncall_acknowledged_at:
return (self.oncall_acknowledged_at - self.first_alert_fired_at).total_seconds() / 60
return None
def mttd(self) -> Optional[float]:
"""Mean Time to Diagnose (minutes from alert)."""
if self.first_alert_fired_at and self.diagnosis_complete_at:
return (self.diagnosis_complete_at - self.first_alert_fired_at).total_seconds() / 60
return None
def mttr(self) -> Optional[float]:
"""Mean Time to Recovery (minutes from failure injection)."""
if self.service_restored_at:
return (self.service_restored_at - self.failure_injected_at).total_seconds() / 60
return None
def run_drill_with_timing(scenario_name: str, inject_failure, restore_service) -> IncidentTimeline:
"""Run a drill and capture timing at each stage."""
timeline = IncidentTimeline(
incident_id=f"drill-{scenario_name}-{int(time.time())}",
failure_injected_at=datetime.utcnow()
)
# Inject failure
inject_failure()
print(f"[{timeline.failure_injected_at.strftime('%H:%M:%S')}] Failure injected")
# Wait for alert to fire
alert_fired = wait_for_alert(timeout_minutes=5)
timeline.first_alert_fired_at = datetime.utcnow()
print(f"[{timeline.first_alert_fired_at.strftime('%H:%M:%S')}] Alert fired")
# Simulate on-call acknowledgment
input("Press Enter when on-call acknowledges the alert...")
timeline.oncall_acknowledged_at = datetime.utcnow()
# Simulate diagnosis
input("Press Enter when diagnosis is complete...")
timeline.diagnosis_complete_at = datetime.utcnow()
# Restore service
restore_service()
timeline.service_restored_at = datetime.utcnow()
print(f"[{timeline.service_restored_at.strftime('%H:%M:%S')}] Service restored")
print(f"\n=== Drill Results: {scenario_name} ===")
print(f"MTTA: {timeline.mtta():.1f} minutes")
print(f"MTTD: {timeline.mttd():.1f} minutes")
print(f"MTTR: {timeline.mttr():.1f} minutes")
return timelineScenario Library
# scenario_library.py
DRILL_SCENARIOS = [
{
"name": "database_primary_failure",
"description": "Primary database becomes unreachable",
"inject": lambda: block_db_traffic(PRIMARY_DB_IP),
"restore": lambda: unblock_db_traffic(PRIMARY_DB_IP),
"expected_rto_minutes": 15,
"expected_alerts": ["DatabasePrimaryDown", "ErrorRateHigh"],
"runbook": "runbooks/database-failover.md"
},
{
"name": "cache_failure",
"description": "Redis cache unavailable",
"inject": lambda: subprocess.run("kubectl scale deployment redis --replicas=0 -n production", shell=True),
"restore": lambda: subprocess.run("kubectl scale deployment redis --replicas=3 -n production", shell=True),
"expected_rto_minutes": 5,
"expected_alerts": ["CacheDown"],
"runbook": "runbooks/cache-recovery.md"
},
{
"name": "high_memory_pod",
"description": "Application pod consuming excess memory",
"inject": lambda: stress_test_pod("api-service", memory_mb=7000),
"restore": lambda: kill_high_memory_pod("api-service"),
"expected_rto_minutes": 3,
"expected_alerts": ["PodMemoryHigh"],
"runbook": "runbooks/pod-memory.md"
}
]Alert Testing
Your runbooks are only as good as the alerts that trigger them. Test that alerts fire when they should and don't fire when they shouldn't.
class AlertTester:
def __init__(self, prometheus_url: str, alertmanager_url: str):
self.prom = prometheus_url
self.am = alertmanager_url
def inject_metric(self, metric_name: str, value: float, labels: dict):
"""Push a metric value to the test prometheus pushgateway."""
import requests
label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
data = f"# TYPE {metric_name} gauge\n{metric_name}{{{label_str}}} {value}\n"
requests.post(f"{self.prom}/metrics/job/alert-test", data=data)
def wait_for_alert(self, alert_name: str, timeout_seconds: int = 120) -> bool:
"""Wait for an alert to appear in Alertmanager."""
deadline = time.time() + timeout_seconds
while time.time() < deadline:
import requests
response = requests.get(f"{self.am}/api/v2/alerts")
active_alerts = response.json()
for alert in active_alerts:
if alert.get("labels", {}).get("alertname") == alert_name:
return True
time.sleep(5)
return False
def test_high_error_rate_alert_fires():
"""Verify ErrorRateHigh alert fires when error rate exceeds 5%."""
tester = AlertTester(
prometheus_url="http://prometheus.test:9090",
alertmanager_url="http://alertmanager.test:9093"
)
# Inject high error rate
tester.inject_metric(
"http_requests_total",
value=500,
labels={"status": "500", "service": "api-service"}
)
tester.inject_metric(
"http_requests_total",
value=9500,
labels={"status": "200", "service": "api-service"}
)
# 500/10000 = 5% error rate — should trigger alert
alert_fired = tester.wait_for_alert("ErrorRateHigh", timeout_seconds=120)
assert alert_fired, "ErrorRateHigh alert did not fire within 2 minutes"
def test_no_alert_for_normal_error_rate():
"""Verify no false-positive alert for 0.5% error rate."""
tester = AlertTester(...)
tester.inject_metric("http_requests_total", 5, {"status": "500", "service": "api-service"})
tester.inject_metric("http_requests_total", 995, {"status": "200", "service": "api-service"})
time.sleep(90) # Wait for any alert to potentially fire
import requests
alerts = requests.get(f"{tester.am}/api/v2/alerts").json()
assert not any(a["labels"]["alertname"] == "ErrorRateHigh" for a in alerts), \
"False positive: ErrorRateHigh fired at 0.5% error rate"Building a Drill Culture
Drills only improve your team if they're done regularly and honestly. Anti-patterns to avoid:
Unrealistic scenarios — If every drill is "pod restart and kubernetes heals it," you're not testing anything meaningful.
Blame in post-drill reviews — If engineers fear looking incompetent, they'll avoid participating. Make drills psychologically safe.
No follow-through — If the action items from drills never get completed, the drills are waste. Block time to close the tickets they generate.
Skipping post-drill review — The learning happens in the retrospective. A drill without a review is a fire drill, not a learning exercise.
The goal is a team that, when a real incident happens, says "we've practiced this" — not "what do we do now?"