Production Traffic Replay Patterns with Shadowtraf

Production Traffic Replay Patterns with Shadowtraf

Replaying production traffic is conceptually simple: capture requests, store them, send them again to a different target. In practice, it is full of hard problems — authentication tokens expire, session state is tied to specific users, database operations are not idempotent, and request timing and ordering matters. This post covers the full range of traffic replay patterns, with particular attention to Shadowtraf and the design decisions that make traffic replay practical in real systems.

The Traffic Replay Problem Space

Production traffic replay breaks down into three distinct concerns:

Capture fidelity: How accurately do you record the traffic? Headers, body, timing, TLS details, connection characteristics — each adds fidelity but also complexity.

Replay isolation: How do you ensure that replayed requests do not contaminate production data, trigger real external API calls, or interact with each other in ways the original requests did not?

Behavior comparison: How do you compare the response from the replay target to what the original request would have returned, accounting for non-determinism and intentional changes?

Different tools handle these concerns differently. GoReplay focuses on capture and replay with some filtering. Diffy focuses on parallel comparison. Shadowtraf is a Python-based tool and framework that focuses on correctness of replay, particularly the isolation problem.

Shadowtraf Overview

Shadowtraf is a Python library for production traffic replay that provides:

  • A traffic capture format with session correlation
  • Replay with configurable workers and rate limiting
  • Session isolation per replay run
  • Pluggable auth token substitution
  • Request transformer pipeline
  • Response comparison with configurable matchers

Install it:

pip install shadowtraf

Or from source:

git clone https://github.com/shadowtraf/shadowtraf
<span class="hljs-built_in">cd shadowtraf
pip install -e .

Traffic Capture Patterns

Pattern 1: Nginx/HAProxy Log-Based Capture

The simplest capture method reads from your existing access logs:

# shadowtraf/capture/nginx.py
import re
import json
from datetime import datetime

NGINX_LOG_PATTERN = re.compile(
    r'(?P<remote_addr>\S+) - (?P<remote_user>\S+) \[(?P<time_local>.+?)\] '
    r'"(?P<method>\S+) (?P<path>\S+) HTTP/(?P<http_version>\S+)" '
    r'(?P<status>\d+) (?P<body_bytes_sent>\d+) '
    r'"(?P<http_referer>[^"]*)" "(?P<http_user_agent>[^"]*)"'
    r'(?: "(?P<request_body>[^"]*)")?'
)

def parse_nginx_log(log_path, output_path):
    requests = []
    with open(log_path) as f:
        for line in f:
            m = NGINX_LOG_PATTERN.match(line.strip())
            if not m:
                continue
            requests.append({
                'timestamp': datetime.strptime(
                    m.group('time_local'), '%d/%b/%Y:%H:%M:%S %z'
                ).isoformat(),
                'method': m.group('method'),
                'path': m.group('path'),
                'body': m.group('request_body') or '',
                'user_agent': m.group('http_user_agent'),
            })
    
    with open(output_path, 'w') as f:
        for req in requests:
            f.write(json.dumps(req) + '\n')

Pattern 2: Application-Level Capture Middleware

For richer capture (full headers, request bodies, correlation IDs), add middleware to your application:

# middleware.py
import json
import uuid
import time
import gzip
from pathlib import Path

