Data Pipeline Quality Assurance: Testing Frameworks, Patterns, and Best Practices

Data Pipeline Quality Assurance: Testing Frameworks, Patterns, and Best Practices

Data pipeline QA requires a layered approach: unit tests for transformation logic, contract tests for schema agreements between systems, integration tests for end-to-end correctness, and SLA-based operational tests for latency and freshness. Idempotency and chaos testing are the two most commonly skipped layers — and the two most likely to catch production failures before they happen.

Key Takeaways

The data testing pyramid is inverted from software testing. In software, unit tests are cheapest. In data pipelines, integration tests often run cheapest because they test real data in real systems, while "unit" tests of transformations require standing up expensive DataFrame runtimes. Prioritize accordingly.

Idempotency is a first-class contract, not an assumption. Every pipeline step should produce the same output when run twice with the same input. Test this explicitly by running steps twice and diffing outputs — most idempotency bugs are invisible in single-run tests.

SLA tests belong in CI, not just in monitoring dashboards. Freshness and latency requirements are business contracts. Encoding them as failing tests that block deployment ensures they're enforced during development, not discovered after an incident.

The Data Testing Pyramid

Software engineers use a testing pyramid: many unit tests, fewer integration tests, fewer E2E tests. Data pipelines need a different model because the failure modes are different.

          ┌─────────────────────┐
          │   E2E / Business    │  ← "Did revenue match last month ±5%?"
          │   Validation        │
          ├─────────────────────┤
          │   SLA / Operational │  ← Latency, freshness, volume anomalies
          │   Tests             │
          ├─────────────────────┤
          │   Integration       │  ← Source-to-sink correctness, joins
          │   Tests             │
          ├─────────────────────┤
          │   Contract Tests    │  ← Schema agreements between producers/consumers
          ├─────────────────────┤
          │   Unit Tests        │  ← Transformation logic on static DataFrames
          └─────────────────────┘

The key insight: in data pipelines, bugs most commonly enter at schema boundaries (contract layer) and at operational boundaries (SLA layer) — not in transformation logic. Invest proportionally.

Layer 1: Unit Testing Transformation Logic

Unit tests validate that your transformation functions produce correct output for known input. Keep them fast by operating on small, static DataFrames.

# transformations/normalize.py
import re
from pyspark.sql import DataFrame
from pyspark.sql import functions as F


def normalize_phone_numbers(df: DataFrame, column: str) -> DataFrame:
    """Strip non-digit characters from phone numbers and add country code."""
    return df.withColumn(
        column,
        F.when(
            F.regexp_replace(F.col(column), r"\D", "").rlike(r"^\d{10}$"),
            F.concat(F.lit("+1"), F.regexp_replace(F.col(column), r"\D", ""))
        ).otherwise(None)
    )


def compute_ltv_segment(df: DataFrame) -> DataFrame:
    """Segment customers by lifetime value."""
    return df.withColumn(
        "ltv_segment",
        F.when(F.col("total_spend") >= 10_000, "platinum")
         .when(F.col("total_spend") >= 1_000, "gold")
         .when(F.col("total_spend") >= 100, "silver")
         .otherwise("bronze")
    )
# tests/unit/test_normalize.py
import pytest
from pyspark.sql import Row
from transformations.normalize import normalize_phone_numbers, compute_ltv_segment


@pytest.mark.parametrize("input_phone,expected", [
    ("555-123-4567", "+15551234567"),
    ("(555) 123-4567", "+15551234567"),
    ("5551234567", "+15551234567"),
    ("invalid", None),
    ("123", None),  # too short
    (None, None),
])
def test_normalize_phone_formats(spark, input_phone, expected):
    df = spark.createDataFrame([Row(phone=input_phone)])
    result = normalize_phone_numbers(df, "phone").collect()[0].phone
    assert result == expected


