Great Expectations Data Quality: Writing and Running Expectation Suites

Great Expectations Data Quality: Writing and Running Expectation Suites

Great Expectations structures data quality as code: Expectations describe what data should look like, Validators apply them to DataFrames or tables, and Checkpoints bundle validation into CI-runnable steps. Data Docs turn validation results into human-readable HTML reports your team can share.

Key Takeaways

Write expectations from profiles, refine manually. The profile command auto-generates an expectation suite from your data, but auto-generated suites are always too permissive. Treat profiling output as a first draft, then tighten column types, value ranges, and null rates by hand.

Run checkpoints in CI, not just locally. A validation that only runs on your laptop protects no one. Wire checkpoints to your CI pipeline so every data build fails when expectations are violated — the same way unit tests fail when code is broken.

Custom expectations belong in your repo, not in configuration. Generic expectations cover 80% of cases. The remaining 20% — referential integrity, cross-column consistency, business-rule constraints — require custom ColumnMapExpectation or BatchExpectation subclasses that live in version control alongside your pipeline code.

What Great Expectations Actually Does

Great Expectations (GX) sits at the intersection of unit testing and data documentation. It lets you declare what your data should look like — column types, value ranges, null rates, set membership, regex patterns — and then verify those declarations against real data in a repeatable, automated way.

The mental model: think of an ExpectationSuite as a test file, and a Checkpoint as a test runner. DataContext is the project configuration that ties everything together.

Installing and Initializing a DataContext

pip install great-expectations
great_expectations init

This creates a great_expectations/ directory with:

great_expectations/
├── great_expectations.yml          # project config
├── expectations/                   # expectation suites (JSON)
├── checkpoints/                    # checkpoint configs (YAML)
├── uncommitted/
│   ├── config_variables.yml        # secrets (gitignored)
│   └── data_docs/                  # generated HTML reports
└── plugins/
    └── custom_data_docs/

You can also initialize programmatically, which is better for automated pipelines:

import great_expectations as gx

context = gx.get_context(
    context_root_dir="./great_expectations"
)

Or use an ephemeral in-memory context for testing:

context = gx.get_context(mode="ephemeral")

Connecting Data Sources

Pandas DataFrame

import pandas as pd
import great_expectations as gx

context = gx.get_context(mode="ephemeral")

df = pd.read_parquet("s3://my-bucket/orders/2026-05-17.parquet")

datasource = context.sources.add_pandas("orders_source")
asset = datasource.add_dataframe_asset("daily_orders")

batch_request = asset.build_batch_request(dataframe=df)

SQL / BigQuery

datasource = context.sources.add_sql(
    name="warehouse",
    connection_string="bigquery://my-project/my-dataset"
)
asset = datasource.add_table_asset(
    name="orders",
    table_name="orders"
)
batch_request = asset.build_batch_request()

Spark DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3://my-bucket/events/")

datasource = context.sources.add_spark("events_source")
asset = datasource.add_dataframe_asset("events")
batch_request = asset.build_batch_request(dataframe=df)

Writing an Expectation Suite

An ExpectationSuite is a named collection of expectations. Start by creating one:

suite = context.add_expectation_suite("orders.critical")

Then attach a validator and add expectations:

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders.critical"
)

# Schema expectations
validator.expect_column_to_exist("order_id")
validator.expect_column_to_exist("customer_id")
validator.expect_column_to_exist("amount")
validator.expect_column_to_exist("status")
validator.expect_column_to_exist("created_at")

# Null constraints
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_null("refund_reason",
    mostly=0.95)  # at most 5% have a refund reason

# Type constraints
validator.expect_column_values_to_be_of_type("order_id", "str")
validator.expect_column_values_to_be_of_type("amount", "float")

# Value range
validator.expect_column_values_to_be_between(
    "amount", min_value=0.01, max_value=100_000.00,
    mostly=0.999  # 0.1% outliers allowed
)

# Set membership
validator.expect_column_values_to_be_in_set(
    "status",
    {"pending", "processing", "shipped", "delivered", "cancelled", "refunded"}
)

# Uniqueness
validator.expect_column_values_to_be_unique("order_id")

# Row count (volume check)
validator.expect_table_row_count_to_be_between(
    min_value=1_000,
    max_value=10_000_000
)

# Regex format
validator.expect_column_values_to_match_regex(
    "order_id",
    r"^ORD-[0-9]{10}$"
)

# Save expectations to disk
validator.save_expectation_suite(discard_failed_expectations=False)

Writing Custom Expectations

The built-in library covers common cases. For business-rule validations, write custom ColumnMapExpectation subclasses.

Custom: Expect Referential Integrity

from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.execution_engine import (
    PandasExecutionEngine,
    SparkDFExecutionEngine,
)
from great_expectations.expectations.metrics import (
    ColumnMapMetricProvider,
    column_condition_partial,
)