class TrafficCaptureMiddleware:
    def __init__(self, app, capture_file, sample_rate=0.01):
        self.app = app
        self.capture_file = open(capture_file, 'ab')
        self.sample_rate = sample_rate
    
    async def __call__(self, scope, receive, send):
        import random
        
        if scope['type'] != 'http' or random.random() > self.sample_rate:
            await self.app(scope, receive, send)
            return
        
        # Capture request body
        body_chunks = []
        async def receive_wrapper():
            message = await receive()
            if message['type'] == 'http.request':
                body_chunks.append(message.get('body', b''))
            return message
        
        # Capture response
        response_status = [None]
        response_headers = [None]
        response_body = [b'']
        
        async def send_wrapper(message):
            if message['type'] == 'http.response.start':
                response_status[0] = message['status']
                response_headers[0] = message['headers']
            elif message['type'] == 'http.response.body':
                response_body[0] += message.get('body', b'')
            await send(message)
        
        start = time.time()
        await self.app(scope, receive_wrapper, send_wrapper)
        latency = time.time() - start
        
        record = {
            'id': str(uuid.uuid4()),
            'timestamp': time.time(),
            'method': scope['method'],
            'path': scope['path'],
            'query': scope['query_string'].decode(),
            'headers': {
                k.decode(): v.decode()
                for k, v in scope['headers']
            },
            'body': b''.join(body_chunks).decode('utf-8', errors='replace'),
            'response_status': response_status[0],
            'response_latency_ms': latency * 1000,
        }
        
        line = (json.dumps(record) + '\n').encode()
        self.capture_file.write(gzip.compress(line))
        self.capture_file.flush()

Pattern 3: Network-Level Capture with tshark

For black-box capture without application changes:

# Capture HTTP traffic on port 8080, write to pcap
tshark -i eth0 -f <span class="hljs-string">"tcp port 8080" -w capture.pcap

<span class="hljs-comment"># Convert pcap to HTTP requests
tshark -r capture.pcap \
  -Y <span class="hljs-string">"http.request" \
  -T fields \
  -e frame.time_epoch \
  -e ip.src \
  -e http.request.method \
  -e http.request.uri \
  -e http.request.line \
  > requests.tsv

Then parse and replay the TSV file.

Database Isolation Strategies

Replayed traffic must not contaminate your production or shared staging database. Four strategies:

Strategy 1: Separate shadow database (recommended)

The shadow service gets its own database, seeded from a production snapshot:

# docker-compose.shadow.yml
services:
  shadow-app:
    image: my-app:candidate
    environment:
      DATABASE_URL: postgres://shadow-db:5432/shadow
    depends_on:
      - shadow-db

  shadow-db:
    image: postgres:15
    volumes:
      - ./prod-snapshot.sql:/docker-entrypoint-initdb.d/seed.sql

Create the snapshot:

pg_dump --no-owner --no-acl production_db > prod-snapshot.sql

Strategy 2: Transaction rollback per request

For read-heavy workloads, wrap each replayed request in a transaction that rolls back:

class ShadowDatabaseMiddleware:
    async def __call__(self, scope, receive, send):
        async with db.transaction() as txn:
            try:
                await self.app(scope, receive, send)
            finally:
                await txn.rollback()

Strategy 3: Write stubs

Intercept all write operations and make them no-ops:

class ShadowWriteStub:
    async def execute(self, query, *args):
        if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
            return FakeResult(rowcount=1)
        return await self.real_db.execute(query, *args)

Strategy 4: Logical replication to shadow

Set up a read replica for the shadow to query from, keeping it up-to-date with production without allowing shadow writes to affect production.

Auth Token Replacement

Authentication is the most common reason traffic replay fails. Production tokens are user-specific, signed with production secrets, and usually short-lived.

Pattern 1: Global token replacement

Replace all auth headers with a known shadow test token:

class AuthReplacer:
    def __init__(self, shadow_token):
        self.shadow_token = shadow_token
    
    def transform(self, request: dict) -> dict:
        headers = request.get('headers', {})
        if 'Authorization' in headers:
            headers['Authorization'] = f'Bearer {self.shadow_token}'
        if 'Cookie' in headers:
            # Replace session cookies
            cookies = SimpleCookie(headers['Cookie'])
            cookies['session'] = 'shadow-session-token'
            headers['Cookie'] = cookies.output(header='').strip()
        return {**request, 'headers': headers}

Pattern 2: User-mapped tokens

Map production user IDs to shadow test users:

class UserMappingAuthReplacer:
    def __init__(self, user_mapping: dict):
        # {prod_user_id: shadow_token}
        self.user_mapping = user_mapping
    
    def extract_user_id(self, token: str) -> str:
        # Decode JWT without verification to get user ID
        payload = token.split('.')[1]
        padded = payload + '=' * (4 - len(payload) % 4)
        decoded = json.loads(base64.urlsafe_b64decode(padded))
        return decoded.get('sub')
    
    def transform(self, request: dict) -> dict:
        headers = request.get('headers', {})
        auth = headers.get('Authorization', '')
        if auth.startswith('Bearer '):
            token = auth[7:]
            user_id = self.extract_user_id(token)
            shadow_token = self.user_mapping.get(user_id, 'default-shadow-token')
            headers['Authorization'] = f'Bearer {shadow_token}'
        return {**request, 'headers': headers}

Replay Ordering and Timing

Production requests have causal relationships: a POST creates a resource, subsequent GETs fetch it. Replaying out of order breaks these relationships.

Preserving order:

class OrderedReplayer:
    def __init__(self, target_url, workers=4):
        self.target_url = target_url
        self.workers = workers
    
    async def replay(self, requests: list[dict]):
        """Replay requests in order, with original timing preserved"""
        if not requests:
            return
        
        base_time = requests[0]['timestamp']
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            for req in requests:
                # Calculate when this request should fire
                original_offset = req['timestamp'] - base_time
                current_offset = time.time() - start_time
                delay = original_offset - current_offset
                
                if delay > 0:
                    await asyncio.sleep(delay)
                
                await self._send_request(session, req)
    
    async def _send_request(self, session, req):
        url = self.target_url + req['path']
        async with session.request(
            method=req['method'],
            url=url,
            headers=req['headers'],
            data=req.get('body', '')
        ) as response:
            return {
                'request_id': req['id'],
                'status': response.status,
                'body': await response.text(),
                'latency_ms': response.headers.get('X-Response-Time'),
            }

Response Comparison

Comparing responses between old and new versions requires handling non-determinism:

import deepdiff

class ResponseComparator:
    def __init__(self, excluded_paths=None, ignored_patterns=None):
        # Paths to exclude: ['root["timestamp"]', "root['request_id']"]
        self.excluded_paths = set(excluded_paths or [])
        self.ignored_patterns = ignored_patterns or []
    
    def compare(self, primary: dict, candidate: dict) -> dict:
        try:
            primary_body = json.loads(primary['body'])
            candidate_body = json.loads(candidate['body'])
            body_diff = deepdiff.DeepDiff(
                primary_body,
                candidate_body,
                exclude_paths=self.excluded_paths,
                ignore_order=True
            )
        except json.JSONDecodeError:
            body_diff = {} if primary['body'] == candidate['body'] else {
                'values_changed': {'root': {
                    'old_value': primary['body'],
                    'new_value': candidate['body']
                }}
            }
        
        return {
            'status_match': primary['status'] == candidate['status'],
            'body_diff': body_diff,
            'is_regression': bool(body_diff) or (primary['status'] != candidate['status']),
        }

The Full Replay Pipeline

Putting it together:

async def run_shadow_test(capture_file, primary_url, candidate_url):
    requests = load_capture(capture_file)
    
    transformer = Pipeline([
        AuthReplacer(shadow_token='test-token'),
        HostRewriter(old='api.production.com', new='api.staging.internal'),
    ])
    
    comparator = ResponseComparator(
        excluded_paths=["root['timestamp']", "root['correlation_id']"]
    )
    
    results = []
    async with aiohttp.ClientSession() as session:
        for req in requests:
            transformed = transformer.transform(req)
            primary_resp = await send(session, primary_url, transformed)
            candidate_resp = await send(session, candidate_url, transformed)
            
            comparison = comparator.compare(primary_resp, candidate_resp)
            if comparison['is_regression']:
                results.append({
                    'request': req,
                    'diff': comparison['body_diff'],
                })
    
    print(f"Regressions found: {len(results)} / {len(requests)}")
    return results

Traffic replay, done carefully with the right isolation and auth strategies, gives you a regression test suite that reflects real-world usage exactly. No synthetic test suite can match the diversity and realism of production traffic.

Read more