def test_ltv_segments(spark):
    data = [
        Row(customer_id="c1", total_spend=15_000.0),
        Row(customer_id="c2", total_spend=5_000.0),
        Row(customer_id="c3", total_spend=500.0),
        Row(customer_id="c4", total_spend=50.0),
    ]
    df = spark.createDataFrame(data)
    result = {r.customer_id: r.ltv_segment
              for r in compute_ltv_segment(df).collect()}

    assert result["c1"] == "platinum"
    assert result["c2"] == "gold"
    assert result["c3"] == "silver"
    assert result["c4"] == "bronze"


def test_ltv_segment_boundary_values(spark):
    """Verify boundary conditions at exactly 100, 1000, 10000."""
    boundaries = [
        Row(customer_id="a", total_spend=99.99),
        Row(customer_id="b", total_spend=100.0),
        Row(customer_id="c", total_spend=999.99),
        Row(customer_id="d", total_spend=1_000.0),
        Row(customer_id="e", total_spend=9_999.99),
        Row(customer_id="f", total_spend=10_000.0),
    ]
    df = spark.createDataFrame(boundaries)
    result = {r.customer_id: r.ltv_segment
              for r in compute_ltv_segment(df).collect()}

    assert result["a"] == "bronze"
    assert result["b"] == "silver"   # exactly 100 → silver
    assert result["c"] == "silver"
    assert result["d"] == "gold"     # exactly 1000 → gold
    assert result["e"] == "gold"
    assert result["f"] == "platinum" # exactly 10000 → platinum

Layer 2: Contract Testing for Schema Agreements

Contract tests define the schema agreement between a data producer (upstream pipeline) and a data consumer (downstream model or application). When the producer changes a column type, the consumer's contract test fails before anything reaches production.

Defining Contracts

# contracts/orders_contract.py
from dataclasses import dataclass, field
from typing import List, Optional


@dataclass
class ColumnContract:
    name: str
    dtype: str
    nullable: bool = True
    min_value: Optional[float] = None
    max_value: Optional[float] = None
    allowed_values: Optional[List] = None
    regex: Optional[str] = None


@dataclass
class TableContract:
    name: str
    columns: List[ColumnContract]
    min_row_count: int = 0
    primary_key: Optional[List[str]] = None


ORDERS_CONTRACT = TableContract(
    name="orders",
    primary_key=["order_id"],
    min_row_count=1,
    columns=[
        ColumnContract("order_id", "string", nullable=False,
                       regex=r"^ORD-\d{10}$"),
        ColumnContract("customer_id", "string", nullable=False),
        ColumnContract("status", "string", nullable=False,
                       allowed_values=["pending", "processing", "shipped",
                                       "delivered", "cancelled", "refunded"]),
        ColumnContract("amount", "double", nullable=False,
                       min_value=0.01, max_value=100_000.0),
        ColumnContract("created_at", "timestamp", nullable=False),
        ColumnContract("refund_amount", "double", nullable=True,
                       min_value=0.0),
    ]
)

Enforcing Contracts in Tests

# tests/contract/test_contract_enforcement.py
import pytest
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from contracts.orders_contract import ORDERS_CONTRACT, TableContract


def validate_contract(df: DataFrame, contract: TableContract) -> List[str]:
    """Returns a list of violation messages. Empty list = contract satisfied."""
    violations = []

    # Row count
    count = df.count()
    if count < contract.min_row_count:
        violations.append(
            f"Row count {count} < minimum {contract.min_row_count}")

    # Primary key uniqueness
    if contract.primary_key:
        pk_cols = contract.primary_key
        dup_count = df.groupBy(*pk_cols).count().filter("count > 1").count()
        if dup_count > 0:
            violations.append(
                f"Primary key {pk_cols} has {dup_count} duplicate combinations")

    # Column-level validations
    actual_fields = {f.name: f.dataType.simpleString()
                     for f in df.schema.fields}

    for col in contract.columns:
        if col.name not in actual_fields:
            violations.append(f"Column '{col.name}' missing from schema")
            continue

        # Null check
        if not col.nullable:
            null_count = df.filter(F.col(col.name).isNull()).count()
            if null_count > 0:
                violations.append(
                    f"Column '{col.name}' has {null_count} null values (not nullable)")

        # Range checks
        if col.min_value is not None:
            below = df.filter(F.col(col.name) < col.min_value).count()
            if below > 0:
                violations.append(
                    f"Column '{col.name}' has {below} values below min {col.min_value}")

        if col.max_value is not None:
            above = df.filter(F.col(col.name) > col.max_value).count()
            if above > 0:
                violations.append(
                    f"Column '{col.name}' has {above} values above max {col.max_value}")

        # Allowed values
        if col.allowed_values is not None:
            invalid = df.filter(
                ~F.col(col.name).isin(col.allowed_values)).count()
            if invalid > 0:
                violations.append(
                    f"Column '{col.name}' has {invalid} values not in allowed set")

        # Regex
        if col.regex is not None:
            invalid = df.filter(
                ~F.col(col.name).rlike(col.regex) &
                F.col(col.name).isNotNull()
            ).count()
            if invalid > 0:
                violations.append(
                    f"Column '{col.name}' has {invalid} values not matching '{col.regex}'")

    return violations