class ColumnValuesInOtherColumnSet(ColumnMapMetricProvider):
    """Values in column A must exist in column B of the same DataFrame."""
    condition_metric_name = "column_values.in_other_column_set"
    condition_value_keys = ("other_column",)

    @column_condition_partial(engine=PandasExecutionEngine)
    def _pandas(cls, column, other_column, **kwargs):
        return column.isin(other_column)


class ExpectColumnValuesToBeInOtherColumnSet(ColumnMapExpectation):
    """
    Expect each value in `column` to appear in `other_column`.
    Use this to verify FK-like relationships within a single DataFrame.
    """
    map_metric = "column_values.in_other_column_set"
    success_keys = ("other_column", "mostly")

    default_kwarg_values = {"mostly": 1.0}

    examples = [
        {
            "data": {
                "product_id": [1, 2, 3, 4],
                "catalog_id": [1, 2, 3, 99],  # 99 is invalid
            },
            "tests": [
                {
                    "title": "product_ids_in_catalog",
                    "include_in_gallery": True,
                    "in": {
                        "column": "product_id",
                        "other_column": "catalog_id",
                        "mostly": 0.75,
                    },
                    "out": {"success": True},
                }
            ],
        }
    ]

Custom: Business-Rule BatchExpectation

from great_expectations.expectations.expectation import BatchExpectation
from great_expectations.execution_engine import PandasExecutionEngine

class ExpectRefundAmountLteOrderAmount(BatchExpectation):
    """
    For rows where status='refunded', refund_amount must be <= amount.
    """
    metric_dependencies = ("table.columns",)
    success_keys = ()

    def _validate(self, configuration, metrics, runtime_configuration=None,
                  execution_engine=None):
        # Access the full DataFrame from the execution engine
        df = execution_engine.batch_manager.active_batch.data.dataframe
        refunded = df[df["status"] == "refunded"]
        violations = refunded[refunded["refund_amount"] > refunded["amount"]]

        return {
            "success": len(violations) == 0,
            "result": {
                "violation_count": len(violations),
                "total_refunded": len(refunded),
            }
        }

Register custom expectations by placing them in great_expectations/plugins/expectations/ or importing them before running validation.

Running Checkpoint Validations in CI

A Checkpoint ties together: which data to validate, which expectation suite to use, and what to do with results (update Data Docs, send Slack alert, fail the pipeline).

Define a Checkpoint

checkpoint_config = {
    "name": "orders_daily_checkpoint",
    "validations": [
        {
            "batch_request": {
                "datasource_name": "orders_source",
                "data_asset_name": "daily_orders",
            },
            "expectation_suite_name": "orders.critical",
        }
    ],
    "action_list": [
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
        {
            "name": "send_slack_notification_on_failure",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "${SLACK_WEBHOOK_URL}",
                "notify_on": "failure",
                "renderer": {
                    "module_name": "great_expectations.render.renderer.slack_renderer",
                    "class_name": "SlackRenderer",
                },
            },
        },
    ],
}

context.add_or_update_checkpoint(**checkpoint_config)

Run in a Python Script

result = context.run_checkpoint(
    checkpoint_name="orders_daily_checkpoint",
    batch_request=batch_request,  # override with fresh data
)

if not result["success"]:
    failed = [
        vr for vr in result["run_results"].values()
        if not vr["validation_result"]["success"]
    ]
    for vr in failed:
        stats = vr["validation_result"]["statistics"]
        print(f"FAILED: {stats['unsuccessful_expectations']} expectations violated")
    raise SystemExit(1)

GitHub Actions CI Integration

# .github/workflows/data-quality.yaml
name: Data Quality Gate

on:
  schedule:
    - cron: "0 6 * * *"   # daily at 6am
  push:
    paths:
      - "pipelines/**"
      - "great_expectations/expectations/**"

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: pip install great-expectations pandas pyarrow

      - name: Download latest data batch
        run: |
          aws s3 cp s3://my-bucket/orders/$(date +%Y-%m-%d).parquet ./data/
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

      - name: Run Great Expectations checkpoint
        run: python scripts/run_checkpoint.py
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
          GX_CLOUD_ORGANIZATION_ID: ${{ secrets.GX_CLOUD_ORG_ID }}
# scripts/run_checkpoint.py
import pandas as pd
import great_expectations as gx
import sys

context = gx.get_context(context_root_dir="./great_expectations")

df = pd.read_parquet("./data/")
datasource = context.get_datasource("orders_source")
asset = datasource.get_asset("daily_orders")
batch_request = asset.build_batch_request(dataframe=df)

result = context.run_checkpoint(
    checkpoint_name="orders_daily_checkpoint",
    batch_request=batch_request,
)

sys.exit(0 if result["success"] else 1)

Integrating with Spark DataFrames

For large datasets, Spark is preferred. The GX Spark backend applies expectations as distributed DataFrame operations:

from pyspark.sql import SparkSession
import great_expectations as gx

spark = SparkSession.builder \
    .appName("gx-validation") \
    .getOrCreate()

context = gx.get_context(context_root_dir="./great_expectations")

# Read a large dataset
df = spark.read.parquet("s3://datalake/events/2026-05-17/")

datasource = context.sources.add_or_update_spark("events_spark")
asset = datasource.add_dataframe_asset("raw_events")
batch_request = asset.build_batch_request(dataframe=df)

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="events.schema"
)

# These expectations execute as Spark jobs
validator.expect_column_values_to_not_be_null("event_id")
validator.expect_column_values_to_be_in_set(
    "event_type",
    {"click", "view", "purchase", "signup"}
)
validator.expect_column_values_to_be_between(
    "timestamp",
    min_value=1700000000,  # Unix epoch bounds
    max_value=2000000000
)

results = validator.validate()
print(f"Success: {results.success}")
print(f"Statistics: {results.statistics}")

Spark Performance Tips

Large DataFrames can make validation slow because some expectations require full scans. Optimize by:

# Sample for expensive expectations on huge tables
validator.expect_column_values_to_match_regex(
    "email",
    r"^[^@]+@[^@]+\.[^@]+$",
    mostly=0.999  # 99.9% — allows sampling
)

# Cache before multiple validations
df.cache()

Generating and Sharing Data Docs

Data Docs are GX's killer feature for non-technical stakeholders. After a checkpoint run, they're auto-generated:

context.build_data_docs()
context.open_data_docs()  # opens browser locally

For CI, publish them to S3:

- name: Upload Data Docs to S3
  run: |
    aws s3 sync great_expectations/uncommitted/data_docs/local_site/ \
      s3://my-data-docs-bucket/latest/ \
      --delete --acl public-read

Or serve them from your internal tool stack:

# Programmatically get the path
docs_site = context.get_docs_sites_urls()[0]
print(f"Data Docs: {docs_site['site_url']}")

Profiling New Data Sources

When onboarding a new dataset, let GX profile it first:

from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler

validator = context.get_validator(batch_request=batch_request,
                                  expectation_suite_name="new_suite")

suite, validation_result = BasicDatasetProfiler().profile(validator)
context.save_expectation_suite(suite)

The profiler creates expectations for every column it finds, based on observed statistics. Always review and tighten the output — the profiler will set min_value=0, max_value=999999 for a count column when your data has never exceeded 5000. That permissive range won't catch real anomalies.

Testing the Tests: Validating Your Expectation Suites

Expectation suites need to be tested too — not just applied to production data. Write unit tests that verify your expectations catch known-bad data:

# tests/test_expectation_suites.py
import pytest
import pandas as pd
import great_expectations as gx

@pytest.fixture
def context():
    return gx.get_context(mode="ephemeral")

def test_null_order_id_fails(context):
    bad_df = pd.DataFrame({
        "order_id": [None, "ORD-0000000001"],
        "customer_id": ["C001", "C002"],
        "amount": [100.0, 200.0],
        "status": ["pending", "shipped"],
        "created_at": ["2026-05-17", "2026-05-17"],
    })
    datasource = context.sources.add_pandas("test")
    asset = datasource.add_dataframe_asset("orders")
    batch_request = asset.build_batch_request(dataframe=bad_df)

    suite = context.add_expectation_suite("orders.critical_test")
    validator = context.get_validator(
        batch_request=batch_request,
        expectation_suite_name="orders.critical_test"
    )
    validator.expect_column_values_to_not_be_null("order_id")

    result = validator.validate()
    assert not result.success, "Should fail when order_id is null"

def test_invalid_status_fails(context):
    bad_df = pd.DataFrame({
        "order_id": ["ORD-0000000001"],
        "status": ["INVALID_STATUS"],
    })
    # ... similar structure
    pass

Best Practices Summary

Version-control everything. Expectation suites, checkpoints, and custom expectations all live as files. Commit them. Review changes in pull requests like you review code changes.

Use mostly intentionally. mostly=1.0 means zero tolerance. mostly=0.99 means 1% violations are acceptable. Make this a conscious business decision, not a lazy default.

Separate suites by severity. A schema.critical suite that fails the pipeline, a quality.warning suite that sends a Slack alert, and a metrics.informational suite that just logs statistics give you graduated responses to data problems.

Run profiling on every new data source. Don't write expectations from memory. Profile first, review, tighten. The 20 minutes spent pruning a profiler-generated suite catches 80% of real bugs before they reach production.


HelpMeTest can run your data pipeline tests automatically — sign up free

Read more