Data Quality Testing for ML Pipelines: Great Expectations, Soda, and Deequ

Data Quality Testing for ML Pipelines: Great Expectations, Soda, and Deequ

ML models fail silently when their input data changes. A column that was never null suddenly has 30% nulls. A categorical feature gains new values the model was never trained on. A numeric distribution shifts because a upstream system changed its calculation.

Data quality testing catches these problems before they corrupt model predictions. Great Expectations, Soda Core, and AWS Deequ are the three most-used open-source frameworks for automating this validation in ML pipelines.

Why Data Quality Testing for ML

Traditional software testing validates code behavior. ML data quality testing validates the statistical properties of data — the distribution of values, null rates, referential integrity, and consistency over time.

The consequences of skipping this:

  • Silent model degradation (predictions become wrong but there's no error)
  • Training/serving skew (model trains on clean data, serves on dirty data)
  • Unexpected schema changes break feature engineering code

Great Expectations

Great Expectations (GE) is the most widely adopted framework. You define "expectations" about your data, which are assertions that can be run against any DataFrame or database table.

Setup and Basic Usage

pip install great_expectations
great_expectations init
import great_expectations as gx
import pandas as pd

context = gx.get_context()

# Load your data
df = pd.read_parquet("data/training_features.parquet")

# Create a Data Source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("training_features")
batch_request = data_asset.build_batch_request(dataframe=df)

# Create an Expectation Suite
suite = context.add_expectation_suite("training_data_suite")

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="training_data_suite"
)

Writing Expectations

# Schema validation
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("purchase_amount")
validator.expect_column_to_exist("event_timestamp")

# Type checks
validator.expect_column_values_to_be_in_type_list("user_id", ["int64", "int32"])
validator.expect_column_values_to_be_in_type_list("purchase_amount", ["float64", "float32"])

# Null checks
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_not_be_null("event_timestamp")
# Allow some nulls in optional fields
validator.expect_column_values_to_not_be_null("promo_code", mostly=0.7)  # 70% non-null

# Value range validation
validator.expect_column_values_to_be_between(
    "purchase_amount", min_value=0.0, max_value=100000.0
)
validator.expect_column_values_to_be_between("age", min_value=13, max_value=120)

# Categorical value validation
validator.expect_column_values_to_be_in_set(
    "payment_method", {"credit_card", "debit_card", "paypal", "apple_pay"}
)

# Statistical distribution checks
validator.expect_column_mean_to_be_between("purchase_amount", min_value=20.0, max_value=200.0)
validator.expect_column_stdev_to_be_between("purchase_amount", min_value=10.0, max_value=500.0)

# Uniqueness
validator.expect_column_values_to_be_unique("transaction_id")

# Row count
validator.expect_table_row_count_to_be_between(min_value=10000, max_value=10000000)

# Save expectations
validator.save_expectation_suite()

Running Validation in a Pipeline

from great_expectations.checkpoint import Checkpoint

checkpoint = context.add_or_update_checkpoint(
    name="training_data_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "training_data_suite",
        }
    ],
)

results = checkpoint.run()

if not results.success:
    failed_expectations = [
        result for result in results.list_validation_results()
        if not result.success
    ]
    for failure in failed_expectations:
        print(f"FAILED: {failure.expectation_config.expectation_type} "
              f"on column {failure.expectation_config.kwargs.get('column')}")
    
    raise ValueError(f"Data quality check failed: {len(failed_expectations)} expectations violated")

Soda Core

Soda uses a YAML-based DSL for data quality checks. It's easier to onboard non-engineers to write checks.

Setup

pip install soda-core soda-core-pandas

Writing Checks

# checks_training_data.yml
checks for training_features:
  - schema:
      name: Required columns present
      fail:
        when required column missing: [user_id, purchase_amount, event_timestamp]
  
  - missing_count(user_id) = 0:
      name: user_id has no nulls
  
  - missing_percent(promo_code) < 50%:
      name: promo_code at most 50% null
  
  - invalid_count(payment_method) = 0:
      valid values: [credit_card, debit_card, paypal, apple_pay]
  
  - min(purchase_amount) >= 0:
      name: No negative purchase amounts
  
  - max(purchase_amount) < 100000:
      name: No unreasonably large purchases
  
  - avg(purchase_amount) between 20 and 200:
      name: Average purchase amount in expected range
  
  - duplicate_count(transaction_id) = 0:
      name: Transaction IDs are unique
  
  - row_count between 10000 and 10000000:
      name: Dataset has reasonable row count

Running Soda Checks