def test_orders_contract(spark):
    # Simulate loading today's orders from the warehouse
    df = spark.read.parquet("s3://warehouse/orders/2026-05-17/")
    violations = validate_contract(df, ORDERS_CONTRACT)
    assert violations == [], f"Contract violations:\n" + "\n".join(violations)

Layer 3: Idempotency Testing

An idempotent pipeline produces identical output when run once or ten times with the same input. Idempotency failures cause duplicate records, inflated aggregates, and accounting errors. Test it explicitly.

Pattern: Run Twice, Diff the Output

# tests/idempotency/test_idempotency.py
import pytest
import hashlib
from pyspark.sql import DataFrame


def dataframe_hash(df: DataFrame) -> str:
    """Compute a deterministic hash of a DataFrame's content."""
    # Sort to make hash order-independent
    sorted_df = df.orderBy(df.columns)
    rows = sorted_df.collect()
    content = str(rows).encode("utf-8")
    return hashlib.sha256(content).hexdigest()


def run_pipeline_step(spark, input_path: str, output_path: str) -> DataFrame:
    """Simulate a single pipeline step: read, transform, write, return."""
    from transformations.normalize import normalize_phone_numbers, compute_ltv_segment

    df = spark.read.parquet(input_path)
    result = compute_ltv_segment(normalize_phone_numbers(df, "phone"))
    result.write.mode("overwrite").parquet(output_path)
    return spark.read.parquet(output_path)


def test_pipeline_step_is_idempotent(spark, tmp_path):
    """
    Run the same pipeline step twice against the same input.
    Output must be byte-for-byte identical.
    """
    input_path = "tests/fixtures/customers.parquet"
    output_path_1 = str(tmp_path / "run1")
    output_path_2 = str(tmp_path / "run2")

    result_1 = run_pipeline_step(spark, input_path, output_path_1)
    result_2 = run_pipeline_step(spark, input_path, output_path_2)

    hash_1 = dataframe_hash(result_1)
    hash_2 = dataframe_hash(result_2)

    assert hash_1 == hash_2, (
        f"Pipeline is not idempotent!\n"
        f"Run 1 hash: {hash_1}\n"
        f"Run 2 hash: {hash_2}"
    )


def test_incremental_load_idempotency(spark, tmp_path):
    """
    Simulate an incremental load being replayed.
    Replaying the same batch should not change the output.
    """
    # Initial load
    batch_1 = spark.createDataFrame([
        {"order_id": "ORD-0000000001", "amount": 100.0, "status": "pending"},
        {"order_id": "ORD-0000000002", "amount": 200.0, "status": "shipped"},
    ])
    output_path = str(tmp_path / "orders_incremental")
    batch_1.write.mode("overwrite").parquet(output_path)

    count_after_first_load = spark.read.parquet(output_path).count()

    # Replay the same batch (simulates a retry or replay)
    batch_1.write.mode("overwrite").parquet(output_path)

    count_after_replay = spark.read.parquet(output_path).count()
    assert count_after_first_load == count_after_replay, (
        f"Replay changed row count: {count_after_first_load}{count_after_replay}"
    )

Layer 4: Schema Evolution Testing

