Data Pipeline Quality Assurance: Testing Frameworks, Patterns, and Best Practices
Data pipeline QA requires a layered approach: unit tests for transformation logic, contract tests for schema agreements between systems, integration tests for end-to-end correctness, and SLA-based operational tests for latency and freshness. Idempotency and chaos testing are the two most commonly skipped layers — and the two most likely to catch production failures before they happen.
Key Takeaways
The data testing pyramid is inverted from software testing. In software, unit tests are cheapest. In data pipelines, integration tests often run cheapest because they test real data in real systems, while "unit" tests of transformations require standing up expensive DataFrame runtimes. Prioritize accordingly.
Idempotency is a first-class contract, not an assumption. Every pipeline step should produce the same output when run twice with the same input. Test this explicitly by running steps twice and diffing outputs — most idempotency bugs are invisible in single-run tests.
SLA tests belong in CI, not just in monitoring dashboards. Freshness and latency requirements are business contracts. Encoding them as failing tests that block deployment ensures they're enforced during development, not discovered after an incident.
The Data Testing Pyramid
Software engineers use a testing pyramid: many unit tests, fewer integration tests, fewer E2E tests. Data pipelines need a different model because the failure modes are different.
┌─────────────────────┐
│ E2E / Business │ ← "Did revenue match last month ±5%?"
│ Validation │
├─────────────────────┤
│ SLA / Operational │ ← Latency, freshness, volume anomalies
│ Tests │
├─────────────────────┤
│ Integration │ ← Source-to-sink correctness, joins
│ Tests │
├─────────────────────┤
│ Contract Tests │ ← Schema agreements between producers/consumers
├─────────────────────┤
│ Unit Tests │ ← Transformation logic on static DataFrames
└─────────────────────┘The key insight: in data pipelines, bugs most commonly enter at schema boundaries (contract layer) and at operational boundaries (SLA layer) — not in transformation logic. Invest proportionally.
Layer 1: Unit Testing Transformation Logic
Unit tests validate that your transformation functions produce correct output for known input. Keep them fast by operating on small, static DataFrames.
# transformations/normalize.py
import re
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
def normalize_phone_numbers(df: DataFrame, column: str) -> DataFrame:
"""Strip non-digit characters from phone numbers and add country code."""
return df.withColumn(
column,
F.when(
F.regexp_replace(F.col(column), r"\D", "").rlike(r"^\d{10}$"),
F.concat(F.lit("+1"), F.regexp_replace(F.col(column), r"\D", ""))
).otherwise(None)
)
def compute_ltv_segment(df: DataFrame) -> DataFrame:
"""Segment customers by lifetime value."""
return df.withColumn(
"ltv_segment",
F.when(F.col("total_spend") >= 10_000, "platinum")
.when(F.col("total_spend") >= 1_000, "gold")
.when(F.col("total_spend") >= 100, "silver")
.otherwise("bronze")
)# tests/unit/test_normalize.py
import pytest
from pyspark.sql import Row
from transformations.normalize import normalize_phone_numbers, compute_ltv_segment
@pytest.mark.parametrize("input_phone,expected", [
("555-123-4567", "+15551234567"),
("(555) 123-4567", "+15551234567"),
("5551234567", "+15551234567"),
("invalid", None),
("123", None), # too short
(None, None),
])
def test_normalize_phone_formats(spark, input_phone, expected):
df = spark.createDataFrame([Row(phone=input_phone)])
result = normalize_phone_numbers(df, "phone").collect()[0].phone
assert result == expected
def test_ltv_segments(spark):
data = [
Row(customer_id="c1", total_spend=15_000.0),
Row(customer_id="c2", total_spend=5_000.0),
Row(customer_id="c3", total_spend=500.0),
Row(customer_id="c4", total_spend=50.0),
]
df = spark.createDataFrame(data)
result = {r.customer_id: r.ltv_segment
for r in compute_ltv_segment(df).collect()}
assert result["c1"] == "platinum"
assert result["c2"] == "gold"
assert result["c3"] == "silver"
assert result["c4"] == "bronze"
def test_ltv_segment_boundary_values(spark):
"""Verify boundary conditions at exactly 100, 1000, 10000."""
boundaries = [
Row(customer_id="a", total_spend=99.99),
Row(customer_id="b", total_spend=100.0),
Row(customer_id="c", total_spend=999.99),
Row(customer_id="d", total_spend=1_000.0),
Row(customer_id="e", total_spend=9_999.99),
Row(customer_id="f", total_spend=10_000.0),
]
df = spark.createDataFrame(boundaries)
result = {r.customer_id: r.ltv_segment
for r in compute_ltv_segment(df).collect()}
assert result["a"] == "bronze"
assert result["b"] == "silver" # exactly 100 → silver
assert result["c"] == "silver"
assert result["d"] == "gold" # exactly 1000 → gold
assert result["e"] == "gold"
assert result["f"] == "platinum" # exactly 10000 → platinumLayer 2: Contract Testing for Schema Agreements
Contract tests define the schema agreement between a data producer (upstream pipeline) and a data consumer (downstream model or application). When the producer changes a column type, the consumer's contract test fails before anything reaches production.
Defining Contracts
# contracts/orders_contract.py
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class ColumnContract:
name: str
dtype: str
nullable: bool = True
min_value: Optional[float] = None
max_value: Optional[float] = None
allowed_values: Optional[List] = None
regex: Optional[str] = None
@dataclass
class TableContract:
name: str
columns: List[ColumnContract]
min_row_count: int = 0
primary_key: Optional[List[str]] = None
ORDERS_CONTRACT = TableContract(
name="orders",
primary_key=["order_id"],
min_row_count=1,
columns=[
ColumnContract("order_id", "string", nullable=False,
regex=r"^ORD-\d{10}$"),
ColumnContract("customer_id", "string", nullable=False),
ColumnContract("status", "string", nullable=False,
allowed_values=["pending", "processing", "shipped",
"delivered", "cancelled", "refunded"]),
ColumnContract("amount", "double", nullable=False,
min_value=0.01, max_value=100_000.0),
ColumnContract("created_at", "timestamp", nullable=False),
ColumnContract("refund_amount", "double", nullable=True,
min_value=0.0),
]
)Enforcing Contracts in Tests
# tests/contract/test_contract_enforcement.py
import pytest
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from contracts.orders_contract import ORDERS_CONTRACT, TableContract
def validate_contract(df: DataFrame, contract: TableContract) -> List[str]:
"""Returns a list of violation messages. Empty list = contract satisfied."""
violations = []
# Row count
count = df.count()
if count < contract.min_row_count:
violations.append(
f"Row count {count} < minimum {contract.min_row_count}")
# Primary key uniqueness
if contract.primary_key:
pk_cols = contract.primary_key
dup_count = df.groupBy(*pk_cols).count().filter("count > 1").count()
if dup_count > 0:
violations.append(
f"Primary key {pk_cols} has {dup_count} duplicate combinations")
# Column-level validations
actual_fields = {f.name: f.dataType.simpleString()
for f in df.schema.fields}
for col in contract.columns:
if col.name not in actual_fields:
violations.append(f"Column '{col.name}' missing from schema")
continue
# Null check
if not col.nullable:
null_count = df.filter(F.col(col.name).isNull()).count()
if null_count > 0:
violations.append(
f"Column '{col.name}' has {null_count} null values (not nullable)")
# Range checks
if col.min_value is not None:
below = df.filter(F.col(col.name) < col.min_value).count()
if below > 0:
violations.append(
f"Column '{col.name}' has {below} values below min {col.min_value}")
if col.max_value is not None:
above = df.filter(F.col(col.name) > col.max_value).count()
if above > 0:
violations.append(
f"Column '{col.name}' has {above} values above max {col.max_value}")
# Allowed values
if col.allowed_values is not None:
invalid = df.filter(
~F.col(col.name).isin(col.allowed_values)).count()
if invalid > 0:
violations.append(
f"Column '{col.name}' has {invalid} values not in allowed set")
# Regex
if col.regex is not None:
invalid = df.filter(
~F.col(col.name).rlike(col.regex) &
F.col(col.name).isNotNull()
).count()
if invalid > 0:
violations.append(
f"Column '{col.name}' has {invalid} values not matching '{col.regex}'")
return violations
def test_orders_contract(spark):
# Simulate loading today's orders from the warehouse
df = spark.read.parquet("s3://warehouse/orders/2026-05-17/")
violations = validate_contract(df, ORDERS_CONTRACT)
assert violations == [], f"Contract violations:\n" + "\n".join(violations)Layer 3: Idempotency Testing
An idempotent pipeline produces identical output when run once or ten times with the same input. Idempotency failures cause duplicate records, inflated aggregates, and accounting errors. Test it explicitly.
Pattern: Run Twice, Diff the Output
# tests/idempotency/test_idempotency.py
import pytest
import hashlib
from pyspark.sql import DataFrame
def dataframe_hash(df: DataFrame) -> str:
"""Compute a deterministic hash of a DataFrame's content."""
# Sort to make hash order-independent
sorted_df = df.orderBy(df.columns)
rows = sorted_df.collect()
content = str(rows).encode("utf-8")
return hashlib.sha256(content).hexdigest()
def run_pipeline_step(spark, input_path: str, output_path: str) -> DataFrame:
"""Simulate a single pipeline step: read, transform, write, return."""
from transformations.normalize import normalize_phone_numbers, compute_ltv_segment
df = spark.read.parquet(input_path)
result = compute_ltv_segment(normalize_phone_numbers(df, "phone"))
result.write.mode("overwrite").parquet(output_path)
return spark.read.parquet(output_path)
def test_pipeline_step_is_idempotent(spark, tmp_path):
"""
Run the same pipeline step twice against the same input.
Output must be byte-for-byte identical.
"""
input_path = "tests/fixtures/customers.parquet"
output_path_1 = str(tmp_path / "run1")
output_path_2 = str(tmp_path / "run2")
result_1 = run_pipeline_step(spark, input_path, output_path_1)
result_2 = run_pipeline_step(spark, input_path, output_path_2)
hash_1 = dataframe_hash(result_1)
hash_2 = dataframe_hash(result_2)
assert hash_1 == hash_2, (
f"Pipeline is not idempotent!\n"
f"Run 1 hash: {hash_1}\n"
f"Run 2 hash: {hash_2}"
)
def test_incremental_load_idempotency(spark, tmp_path):
"""
Simulate an incremental load being replayed.
Replaying the same batch should not change the output.
"""
# Initial load
batch_1 = spark.createDataFrame([
{"order_id": "ORD-0000000001", "amount": 100.0, "status": "pending"},
{"order_id": "ORD-0000000002", "amount": 200.0, "status": "shipped"},
])
output_path = str(tmp_path / "orders_incremental")
batch_1.write.mode("overwrite").parquet(output_path)
count_after_first_load = spark.read.parquet(output_path).count()
# Replay the same batch (simulates a retry or replay)
batch_1.write.mode("overwrite").parquet(output_path)
count_after_replay = spark.read.parquet(output_path).count()
assert count_after_first_load == count_after_replay, (
f"Replay changed row count: {count_after_first_load} → {count_after_replay}"
)Layer 4: Schema Evolution Testing
Data schemas change. Column names are renamed, types are widened, new columns are added. Schema evolution testing ensures your pipeline handles these changes without silently corrupting data.
# tests/schema/test_schema_evolution.py
import pytest
from pyspark.sql.types import *
from pyspark.sql import Row
def test_new_nullable_column_backward_compatible(spark):
"""Adding a nullable column must not break existing consumers."""
old_schema = StructType([
StructField("order_id", StringType(), False),
StructField("amount", DoubleType(), False),
])
new_schema = StructType([
StructField("order_id", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("currency", StringType(), True), # new nullable column
])
old_data = spark.createDataFrame(
[Row(order_id="ORD-001", amount=100.0)], old_schema
)
# Consumer reading with new schema — old data fills new column with null
result = spark.createDataFrame(
old_data.rdd,
new_schema
)
assert result.count() == 1
row = result.collect()[0]
assert row.order_id == "ORD-001"
assert row.currency is None # graceful null for old records
def test_type_widening_compatible(spark):
"""Widening INT to LONG is backward compatible; narrowing is not."""
old_schema = StructType([
StructField("order_id", StringType()),
StructField("quantity", IntegerType()),
])
new_schema = StructType([
StructField("order_id", StringType()),
StructField("quantity", LongType()), # widened
])
old_data = spark.createDataFrame(
[Row(order_id="ORD-001", quantity=5)], old_schema
)
# Cast to new schema — should work without data loss
result = old_data.withColumn(
"quantity", old_data["quantity"].cast(LongType())
)
assert result.collect()[0].quantity == 5
def test_column_rename_breaks_downstream(spark):
"""Renaming a column must be detected and rejected."""
old_schema = StructType([
StructField("order_id", StringType()),
StructField("amount", DoubleType()),
])
new_schema = StructType([
StructField("order_id", StringType()),
StructField("order_amount", DoubleType()), # renamed from 'amount'
])
old_columns = {f.name for f in old_schema.fields}
new_columns = {f.name for f in new_schema.fields}
removed = old_columns - new_columns
added = new_columns - old_columns
# A breaking change: column removed without matching addition
assert len(removed) == 0, (
f"Breaking schema change: columns removed: {removed}. "
f"New columns added: {added}. "
f"If renaming, ensure downstream consumers are updated first."
)Layer 5: SLA-Based Tests (Latency and Freshness)
Data freshness and latency are business contracts. Encode them as tests.
# tests/sla/test_freshness.py
import pytest
from datetime import datetime, timezone, timedelta
import pyarrow.dataset as ds
import pyarrow.compute as pc
def get_latest_partition_timestamp(path: str) -> datetime:
"""Return the max event_time from the latest partition of a Parquet dataset."""
dataset = ds.dataset(path, format="parquet", partitioning="hive")
table = dataset.to_table(columns=["event_time"])
max_ts = pc.max(table["event_time"]).as_py()
return max_ts.replace(tzinfo=timezone.utc)
def test_events_table_freshness():
"""
The events table must contain data from the last 2 hours.
SLA: data is never more than 2 hours stale.
"""
path = "s3://warehouse/events/"
sla_hours = 2
latest = get_latest_partition_timestamp(path)
age = datetime.now(tz=timezone.utc) - latest
assert age <= timedelta(hours=sla_hours), (
f"Events table is stale! Latest data is {age} old. "
f"SLA requires data within {sla_hours} hours."
)
def test_daily_summary_row_count_volume():
"""
Daily summary must have at least 80% of yesterday's row count.
Catches truncations, failed incremental loads, partition drops.
"""
import boto3
import json
s3 = boto3.client("s3")
today = datetime.now(tz=timezone.utc).date()
yesterday = today - timedelta(days=1)
def get_row_count(date):
response = s3.get_object(
Bucket="warehouse-metadata",
Key=f"daily_summary/row_counts/{date}.json"
)
return json.loads(response["Body"].read())["row_count"]
today_count = get_row_count(today)
yesterday_count = get_row_count(yesterday)
ratio = today_count / yesterday_count if yesterday_count > 0 else 0
assert ratio >= 0.80, (
f"Daily summary volume dropped too much: "
f"today={today_count}, yesterday={yesterday_count}, "
f"ratio={ratio:.2%} (SLA: >= 80%)"
)
def test_pipeline_latency_sla():
"""
The time from event ingestion to warehouse availability must be < 15 minutes.
Measured by comparing event_time to the pipeline's processing timestamp.
"""
import boto3
import pyarrow.parquet as pq
path = "s3://warehouse/events/latest/"
table = pq.read_table(path, columns=["event_time", "processed_at"])
event_times = table["event_time"].to_pylist()
processed_ats = table["processed_at"].to_pylist()
latencies = [
(p - e).total_seconds() / 60
for e, p in zip(event_times, processed_ats)
if e and p
]
if not latencies:
pytest.fail("No latency data available — pipeline may not be running")
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
assert p95_latency <= 15, (
f"p95 pipeline latency is {p95_latency:.1f} minutes. "
f"SLA requires <= 15 minutes."
)Layer 6: Data Lineage Verification
Lineage tests verify that the data in a downstream model actually traces back to expected upstream sources. This catches cases where a join silently drops rows or a filter is too aggressive.
# tests/lineage/test_lineage.py
import pytest
from pyspark.sql import SparkSession
def test_revenue_model_covers_all_orders(spark):
"""
Every shipped or delivered order must appear in the revenue model.
Tests that no orders are silently dropped during aggregation.
"""
orders = spark.read.parquet("s3://warehouse/orders/")
revenue = spark.read.parquet("s3://warehouse/revenue_daily/")
# Orders that should be in revenue
billable_orders = orders.filter(
orders.status.isin(["shipped", "delivered"])
).select("order_id")
# Orders in revenue
revenue_order_ids = revenue.select("order_id").distinct()
# Anti-join: billable orders NOT in revenue
missing = billable_orders.join(
revenue_order_ids, on="order_id", how="left_anti"
)
missing_count = missing.count()
assert missing_count == 0, (
f"{missing_count} billable orders are missing from the revenue model. "
f"Sample: {missing.limit(5).collect()}"
)
def test_customer_dimension_covers_all_orders(spark):
"""Every order must have a matching customer in the customer dimension."""
orders = spark.read.parquet("s3://warehouse/orders/")
customers = spark.read.parquet("s3://warehouse/dim_customers/")
orphaned = orders.join(
customers.select("customer_id"),
on="customer_id",
how="left_anti"
)
orphan_count = orphaned.count()
assert orphan_count == 0, (
f"{orphan_count} orders have no matching customer. "
f"Sample order_ids: {[r.order_id for r in orphaned.limit(5).collect()]}"
)Layer 7: Chaos Testing Data Pipelines
Chaos testing injects failures to verify that your pipeline recovers correctly and produces correct output after failure. The most common failure modes to test:
Duplicate Input Records
def test_pipeline_handles_duplicate_records(spark, tmp_path):
"""Pipeline output must deduplicate input even when source sends duplicates."""
# Simulate a source that sends each record twice
input_data = spark.createDataFrame([
{"order_id": "ORD-001", "amount": 100.0},
{"order_id": "ORD-001", "amount": 100.0}, # duplicate
{"order_id": "ORD-002", "amount": 200.0},
{"order_id": "ORD-002", "amount": 200.0}, # duplicate
])
# Your pipeline's dedup step
deduplicated = input_data.dropDuplicates(["order_id"])
output_path = str(tmp_path / "output")
deduplicated.write.parquet(output_path)
result = spark.read.parquet(output_path)
assert result.count() == 2, "Duplicates should be removed"
assert result.select("order_id").distinct().count() == 2
def test_pipeline_handles_null_join_keys(spark):
"""Joins must handle null keys without silently dropping records."""
orders = spark.createDataFrame([
{"order_id": "ORD-001", "customer_id": None}, # null FK
{"order_id": "ORD-002", "customer_id": "C001"},
])
customers = spark.createDataFrame([
{"customer_id": "C001", "country": "US"},
])
result = orders.join(customers, on="customer_id", how="left")
assert result.count() == 2, "Null join key must not drop the order"
null_country = result.filter("order_id = 'ORD-001'").collect()[0].country
assert null_country is None, "Order with null customer_id should have null country"
def test_pipeline_handles_schema_mismatch(spark):
"""Pipeline must fail fast and clearly when schema is incompatible."""
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
expected_schema = StructType([
StructField("order_id", StringType()),
StructField("amount", DoubleType()),
])
# Source sends amount as string instead of double
bad_data = spark.createDataFrame([
{"order_id": "ORD-001", "amount": "not-a-number"},
])
with pytest.raises(Exception):
# Casting should fail or produce nulls depending on mode
bad_data.select(
"order_id",
bad_data["amount"].cast("double")
).filter("amount is null").count() == 0 or \
pytest.fail("Schema mismatch should produce nulls or error")Simulating Upstream Outages
def test_pipeline_checkpoint_recovery(spark, tmp_path):
"""
After a simulated failure, the pipeline should resume from checkpoint
and produce no duplicate records.
"""
checkpoint_dir = str(tmp_path / "checkpoint")
output_dir = str(tmp_path / "output")
# First run: process 5 records and fail after
batch1 = spark.createDataFrame([
{"id": i, "value": float(i)} for i in range(5)
])
batch1.write.mode("overwrite").parquet(output_dir)
# Simulate recovery: process next 5 records
batch2 = spark.createDataFrame([
{"id": i, "value": float(i)} for i in range(5, 10)
])
batch2.write.mode("append").parquet(output_dir)
result = spark.read.parquet(output_dir)
ids = sorted([r.id for r in result.collect()])
assert ids == list(range(10)), f"After recovery, all IDs must be present: {ids}"
assert len(set(ids)) == 10, "No duplicate IDs after recovery"Putting It All Together: A QA Pipeline
Run all layers in sequence with clear reporting:
# run_qa.py
import sys
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run_qa_suite(spark, date: str) -> bool:
results = {}
checks = [
("contract", lambda: run_contract_tests(spark, date)),
("freshness", lambda: run_freshness_tests(date)),
("volume", lambda: run_volume_tests(spark, date)),
("idempotency", lambda: run_idempotency_tests(spark, date)),
("lineage", lambda: run_lineage_tests(spark, date)),
]
for name, fn in checks:
try:
fn()
results[name] = "PASS"
logger.info(f"[PASS] {name}")
except AssertionError as e:
results[name] = f"FAIL: {e}"
logger.error(f"[FAIL] {name}: {e}")
except Exception as e:
results[name] = f"ERROR: {e}"
logger.error(f"[ERROR] {name}: {e}")
passed = sum(1 for v in results.values() if v == "PASS")
total = len(results)
logger.info(f"\nQA Summary: {passed}/{total} checks passed")
return passed == total
if __name__ == "__main__":
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("qa-pipeline").getOrCreate()
date = sys.argv[1] if len(sys.argv) > 1 else datetime.now().strftime("%Y-%m-%d")
success = run_qa_suite(spark, date)
sys.exit(0 if success else 1)Add this to your CI as a post-deploy validation gate:
- name: Run Data QA Suite
run: python run_qa.py ${{ github.event.inputs.date || 'today' }}
env:
SPARK_MASTER: local[4]
AWS_DEFAULT_REGION: us-east-1Data quality is an engineering discipline, not an afterthought. The pipeline that catches its own bugs in CI is the pipeline that earns trust from the business teams who depend on it.
HelpMeTest can run your data pipeline tests automatically — sign up free