from soda.scan import Scan

scan = Scan()
scan.set_data_source_name("training_data")
scan.add_configuration_yaml_str("""
  data_source training_data:
    type: pandas
""")
scan.add_sodacl_yaml_file("checks_training_data.yml")
scan.add_pandas_dataframe(
    data_source_name="training_data",
    dataset_name="training_features",
    pandas_df=df
)

exit_code = scan.execute()

if exit_code != 0:
    print(scan.get_logs_text())
    raise ValueError("Soda data quality checks failed")

Soda in CI

# .github/workflows/ml-pipeline.yml
- name: Validate training data
  run: |
    soda scan \
      -d training_data \
      -c soda_config.yml \
      checks_training_data.yml

AWS Deequ

Deequ is Amazon's data quality library for Spark-based pipelines. If your data lives in S3 and you process it with PySpark or Spark, Deequ integrates naturally.

from pydeequ.analyzers import *
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

spark = SparkSession.builder.master("local").appName("DQ").getOrCreate()
df = spark.read.parquet("s3://ml-data-bucket/training/features/")

check = (
    Check(spark, CheckLevel.Error, "Training Data Checks")
    .isComplete("user_id")
    .isComplete("purchase_amount")
    .isNonNegative("purchase_amount")
    .isContainedIn("payment_method", ["credit_card", "debit_card", "paypal"])
    .hasUniqueness(["transaction_id"], lambda x: x == 1.0)
    .hasSize(lambda x: x >= 10000)
    .satisfies("purchase_amount < 100000", "Max purchase under 100k")
    .hasMin("purchase_amount", lambda x: x >= 0)
    .hasMean("purchase_amount", lambda x: 20 <= x <= 200)
)

result = VerificationSuite(spark).onData(df).addCheck(check).run()

if result.status == "Error":
    result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
    result_df.filter("constraint_status == 'Failure'").show(truncate=False)
    raise ValueError("Deequ data quality checks failed")

Deequ Data Profiling

Before writing checks, use Deequ's profiling to understand your data:

from pydeequ.analyzers import AnalysisRunner, ApproxCountDistinct, Mean, Completeness, Maximum, Minimum

analysis_result = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Completeness("user_id")) \
    .addAnalyzer(Completeness("purchase_amount")) \
    .addAnalyzer(Mean("purchase_amount")) \
    .addAnalyzer(Maximum("purchase_amount")) \
    .addAnalyzer(Minimum("purchase_amount")) \
    .addAnalyzer(ApproxCountDistinct("payment_method")) \
    .run()

metrics_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)
metrics_df.show()

Profile your data first, then write checks based on the observed distributions.

Choosing a Framework

Criteria Great Expectations Soda Core AWS Deequ
Data processing Pandas, Spark, SQL Pandas, SQL, Spark Spark only
Check definition Python API YAML + Python Python (Spark DSL)
Scale Medium-large Medium Very large
CI integration Python SDK CLI + Python Python SDK
Data catalog Built-in docs Soda Cloud (paid) Via AWS Glue
Learning curve Medium Low Medium
Best for Complex checks, data docs YAML-first orgs, dbt users AWS/Spark shops

Integration Pattern: Gate Your Training Pipeline

# training_pipeline.py
def run_training_pipeline():
    # Step 1: Load raw data
    raw_df = load_training_data()
    
    # Step 2: Validate raw data quality — GATE
    validate_data_quality(raw_df, suite="raw_data_suite")
    
    # Step 3: Feature engineering
    features_df = engineer_features(raw_df)
    
    # Step 4: Validate features — GATE
    validate_data_quality(features_df, suite="features_suite")
    
    # Step 5: Train model
    model = train_model(features_df)
    
    # Step 6: Validate model inputs to serving dataset — GATE
    serving_df = load_serving_features()
    validate_training_serving_skew(features_df, serving_df)
    
    return model

def validate_training_serving_skew(train_df, serve_df, threshold=0.1):
    """Fail if feature distributions differ significantly between training and serving."""
    from scipy.stats import ks_2samp
    
    for col in numeric_feature_columns:
        stat, p_value = ks_2samp(train_df[col].dropna(), serve_df[col].dropna())
        if stat > threshold:
            raise ValueError(
                f"Training/serving skew detected in '{col}': "
                f"KS statistic={stat:.3f} (threshold={threshold})"
            )

Data quality testing is a forcing function for understanding your data. Writing expectations forces you to articulate what "good" data looks like — which is exactly the kind of knowledge that disappears when team members leave or upstream systems change.

Read more