Data schemas change. Column names are renamed, types are widened, new columns are added. Schema evolution testing ensures your pipeline handles these changes without silently corrupting data.

# tests/schema/test_schema_evolution.py
import pytest
from pyspark.sql.types import *
from pyspark.sql import Row


def test_new_nullable_column_backward_compatible(spark):
    """Adding a nullable column must not break existing consumers."""
    old_schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("amount", DoubleType(), False),
    ])

    new_schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("amount", DoubleType(), False),
        StructField("currency", StringType(), True),  # new nullable column
    ])

    old_data = spark.createDataFrame(
        [Row(order_id="ORD-001", amount=100.0)], old_schema
    )

    # Consumer reading with new schema — old data fills new column with null
    result = spark.createDataFrame(
        old_data.rdd,
        new_schema
    )

    assert result.count() == 1
    row = result.collect()[0]
    assert row.order_id == "ORD-001"
    assert row.currency is None  # graceful null for old records


def test_type_widening_compatible(spark):
    """Widening INT to LONG is backward compatible; narrowing is not."""
    old_schema = StructType([
        StructField("order_id", StringType()),
        StructField("quantity", IntegerType()),
    ])

    new_schema = StructType([
        StructField("order_id", StringType()),
        StructField("quantity", LongType()),  # widened
    ])

    old_data = spark.createDataFrame(
        [Row(order_id="ORD-001", quantity=5)], old_schema
    )

    # Cast to new schema — should work without data loss
    result = old_data.withColumn(
        "quantity", old_data["quantity"].cast(LongType())
    )
    assert result.collect()[0].quantity == 5


def test_column_rename_breaks_downstream(spark):
    """Renaming a column must be detected and rejected."""
    old_schema = StructType([
        StructField("order_id", StringType()),
        StructField("amount", DoubleType()),
    ])

    new_schema = StructType([
        StructField("order_id", StringType()),
        StructField("order_amount", DoubleType()),  # renamed from 'amount'
    ])

    old_columns = {f.name for f in old_schema.fields}
    new_columns = {f.name for f in new_schema.fields}

    removed = old_columns - new_columns
    added = new_columns - old_columns

    # A breaking change: column removed without matching addition
    assert len(removed) == 0, (
        f"Breaking schema change: columns removed: {removed}. "
        f"New columns added: {added}. "
        f"If renaming, ensure downstream consumers are updated first."
    )

Layer 5: SLA-Based Tests (Latency and Freshness)

Data freshness and latency are business contracts. Encode them as tests.

# tests/sla/test_freshness.py
import pytest
from datetime import datetime, timezone, timedelta
import pyarrow.dataset as ds
import pyarrow.compute as pc


def get_latest_partition_timestamp(path: str) -> datetime:
    """Return the max event_time from the latest partition of a Parquet dataset."""
    dataset = ds.dataset(path, format="parquet", partitioning="hive")
    table = dataset.to_table(columns=["event_time"])
    max_ts = pc.max(table["event_time"]).as_py()
    return max_ts.replace(tzinfo=timezone.utc)


def test_events_table_freshness():
    """
    The events table must contain data from the last 2 hours.
    SLA: data is never more than 2 hours stale.
    """
    path = "s3://warehouse/events/"
    sla_hours = 2

    latest = get_latest_partition_timestamp(path)
    age = datetime.now(tz=timezone.utc) - latest

    assert age <= timedelta(hours=sla_hours), (
        f"Events table is stale! Latest data is {age} old. "
        f"SLA requires data within {sla_hours} hours."
    )


def test_daily_summary_row_count_volume():
    """
    Daily summary must have at least 80% of yesterday's row count.
    Catches truncations, failed incremental loads, partition drops.
    """
    import boto3
    import json

    s3 = boto3.client("s3")

    today = datetime.now(tz=timezone.utc).date()
    yesterday = today - timedelta(days=1)

    def get_row_count(date):
        response = s3.get_object(
            Bucket="warehouse-metadata",
            Key=f"daily_summary/row_counts/{date}.json"
        )
        return json.loads(response["Body"].read())["row_count"]

    today_count = get_row_count(today)
    yesterday_count = get_row_count(yesterday)

    ratio = today_count / yesterday_count if yesterday_count > 0 else 0
    assert ratio >= 0.80, (
        f"Daily summary volume dropped too much: "
        f"today={today_count}, yesterday={yesterday_count}, "
        f"ratio={ratio:.2%} (SLA: >= 80%)"
    )


