A/B Testing ML Models in Production: Shadow Mode and Canary Deployments

A/B Testing ML Models in Production: Shadow Mode and Canary Deployments

Offline evaluation tells you a model performs better on held-out data. Production A/B testing tells you it actually improves the business metric you care about. These are different questions, and sometimes they have different answers.

A new recommendation model might have higher offline AUC but lower click-through rate in production because offline evaluation didn't capture how users respond to the recommendations. Production A/B testing is the final validation gate.

The A/B Testing Workflow for ML

  1. Shadow mode — run both models, serve only the old one, compare outputs
  2. Canary deployment — serve new model to a small traffic slice, monitor key metrics
  3. Full A/B test — split traffic 50/50, measure statistical significance
  4. Promote or rollback — based on results

Shadow Mode Testing

Shadow mode is the safest way to validate a new model before it affects any users. The old model serves real traffic. The new model receives the same inputs, makes predictions, but its outputs are discarded. You compare the outputs of both models.

Implementation

import logging
from dataclasses import dataclass
from typing import Any, Optional
import threading

logger = logging.getLogger(__name__)

@dataclass
class PredictionResult:
    model_name: str
    prediction: Any
    latency_ms: float
    error: Optional[str] = None

class ShadowModeRouter:
    def __init__(self, primary_model, shadow_model, log_comparisons: bool = True):
        self.primary = primary_model
        self.shadow = shadow_model
        self.log_comparisons = log_comparisons
    
    def predict(self, features):
        """Serve primary model output, run shadow model asynchronously."""
        # Primary model: blocking, result is served
        import time
        start = time.perf_counter()
        primary_result = self.primary.predict(features)
        primary_latency = (time.perf_counter() - start) * 1000
        
        # Shadow model: non-blocking, result is logged and discarded
        def run_shadow():
            try:
                start = time.perf_counter()
                shadow_result = self.shadow.predict(features)
                shadow_latency = (time.perf_counter() - start) * 1000
                
                if self.log_comparisons:
                    self._log_comparison(
                        features, primary_result, primary_latency,
                        shadow_result, shadow_latency
                    )
            except Exception as e:
                logger.warning(f"Shadow model error: {e}")
        
        thread = threading.Thread(target=run_shadow, daemon=True)
        thread.start()
        
        return primary_result
    
    def _log_comparison(self, features, primary_pred, primary_latency, 
                        shadow_pred, shadow_latency):
        agreed = primary_pred == shadow_pred
        logger.info({
            "event": "shadow_comparison",
            "primary_prediction": primary_pred,
            "shadow_prediction": shadow_pred,
            "agreed": agreed,
            "primary_latency_ms": primary_latency,
            "shadow_latency_ms": shadow_latency,
            "features_hash": hash(str(features))  # For debugging, not PII
        })

# Usage
from models import OldChurnModel, NewChurnModel

router = ShadowModeRouter(
    primary_model=OldChurnModel.load("models/v1/"),
    shadow_model=NewChurnModel.load("models/v2/")
)

# In your prediction endpoint
prediction = router.predict(user_features)  # Only serves primary result

Analyzing Shadow Mode Results

import pandas as pd
import numpy as np

def analyze_shadow_comparisons(log_file: str) -> dict:
    """Analyze disagreements between primary and shadow models."""
    df = pd.read_json(log_file, lines=True)
    df = df[df["event"] == "shadow_comparison"]
    
    agreement_rate = df["agreed"].mean()
    
    # Find systematic disagreements
    disagreements = df[~df["agreed"]].copy()
    
    print(f"Total predictions: {len(df)}")
    print(f"Agreement rate: {agreement_rate:.2%}")
    print(f"Disagreements: {len(disagreements)}")
    print(f"\nLatency comparison:")
    print(f"  Primary P95: {df['primary_latency_ms'].quantile(0.95):.1f}ms")
    print(f"  Shadow P95:  {df['shadow_latency_ms'].quantile(0.95):.1f}ms")
    
    # Shadow is higher risk if it predicts positive more aggressively
    if df["primary_prediction"].dtype in [np.float64, np.float32]:
        print(f"\nPrediction distributions:")
        print(f"  Primary mean: {df['primary_prediction'].mean():.4f}")
        print(f"  Shadow mean:  {df['shadow_prediction'].mean():.4f}")
    
    return {
        "agreement_rate": agreement_rate,
        "disagreement_count": len(disagreements),
        "shadow_latency_p95": df['shadow_latency_ms'].quantile(0.95)
    }

