Data Quality Testing for ML Pipelines: Great Expectations, Soda, and Deequ
ML models fail silently when their input data changes. A column that was never null suddenly has 30% nulls. A categorical feature gains new values the model was never trained on. A numeric distribution shifts because a upstream system changed its calculation.
Data quality testing catches these problems before they corrupt model predictions. Great Expectations, Soda Core, and AWS Deequ are the three most-used open-source frameworks for automating this validation in ML pipelines.
Why Data Quality Testing for ML
Traditional software testing validates code behavior. ML data quality testing validates the statistical properties of data — the distribution of values, null rates, referential integrity, and consistency over time.
The consequences of skipping this:
- Silent model degradation (predictions become wrong but there's no error)
- Training/serving skew (model trains on clean data, serves on dirty data)
- Unexpected schema changes break feature engineering code
Great Expectations
Great Expectations (GE) is the most widely adopted framework. You define "expectations" about your data, which are assertions that can be run against any DataFrame or database table.
Setup and Basic Usage
pip install great_expectations
great_expectations initimport great_expectations as gx
import pandas as pd
context = gx.get_context()
# Load your data
df = pd.read_parquet("data/training_features.parquet")
# Create a Data Source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("training_features")
batch_request = data_asset.build_batch_request(dataframe=df)
# Create an Expectation Suite
suite = context.add_expectation_suite("training_data_suite")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="training_data_suite"
)Writing Expectations
# Schema validation
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("purchase_amount")
validator.expect_column_to_exist("event_timestamp")
# Type checks
validator.expect_column_values_to_be_in_type_list("user_id", ["int64", "int32"])
validator.expect_column_values_to_be_in_type_list("purchase_amount", ["float64", "float32"])
# Null checks
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_not_be_null("event_timestamp")
# Allow some nulls in optional fields
validator.expect_column_values_to_not_be_null("promo_code", mostly=0.7) # 70% non-null
# Value range validation
validator.expect_column_values_to_be_between(
"purchase_amount", min_value=0.0, max_value=100000.0
)
validator.expect_column_values_to_be_between("age", min_value=13, max_value=120)
# Categorical value validation
validator.expect_column_values_to_be_in_set(
"payment_method", {"credit_card", "debit_card", "paypal", "apple_pay"}
)
# Statistical distribution checks
validator.expect_column_mean_to_be_between("purchase_amount", min_value=20.0, max_value=200.0)
validator.expect_column_stdev_to_be_between("purchase_amount", min_value=10.0, max_value=500.0)
# Uniqueness
validator.expect_column_values_to_be_unique("transaction_id")
# Row count
validator.expect_table_row_count_to_be_between(min_value=10000, max_value=10000000)
# Save expectations
validator.save_expectation_suite()Running Validation in a Pipeline
from great_expectations.checkpoint import Checkpoint
checkpoint = context.add_or_update_checkpoint(
name="training_data_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "training_data_suite",
}
],
)
results = checkpoint.run()
if not results.success:
failed_expectations = [
result for result in results.list_validation_results()
if not result.success
]
for failure in failed_expectations:
print(f"FAILED: {failure.expectation_config.expectation_type} "
f"on column {failure.expectation_config.kwargs.get('column')}")
raise ValueError(f"Data quality check failed: {len(failed_expectations)} expectations violated")Soda Core
Soda uses a YAML-based DSL for data quality checks. It's easier to onboard non-engineers to write checks.
Setup
pip install soda-core soda-core-pandasWriting Checks
# checks_training_data.yml
checks for training_features:
- schema:
name: Required columns present
fail:
when required column missing: [user_id, purchase_amount, event_timestamp]
- missing_count(user_id) = 0:
name: user_id has no nulls
- missing_percent(promo_code) < 50%:
name: promo_code at most 50% null
- invalid_count(payment_method) = 0:
valid values: [credit_card, debit_card, paypal, apple_pay]
- min(purchase_amount) >= 0:
name: No negative purchase amounts
- max(purchase_amount) < 100000:
name: No unreasonably large purchases
- avg(purchase_amount) between 20 and 200:
name: Average purchase amount in expected range
- duplicate_count(transaction_id) = 0:
name: Transaction IDs are unique
- row_count between 10000 and 10000000:
name: Dataset has reasonable row countRunning Soda Checks
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name("training_data")
scan.add_configuration_yaml_str("""
data_source training_data:
type: pandas
""")
scan.add_sodacl_yaml_file("checks_training_data.yml")
scan.add_pandas_dataframe(
data_source_name="training_data",
dataset_name="training_features",
pandas_df=df
)
exit_code = scan.execute()
if exit_code != 0:
print(scan.get_logs_text())
raise ValueError("Soda data quality checks failed")Soda in CI
# .github/workflows/ml-pipeline.yml
- name: Validate training data
run: |
soda scan \
-d training_data \
-c soda_config.yml \
checks_training_data.ymlAWS Deequ
Deequ is Amazon's data quality library for Spark-based pipelines. If your data lives in S3 and you process it with PySpark or Spark, Deequ integrates naturally.
from pydeequ.analyzers import *
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
spark = SparkSession.builder.master("local").appName("DQ").getOrCreate()
df = spark.read.parquet("s3://ml-data-bucket/training/features/")
check = (
Check(spark, CheckLevel.Error, "Training Data Checks")
.isComplete("user_id")
.isComplete("purchase_amount")
.isNonNegative("purchase_amount")
.isContainedIn("payment_method", ["credit_card", "debit_card", "paypal"])
.hasUniqueness(["transaction_id"], lambda x: x == 1.0)
.hasSize(lambda x: x >= 10000)
.satisfies("purchase_amount < 100000", "Max purchase under 100k")
.hasMin("purchase_amount", lambda x: x >= 0)
.hasMean("purchase_amount", lambda x: 20 <= x <= 200)
)
result = VerificationSuite(spark).onData(df).addCheck(check).run()
if result.status == "Error":
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.filter("constraint_status == 'Failure'").show(truncate=False)
raise ValueError("Deequ data quality checks failed")Deequ Data Profiling
Before writing checks, use Deequ's profiling to understand your data:
from pydeequ.analyzers import AnalysisRunner, ApproxCountDistinct, Mean, Completeness, Maximum, Minimum
analysis_result = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Completeness("user_id")) \
.addAnalyzer(Completeness("purchase_amount")) \
.addAnalyzer(Mean("purchase_amount")) \
.addAnalyzer(Maximum("purchase_amount")) \
.addAnalyzer(Minimum("purchase_amount")) \
.addAnalyzer(ApproxCountDistinct("payment_method")) \
.run()
metrics_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)
metrics_df.show()Profile your data first, then write checks based on the observed distributions.
Choosing a Framework
| Criteria | Great Expectations | Soda Core | AWS Deequ |
|---|---|---|---|
| Data processing | Pandas, Spark, SQL | Pandas, SQL, Spark | Spark only |
| Check definition | Python API | YAML + Python | Python (Spark DSL) |
| Scale | Medium-large | Medium | Very large |
| CI integration | Python SDK | CLI + Python | Python SDK |
| Data catalog | Built-in docs | Soda Cloud (paid) | Via AWS Glue |
| Learning curve | Medium | Low | Medium |
| Best for | Complex checks, data docs | YAML-first orgs, dbt users | AWS/Spark shops |
Integration Pattern: Gate Your Training Pipeline
# training_pipeline.py
def run_training_pipeline():
# Step 1: Load raw data
raw_df = load_training_data()
# Step 2: Validate raw data quality — GATE
validate_data_quality(raw_df, suite="raw_data_suite")
# Step 3: Feature engineering
features_df = engineer_features(raw_df)
# Step 4: Validate features — GATE
validate_data_quality(features_df, suite="features_suite")
# Step 5: Train model
model = train_model(features_df)
# Step 6: Validate model inputs to serving dataset — GATE
serving_df = load_serving_features()
validate_training_serving_skew(features_df, serving_df)
return model
def validate_training_serving_skew(train_df, serve_df, threshold=0.1):
"""Fail if feature distributions differ significantly between training and serving."""
from scipy.stats import ks_2samp
for col in numeric_feature_columns:
stat, p_value = ks_2samp(train_df[col].dropna(), serve_df[col].dropna())
if stat > threshold:
raise ValueError(
f"Training/serving skew detected in '{col}': "
f"KS statistic={stat:.3f} (threshold={threshold})"
)Data quality testing is a forcing function for understanding your data. Writing expectations forces you to articulate what "good" data looks like — which is exactly the kind of knowledge that disappears when team members leave or upstream systems change.