def test_pipeline_latency_sla():
    """
    The time from event ingestion to warehouse availability must be < 15 minutes.
    Measured by comparing event_time to the pipeline's processing timestamp.
    """
    import boto3
    import pyarrow.parquet as pq

    path = "s3://warehouse/events/latest/"
    table = pq.read_table(path, columns=["event_time", "processed_at"])

    event_times = table["event_time"].to_pylist()
    processed_ats = table["processed_at"].to_pylist()

    latencies = [
        (p - e).total_seconds() / 60
        for e, p in zip(event_times, processed_ats)
        if e and p
    ]

    if not latencies:
        pytest.fail("No latency data available — pipeline may not be running")

    p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
    assert p95_latency <= 15, (
        f"p95 pipeline latency is {p95_latency:.1f} minutes. "
        f"SLA requires <= 15 minutes."
    )

Layer 6: Data Lineage Verification

Lineage tests verify that the data in a downstream model actually traces back to expected upstream sources. This catches cases where a join silently drops rows or a filter is too aggressive.

# tests/lineage/test_lineage.py
import pytest
from pyspark.sql import SparkSession


def test_revenue_model_covers_all_orders(spark):
    """
    Every shipped or delivered order must appear in the revenue model.
    Tests that no orders are silently dropped during aggregation.
    """
    orders = spark.read.parquet("s3://warehouse/orders/")
    revenue = spark.read.parquet("s3://warehouse/revenue_daily/")

    # Orders that should be in revenue
    billable_orders = orders.filter(
        orders.status.isin(["shipped", "delivered"])
    ).select("order_id")

    # Orders in revenue
    revenue_order_ids = revenue.select("order_id").distinct()

    # Anti-join: billable orders NOT in revenue
    missing = billable_orders.join(
        revenue_order_ids, on="order_id", how="left_anti"
    )

    missing_count = missing.count()
    assert missing_count == 0, (
        f"{missing_count} billable orders are missing from the revenue model. "
        f"Sample: {missing.limit(5).collect()}"
    )


def test_customer_dimension_covers_all_orders(spark):
    """Every order must have a matching customer in the customer dimension."""
    orders = spark.read.parquet("s3://warehouse/orders/")
    customers = spark.read.parquet("s3://warehouse/dim_customers/")

    orphaned = orders.join(
        customers.select("customer_id"),
        on="customer_id",
        how="left_anti"
    )

    orphan_count = orphaned.count()
    assert orphan_count == 0, (
        f"{orphan_count} orders have no matching customer. "
        f"Sample order_ids: {[r.order_id for r in orphaned.limit(5).collect()]}"
    )

Layer 7: Chaos Testing Data Pipelines

Chaos testing injects failures to verify that your pipeline recovers correctly and produces correct output after failure. The most common failure modes to test:

Duplicate Input Records

def test_pipeline_handles_duplicate_records(spark, tmp_path):
    """Pipeline output must deduplicate input even when source sends duplicates."""
    # Simulate a source that sends each record twice
    input_data = spark.createDataFrame([
        {"order_id": "ORD-001", "amount": 100.0},
        {"order_id": "ORD-001", "amount": 100.0},  # duplicate
        {"order_id": "ORD-002", "amount": 200.0},
        {"order_id": "ORD-002", "amount": 200.0},  # duplicate
    ])

    # Your pipeline's dedup step
    deduplicated = input_data.dropDuplicates(["order_id"])

    output_path = str(tmp_path / "output")
    deduplicated.write.parquet(output_path)
    result = spark.read.parquet(output_path)

    assert result.count() == 2, "Duplicates should be removed"
    assert result.select("order_id").distinct().count() == 2


