Data Contracts Testing Guide: Contract-First Data Quality for Modern Pipelines

Data Contracts Testing Guide: Contract-First Data Quality for Modern Pipelines

Data contracts are the data engineering equivalent of API contracts — a formal agreement between data producers and consumers about the structure, semantics, and quality of data. In data mesh architectures, where multiple teams own data products, contracts prevent the silent schema changes, missing fields, and format shifts that cause downstream pipeline failures. This guide covers how to implement and test data contracts effectively.

What Are Data Contracts?

A data contract defines:

  • Schema — field names, types, nullability
  • Semantics — what each field means, valid value ranges
  • SLOs — freshness, completeness, and latency expectations
  • Ownership — who produces and who consumes the data
  • Change policy — what changes require consumer notification

Unlike database schemas (which are enforced by the database), data contracts are validated at the pipeline boundary — when data moves from producer to consumer.

Data Contract Specification Formats

Several specification formats exist:

OpenDataContract Standard (ODCS)

# data-contract.yaml
dataContractSpecification: 0.9.3
id: urn:datacontract:sales:orders-v1
info:
  title: Orders Data Contract
  version: 1.0.0
  owner: sales-team

terms:
  usage: "Internal use only — analytics and reporting"
  limitations: "Not for ML training without approval"

models:
  orders:
    description: "Customer orders"
    type: table
    fields:
      order_id:
        type: uuid
        required: true
        unique: true
        description: "Unique order identifier"
      customer_id:
        type: string
        required: true
        pattern: "^CUST-[0-9]{6}$"
      amount:
        type: decimal
        required: true
        minimum: 0
        maximum: 1000000
      status:
        type: string
        enum: [pending, confirmed, shipped, delivered, cancelled]
      created_at:
        type: timestamp
        required: true

quality:
  - rule: row_count_min
    description: "Must have at least 100 rows per hour"
    parameters:
      min_count: 100
      window: 1h
  - rule: freshness
    description: "Latest record must be within 15 minutes"
    parameters:
      max_age: 15m
  - rule: completeness
    description: "customer_id must be 100% populated"
    parameters:
      field: customer_id
      threshold: 1.0

DataMesh Manager Format

dataProduct: orders
outputPort: orders_v1
schema:
  - name: order_id
    type: STRING
    isNullable: false
  - name: amount
    type: NUMERIC
    isNullable: false
    description: "Order total in USD cents"

Testing Data Contracts with the datacontract-cli

The datacontract-cli tool validates data against contracts:

# Install
pip install datacontract-cli

<span class="hljs-comment"># Test a contract against a Snowflake table
datacontract <span class="hljs-built_in">test data-contract.yaml \
  --server snowflake \
  --server-url <span class="hljs-string">"snowflake://account.snowflakecomputing.com/warehouse/DB/SCHEMA"

<span class="hljs-comment"># Test against a local Parquet file
datacontract <span class="hljs-built_in">test data-contract.yaml \
  --server <span class="hljs-built_in">local \
  --server-url <span class="hljs-string">"file:///data/orders.parquet"

<span class="hljs-comment"># Lint the contract itself for formatting issues
datacontract lint data-contract.yaml

Integrating Contract Tests in CI/CD

# .github/workflows/data-contracts.yml
name: Data Contract Tests

on:
  push:
    paths:
      - 'contracts/**'
      - 'pipelines/**'

jobs:
  test-contracts:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Install datacontract-cli
        run: pip install datacontract-cli

      - name: Test orders contract
        run: |
          datacontract test contracts/orders-v1.yaml \
            --server bigquery \
            --server-project-id ${{ secrets.GCP_PROJECT }}
        env:
          GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GCP_SA_KEY }}

      - name: Test events contract  
        run: |
          datacontract test contracts/events-v1.yaml \
            --server kafka \
            --server-host ${{ secrets.KAFKA_HOST }}

Schema Validation Testing

Test schema enforcement at the pipeline boundary.

Python: Pandera Schema Validation

import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check

# Define the contract schema
orders_schema = DataFrameSchema({
    "order_id": Column(str, Check.str_matches(r'^[0-9a-f-]{36}$'), nullable=False),
    "customer_id": Column(str, Check.str_matches(r'^CUST-\d{6}$'), nullable=False),
    "amount": Column(float, [
        Check.greater_than_or_equal_to(0),
        Check.less_than_or_equal_to(1_000_000),
    ], nullable=False),
    "status": Column(str, Check.isin(['pending', 'confirmed', 'shipped', 'delivered', 'cancelled'])),
    "created_at": Column(pd.Timestamp, nullable=False),
})

def test_orders_data_matches_contract(sample_orders_df):
    """Validate the orders pipeline output against the contract"""
    validated = orders_schema.validate(sample_orders_df)
    assert len(validated) > 0

def test_contract_catches_invalid_customer_id():
    """Verify the contract rejects malformed customer IDs"""
    bad_df = pd.DataFrame({
        "order_id": ["abc-123"],
        "customer_id": ["12345"],  # missing CUST- prefix
        "amount": [100.0],
        "status": ["pending"],
        "created_at": [pd.Timestamp.now()],
    })
    
    with pytest.raises(pa.errors.SchemaError):
        orders_schema.validate(bad_df)

def test_contract_catches_negative_amount():
    """Verify the contract rejects negative amounts"""
    bad_df = pd.DataFrame({
        "order_id": ["abc-123"],
        "customer_id": ["CUST-000001"],
        "amount": [-50.0],  # negative — violates contract
        "status": ["pending"],
        "created_at": [pd.Timestamp.now()],
    })
    
    with pytest.raises(pa.errors.SchemaError, match="amount"):
        orders_schema.validate(bad_df)

Testing Breaking vs. Non-Breaking Changes

Data contracts must distinguish between breaking changes (remove a field, change a type) and non-breaking changes (add an optional field, widen a constraint).

# schemas/orders_v1.py
ORDERS_V1_SCHEMA = {
    "fields": [
        {"name": "order_id", "type": "string", "required": True},
        {"name": "amount", "type": "decimal", "required": True},
        {"name": "status", "type": "string", "required": True},
    ]
}

# schemas/orders_v2.py  
ORDERS_V2_SCHEMA = {
    "fields": [
        {"name": "order_id", "type": "string", "required": True},
        {"name": "amount", "type": "decimal", "required": True},
        {"name": "status", "type": "string", "required": True},
        {"name": "discount_code", "type": "string", "required": False},  # new optional field
    ]
}

class TestSchemaCompatibility:
    def test_adding_optional_field_is_non_breaking(self):
        """Adding optional fields should not break existing consumers"""
        is_compatible = check_schema_compatibility(ORDERS_V1_SCHEMA, ORDERS_V2_SCHEMA)
        assert is_compatible.backward_compatible is True
        assert is_compatible.forward_compatible is True

    def test_removing_field_is_breaking(self):
        """Removing existing fields breaks consumers that read them"""
        schema_missing_amount = {
            "fields": [
                {"name": "order_id", "type": "string", "required": True},
                {"name": "status", "type": "string", "required": True},
                # amount removed!
            ]
        }
        
        is_compatible = check_schema_compatibility(ORDERS_V1_SCHEMA, schema_missing_amount)
        assert is_compatible.backward_compatible is False
        assert "amount" in is_compatible.breaking_changes

    def test_changing_type_from_string_to_int_is_breaking(self):
        """Type narrowing breaks consumers that expect the original type"""
        schema_with_int_amount = {
            "fields": [
                {"name": "order_id", "type": "string", "required": True},
                {"name": "amount", "type": "integer", "required": True},  # was decimal
                {"name": "status", "type": "string", "required": True},
            ]
        }
        
        is_compatible = check_schema_compatibility(ORDERS_V1_SCHEMA, schema_with_int_amount)
        assert is_compatible.backward_compatible is False

Semantic Rule Testing

Schema validation catches structural violations. Semantic rules catch business logic violations.

def test_amount_always_positive_for_completed_orders(orders_df):
    """Orders in 'delivered' status must have amount > 0"""
    delivered = orders_df[orders_df['status'] == 'delivered']
    invalid = delivered[delivered['amount'] <= 0]
    
    assert len(invalid) == 0, (
        f"Found {len(invalid)} delivered orders with non-positive amounts: "
        f"{invalid['order_id'].tolist()}"
    )

def test_no_future_created_at(orders_df):
    """created_at must not be in the future"""
    now = pd.Timestamp.now(tz='UTC')
    future_records = orders_df[orders_df['created_at'] > now]
    
    assert len(future_records) == 0, (
        f"Found {len(future_records)} records with future timestamps"
    )

def test_referential_integrity_customers(orders_df, customers_df):
    """All customer_ids in orders must exist in the customers table"""
    valid_customers = set(customers_df['customer_id'])
    orphaned = orders_df[~orders_df['customer_id'].isin(valid_customers)]
    
    assert len(orphaned) == 0, (
        f"Found {len(orphaned)} orders with non-existent customer_ids"
    )

Freshness and Completeness Testing

from datetime import datetime, timezone, timedelta

def test_data_freshness(orders_df):
    """Latest order record must be within 15 minutes"""
    max_age = timedelta(minutes=15)
    latest = orders_df['created_at'].max()
    age = datetime.now(tz=timezone.utc) - latest
    
    assert age <= max_age, (
        f"Data is stale: latest record is {age} old, threshold is {max_age}"
    )

def test_completeness_customer_id(orders_df):
    """customer_id must be 100% populated"""
    total = len(orders_df)
    populated = orders_df['customer_id'].notna().sum()
    completeness = populated / total
    
    assert completeness == 1.0, (
        f"customer_id completeness is {completeness:.1%}, expected 100%"
    )

def test_row_count_minimum(orders_df):
    """Must have at least 100 orders per day"""
    daily_counts = orders_df.groupby(
        orders_df['created_at'].dt.date
    ).size()
    
    below_threshold = daily_counts[daily_counts < 100]
    assert len(below_threshold) == 0, (
        f"Days with fewer than 100 orders: {below_threshold.to_dict()}"
    )

Consumer-Driven Contract Tests

In data mesh, consumers define what they need. Implement consumer-driven contracts:

# consumer: analytics-team
# contract: what analytics needs from the orders data product

ANALYTICS_REQUIRED_FIELDS = {
    'order_id': 'string',
    'customer_id': 'string',
    'amount': 'decimal',
    'created_at': 'timestamp',
}

def test_orders_satisfies_analytics_contract(orders_df):
    """Verify the orders data product satisfies analytics team's contract"""
    for field, expected_type in ANALYTICS_REQUIRED_FIELDS.items():
        assert field in orders_df.columns, (
            f"Missing required field: {field}"
        )
        # Type compatibility check
        assert is_type_compatible(orders_df[field].dtype, expected_type), (
            f"Field {field} has incompatible type"
        )

Production Monitoring with HelpMeTest

Data contracts should be monitored continuously in production. HelpMeTest's health checks can run your contract validation on a schedule:

Health check: orders-contract-validation
Interval: 15 minutes
Command: datacontract test contracts/orders-v1.yaml --server snowflake
Alert: Slack + email on contract violation

A contract violation at 2am means your morning dashboards will be broken. Catch it at 2am, not 8am.

Summary

Data contract testing requires: contract specification in ODCS or similar format, schema validation tests covering field types, required fields, and value constraints, semantic rule tests for business logic validity, breaking change detection tests that distinguish additive from destructive changes, completeness and freshness tests for SLO validation, consumer-driven contract tests for data mesh architectures, and continuous production monitoring. The most valuable test is the breaking change detector — run it in CI before every pipeline deploy.

Read more