Great Expectations for ML Pipeline Data Validation

Great Expectations for ML Pipeline Data Validation

Great Expectations (GX) is the de facto standard for data quality validation in ML pipelines. It lets you define data expectations as code, validate datasets before training, and generate visual data quality reports. This guide covers setup, writing expectations, checkpoints, and pipeline integration.


Why Data Validation Matters for ML

The most common reason ML models fail silently isn't model bugs — it's data bugs:

  • A column that was never null suddenly has 40% null values
  • A numeric feature develops negative values that physically can't exist (e.g., negative age)
  • Training and serving data have different categorical encodings
  • A new data source is connected that uses a different date format

Without data validation, these issues flow into training runs and corrupt models. Great Expectations adds a validation layer at every stage of your ML pipeline.


Installation and Setup

pip install great_expectations

Initialize a new GX project:

great_expectations init

This creates:

great_expectations/
  great_expectations.yml       # Main config
  expectations/                # Your expectation suites
  checkpoints/                 # Checkpoint configs
  uncommitted/
    data_docs/                 # Generated HTML reports
    validations/               # Validation results

Core Concepts

  • Expectation — a single assertion about your data (expect_column_values_to_not_be_null)
  • Expectation Suite — a collection of expectations for a dataset
  • Data Source — connection to your data (Pandas, Spark, SQL, etc.)
  • Batch — a specific slice of data to validate
  • Checkpoint — ties suites to data sources for automated validation
  • Data Docs — auto-generated HTML reports of validation results

Writing Expectations

Create an Expectation Suite

import great_expectations as gx

context = gx.get_context()

# Create a new suite
suite = context.add_expectation_suite(expectation_suite_name="customer_features.basic")

Connecting to Data

import pandas as pd

# Load your data
df = pd.read_parquet('data/customer_features.parquet')

# Create a Data Source (Pandas in-memory)
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset(name="customer_features")
batch_request = data_asset.build_batch_request(dataframe=df)

# Get a Validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="customer_features.basic"
)

Adding Expectations

# Completeness
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("income")

# Value ranges
validator.expect_column_values_to_be_between(
    "age", min_value=18, max_value=120
)
validator.expect_column_values_to_be_between(
    "credit_score", min_value=300, max_value=850
)
validator.expect_column_values_to_be_between(
    "income", min_value=0, max_value=10_000_000
)

# Uniqueness
validator.expect_column_values_to_be_unique("customer_id")

# Value sets (categorical features)
validator.expect_column_values_to_be_in_set(
    "employment_status",
    value_set=["employed", "self_employed", "unemployed", "retired"]
)

# String formats
validator.expect_column_values_to_match_regex(
    "email",
    regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)

# Row count
validator.expect_table_row_count_to_be_between(
    min_value=10_000,
    max_value=1_000_000
)

# Column existence
validator.expect_column_to_exist("customer_id")
validator.expect_column_to_exist("income")
validator.expect_column_to_exist("credit_score")

# Save the suite
validator.save_expectation_suite()

Running Validation

Direct Validation in Python

def validate_training_data(data_path: str) -> bool:
    """
    Validate data quality before training.
    Returns True if all expectations pass.
    """
    context = gx.get_context()
    
    df = pd.read_parquet(data_path)
    
    datasource = context.sources.get("my_datasource")
    data_asset = datasource.get_asset("customer_features")
    batch_request = data_asset.build_batch_request(dataframe=df)
    
    results = context.run_checkpoint(
        checkpoint_name="training_data_checkpoint",
        batch_request=batch_request
    )
    
    return results.success

pytest Integration

import pytest
import great_expectations as gx
import pandas as pd

@pytest.fixture(scope="session")
def gx_context():
    return gx.get_context()


def test_training_data_passes_expectations(gx_context):
    """Training data must pass all quality expectations before model training."""
    df = pd.read_parquet('data/training_data.parquet')
    
    datasource = gx_context.sources.get("my_datasource")
    data_asset = datasource.get_asset("customer_features")
    batch_request = data_asset.build_batch_request(dataframe=df)
    
    validator = gx_context.get_validator(
        batch_request=batch_request,
        expectation_suite_name="customer_features.basic"
    )
    
    results = validator.validate()
    
    # Collect failed expectations for clear error messages
    failures = [
        f"{r['expectation_config']['expectation_type']} on column '{r['expectation_config']['kwargs'].get('column', 'table')}'"
        for r in results.results
        if not r['success']
    ]
    
    assert results.success, f"Data quality failures:\n" + "\n".join(f"  - {f}" for f in failures)


def test_no_duplicate_customer_ids(gx_context):
    """Customer IDs must be unique — duplicates corrupt training labels."""
    df = pd.read_parquet('data/training_data.parquet')
    
    datasource = gx_context.sources.get("my_datasource")
    data_asset = datasource.get_asset("customer_features")
    batch_request = data_asset.build_batch_request(dataframe=df)
    
    validator = gx_context.get_validator(batch_request=batch_request, expectation_suite_name="temp")
    
    result = validator.expect_column_values_to_be_unique("customer_id")
    
    assert result.success, \
        f"Duplicate customer IDs found: {result.result.get('partial_unexpected_list', [])[:10]}"


def test_feature_distributions_match_expected_ranges(gx_context):
    """Critical feature statistics must be within historical ranges."""
    df = pd.read_parquet('data/training_data.parquet')
    
    datasource = gx_context.sources.get("my_datasource")
    data_asset = datasource.get_asset("customer_features")
    batch_request = data_asset.build_batch_request(dataframe=df)
    
    validator = gx_context.get_validator(batch_request=batch_request, expectation_suite_name="temp")
    
    # Mean income should be within a reasonable range (based on historical data)
    result = validator.expect_column_mean_to_be_between(
        "income", min_value=45_000, max_value=120_000
    )
    assert result.success, \
        f"Income mean {result.result.get('observed_value')} out of expected range [45000, 120000]"

Checkpoints for Automated Pipeline Validation

Checkpoints bundle a suite with a data connector for reuse in pipelines:

# great_expectations/checkpoints/training_data_checkpoint.yml
name: training_data_checkpoint
config_version: 1.0
class_name: SimpleCheckpoint
validations:
  - batch_request:
      datasource_name: my_datasource
      data_asset_name: customer_features
    expectation_suite_name: customer_features.basic
    action_list:
      - name: store_validation_result
        action:
          class_name: StoreValidationResultAction
      - name: update_data_docs
        action:
          class_name: UpdateDataDocsAction
      - name: slack_notification
        action:
          class_name: SlackNotificationAction
          slack_webhook: "https://hooks.slack.com/services/your-webhook"
          notify_on: failure

Run from command line:

great_expectations checkpoint run training_data_checkpoint

Integration with Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def validate_data(**context):
    import great_expectations as gx
    import pandas as pd
    
    gx_context = gx.get_context()
    
    # Load the day's data
    run_date = context['ds']
    df = pd.read_parquet(f's3://bucket/features/{run_date}.parquet')
    
    results = gx_context.run_checkpoint(
        checkpoint_name="daily_training_data",
        batch_request={"dataframe": df}
    )
    
    if not results.success:
        raise ValueError(f"Data validation failed for {run_date}. Check Data Docs.")


with DAG('ml_training_pipeline', start_date=datetime(2026, 1, 1), schedule_interval='@daily') as dag:
    
    validate_task = PythonOperator(
        task_id='validate_training_data',
        python_callable=validate_data
    )
    
    train_task = BashOperator(
        task_id='train_model',
        bash_command='python training/train.py --date={{ ds }}'
    )
    
    evaluate_task = BashOperator(
        task_id='evaluate_model',
        bash_command='python evaluation/evaluate.py --date={{ ds }}'
    )
    
    # Validation gates training
    validate_task >> train_task >> evaluate_task

Training vs. Serving Data Comparison

Detect training-serving skew by validating serving data against training expectations:

def validate_serving_data(serving_df: pd.DataFrame) -> dict:
    """
    Validate real-time inference data against training data expectations.
    Call this before making predictions to catch data pipeline issues.
    """
    context = gx.get_context()
    
    datasource = context.sources.get("my_datasource")
    data_asset = datasource.get_asset("customer_features")
    batch_request = data_asset.build_batch_request(dataframe=serving_df)
    
    # Use the same suite as training
    validator = context.get_validator(
        batch_request=batch_request,
        expectation_suite_name="customer_features.basic"
    )
    
    results = validator.validate()
    
    failed_expectations = [
        {
            "expectation": r['expectation_config']['expectation_type'],
            "column": r['expectation_config']['kwargs'].get('column'),
            "details": r.get('result', {})
        }
        for r in results.results
        if not r['success']
    ]
    
    return {
        "valid": results.success,
        "failures": failed_expectations,
        "statistics": results.statistics
    }

Data Docs

GX auto-generates HTML data quality reports. Host them for team visibility:

# Build and open Data Docs locally
great_expectations docs build

<span class="hljs-comment"># In CI, upload to S3 or GCS
aws s3 <span class="hljs-built_in">sync great_expectations/uncommitted/data_docs/local_site/ \
  s3://your-bucket/data-docs/ --acl public-read

CI Integration

name: ML Data Validation
on:
  push:
    paths: ['data/**', 'pipelines/**']
  schedule:
    - cron: '0 5 * * *'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install great_expectations pandas pyarrow pytest
      
      - name: Download latest data
        run: python scripts/fetch_data.py --date today
      
      - name: Run Great Expectations checkpoint
        run: great_expectations checkpoint run training_data_checkpoint
      
      - name: Run pytest data quality tests
        run: pytest tests/data_quality/ -v
      
      - name: Upload Data Docs
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: data-docs
          path: great_expectations/uncommitted/data_docs/

Summary

Great Expectations brings software testing discipline to data:

  • Expectation suites — codify data contracts as version-controlled assertions
  • Checkpoints — automate validation in pipelines with notifications on failure
  • Data Docs — visual reports make data quality visible to non-technical stakeholders
  • Training/serving parity — validate inference data against training expectations to catch skew

The key insight: ML model bugs often originate in data, not code. Testing the data before it reaches training and inference is more effective than debugging model behavior after the fact.

Read more