Great Expectations Data Quality: Writing and Running Expectation Suites
Great Expectations structures data quality as code: Expectations describe what data should look like, Validators apply them to DataFrames or tables, and Checkpoints bundle validation into CI-runnable steps. Data Docs turn validation results into human-readable HTML reports your team can share.
Key Takeaways
Write expectations from profiles, refine manually. The profile command auto-generates an expectation suite from your data, but auto-generated suites are always too permissive. Treat profiling output as a first draft, then tighten column types, value ranges, and null rates by hand.
Run checkpoints in CI, not just locally. A validation that only runs on your laptop protects no one. Wire checkpoints to your CI pipeline so every data build fails when expectations are violated — the same way unit tests fail when code is broken.
Custom expectations belong in your repo, not in configuration. Generic expectations cover 80% of cases. The remaining 20% — referential integrity, cross-column consistency, business-rule constraints — require custom ColumnMapExpectation or BatchExpectation subclasses that live in version control alongside your pipeline code.
What Great Expectations Actually Does
Great Expectations (GX) sits at the intersection of unit testing and data documentation. It lets you declare what your data should look like — column types, value ranges, null rates, set membership, regex patterns — and then verify those declarations against real data in a repeatable, automated way.
The mental model: think of an ExpectationSuite as a test file, and a Checkpoint as a test runner. DataContext is the project configuration that ties everything together.
Installing and Initializing a DataContext
pip install great-expectations
great_expectations initThis creates a great_expectations/ directory with:
great_expectations/
├── great_expectations.yml # project config
├── expectations/ # expectation suites (JSON)
├── checkpoints/ # checkpoint configs (YAML)
├── uncommitted/
│ ├── config_variables.yml # secrets (gitignored)
│ └── data_docs/ # generated HTML reports
└── plugins/
└── custom_data_docs/You can also initialize programmatically, which is better for automated pipelines:
import great_expectations as gx
context = gx.get_context(
context_root_dir="./great_expectations"
)Or use an ephemeral in-memory context for testing:
context = gx.get_context(mode="ephemeral")Connecting Data Sources
Pandas DataFrame
import pandas as pd
import great_expectations as gx
context = gx.get_context(mode="ephemeral")
df = pd.read_parquet("s3://my-bucket/orders/2026-05-17.parquet")
datasource = context.sources.add_pandas("orders_source")
asset = datasource.add_dataframe_asset("daily_orders")
batch_request = asset.build_batch_request(dataframe=df)SQL / BigQuery
datasource = context.sources.add_sql(
name="warehouse",
connection_string="bigquery://my-project/my-dataset"
)
asset = datasource.add_table_asset(
name="orders",
table_name="orders"
)
batch_request = asset.build_batch_request()Spark DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3://my-bucket/events/")
datasource = context.sources.add_spark("events_source")
asset = datasource.add_dataframe_asset("events")
batch_request = asset.build_batch_request(dataframe=df)Writing an Expectation Suite
An ExpectationSuite is a named collection of expectations. Start by creating one:
suite = context.add_expectation_suite("orders.critical")Then attach a validator and add expectations:
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders.critical"
)
# Schema expectations
validator.expect_column_to_exist("order_id")
validator.expect_column_to_exist("customer_id")
validator.expect_column_to_exist("amount")
validator.expect_column_to_exist("status")
validator.expect_column_to_exist("created_at")
# Null constraints
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_null("refund_reason",
mostly=0.95) # at most 5% have a refund reason
# Type constraints
validator.expect_column_values_to_be_of_type("order_id", "str")
validator.expect_column_values_to_be_of_type("amount", "float")
# Value range
validator.expect_column_values_to_be_between(
"amount", min_value=0.01, max_value=100_000.00,
mostly=0.999 # 0.1% outliers allowed
)
# Set membership
validator.expect_column_values_to_be_in_set(
"status",
{"pending", "processing", "shipped", "delivered", "cancelled", "refunded"}
)
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Row count (volume check)
validator.expect_table_row_count_to_be_between(
min_value=1_000,
max_value=10_000_000
)
# Regex format
validator.expect_column_values_to_match_regex(
"order_id",
r"^ORD-[0-9]{10}$"
)
# Save expectations to disk
validator.save_expectation_suite(discard_failed_expectations=False)Writing Custom Expectations
The built-in library covers common cases. For business-rule validations, write custom ColumnMapExpectation subclasses.
Custom: Expect Referential Integrity
from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.execution_engine import (
PandasExecutionEngine,
SparkDFExecutionEngine,
)
from great_expectations.expectations.metrics import (
ColumnMapMetricProvider,
column_condition_partial,
)
class ColumnValuesInOtherColumnSet(ColumnMapMetricProvider):
"""Values in column A must exist in column B of the same DataFrame."""
condition_metric_name = "column_values.in_other_column_set"
condition_value_keys = ("other_column",)
@column_condition_partial(engine=PandasExecutionEngine)
def _pandas(cls, column, other_column, **kwargs):
return column.isin(other_column)
class ExpectColumnValuesToBeInOtherColumnSet(ColumnMapExpectation):
"""
Expect each value in `column` to appear in `other_column`.
Use this to verify FK-like relationships within a single DataFrame.
"""
map_metric = "column_values.in_other_column_set"
success_keys = ("other_column", "mostly")
default_kwarg_values = {"mostly": 1.0}
examples = [
{
"data": {
"product_id": [1, 2, 3, 4],
"catalog_id": [1, 2, 3, 99], # 99 is invalid
},
"tests": [
{
"title": "product_ids_in_catalog",
"include_in_gallery": True,
"in": {
"column": "product_id",
"other_column": "catalog_id",
"mostly": 0.75,
},
"out": {"success": True},
}
],
}
]Custom: Business-Rule BatchExpectation
from great_expectations.expectations.expectation import BatchExpectation
from great_expectations.execution_engine import PandasExecutionEngine
class ExpectRefundAmountLteOrderAmount(BatchExpectation):
"""
For rows where status='refunded', refund_amount must be <= amount.
"""
metric_dependencies = ("table.columns",)
success_keys = ()
def _validate(self, configuration, metrics, runtime_configuration=None,
execution_engine=None):
# Access the full DataFrame from the execution engine
df = execution_engine.batch_manager.active_batch.data.dataframe
refunded = df[df["status"] == "refunded"]
violations = refunded[refunded["refund_amount"] > refunded["amount"]]
return {
"success": len(violations) == 0,
"result": {
"violation_count": len(violations),
"total_refunded": len(refunded),
}
}Register custom expectations by placing them in great_expectations/plugins/expectations/ or importing them before running validation.
Running Checkpoint Validations in CI
A Checkpoint ties together: which data to validate, which expectation suite to use, and what to do with results (update Data Docs, send Slack alert, fail the pipeline).
Define a Checkpoint
checkpoint_config = {
"name": "orders_daily_checkpoint",
"validations": [
{
"batch_request": {
"datasource_name": "orders_source",
"data_asset_name": "daily_orders",
},
"expectation_suite_name": "orders.critical",
}
],
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer.slack_renderer",
"class_name": "SlackRenderer",
},
},
},
],
}
context.add_or_update_checkpoint(**checkpoint_config)Run in a Python Script
result = context.run_checkpoint(
checkpoint_name="orders_daily_checkpoint",
batch_request=batch_request, # override with fresh data
)
if not result["success"]:
failed = [
vr for vr in result["run_results"].values()
if not vr["validation_result"]["success"]
]
for vr in failed:
stats = vr["validation_result"]["statistics"]
print(f"FAILED: {stats['unsuccessful_expectations']} expectations violated")
raise SystemExit(1)GitHub Actions CI Integration
# .github/workflows/data-quality.yaml
name: Data Quality Gate
on:
schedule:
- cron: "0 6 * * *" # daily at 6am
push:
paths:
- "pipelines/**"
- "great_expectations/expectations/**"
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install great-expectations pandas pyarrow
- name: Download latest data batch
run: |
aws s3 cp s3://my-bucket/orders/$(date +%Y-%m-%d).parquet ./data/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Run Great Expectations checkpoint
run: python scripts/run_checkpoint.py
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GX_CLOUD_ORGANIZATION_ID: ${{ secrets.GX_CLOUD_ORG_ID }}# scripts/run_checkpoint.py
import pandas as pd
import great_expectations as gx
import sys
context = gx.get_context(context_root_dir="./great_expectations")
df = pd.read_parquet("./data/")
datasource = context.get_datasource("orders_source")
asset = datasource.get_asset("daily_orders")
batch_request = asset.build_batch_request(dataframe=df)
result = context.run_checkpoint(
checkpoint_name="orders_daily_checkpoint",
batch_request=batch_request,
)
sys.exit(0 if result["success"] else 1)Integrating with Spark DataFrames
For large datasets, Spark is preferred. The GX Spark backend applies expectations as distributed DataFrame operations:
from pyspark.sql import SparkSession
import great_expectations as gx
spark = SparkSession.builder \
.appName("gx-validation") \
.getOrCreate()
context = gx.get_context(context_root_dir="./great_expectations")
# Read a large dataset
df = spark.read.parquet("s3://datalake/events/2026-05-17/")
datasource = context.sources.add_or_update_spark("events_spark")
asset = datasource.add_dataframe_asset("raw_events")
batch_request = asset.build_batch_request(dataframe=df)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="events.schema"
)
# These expectations execute as Spark jobs
validator.expect_column_values_to_not_be_null("event_id")
validator.expect_column_values_to_be_in_set(
"event_type",
{"click", "view", "purchase", "signup"}
)
validator.expect_column_values_to_be_between(
"timestamp",
min_value=1700000000, # Unix epoch bounds
max_value=2000000000
)
results = validator.validate()
print(f"Success: {results.success}")
print(f"Statistics: {results.statistics}")Spark Performance Tips
Large DataFrames can make validation slow because some expectations require full scans. Optimize by:
# Sample for expensive expectations on huge tables
validator.expect_column_values_to_match_regex(
"email",
r"^[^@]+@[^@]+\.[^@]+$",
mostly=0.999 # 99.9% — allows sampling
)
# Cache before multiple validations
df.cache()Generating and Sharing Data Docs
Data Docs are GX's killer feature for non-technical stakeholders. After a checkpoint run, they're auto-generated:
context.build_data_docs()
context.open_data_docs() # opens browser locallyFor CI, publish them to S3:
- name: Upload Data Docs to S3
run: |
aws s3 sync great_expectations/uncommitted/data_docs/local_site/ \
s3://my-data-docs-bucket/latest/ \
--delete --acl public-readOr serve them from your internal tool stack:
# Programmatically get the path
docs_site = context.get_docs_sites_urls()[0]
print(f"Data Docs: {docs_site['site_url']}")Profiling New Data Sources
When onboarding a new dataset, let GX profile it first:
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
validator = context.get_validator(batch_request=batch_request,
expectation_suite_name="new_suite")
suite, validation_result = BasicDatasetProfiler().profile(validator)
context.save_expectation_suite(suite)The profiler creates expectations for every column it finds, based on observed statistics. Always review and tighten the output — the profiler will set min_value=0, max_value=999999 for a count column when your data has never exceeded 5000. That permissive range won't catch real anomalies.
Testing the Tests: Validating Your Expectation Suites
Expectation suites need to be tested too — not just applied to production data. Write unit tests that verify your expectations catch known-bad data:
# tests/test_expectation_suites.py
import pytest
import pandas as pd
import great_expectations as gx
@pytest.fixture
def context():
return gx.get_context(mode="ephemeral")
def test_null_order_id_fails(context):
bad_df = pd.DataFrame({
"order_id": [None, "ORD-0000000001"],
"customer_id": ["C001", "C002"],
"amount": [100.0, 200.0],
"status": ["pending", "shipped"],
"created_at": ["2026-05-17", "2026-05-17"],
})
datasource = context.sources.add_pandas("test")
asset = datasource.add_dataframe_asset("orders")
batch_request = asset.build_batch_request(dataframe=bad_df)
suite = context.add_expectation_suite("orders.critical_test")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders.critical_test"
)
validator.expect_column_values_to_not_be_null("order_id")
result = validator.validate()
assert not result.success, "Should fail when order_id is null"
def test_invalid_status_fails(context):
bad_df = pd.DataFrame({
"order_id": ["ORD-0000000001"],
"status": ["INVALID_STATUS"],
})
# ... similar structure
passBest Practices Summary
Version-control everything. Expectation suites, checkpoints, and custom expectations all live as files. Commit them. Review changes in pull requests like you review code changes.
Use mostly intentionally. mostly=1.0 means zero tolerance. mostly=0.99 means 1% violations are acceptable. Make this a conscious business decision, not a lazy default.
Separate suites by severity. A schema.critical suite that fails the pipeline, a quality.warning suite that sends a Slack alert, and a metrics.informational suite that just logs statistics give you graduated responses to data problems.
Run profiling on every new data source. Don't write expectations from memory. Profile first, review, tighten. The 20 minutes spent pruning a profiler-generated suite catches 80% of real bugs before they reach production.
HelpMeTest can run your data pipeline tests automatically — sign up free