Great Expectations for ML Pipeline Data Validation
Great Expectations (GX) is the de facto standard for data quality validation in ML pipelines. It lets you define data expectations as code, validate datasets before training, and generate visual data quality reports. This guide covers setup, writing expectations, checkpoints, and pipeline integration.
Why Data Validation Matters for ML
The most common reason ML models fail silently isn't model bugs — it's data bugs:
- A column that was never null suddenly has 40% null values
- A numeric feature develops negative values that physically can't exist (e.g., negative age)
- Training and serving data have different categorical encodings
- A new data source is connected that uses a different date format
Without data validation, these issues flow into training runs and corrupt models. Great Expectations adds a validation layer at every stage of your ML pipeline.
Installation and Setup
pip install great_expectationsInitialize a new GX project:
great_expectations initThis creates:
great_expectations/
great_expectations.yml # Main config
expectations/ # Your expectation suites
checkpoints/ # Checkpoint configs
uncommitted/
data_docs/ # Generated HTML reports
validations/ # Validation resultsCore Concepts
- Expectation — a single assertion about your data (
expect_column_values_to_not_be_null) - Expectation Suite — a collection of expectations for a dataset
- Data Source — connection to your data (Pandas, Spark, SQL, etc.)
- Batch — a specific slice of data to validate
- Checkpoint — ties suites to data sources for automated validation
- Data Docs — auto-generated HTML reports of validation results
Writing Expectations
Create an Expectation Suite
import great_expectations as gx
context = gx.get_context()
# Create a new suite
suite = context.add_expectation_suite(expectation_suite_name="customer_features.basic")Connecting to Data
import pandas as pd
# Load your data
df = pd.read_parquet('data/customer_features.parquet')
# Create a Data Source (Pandas in-memory)
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset(name="customer_features")
batch_request = data_asset.build_batch_request(dataframe=df)
# Get a Validator
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="customer_features.basic"
)Adding Expectations
# Completeness
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("income")
# Value ranges
validator.expect_column_values_to_be_between(
"age", min_value=18, max_value=120
)
validator.expect_column_values_to_be_between(
"credit_score", min_value=300, max_value=850
)
validator.expect_column_values_to_be_between(
"income", min_value=0, max_value=10_000_000
)
# Uniqueness
validator.expect_column_values_to_be_unique("customer_id")
# Value sets (categorical features)
validator.expect_column_values_to_be_in_set(
"employment_status",
value_set=["employed", "self_employed", "unemployed", "retired"]
)
# String formats
validator.expect_column_values_to_match_regex(
"email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# Row count
validator.expect_table_row_count_to_be_between(
min_value=10_000,
max_value=1_000_000
)
# Column existence
validator.expect_column_to_exist("customer_id")
validator.expect_column_to_exist("income")
validator.expect_column_to_exist("credit_score")
# Save the suite
validator.save_expectation_suite()Running Validation
Direct Validation in Python
def validate_training_data(data_path: str) -> bool:
"""
Validate data quality before training.
Returns True if all expectations pass.
"""
context = gx.get_context()
df = pd.read_parquet(data_path)
datasource = context.sources.get("my_datasource")
data_asset = datasource.get_asset("customer_features")
batch_request = data_asset.build_batch_request(dataframe=df)
results = context.run_checkpoint(
checkpoint_name="training_data_checkpoint",
batch_request=batch_request
)
return results.successpytest Integration
import pytest
import great_expectations as gx
import pandas as pd
@pytest.fixture(scope="session")
def gx_context():
return gx.get_context()
def test_training_data_passes_expectations(gx_context):
"""Training data must pass all quality expectations before model training."""
df = pd.read_parquet('data/training_data.parquet')
datasource = gx_context.sources.get("my_datasource")
data_asset = datasource.get_asset("customer_features")
batch_request = data_asset.build_batch_request(dataframe=df)
validator = gx_context.get_validator(
batch_request=batch_request,
expectation_suite_name="customer_features.basic"
)
results = validator.validate()
# Collect failed expectations for clear error messages
failures = [
f"{r['expectation_config']['expectation_type']} on column '{r['expectation_config']['kwargs'].get('column', 'table')}'"
for r in results.results
if not r['success']
]
assert results.success, f"Data quality failures:\n" + "\n".join(f" - {f}" for f in failures)
def test_no_duplicate_customer_ids(gx_context):
"""Customer IDs must be unique — duplicates corrupt training labels."""
df = pd.read_parquet('data/training_data.parquet')
datasource = gx_context.sources.get("my_datasource")
data_asset = datasource.get_asset("customer_features")
batch_request = data_asset.build_batch_request(dataframe=df)
validator = gx_context.get_validator(batch_request=batch_request, expectation_suite_name="temp")
result = validator.expect_column_values_to_be_unique("customer_id")
assert result.success, \
f"Duplicate customer IDs found: {result.result.get('partial_unexpected_list', [])[:10]}"
def test_feature_distributions_match_expected_ranges(gx_context):
"""Critical feature statistics must be within historical ranges."""
df = pd.read_parquet('data/training_data.parquet')
datasource = gx_context.sources.get("my_datasource")
data_asset = datasource.get_asset("customer_features")
batch_request = data_asset.build_batch_request(dataframe=df)
validator = gx_context.get_validator(batch_request=batch_request, expectation_suite_name="temp")
# Mean income should be within a reasonable range (based on historical data)
result = validator.expect_column_mean_to_be_between(
"income", min_value=45_000, max_value=120_000
)
assert result.success, \
f"Income mean {result.result.get('observed_value')} out of expected range [45000, 120000]"Checkpoints for Automated Pipeline Validation
Checkpoints bundle a suite with a data connector for reuse in pipelines:
# great_expectations/checkpoints/training_data_checkpoint.yml
name: training_data_checkpoint
config_version: 1.0
class_name: SimpleCheckpoint
validations:
- batch_request:
datasource_name: my_datasource
data_asset_name: customer_features
expectation_suite_name: customer_features.basic
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
- name: slack_notification
action:
class_name: SlackNotificationAction
slack_webhook: "https://hooks.slack.com/services/your-webhook"
notify_on: failureRun from command line:
great_expectations checkpoint run training_data_checkpointIntegration with Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def validate_data(**context):
import great_expectations as gx
import pandas as pd
gx_context = gx.get_context()
# Load the day's data
run_date = context['ds']
df = pd.read_parquet(f's3://bucket/features/{run_date}.parquet')
results = gx_context.run_checkpoint(
checkpoint_name="daily_training_data",
batch_request={"dataframe": df}
)
if not results.success:
raise ValueError(f"Data validation failed for {run_date}. Check Data Docs.")
with DAG('ml_training_pipeline', start_date=datetime(2026, 1, 1), schedule_interval='@daily') as dag:
validate_task = PythonOperator(
task_id='validate_training_data',
python_callable=validate_data
)
train_task = BashOperator(
task_id='train_model',
bash_command='python training/train.py --date={{ ds }}'
)
evaluate_task = BashOperator(
task_id='evaluate_model',
bash_command='python evaluation/evaluate.py --date={{ ds }}'
)
# Validation gates training
validate_task >> train_task >> evaluate_taskTraining vs. Serving Data Comparison
Detect training-serving skew by validating serving data against training expectations:
def validate_serving_data(serving_df: pd.DataFrame) -> dict:
"""
Validate real-time inference data against training data expectations.
Call this before making predictions to catch data pipeline issues.
"""
context = gx.get_context()
datasource = context.sources.get("my_datasource")
data_asset = datasource.get_asset("customer_features")
batch_request = data_asset.build_batch_request(dataframe=serving_df)
# Use the same suite as training
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="customer_features.basic"
)
results = validator.validate()
failed_expectations = [
{
"expectation": r['expectation_config']['expectation_type'],
"column": r['expectation_config']['kwargs'].get('column'),
"details": r.get('result', {})
}
for r in results.results
if not r['success']
]
return {
"valid": results.success,
"failures": failed_expectations,
"statistics": results.statistics
}Data Docs
GX auto-generates HTML data quality reports. Host them for team visibility:
# Build and open Data Docs locally
great_expectations docs build
<span class="hljs-comment"># In CI, upload to S3 or GCS
aws s3 <span class="hljs-built_in">sync great_expectations/uncommitted/data_docs/local_site/ \
s3://your-bucket/data-docs/ --acl public-readCI Integration
name: ML Data Validation
on:
push:
paths: ['data/**', 'pipelines/**']
schedule:
- cron: '0 5 * * *'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install great_expectations pandas pyarrow pytest
- name: Download latest data
run: python scripts/fetch_data.py --date today
- name: Run Great Expectations checkpoint
run: great_expectations checkpoint run training_data_checkpoint
- name: Run pytest data quality tests
run: pytest tests/data_quality/ -v
- name: Upload Data Docs
if: always()
uses: actions/upload-artifact@v4
with:
name: data-docs
path: great_expectations/uncommitted/data_docs/Summary
Great Expectations brings software testing discipline to data:
- Expectation suites — codify data contracts as version-controlled assertions
- Checkpoints — automate validation in pipelines with notifications on failure
- Data Docs — visual reports make data quality visible to non-technical stakeholders
- Training/serving parity — validate inference data against training expectations to catch skew
The key insight: ML model bugs often originate in data, not code. Testing the data before it reaches training and inference is more effective than debugging model behavior after the fact.