def test_pipeline_handles_null_join_keys(spark):
    """Joins must handle null keys without silently dropping records."""
    orders = spark.createDataFrame([
        {"order_id": "ORD-001", "customer_id": None},  # null FK
        {"order_id": "ORD-002", "customer_id": "C001"},
    ])
    customers = spark.createDataFrame([
        {"customer_id": "C001", "country": "US"},
    ])

    result = orders.join(customers, on="customer_id", how="left")
    assert result.count() == 2, "Null join key must not drop the order"
    null_country = result.filter("order_id = 'ORD-001'").collect()[0].country
    assert null_country is None, "Order with null customer_id should have null country"


def test_pipeline_handles_schema_mismatch(spark):
    """Pipeline must fail fast and clearly when schema is incompatible."""
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType

    expected_schema = StructType([
        StructField("order_id", StringType()),
        StructField("amount", DoubleType()),
    ])

    # Source sends amount as string instead of double
    bad_data = spark.createDataFrame([
        {"order_id": "ORD-001", "amount": "not-a-number"},
    ])

    with pytest.raises(Exception):
        # Casting should fail or produce nulls depending on mode
        bad_data.select(
            "order_id",
            bad_data["amount"].cast("double")
        ).filter("amount is null").count() == 0 or \
        pytest.fail("Schema mismatch should produce nulls or error")

Simulating Upstream Outages

def test_pipeline_checkpoint_recovery(spark, tmp_path):
    """
    After a simulated failure, the pipeline should resume from checkpoint
    and produce no duplicate records.
    """
    checkpoint_dir = str(tmp_path / "checkpoint")
    output_dir = str(tmp_path / "output")

    # First run: process 5 records and fail after
    batch1 = spark.createDataFrame([
        {"id": i, "value": float(i)} for i in range(5)
    ])
    batch1.write.mode("overwrite").parquet(output_dir)

    # Simulate recovery: process next 5 records
    batch2 = spark.createDataFrame([
        {"id": i, "value": float(i)} for i in range(5, 10)
    ])
    batch2.write.mode("append").parquet(output_dir)

    result = spark.read.parquet(output_dir)
    ids = sorted([r.id for r in result.collect()])

    assert ids == list(range(10)), f"After recovery, all IDs must be present: {ids}"
    assert len(set(ids)) == 10, "No duplicate IDs after recovery"

Putting It All Together: A QA Pipeline

Run all layers in sequence with clear reporting:

# run_qa.py
import sys
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def run_qa_suite(spark, date: str) -> bool:
    results = {}

    checks = [
        ("contract", lambda: run_contract_tests(spark, date)),
        ("freshness", lambda: run_freshness_tests(date)),
        ("volume", lambda: run_volume_tests(spark, date)),
        ("idempotency", lambda: run_idempotency_tests(spark, date)),
        ("lineage", lambda: run_lineage_tests(spark, date)),
    ]

    for name, fn in checks:
        try:
            fn()
            results[name] = "PASS"
            logger.info(f"[PASS] {name}")
        except AssertionError as e:
            results[name] = f"FAIL: {e}"
            logger.error(f"[FAIL] {name}: {e}")
        except Exception as e:
            results[name] = f"ERROR: {e}"
            logger.error(f"[ERROR] {name}: {e}")

    passed = sum(1 for v in results.values() if v == "PASS")
    total = len(results)
    logger.info(f"\nQA Summary: {passed}/{total} checks passed")

    return passed == total


if __name__ == "__main__":
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("qa-pipeline").getOrCreate()
    date = sys.argv[1] if len(sys.argv) > 1 else datetime.now().strftime("%Y-%m-%d")
    success = run_qa_suite(spark, date)
    sys.exit(0 if success else 1)

Add this to your CI as a post-deploy validation gate:

- name: Run Data QA Suite
  run: python run_qa.py ${{ github.event.inputs.date || 'today' }}
  env:
    SPARK_MASTER: local[4]
    AWS_DEFAULT_REGION: us-east-1

Data quality is an engineering discipline, not an afterthought. The pipeline that catches its own bugs in CI is the pipeline that earns trust from the business teams who depend on it.


HelpMeTest can run your data pipeline tests automatically — sign up free

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest