Testing scikit-learn Pipelines and Preprocessing

Testing scikit-learn Pipelines and Preprocessing

scikit-learn pipelines combine preprocessing steps and models into a single object. They're elegant for training and inference, but they introduce testing challenges: how do you test a transformer in isolation? How do you verify that the pipeline doesn't leak information? How do you catch regressions when a preprocessing step changes?

Why Test Pipelines

ML pipelines contain business logic. A custom transformer that imputes missing values with the median makes a choice that affects predictions. A feature encoding scheme determines how the model sees the world. These decisions deserve the same test discipline as application code.

Testing also catches silent failures:

  • A transformer that returns a different shape than expected
  • A pipeline that fits on test data (data leakage)
  • Serialization that doesn't preserve transformer state
  • Preprocessing changes that cause training/serving skew

Testing Individual Transformers

Input/Output Shape Tests

The most fundamental test: verify that your transformer produces the expected output shape and type.

import numpy as np
import pandas as pd
import pytest
from sklearn.base import BaseEstimator, TransformerMixin

class MedianImputer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        self.medians_ = pd.DataFrame(X).median()
        return self
    
    def transform(self, X):
        df = pd.DataFrame(X).copy()
        df.fillna(self.medians_, inplace=True)
        return df.values

# Tests
@pytest.fixture
def sample_data():
    return np.array([
        [1.0, 2.0, np.nan],
        [np.nan, 5.0, 3.0],
        [3.0, np.nan, 4.0],
        [4.0, 8.0, 5.0]
    ])

def test_median_imputer_output_shape(sample_data):
    imputer = MedianImputer()
    result = imputer.fit_transform(sample_data)
    
    assert result.shape == sample_data.shape, \
        f"Output shape {result.shape} != input shape {sample_data.shape}"

def test_median_imputer_no_nulls_after_transform(sample_data):
    imputer = MedianImputer()
    result = imputer.fit_transform(sample_data)
    
    assert not np.isnan(result).any(), "Imputer left NaN values in output"

def test_median_imputer_correct_values(sample_data):
    imputer = MedianImputer()
    imputer.fit(sample_data)
    
    # Column 0: values are 1, NaN, 3, 4 → median = 3
    assert imputer.medians_[0] == 3.0
    
    # Transform and check imputed values
    result = imputer.transform(sample_data)
    assert result[1, 0] == 3.0, "NaN not imputed with column median"

Fit/Transform Separation

Transformers must not use transform-time data during fit. Test this explicitly:

def test_imputer_uses_only_fit_data():
    """Transform should use statistics learned at fit time, not the current batch."""
    train_data = np.array([[1.0], [2.0], [3.0], [4.0]])  # Median = 2.5
    test_data = np.array([[np.nan], [100.0], [200.0]])    # Very different distribution
    
    imputer = MedianImputer()
    imputer.fit(train_data)
    
    result = imputer.transform(test_data)
    
    # NaN should be imputed with training median (2.5), NOT test median (100.0 or 150.0)
    assert result[0, 0] == 2.5, \
        f"Imputed {result[0, 0]}, expected training median 2.5"

Sklearn API Compliance

scikit-learn has an API contract. Use check_estimator to verify your transformer meets it:

from sklearn.utils.estimator_checks import check_estimator

def test_median_imputer_sklearn_api():
    """Verify transformer meets sklearn API requirements."""
    # This runs dozens of checks including clonability, serialization,
    # consistent transform after fit, and more
    for estimator, check in check_estimator(MedianImputer(), generate_only=True):
        check(estimator)

Testing Full Pipelines

Smoke Test: Pipeline Runs End-to-End

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import pytest

@pytest.fixture
def sample_classification_data():
    X, y = make_classification(n_samples=200, n_features=10, random_state=42)
    return X, y

def test_pipeline_fit_predict_smoke(sample_classification_data):
    """Full pipeline trains and predicts without errors."""
    X, y = sample_classification_data
    
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('model', RandomForestClassifier(n_estimators=10, random_state=42))
    ])
    
    pipeline.fit(X, y)
    predictions = pipeline.predict(X[:10])
    
    assert len(predictions) == 10
    assert set(predictions).issubset({0, 1})

def test_pipeline_predict_proba_shape(sample_classification_data):
    X, y = sample_classification_data
    
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('model', RandomForestClassifier(n_estimators=10, random_state=42))
    ])
    pipeline.fit(X, y)
    
    probas = pipeline.predict_proba(X[:10])
    
    assert probas.shape == (10, 2), f"Expected (10, 2), got {probas.shape}"
    assert np.allclose(probas.sum(axis=1), 1.0), "Probabilities don't sum to 1"

Test No Data Leakage

This is the most important pipeline test:

from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

def test_pipeline_no_leakage_vs_bare_transform():
    """
    A pipeline using cross-validation should give different (lower) 
    scores than pre-transforming the full dataset and then CV-ing.
    If they're the same, the pipeline might not be preventing leakage 
    on non-linear data... but for a well-designed pipeline they should differ.
    
    More specifically: fitting a scaler on all data BEFORE CV gives optimistically
    biased results for high-dimensional data.
    """
    from sklearn.datasets import make_classification
    from sklearn.preprocessing import StandardScaler
    
    # High-dimensional data where leakage matters more
    X, y = make_classification(n_samples=100, n_features=100, 
                                n_informative=5, random_state=42)
    
    # WRONG approach: scale all data first, then CV (leakage)
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)  # Leaks test distribution
    leaky_scores = cross_val_score(LogisticRegression(), X_scaled, y, cv=5)
    
    # RIGHT approach: scale inside pipeline (no leakage)
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('model', LogisticRegression())
    ])
    correct_scores = cross_val_score(pipeline, X, y, cv=5)
    
    # This test documents that the two approaches can differ
    # In practice, the correct approach may or may not give lower scores
    # depending on the dataset, but the pipeline approach is always correct
    print(f"Leaky CV mean: {leaky_scores.mean():.4f}")
    print(f"Correct CV mean: {correct_scores.mean():.4f}")