Canary Deployment

After shadow testing confirms the new model is stable and makes reasonable predictions, shift a small percentage of live traffic to it.

Traffic Splitting

import hashlib
import os

class CanaryRouter:
    def __init__(self, control_model, treatment_model, treatment_pct: float = 0.05):
        """
        Route `treatment_pct` fraction of traffic to the treatment model.
        Routing is deterministic per user_id (consistent experience).
        """
        self.control = control_model
        self.treatment = treatment_model
        self.treatment_pct = treatment_pct
    
    def predict(self, features, user_id: str):
        if self._is_in_treatment(user_id):
            prediction = self.treatment.predict(features)
            group = "treatment"
        else:
            prediction = self.control.predict(features)
            group = "control"
        
        # Log assignment for analysis
        self._log_prediction(user_id, group, prediction)
        return prediction
    
    def _is_in_treatment(self, user_id: str) -> bool:
        """Deterministic assignment: same user always gets same group."""
        hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        return (hash_value % 100) < (self.treatment_pct * 100)
    
    def _log_prediction(self, user_id, group, prediction):
        import logging
        logging.getLogger("canary").info({
            "user_id": user_id,
            "group": group,
            "prediction": prediction
        })

Monitoring Canary Health

During canary, monitor:

  1. Error rate — is the new model throwing more exceptions?
  2. Latency — is it slower?
  3. Business metrics — clicks, conversions, revenue per user
  4. Prediction distribution — is the new model predicting differently at the aggregate level?
def check_canary_health(metrics_client, experiment_id: str, min_samples: int = 1000):
    """Check if canary deployment is healthy enough to continue or promote."""
    
    control = metrics_client.get_group_metrics(experiment_id, "control")
    treatment = metrics_client.get_group_metrics(experiment_id, "treatment")
    
    if control.sample_size < min_samples or treatment.sample_size < min_samples:
        print(f"Insufficient samples (control={control.sample_size}, "
              f"treatment={treatment.sample_size}). Need {min_samples} each.")
        return "insufficient_data"
    
    # Check error rate
    if treatment.error_rate > control.error_rate * 2:
        print(f"ALERT: Treatment error rate {treatment.error_rate:.3%} is "
              f"2x control {control.error_rate:.3%}")
        return "degraded"
    
    # Check latency
    if treatment.p99_latency_ms > control.p99_latency_ms * 1.5:
        print(f"ALERT: Treatment P99 {treatment.p99_latency_ms:.1f}ms is "
              f"50% slower than control {control.p99_latency_ms:.1f}ms")
        return "degraded"
    
    return "healthy"

Full A/B Test and Statistical Analysis

Once the canary is stable, expand to a full 50/50 split and measure statistical significance:

from scipy import stats
import numpy as np

def analyze_ab_test(control_outcomes: np.ndarray, 
                    treatment_outcomes: np.ndarray,
                    metric_name: str,
                    alpha: float = 0.05) -> dict:
    """
    Analyze A/B test results with appropriate statistical test.
    
    For conversion rates: chi-squared or Fisher's exact test
    For continuous metrics (revenue, latency): t-test or Mann-Whitney U
    """
    n_control = len(control_outcomes)
    n_treatment = len(treatment_outcomes)
    
    # Use Mann-Whitney U test (non-parametric, handles non-normal distributions)
    statistic, p_value = stats.mannwhitneyu(
        control_outcomes, treatment_outcomes, 
        alternative='two-sided'
    )
    
    control_mean = np.mean(control_outcomes)
    treatment_mean = np.mean(treatment_outcomes)
    relative_change = (treatment_mean - control_mean) / control_mean
    
    # Confidence interval for the difference
    from scipy.stats import bootstrap
    diff = lambda x, y: np.mean(y) - np.mean(x)
    
    result = {
        "metric": metric_name,
        "control_mean": control_mean,
        "treatment_mean": treatment_mean,
        "relative_change": relative_change,
        "p_value": p_value,
        "statistically_significant": p_value < alpha,
        "n_control": n_control,
        "n_treatment": n_treatment,
        "recommendation": None
    }
    
    if p_value < alpha:
        if relative_change > 0:
            result["recommendation"] = "promote"
            print(f"✅ PROMOTE: {metric_name} improved by {relative_change:.2%} (p={p_value:.4f})")
        else:
            result["recommendation"] = "rollback"
            print(f"🚫 ROLLBACK: {metric_name} degraded by {abs(relative_change):.2%} (p={p_value:.4f})")
    else:
        result["recommendation"] = "continue_or_neutral"
        print(f"⏳ NO SIGNIFICANT DIFFERENCE: {metric_name} "
              f"change={relative_change:.2%}, p={p_value:.4f}")
    
    return result

