Production Traffic Replay Patterns with Shadowtraf
Replaying production traffic is conceptually simple: capture requests, store them, send them again to a different target. In practice, it is full of hard problems — authentication tokens expire, session state is tied to specific users, database operations are not idempotent, and request timing and ordering matters. This post covers the full range of traffic replay patterns, with particular attention to Shadowtraf and the design decisions that make traffic replay practical in real systems.
The Traffic Replay Problem Space
Production traffic replay breaks down into three distinct concerns:
Capture fidelity: How accurately do you record the traffic? Headers, body, timing, TLS details, connection characteristics — each adds fidelity but also complexity.
Replay isolation: How do you ensure that replayed requests do not contaminate production data, trigger real external API calls, or interact with each other in ways the original requests did not?
Behavior comparison: How do you compare the response from the replay target to what the original request would have returned, accounting for non-determinism and intentional changes?
Different tools handle these concerns differently. GoReplay focuses on capture and replay with some filtering. Diffy focuses on parallel comparison. Shadowtraf is a Python-based tool and framework that focuses on correctness of replay, particularly the isolation problem.
Shadowtraf Overview
Shadowtraf is a Python library for production traffic replay that provides:
- A traffic capture format with session correlation
- Replay with configurable workers and rate limiting
- Session isolation per replay run
- Pluggable auth token substitution
- Request transformer pipeline
- Response comparison with configurable matchers
Install it:
pip install shadowtrafOr from source:
git clone https://github.com/shadowtraf/shadowtraf
<span class="hljs-built_in">cd shadowtraf
pip install -e .Traffic Capture Patterns
Pattern 1: Nginx/HAProxy Log-Based Capture
The simplest capture method reads from your existing access logs:
# shadowtraf/capture/nginx.py
import re
import json
from datetime import datetime
NGINX_LOG_PATTERN = re.compile(
r'(?P<remote_addr>\S+) - (?P<remote_user>\S+) \[(?P<time_local>.+?)\] '
r'"(?P<method>\S+) (?P<path>\S+) HTTP/(?P<http_version>\S+)" '
r'(?P<status>\d+) (?P<body_bytes_sent>\d+) '
r'"(?P<http_referer>[^"]*)" "(?P<http_user_agent>[^"]*)"'
r'(?: "(?P<request_body>[^"]*)")?'
)
def parse_nginx_log(log_path, output_path):
requests = []
with open(log_path) as f:
for line in f:
m = NGINX_LOG_PATTERN.match(line.strip())
if not m:
continue
requests.append({
'timestamp': datetime.strptime(
m.group('time_local'), '%d/%b/%Y:%H:%M:%S %z'
).isoformat(),
'method': m.group('method'),
'path': m.group('path'),
'body': m.group('request_body') or '',
'user_agent': m.group('http_user_agent'),
})
with open(output_path, 'w') as f:
for req in requests:
f.write(json.dumps(req) + '\n')Pattern 2: Application-Level Capture Middleware
For richer capture (full headers, request bodies, correlation IDs), add middleware to your application:
# middleware.py
import json
import uuid
import time
import gzip
from pathlib import Path
class TrafficCaptureMiddleware:
def __init__(self, app, capture_file, sample_rate=0.01):
self.app = app
self.capture_file = open(capture_file, 'ab')
self.sample_rate = sample_rate
async def __call__(self, scope, receive, send):
import random
if scope['type'] != 'http' or random.random() > self.sample_rate:
await self.app(scope, receive, send)
return
# Capture request body
body_chunks = []
async def receive_wrapper():
message = await receive()
if message['type'] == 'http.request':
body_chunks.append(message.get('body', b''))
return message
# Capture response
response_status = [None]
response_headers = [None]
response_body = [b'']
async def send_wrapper(message):
if message['type'] == 'http.response.start':
response_status[0] = message['status']
response_headers[0] = message['headers']
elif message['type'] == 'http.response.body':
response_body[0] += message.get('body', b'')
await send(message)
start = time.time()
await self.app(scope, receive_wrapper, send_wrapper)
latency = time.time() - start
record = {
'id': str(uuid.uuid4()),
'timestamp': time.time(),
'method': scope['method'],
'path': scope['path'],
'query': scope['query_string'].decode(),
'headers': {
k.decode(): v.decode()
for k, v in scope['headers']
},
'body': b''.join(body_chunks).decode('utf-8', errors='replace'),
'response_status': response_status[0],
'response_latency_ms': latency * 1000,
}
line = (json.dumps(record) + '\n').encode()
self.capture_file.write(gzip.compress(line))
self.capture_file.flush()Pattern 3: Network-Level Capture with tshark
For black-box capture without application changes:
# Capture HTTP traffic on port 8080, write to pcap
tshark -i eth0 -f <span class="hljs-string">"tcp port 8080" -w capture.pcap
<span class="hljs-comment"># Convert pcap to HTTP requests
tshark -r capture.pcap \
-Y <span class="hljs-string">"http.request" \
-T fields \
-e frame.time_epoch \
-e ip.src \
-e http.request.method \
-e http.request.uri \
-e http.request.line \
> requests.tsvThen parse and replay the TSV file.
Database Isolation Strategies
Replayed traffic must not contaminate your production or shared staging database. Four strategies:
Strategy 1: Separate shadow database (recommended)
The shadow service gets its own database, seeded from a production snapshot:
# docker-compose.shadow.yml
services:
shadow-app:
image: my-app:candidate
environment:
DATABASE_URL: postgres://shadow-db:5432/shadow
depends_on:
- shadow-db
shadow-db:
image: postgres:15
volumes:
- ./prod-snapshot.sql:/docker-entrypoint-initdb.d/seed.sqlCreate the snapshot:
pg_dump --no-owner --no-acl production_db > prod-snapshot.sqlStrategy 2: Transaction rollback per request
For read-heavy workloads, wrap each replayed request in a transaction that rolls back:
class ShadowDatabaseMiddleware:
async def __call__(self, scope, receive, send):
async with db.transaction() as txn:
try:
await self.app(scope, receive, send)
finally:
await txn.rollback()Strategy 3: Write stubs
Intercept all write operations and make them no-ops:
class ShadowWriteStub:
async def execute(self, query, *args):
if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
return FakeResult(rowcount=1)
return await self.real_db.execute(query, *args)Strategy 4: Logical replication to shadow
Set up a read replica for the shadow to query from, keeping it up-to-date with production without allowing shadow writes to affect production.
Auth Token Replacement
Authentication is the most common reason traffic replay fails. Production tokens are user-specific, signed with production secrets, and usually short-lived.
Pattern 1: Global token replacement
Replace all auth headers with a known shadow test token:
class AuthReplacer:
def __init__(self, shadow_token):
self.shadow_token = shadow_token
def transform(self, request: dict) -> dict:
headers = request.get('headers', {})
if 'Authorization' in headers:
headers['Authorization'] = f'Bearer {self.shadow_token}'
if 'Cookie' in headers:
# Replace session cookies
cookies = SimpleCookie(headers['Cookie'])
cookies['session'] = 'shadow-session-token'
headers['Cookie'] = cookies.output(header='').strip()
return {**request, 'headers': headers}Pattern 2: User-mapped tokens
Map production user IDs to shadow test users:
class UserMappingAuthReplacer:
def __init__(self, user_mapping: dict):
# {prod_user_id: shadow_token}
self.user_mapping = user_mapping
def extract_user_id(self, token: str) -> str:
# Decode JWT without verification to get user ID
payload = token.split('.')[1]
padded = payload + '=' * (4 - len(payload) % 4)
decoded = json.loads(base64.urlsafe_b64decode(padded))
return decoded.get('sub')
def transform(self, request: dict) -> dict:
headers = request.get('headers', {})
auth = headers.get('Authorization', '')
if auth.startswith('Bearer '):
token = auth[7:]
user_id = self.extract_user_id(token)
shadow_token = self.user_mapping.get(user_id, 'default-shadow-token')
headers['Authorization'] = f'Bearer {shadow_token}'
return {**request, 'headers': headers}Replay Ordering and Timing
Production requests have causal relationships: a POST creates a resource, subsequent GETs fetch it. Replaying out of order breaks these relationships.
Preserving order:
class OrderedReplayer:
def __init__(self, target_url, workers=4):
self.target_url = target_url
self.workers = workers
async def replay(self, requests: list[dict]):
"""Replay requests in order, with original timing preserved"""
if not requests:
return
base_time = requests[0]['timestamp']
start_time = time.time()
async with aiohttp.ClientSession() as session:
for req in requests:
# Calculate when this request should fire
original_offset = req['timestamp'] - base_time
current_offset = time.time() - start_time
delay = original_offset - current_offset
if delay > 0:
await asyncio.sleep(delay)
await self._send_request(session, req)
async def _send_request(self, session, req):
url = self.target_url + req['path']
async with session.request(
method=req['method'],
url=url,
headers=req['headers'],
data=req.get('body', '')
) as response:
return {
'request_id': req['id'],
'status': response.status,
'body': await response.text(),
'latency_ms': response.headers.get('X-Response-Time'),
}Response Comparison
Comparing responses between old and new versions requires handling non-determinism:
import deepdiff
class ResponseComparator:
def __init__(self, excluded_paths=None, ignored_patterns=None):
# Paths to exclude: ['root["timestamp"]', "root['request_id']"]
self.excluded_paths = set(excluded_paths or [])
self.ignored_patterns = ignored_patterns or []
def compare(self, primary: dict, candidate: dict) -> dict:
try:
primary_body = json.loads(primary['body'])
candidate_body = json.loads(candidate['body'])
body_diff = deepdiff.DeepDiff(
primary_body,
candidate_body,
exclude_paths=self.excluded_paths,
ignore_order=True
)
except json.JSONDecodeError:
body_diff = {} if primary['body'] == candidate['body'] else {
'values_changed': {'root': {
'old_value': primary['body'],
'new_value': candidate['body']
}}
}
return {
'status_match': primary['status'] == candidate['status'],
'body_diff': body_diff,
'is_regression': bool(body_diff) or (primary['status'] != candidate['status']),
}The Full Replay Pipeline
Putting it together:
async def run_shadow_test(capture_file, primary_url, candidate_url):
requests = load_capture(capture_file)
transformer = Pipeline([
AuthReplacer(shadow_token='test-token'),
HostRewriter(old='api.production.com', new='api.staging.internal'),
])
comparator = ResponseComparator(
excluded_paths=["root['timestamp']", "root['correlation_id']"]
)
results = []
async with aiohttp.ClientSession() as session:
for req in requests:
transformed = transformer.transform(req)
primary_resp = await send(session, primary_url, transformed)
candidate_resp = await send(session, candidate_url, transformed)
comparison = comparator.compare(primary_resp, candidate_resp)
if comparison['is_regression']:
results.append({
'request': req,
'diff': comparison['body_diff'],
})
print(f"Regressions found: {len(results)} / {len(requests)}")
return resultsTraffic replay, done carefully with the right isolation and auth strategies, gives you a regression test suite that reflects real-world usage exactly. No synthetic test suite can match the diversity and realism of production traffic.