Test Determinism

ML pipelines should produce the same output for the same input. Non-deterministic behavior is a bug:

def test_pipeline_deterministic(sample_classification_data):
    """Same random_state should produce identical predictions."""
    X, y = sample_classification_data
    X_new = X[:20]
    
    # Train and predict twice with same random state
    pipeline1 = Pipeline([
        ('scaler', StandardScaler()),
        ('model', RandomForestClassifier(n_estimators=10, random_state=42))
    ])
    pipeline1.fit(X, y)
    pred1 = pipeline1.predict(X_new)
    
    pipeline2 = Pipeline([
        ('scaler', StandardScaler()),
        ('model', RandomForestClassifier(n_estimators=10, random_state=42))
    ])
    pipeline2.fit(X, y)
    pred2 = pipeline2.predict(X_new)
    
    np.testing.assert_array_equal(pred1, pred2, 
        err_msg="Pipeline is not deterministic with same random_state")

Test Serialization

Deployed models are serialized (pickle or joblib). Test that serialization round-trips correctly:

import joblib
import tempfile
import os

def test_pipeline_serialization(sample_classification_data, tmp_path):
    """Serialized and deserialized pipeline produces same predictions."""
    X, y = sample_classification_data
    
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('model', RandomForestClassifier(n_estimators=10, random_state=42))
    ])
    pipeline.fit(X, y)
    original_preds = pipeline.predict(X[:20])
    
    # Save
    model_path = tmp_path / "model.joblib"
    joblib.dump(pipeline, model_path)
    
    # Load
    loaded_pipeline = joblib.load(model_path)
    loaded_preds = loaded_pipeline.predict(X[:20])
    
    np.testing.assert_array_equal(original_preds, loaded_preds,
        err_msg="Serialization changed model predictions")
    
    # Verify file size is reasonable
    file_size_mb = os.path.getsize(model_path) / (1024 * 1024)
    assert file_size_mb < 100, f"Model file {file_size_mb:.1f}MB is suspiciously large"

Testing ColumnTransformer Pipelines

Most real pipelines use ColumnTransformer for mixed numeric/categorical data:

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

@pytest.fixture
def mixed_data():
    return pd.DataFrame({
        'age': [25.0, np.nan, 35.0, 42.0],
        'income': [50000.0, 75000.0, np.nan, 90000.0],
        'country': ['US', 'UK', 'US', np.nan],
        'plan': ['basic', 'premium', 'basic', 'enterprise']
    })

def build_pipeline():
    numeric_features = ['age', 'income']
    categorical_features = ['country', 'plan']
    
    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    preprocessor = ColumnTransformer([
        ('numeric', numeric_transformer, numeric_features),
        ('categorical', categorical_transformer, categorical_features)
    ])
    
    return Pipeline([
        ('preprocessor', preprocessor),
        ('model', RandomForestClassifier(n_estimators=10, random_state=42))
    ])

def test_column_transformer_handles_nulls(mixed_data):
    """Pipeline should handle null values in all feature types."""
    y = np.array([0, 1, 0, 1])
    
    pipeline = build_pipeline()
    
    # Should not raise
    pipeline.fit(mixed_data, y)
    predictions = pipeline.predict(mixed_data)
    
    assert len(predictions) == 4
    assert not np.isnan(predictions).any()

def test_column_transformer_handles_unseen_categories(mixed_data):
    """Pipeline should handle categories not seen during training."""
    y = np.array([0, 1, 0, 1])
    
    pipeline = build_pipeline()
    pipeline.fit(mixed_data, y)
    
    # Test data with a new country not seen in training
    test_data = pd.DataFrame({
        'age': [30.0],
        'income': [60000.0],
        'country': ['JAPAN'],  # Not in training data
        'plan': ['basic']
    })
    
    # Should not raise (handle_unknown='ignore' in OneHotEncoder)
    predictions = pipeline.predict(test_data)
    assert len(predictions) == 1

Regression Testing with Performance Thresholds

After any preprocessing change, verify model performance hasn't regressed:

from sklearn.model_selection import cross_val_score
import joblib

PERFORMANCE_THRESHOLD = 0.80  # F1 score floor

def test_pipeline_performance_regression():
    """Performance must not drop below established threshold."""
    X, y = load_production_training_data()
    
    pipeline = build_pipeline()
    
    scores = cross_val_score(pipeline, X, y, cv=5, scoring='f1_macro')
    mean_score = scores.mean()
    
    assert mean_score >= PERFORMANCE_THRESHOLD, \
        f"Model F1 {mean_score:.4f} below threshold {PERFORMANCE_THRESHOLD}. " \
        f"Preprocessing change may have introduced a regression."

Unit testing scikit-learn pipelines is an investment that pays off when preprocessing logic grows complex, when multiple engineers work on the same pipeline, and when you need confidence that a refactoring didn't silently change model behavior.

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest