Data Contracts Testing Guide: Contract-First Data Quality for Modern Pipelines
Data contracts are the data engineering equivalent of API contracts — a formal agreement between data producers and consumers about the structure, semantics, and quality of data. In data mesh architectures, where multiple teams own data products, contracts prevent the silent schema changes, missing fields, and format shifts that cause downstream pipeline failures. This guide covers how to implement and test data contracts effectively.
What Are Data Contracts?
A data contract defines:
- Schema — field names, types, nullability
- Semantics — what each field means, valid value ranges
- SLOs — freshness, completeness, and latency expectations
- Ownership — who produces and who consumes the data
- Change policy — what changes require consumer notification
Unlike database schemas (which are enforced by the database), data contracts are validated at the pipeline boundary — when data moves from producer to consumer.
Data Contract Specification Formats
Several specification formats exist:
OpenDataContract Standard (ODCS)
# data-contract.yaml
dataContractSpecification: 0.9.3
id: urn:datacontract:sales:orders-v1
info:
title: Orders Data Contract
version: 1.0.0
owner: sales-team
terms:
usage: "Internal use only — analytics and reporting"
limitations: "Not for ML training without approval"
models:
orders:
description: "Customer orders"
type: table
fields:
order_id:
type: uuid
required: true
unique: true
description: "Unique order identifier"
customer_id:
type: string
required: true
pattern: "^CUST-[0-9]{6}$"
amount:
type: decimal
required: true
minimum: 0
maximum: 1000000
status:
type: string
enum: [pending, confirmed, shipped, delivered, cancelled]
created_at:
type: timestamp
required: true
quality:
- rule: row_count_min
description: "Must have at least 100 rows per hour"
parameters:
min_count: 100
window: 1h
- rule: freshness
description: "Latest record must be within 15 minutes"
parameters:
max_age: 15m
- rule: completeness
description: "customer_id must be 100% populated"
parameters:
field: customer_id
threshold: 1.0DataMesh Manager Format
dataProduct: orders
outputPort: orders_v1
schema:
- name: order_id
type: STRING
isNullable: false
- name: amount
type: NUMERIC
isNullable: false
description: "Order total in USD cents"Testing Data Contracts with the datacontract-cli
The datacontract-cli tool validates data against contracts:
# Install
pip install datacontract-cli
<span class="hljs-comment"># Test a contract against a Snowflake table
datacontract <span class="hljs-built_in">test data-contract.yaml \
--server snowflake \
--server-url <span class="hljs-string">"snowflake://account.snowflakecomputing.com/warehouse/DB/SCHEMA"
<span class="hljs-comment"># Test against a local Parquet file
datacontract <span class="hljs-built_in">test data-contract.yaml \
--server <span class="hljs-built_in">local \
--server-url <span class="hljs-string">"file:///data/orders.parquet"
<span class="hljs-comment"># Lint the contract itself for formatting issues
datacontract lint data-contract.yamlIntegrating Contract Tests in CI/CD
# .github/workflows/data-contracts.yml
name: Data Contract Tests
on:
push:
paths:
- 'contracts/**'
- 'pipelines/**'
jobs:
test-contracts:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install datacontract-cli
run: pip install datacontract-cli
- name: Test orders contract
run: |
datacontract test contracts/orders-v1.yaml \
--server bigquery \
--server-project-id ${{ secrets.GCP_PROJECT }}
env:
GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GCP_SA_KEY }}
- name: Test events contract
run: |
datacontract test contracts/events-v1.yaml \
--server kafka \
--server-host ${{ secrets.KAFKA_HOST }}Schema Validation Testing
Test schema enforcement at the pipeline boundary.
Python: Pandera Schema Validation
import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check
# Define the contract schema
orders_schema = DataFrameSchema({
"order_id": Column(str, Check.str_matches(r'^[0-9a-f-]{36}$'), nullable=False),
"customer_id": Column(str, Check.str_matches(r'^CUST-\d{6}$'), nullable=False),
"amount": Column(float, [
Check.greater_than_or_equal_to(0),
Check.less_than_or_equal_to(1_000_000),
], nullable=False),
"status": Column(str, Check.isin(['pending', 'confirmed', 'shipped', 'delivered', 'cancelled'])),
"created_at": Column(pd.Timestamp, nullable=False),
})
def test_orders_data_matches_contract(sample_orders_df):
"""Validate the orders pipeline output against the contract"""
validated = orders_schema.validate(sample_orders_df)
assert len(validated) > 0
def test_contract_catches_invalid_customer_id():
"""Verify the contract rejects malformed customer IDs"""
bad_df = pd.DataFrame({
"order_id": ["abc-123"],
"customer_id": ["12345"], # missing CUST- prefix
"amount": [100.0],
"status": ["pending"],
"created_at": [pd.Timestamp.now()],
})
with pytest.raises(pa.errors.SchemaError):
orders_schema.validate(bad_df)
def test_contract_catches_negative_amount():
"""Verify the contract rejects negative amounts"""
bad_df = pd.DataFrame({
"order_id": ["abc-123"],
"customer_id": ["CUST-000001"],
"amount": [-50.0], # negative — violates contract
"status": ["pending"],
"created_at": [pd.Timestamp.now()],
})
with pytest.raises(pa.errors.SchemaError, match="amount"):
orders_schema.validate(bad_df)Testing Breaking vs. Non-Breaking Changes
Data contracts must distinguish between breaking changes (remove a field, change a type) and non-breaking changes (add an optional field, widen a constraint).
# schemas/orders_v1.py
ORDERS_V1_SCHEMA = {
"fields": [
{"name": "order_id", "type": "string", "required": True},
{"name": "amount", "type": "decimal", "required": True},
{"name": "status", "type": "string", "required": True},
]
}
# schemas/orders_v2.py
ORDERS_V2_SCHEMA = {
"fields": [
{"name": "order_id", "type": "string", "required": True},
{"name": "amount", "type": "decimal", "required": True},
{"name": "status", "type": "string", "required": True},
{"name": "discount_code", "type": "string", "required": False}, # new optional field
]
}
class TestSchemaCompatibility:
def test_adding_optional_field_is_non_breaking(self):
"""Adding optional fields should not break existing consumers"""
is_compatible = check_schema_compatibility(ORDERS_V1_SCHEMA, ORDERS_V2_SCHEMA)
assert is_compatible.backward_compatible is True
assert is_compatible.forward_compatible is True
def test_removing_field_is_breaking(self):
"""Removing existing fields breaks consumers that read them"""
schema_missing_amount = {
"fields": [
{"name": "order_id", "type": "string", "required": True},
{"name": "status", "type": "string", "required": True},
# amount removed!
]
}
is_compatible = check_schema_compatibility(ORDERS_V1_SCHEMA, schema_missing_amount)
assert is_compatible.backward_compatible is False
assert "amount" in is_compatible.breaking_changes
def test_changing_type_from_string_to_int_is_breaking(self):
"""Type narrowing breaks consumers that expect the original type"""
schema_with_int_amount = {
"fields": [
{"name": "order_id", "type": "string", "required": True},
{"name": "amount", "type": "integer", "required": True}, # was decimal
{"name": "status", "type": "string", "required": True},
]
}
is_compatible = check_schema_compatibility(ORDERS_V1_SCHEMA, schema_with_int_amount)
assert is_compatible.backward_compatible is FalseSemantic Rule Testing
Schema validation catches structural violations. Semantic rules catch business logic violations.
def test_amount_always_positive_for_completed_orders(orders_df):
"""Orders in 'delivered' status must have amount > 0"""
delivered = orders_df[orders_df['status'] == 'delivered']
invalid = delivered[delivered['amount'] <= 0]
assert len(invalid) == 0, (
f"Found {len(invalid)} delivered orders with non-positive amounts: "
f"{invalid['order_id'].tolist()}"
)
def test_no_future_created_at(orders_df):
"""created_at must not be in the future"""
now = pd.Timestamp.now(tz='UTC')
future_records = orders_df[orders_df['created_at'] > now]
assert len(future_records) == 0, (
f"Found {len(future_records)} records with future timestamps"
)
def test_referential_integrity_customers(orders_df, customers_df):
"""All customer_ids in orders must exist in the customers table"""
valid_customers = set(customers_df['customer_id'])
orphaned = orders_df[~orders_df['customer_id'].isin(valid_customers)]
assert len(orphaned) == 0, (
f"Found {len(orphaned)} orders with non-existent customer_ids"
)Freshness and Completeness Testing
from datetime import datetime, timezone, timedelta
def test_data_freshness(orders_df):
"""Latest order record must be within 15 minutes"""
max_age = timedelta(minutes=15)
latest = orders_df['created_at'].max()
age = datetime.now(tz=timezone.utc) - latest
assert age <= max_age, (
f"Data is stale: latest record is {age} old, threshold is {max_age}"
)
def test_completeness_customer_id(orders_df):
"""customer_id must be 100% populated"""
total = len(orders_df)
populated = orders_df['customer_id'].notna().sum()
completeness = populated / total
assert completeness == 1.0, (
f"customer_id completeness is {completeness:.1%}, expected 100%"
)
def test_row_count_minimum(orders_df):
"""Must have at least 100 orders per day"""
daily_counts = orders_df.groupby(
orders_df['created_at'].dt.date
).size()
below_threshold = daily_counts[daily_counts < 100]
assert len(below_threshold) == 0, (
f"Days with fewer than 100 orders: {below_threshold.to_dict()}"
)Consumer-Driven Contract Tests
In data mesh, consumers define what they need. Implement consumer-driven contracts:
# consumer: analytics-team
# contract: what analytics needs from the orders data product
ANALYTICS_REQUIRED_FIELDS = {
'order_id': 'string',
'customer_id': 'string',
'amount': 'decimal',
'created_at': 'timestamp',
}
def test_orders_satisfies_analytics_contract(orders_df):
"""Verify the orders data product satisfies analytics team's contract"""
for field, expected_type in ANALYTICS_REQUIRED_FIELDS.items():
assert field in orders_df.columns, (
f"Missing required field: {field}"
)
# Type compatibility check
assert is_type_compatible(orders_df[field].dtype, expected_type), (
f"Field {field} has incompatible type"
)Production Monitoring with HelpMeTest
Data contracts should be monitored continuously in production. HelpMeTest's health checks can run your contract validation on a schedule:
Health check: orders-contract-validation
Interval: 15 minutes
Command: datacontract test contracts/orders-v1.yaml --server snowflake
Alert: Slack + email on contract violationA contract violation at 2am means your morning dashboards will be broken. Catch it at 2am, not 8am.
Summary
Data contract testing requires: contract specification in ODCS or similar format, schema validation tests covering field types, required fields, and value constraints, semantic rule tests for business logic validity, breaking change detection tests that distinguish additive from destructive changes, completeness and freshness tests for SLO validation, consumer-driven contract tests for data mesh architectures, and continuous production monitoring. The most valuable test is the breaking change detector — run it in CI before every pipeline deploy.