A/B Testing ML Models in Production: Shadow Mode and Canary Deployments
Offline evaluation tells you a model performs better on held-out data. Production A/B testing tells you it actually improves the business metric you care about. These are different questions, and sometimes they have different answers.
A new recommendation model might have higher offline AUC but lower click-through rate in production because offline evaluation didn't capture how users respond to the recommendations. Production A/B testing is the final validation gate.
The A/B Testing Workflow for ML
- Shadow mode — run both models, serve only the old one, compare outputs
- Canary deployment — serve new model to a small traffic slice, monitor key metrics
- Full A/B test — split traffic 50/50, measure statistical significance
- Promote or rollback — based on results
Shadow Mode Testing
Shadow mode is the safest way to validate a new model before it affects any users. The old model serves real traffic. The new model receives the same inputs, makes predictions, but its outputs are discarded. You compare the outputs of both models.
Implementation
import logging
from dataclasses import dataclass
from typing import Any, Optional
import threading
logger = logging.getLogger(__name__)
@dataclass
class PredictionResult:
model_name: str
prediction: Any
latency_ms: float
error: Optional[str] = None
class ShadowModeRouter:
def __init__(self, primary_model, shadow_model, log_comparisons: bool = True):
self.primary = primary_model
self.shadow = shadow_model
self.log_comparisons = log_comparisons
def predict(self, features):
"""Serve primary model output, run shadow model asynchronously."""
# Primary model: blocking, result is served
import time
start = time.perf_counter()
primary_result = self.primary.predict(features)
primary_latency = (time.perf_counter() - start) * 1000
# Shadow model: non-blocking, result is logged and discarded
def run_shadow():
try:
start = time.perf_counter()
shadow_result = self.shadow.predict(features)
shadow_latency = (time.perf_counter() - start) * 1000
if self.log_comparisons:
self._log_comparison(
features, primary_result, primary_latency,
shadow_result, shadow_latency
)
except Exception as e:
logger.warning(f"Shadow model error: {e}")
thread = threading.Thread(target=run_shadow, daemon=True)
thread.start()
return primary_result
def _log_comparison(self, features, primary_pred, primary_latency,
shadow_pred, shadow_latency):
agreed = primary_pred == shadow_pred
logger.info({
"event": "shadow_comparison",
"primary_prediction": primary_pred,
"shadow_prediction": shadow_pred,
"agreed": agreed,
"primary_latency_ms": primary_latency,
"shadow_latency_ms": shadow_latency,
"features_hash": hash(str(features)) # For debugging, not PII
})
# Usage
from models import OldChurnModel, NewChurnModel
router = ShadowModeRouter(
primary_model=OldChurnModel.load("models/v1/"),
shadow_model=NewChurnModel.load("models/v2/")
)
# In your prediction endpoint
prediction = router.predict(user_features) # Only serves primary resultAnalyzing Shadow Mode Results
import pandas as pd
import numpy as np
def analyze_shadow_comparisons(log_file: str) -> dict:
"""Analyze disagreements between primary and shadow models."""
df = pd.read_json(log_file, lines=True)
df = df[df["event"] == "shadow_comparison"]
agreement_rate = df["agreed"].mean()
# Find systematic disagreements
disagreements = df[~df["agreed"]].copy()
print(f"Total predictions: {len(df)}")
print(f"Agreement rate: {agreement_rate:.2%}")
print(f"Disagreements: {len(disagreements)}")
print(f"\nLatency comparison:")
print(f" Primary P95: {df['primary_latency_ms'].quantile(0.95):.1f}ms")
print(f" Shadow P95: {df['shadow_latency_ms'].quantile(0.95):.1f}ms")
# Shadow is higher risk if it predicts positive more aggressively
if df["primary_prediction"].dtype in [np.float64, np.float32]:
print(f"\nPrediction distributions:")
print(f" Primary mean: {df['primary_prediction'].mean():.4f}")
print(f" Shadow mean: {df['shadow_prediction'].mean():.4f}")
return {
"agreement_rate": agreement_rate,
"disagreement_count": len(disagreements),
"shadow_latency_p95": df['shadow_latency_ms'].quantile(0.95)
}Canary Deployment
After shadow testing confirms the new model is stable and makes reasonable predictions, shift a small percentage of live traffic to it.
Traffic Splitting
import hashlib
import os
class CanaryRouter:
def __init__(self, control_model, treatment_model, treatment_pct: float = 0.05):
"""
Route `treatment_pct` fraction of traffic to the treatment model.
Routing is deterministic per user_id (consistent experience).
"""
self.control = control_model
self.treatment = treatment_model
self.treatment_pct = treatment_pct
def predict(self, features, user_id: str):
if self._is_in_treatment(user_id):
prediction = self.treatment.predict(features)
group = "treatment"
else:
prediction = self.control.predict(features)
group = "control"
# Log assignment for analysis
self._log_prediction(user_id, group, prediction)
return prediction
def _is_in_treatment(self, user_id: str) -> bool:
"""Deterministic assignment: same user always gets same group."""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return (hash_value % 100) < (self.treatment_pct * 100)
def _log_prediction(self, user_id, group, prediction):
import logging
logging.getLogger("canary").info({
"user_id": user_id,
"group": group,
"prediction": prediction
})Monitoring Canary Health
During canary, monitor:
- Error rate — is the new model throwing more exceptions?
- Latency — is it slower?
- Business metrics — clicks, conversions, revenue per user
- Prediction distribution — is the new model predicting differently at the aggregate level?
def check_canary_health(metrics_client, experiment_id: str, min_samples: int = 1000):
"""Check if canary deployment is healthy enough to continue or promote."""
control = metrics_client.get_group_metrics(experiment_id, "control")
treatment = metrics_client.get_group_metrics(experiment_id, "treatment")
if control.sample_size < min_samples or treatment.sample_size < min_samples:
print(f"Insufficient samples (control={control.sample_size}, "
f"treatment={treatment.sample_size}). Need {min_samples} each.")
return "insufficient_data"
# Check error rate
if treatment.error_rate > control.error_rate * 2:
print(f"ALERT: Treatment error rate {treatment.error_rate:.3%} is "
f"2x control {control.error_rate:.3%}")
return "degraded"
# Check latency
if treatment.p99_latency_ms > control.p99_latency_ms * 1.5:
print(f"ALERT: Treatment P99 {treatment.p99_latency_ms:.1f}ms is "
f"50% slower than control {control.p99_latency_ms:.1f}ms")
return "degraded"
return "healthy"Full A/B Test and Statistical Analysis
Once the canary is stable, expand to a full 50/50 split and measure statistical significance:
from scipy import stats
import numpy as np
def analyze_ab_test(control_outcomes: np.ndarray,
treatment_outcomes: np.ndarray,
metric_name: str,
alpha: float = 0.05) -> dict:
"""
Analyze A/B test results with appropriate statistical test.
For conversion rates: chi-squared or Fisher's exact test
For continuous metrics (revenue, latency): t-test or Mann-Whitney U
"""
n_control = len(control_outcomes)
n_treatment = len(treatment_outcomes)
# Use Mann-Whitney U test (non-parametric, handles non-normal distributions)
statistic, p_value = stats.mannwhitneyu(
control_outcomes, treatment_outcomes,
alternative='two-sided'
)
control_mean = np.mean(control_outcomes)
treatment_mean = np.mean(treatment_outcomes)
relative_change = (treatment_mean - control_mean) / control_mean
# Confidence interval for the difference
from scipy.stats import bootstrap
diff = lambda x, y: np.mean(y) - np.mean(x)
result = {
"metric": metric_name,
"control_mean": control_mean,
"treatment_mean": treatment_mean,
"relative_change": relative_change,
"p_value": p_value,
"statistically_significant": p_value < alpha,
"n_control": n_control,
"n_treatment": n_treatment,
"recommendation": None
}
if p_value < alpha:
if relative_change > 0:
result["recommendation"] = "promote"
print(f"✅ PROMOTE: {metric_name} improved by {relative_change:.2%} (p={p_value:.4f})")
else:
result["recommendation"] = "rollback"
print(f"🚫 ROLLBACK: {metric_name} degraded by {abs(relative_change):.2%} (p={p_value:.4f})")
else:
result["recommendation"] = "continue_or_neutral"
print(f"⏳ NO SIGNIFICANT DIFFERENCE: {metric_name} "
f"change={relative_change:.2%}, p={p_value:.4f}")
return result
# Analyze multiple metrics
control_users = load_experiment_data("control")
treatment_users = load_experiment_data("treatment")
for metric in ["click_through_rate", "conversion_rate", "revenue_per_user"]:
result = analyze_ab_test(
control_users[metric].values,
treatment_users[metric].values,
metric_name=metric
)Sample Size Calculation
Don't start an A/B test without knowing how many samples you need:
from statsmodels.stats.power import TTestIndPower
def required_sample_size(baseline_mean: float,
minimum_detectable_effect: float,
baseline_std: float,
alpha: float = 0.05,
power: float = 0.80) -> int:
"""
Calculate required sample size per group.
Example: detect a 5% improvement in conversion from 2% baseline
"""
effect_size = minimum_detectable_effect / baseline_std
analysis = TTestIndPower()
n = analysis.solve_power(
effect_size=effect_size,
alpha=alpha,
power=power,
ratio=1.0 # Equal group sizes
)
return int(np.ceil(n))
# Example: detect 5% lift in revenue per user
# Baseline: $15/user, std: $45
n_per_group = required_sample_size(
baseline_mean=15.0,
minimum_detectable_effect=0.75, # $0.75 = 5% of $15
baseline_std=45.0
)
print(f"Need {n_per_group} users per group ({2 * n_per_group} total)")
# → ~14,000 per group for 80% powerAutomated Promotion and Rollback
class ExperimentController:
def __init__(self, router, metrics_client, check_interval_hours=24):
self.router = router
self.metrics = metrics_client
self.check_interval = check_interval_hours
def run_until_decision(self, experiment_id: str, min_samples: int = 10000):
import time
while True:
health = check_canary_health(self.metrics, experiment_id, min_samples)
if health == "degraded":
self.rollback(experiment_id)
return "rolled_back"
if health == "insufficient_data":
print(f"Waiting {self.check_interval}h for more data...")
time.sleep(self.check_interval * 3600)
continue
# Check if we have enough for a statistical decision
result = analyze_ab_test(
self.metrics.get_outcomes(experiment_id, "control"),
self.metrics.get_outcomes(experiment_id, "treatment"),
metric_name="primary_metric"
)
if result["statistically_significant"]:
if result["recommendation"] == "promote":
self.promote(experiment_id)
return "promoted"
else:
self.rollback(experiment_id)
return "rolled_back"
print(f"No decision yet (p={result['p_value']:.4f}). Continuing...")
time.sleep(self.check_interval * 3600)
def promote(self, experiment_id):
print(f"Promoting treatment model for experiment {experiment_id}")
self.router.set_traffic_split(treatment_pct=1.0)
def rollback(self, experiment_id):
print(f"Rolling back to control model for experiment {experiment_id}")
self.router.set_traffic_split(treatment_pct=0.0)Shadow mode, canary, and A/B testing form a progressive confidence ladder for ML deployments. Each stage reduces risk and provides evidence for the promotion decision. Skip steps only with explicit justification — the cost of a bad model in production is almost always higher than the cost of a few more days of controlled testing.