# Analyze multiple metrics
control_users = load_experiment_data("control")
treatment_users = load_experiment_data("treatment")

for metric in ["click_through_rate", "conversion_rate", "revenue_per_user"]:
    result = analyze_ab_test(
        control_users[metric].values,
        treatment_users[metric].values,
        metric_name=metric
    )

Sample Size Calculation

Don't start an A/B test without knowing how many samples you need:

from statsmodels.stats.power import TTestIndPower

def required_sample_size(baseline_mean: float,
                          minimum_detectable_effect: float,
                          baseline_std: float,
                          alpha: float = 0.05,
                          power: float = 0.80) -> int:
    """
    Calculate required sample size per group.
    
    Example: detect a 5% improvement in conversion from 2% baseline
    """
    effect_size = minimum_detectable_effect / baseline_std
    
    analysis = TTestIndPower()
    n = analysis.solve_power(
        effect_size=effect_size,
        alpha=alpha,
        power=power,
        ratio=1.0  # Equal group sizes
    )
    
    return int(np.ceil(n))

# Example: detect 5% lift in revenue per user
# Baseline: $15/user, std: $45
n_per_group = required_sample_size(
    baseline_mean=15.0,
    minimum_detectable_effect=0.75,  # $0.75 = 5% of $15
    baseline_std=45.0
)
print(f"Need {n_per_group} users per group ({2 * n_per_group} total)")
# → ~14,000 per group for 80% power

Automated Promotion and Rollback

class ExperimentController:
    def __init__(self, router, metrics_client, check_interval_hours=24):
        self.router = router
        self.metrics = metrics_client
        self.check_interval = check_interval_hours
    
    def run_until_decision(self, experiment_id: str, min_samples: int = 10000):
        import time
        
        while True:
            health = check_canary_health(self.metrics, experiment_id, min_samples)
            
            if health == "degraded":
                self.rollback(experiment_id)
                return "rolled_back"
            
            if health == "insufficient_data":
                print(f"Waiting {self.check_interval}h for more data...")
                time.sleep(self.check_interval * 3600)
                continue
            
            # Check if we have enough for a statistical decision
            result = analyze_ab_test(
                self.metrics.get_outcomes(experiment_id, "control"),
                self.metrics.get_outcomes(experiment_id, "treatment"),
                metric_name="primary_metric"
            )
            
            if result["statistically_significant"]:
                if result["recommendation"] == "promote":
                    self.promote(experiment_id)
                    return "promoted"
                else:
                    self.rollback(experiment_id)
                    return "rolled_back"
            
            print(f"No decision yet (p={result['p_value']:.4f}). Continuing...")
            time.sleep(self.check_interval * 3600)
    
    def promote(self, experiment_id):
        print(f"Promoting treatment model for experiment {experiment_id}")
        self.router.set_traffic_split(treatment_pct=1.0)
    
    def rollback(self, experiment_id):
        print(f"Rolling back to control model for experiment {experiment_id}")
        self.router.set_traffic_split(treatment_pct=0.0)

Shadow mode, canary, and A/B testing form a progressive confidence ladder for ML deployments. Each stage reduces risk and provides evidence for the promotion decision. Skip steps only with explicit justification — the cost of a bad model in production is almost always higher than the cost of a few more days of controlled testing.

Read more