Testing scikit-learn Pipelines and Preprocessing
scikit-learn pipelines combine preprocessing steps and models into a single object. They're elegant for training and inference, but they introduce testing challenges: how do you test a transformer in isolation? How do you verify that the pipeline doesn't leak information? How do you catch regressions when a preprocessing step changes?
Why Test Pipelines
ML pipelines contain business logic. A custom transformer that imputes missing values with the median makes a choice that affects predictions. A feature encoding scheme determines how the model sees the world. These decisions deserve the same test discipline as application code.
Testing also catches silent failures:
- A transformer that returns a different shape than expected
- A pipeline that fits on test data (data leakage)
- Serialization that doesn't preserve transformer state
- Preprocessing changes that cause training/serving skew
Testing Individual Transformers
Input/Output Shape Tests
The most fundamental test: verify that your transformer produces the expected output shape and type.
import numpy as np
import pandas as pd
import pytest
from sklearn.base import BaseEstimator, TransformerMixin
class MedianImputer(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
self.medians_ = pd.DataFrame(X).median()
return self
def transform(self, X):
df = pd.DataFrame(X).copy()
df.fillna(self.medians_, inplace=True)
return df.values
# Tests
@pytest.fixture
def sample_data():
return np.array([
[1.0, 2.0, np.nan],
[np.nan, 5.0, 3.0],
[3.0, np.nan, 4.0],
[4.0, 8.0, 5.0]
])
def test_median_imputer_output_shape(sample_data):
imputer = MedianImputer()
result = imputer.fit_transform(sample_data)
assert result.shape == sample_data.shape, \
f"Output shape {result.shape} != input shape {sample_data.shape}"
def test_median_imputer_no_nulls_after_transform(sample_data):
imputer = MedianImputer()
result = imputer.fit_transform(sample_data)
assert not np.isnan(result).any(), "Imputer left NaN values in output"
def test_median_imputer_correct_values(sample_data):
imputer = MedianImputer()
imputer.fit(sample_data)
# Column 0: values are 1, NaN, 3, 4 → median = 3
assert imputer.medians_[0] == 3.0
# Transform and check imputed values
result = imputer.transform(sample_data)
assert result[1, 0] == 3.0, "NaN not imputed with column median"Fit/Transform Separation
Transformers must not use transform-time data during fit. Test this explicitly:
def test_imputer_uses_only_fit_data():
"""Transform should use statistics learned at fit time, not the current batch."""
train_data = np.array([[1.0], [2.0], [3.0], [4.0]]) # Median = 2.5
test_data = np.array([[np.nan], [100.0], [200.0]]) # Very different distribution
imputer = MedianImputer()
imputer.fit(train_data)
result = imputer.transform(test_data)
# NaN should be imputed with training median (2.5), NOT test median (100.0 or 150.0)
assert result[0, 0] == 2.5, \
f"Imputed {result[0, 0]}, expected training median 2.5"Sklearn API Compliance
scikit-learn has an API contract. Use check_estimator to verify your transformer meets it:
from sklearn.utils.estimator_checks import check_estimator
def test_median_imputer_sklearn_api():
"""Verify transformer meets sklearn API requirements."""
# This runs dozens of checks including clonability, serialization,
# consistent transform after fit, and more
for estimator, check in check_estimator(MedianImputer(), generate_only=True):
check(estimator)Testing Full Pipelines
Smoke Test: Pipeline Runs End-to-End
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import pytest
@pytest.fixture
def sample_classification_data():
X, y = make_classification(n_samples=200, n_features=10, random_state=42)
return X, y
def test_pipeline_fit_predict_smoke(sample_classification_data):
"""Full pipeline trains and predicts without errors."""
X, y = sample_classification_data
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier(n_estimators=10, random_state=42))
])
pipeline.fit(X, y)
predictions = pipeline.predict(X[:10])
assert len(predictions) == 10
assert set(predictions).issubset({0, 1})
def test_pipeline_predict_proba_shape(sample_classification_data):
X, y = sample_classification_data
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier(n_estimators=10, random_state=42))
])
pipeline.fit(X, y)
probas = pipeline.predict_proba(X[:10])
assert probas.shape == (10, 2), f"Expected (10, 2), got {probas.shape}"
assert np.allclose(probas.sum(axis=1), 1.0), "Probabilities don't sum to 1"Test No Data Leakage
This is the most important pipeline test:
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
def test_pipeline_no_leakage_vs_bare_transform():
"""
A pipeline using cross-validation should give different (lower)
scores than pre-transforming the full dataset and then CV-ing.
If they're the same, the pipeline might not be preventing leakage
on non-linear data... but for a well-designed pipeline they should differ.
More specifically: fitting a scaler on all data BEFORE CV gives optimistically
biased results for high-dimensional data.
"""
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
# High-dimensional data where leakage matters more
X, y = make_classification(n_samples=100, n_features=100,
n_informative=5, random_state=42)
# WRONG approach: scale all data first, then CV (leakage)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # Leaks test distribution
leaky_scores = cross_val_score(LogisticRegression(), X_scaled, y, cv=5)
# RIGHT approach: scale inside pipeline (no leakage)
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression())
])
correct_scores = cross_val_score(pipeline, X, y, cv=5)
# This test documents that the two approaches can differ
# In practice, the correct approach may or may not give lower scores
# depending on the dataset, but the pipeline approach is always correct
print(f"Leaky CV mean: {leaky_scores.mean():.4f}")
print(f"Correct CV mean: {correct_scores.mean():.4f}")Test Determinism
ML pipelines should produce the same output for the same input. Non-deterministic behavior is a bug:
def test_pipeline_deterministic(sample_classification_data):
"""Same random_state should produce identical predictions."""
X, y = sample_classification_data
X_new = X[:20]
# Train and predict twice with same random state
pipeline1 = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier(n_estimators=10, random_state=42))
])
pipeline1.fit(X, y)
pred1 = pipeline1.predict(X_new)
pipeline2 = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier(n_estimators=10, random_state=42))
])
pipeline2.fit(X, y)
pred2 = pipeline2.predict(X_new)
np.testing.assert_array_equal(pred1, pred2,
err_msg="Pipeline is not deterministic with same random_state")Test Serialization
Deployed models are serialized (pickle or joblib). Test that serialization round-trips correctly:
import joblib
import tempfile
import os
def test_pipeline_serialization(sample_classification_data, tmp_path):
"""Serialized and deserialized pipeline produces same predictions."""
X, y = sample_classification_data
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier(n_estimators=10, random_state=42))
])
pipeline.fit(X, y)
original_preds = pipeline.predict(X[:20])
# Save
model_path = tmp_path / "model.joblib"
joblib.dump(pipeline, model_path)
# Load
loaded_pipeline = joblib.load(model_path)
loaded_preds = loaded_pipeline.predict(X[:20])
np.testing.assert_array_equal(original_preds, loaded_preds,
err_msg="Serialization changed model predictions")
# Verify file size is reasonable
file_size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert file_size_mb < 100, f"Model file {file_size_mb:.1f}MB is suspiciously large"Testing ColumnTransformer Pipelines
Most real pipelines use ColumnTransformer for mixed numeric/categorical data:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
@pytest.fixture
def mixed_data():
return pd.DataFrame({
'age': [25.0, np.nan, 35.0, 42.0],
'income': [50000.0, 75000.0, np.nan, 90000.0],
'country': ['US', 'UK', 'US', np.nan],
'plan': ['basic', 'premium', 'basic', 'enterprise']
})
def build_pipeline():
numeric_features = ['age', 'income']
categorical_features = ['country', 'plan']
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('numeric', numeric_transformer, numeric_features),
('categorical', categorical_transformer, categorical_features)
])
return Pipeline([
('preprocessor', preprocessor),
('model', RandomForestClassifier(n_estimators=10, random_state=42))
])
def test_column_transformer_handles_nulls(mixed_data):
"""Pipeline should handle null values in all feature types."""
y = np.array([0, 1, 0, 1])
pipeline = build_pipeline()
# Should not raise
pipeline.fit(mixed_data, y)
predictions = pipeline.predict(mixed_data)
assert len(predictions) == 4
assert not np.isnan(predictions).any()
def test_column_transformer_handles_unseen_categories(mixed_data):
"""Pipeline should handle categories not seen during training."""
y = np.array([0, 1, 0, 1])
pipeline = build_pipeline()
pipeline.fit(mixed_data, y)
# Test data with a new country not seen in training
test_data = pd.DataFrame({
'age': [30.0],
'income': [60000.0],
'country': ['JAPAN'], # Not in training data
'plan': ['basic']
})
# Should not raise (handle_unknown='ignore' in OneHotEncoder)
predictions = pipeline.predict(test_data)
assert len(predictions) == 1Regression Testing with Performance Thresholds
After any preprocessing change, verify model performance hasn't regressed:
from sklearn.model_selection import cross_val_score
import joblib
PERFORMANCE_THRESHOLD = 0.80 # F1 score floor
def test_pipeline_performance_regression():
"""Performance must not drop below established threshold."""
X, y = load_production_training_data()
pipeline = build_pipeline()
scores = cross_val_score(pipeline, X, y, cv=5, scoring='f1_macro')
mean_score = scores.mean()
assert mean_score >= PERFORMANCE_THRESHOLD, \
f"Model F1 {mean_score:.4f} below threshold {PERFORMANCE_THRESHOLD}. " \
f"Preprocessing change may have introduced a regression."Unit testing scikit-learn pipelines is an investment that pays off when preprocessing logic grows complex, when multiple engineers work on the same pipeline, and when you need confidence that a refactoring didn't silently change model behavior.