ETL Testing Guide: Data Completeness, Transformation Accuracy, and Load Verification

ETL Testing Guide: Data Completeness, Transformation Accuracy, and Load Verification

ETL testing verifies three things: all expected data was extracted (completeness), transformations produced correct output (accuracy), and the data was loaded without corruption or duplication (integrity). Each phase requires different testing techniques: reconciliation counts and checksums for extraction, sample-based assertions for transformation, and row count plus constraint checks for loading.

Key Takeaways

Completeness testing catches silent data loss. Compare row counts and key metrics between source and destination. A pipeline that loads 95% of records without erroring looks successful but isn't.

Transformation testing should use known input/output pairs. The only way to verify transformation logic is to run it against data where you know the expected output. Generate test fixtures, not just random data.

Load testing verifies no duplication or corruption. Check primary key uniqueness, referential integrity, and that no rows were silently dropped by constraint violations.

Test incrementally, not just full loads. Incremental pipelines have more failure modes: missed records, duplicates from re-processing, boundary condition bugs at the watermark.

Reconciliation is the most important test. Source count equals destination count is a simple check that catches a wide range of bugs. Always implement it.

What ETL Testing Covers

ETL (Extract, Transform, Load) pipelines move data between systems. Each phase can introduce data quality problems:

  • Extract: records skipped due to query filters, API pagination bugs, or network timeouts
  • Transform: incorrect calculations, missed edge cases, wrong joins that produce fan-outs or dropped rows
  • Load: constraint violations silently dropping rows, duplicate inserts from retry logic, partial loads from transaction rollbacks

ETL testing catches these problems at each phase rather than discovering them when someone reports wrong numbers in a dashboard.

Phase 1: Extraction Testing

Row Count Reconciliation

The most basic and most important extraction test: source count equals extracted count.

# tests/test_extraction.py
import pytest
import psycopg2
import pandas as pd

def get_source_count(source_conn, table_name, date_filter=None):
    query = f"SELECT COUNT(*) FROM {table_name}"
    if date_filter:
        query += f" WHERE created_at >= '{date_filter}'"
    cursor = source_conn.cursor()
    cursor.execute(query)
    return cursor.fetchone()[0]

def get_extracted_count(extracted_df):
    return len(extracted_df)

def test_extraction_completeness(source_conn, extracted_orders_df):
    source_count = get_source_count(source_conn, "orders")
    extracted_count = get_extracted_count(extracted_orders_df)
    
    assert source_count == extracted_count, (
        f"Extraction incomplete: source has {source_count} rows, "
        f"extracted {extracted_count} rows"
    )

Checksum Reconciliation

Row counts match even when individual records are corrupted. Use checksums on key columns:

-- Source checksum
SELECT 
    COUNT(*) as row_count,
    SUM(order_amount) as total_amount,
    MAX(updated_at) as latest_record,
    COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE created_at >= '2025-01-01';
def test_extraction_checksum(source_conn, extracted_df):
    # Source metrics
    cursor = source_conn.cursor()
    cursor.execute("""
        SELECT 
            COUNT(*) as row_count,
            SUM(order_amount) as total_amount,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM orders
        WHERE created_at >= '2025-01-01'
    """)
    source = cursor.fetchone()
    
    # Extracted metrics
    extracted_count = len(extracted_df)
    extracted_amount = extracted_df["order_amount"].sum()
    extracted_customers = extracted_df["customer_id"].nunique()
    
    assert extracted_count == source[0], "Row count mismatch"
    assert abs(extracted_amount - source[1]) < 0.01, "Amount checksum mismatch"
    assert extracted_customers == source[2], "Unique customer count mismatch"

Incremental Extraction Testing

Incremental pipelines extract only records changed since the last run. They have additional failure modes:

Watermark boundary bug: Records exactly at the watermark timestamp may be included or excluded based on whether the filter uses > or >=. Test this explicitly:

def test_incremental_boundary_inclusive(source_conn, extractor):
    """Records at the exact watermark should be included."""
    watermark = datetime(2025, 6, 15, 12, 0, 0)
    
    # Insert a record with the exact watermark timestamp
    source_conn.execute(
        "INSERT INTO orders (order_id, updated_at, amount) VALUES (9999, %s, 100.00)",
        (watermark,)
    )
    
    extracted = extractor.extract(since=watermark)
    
    assert 9999 in extracted["order_id"].values, (
        "Record at exact watermark boundary was not extracted"
    )

def test_no_records_missed_across_boundary():
    """Consecutive incremental runs should not miss records."""
    # Simulate two consecutive runs
    run1_end = datetime(2025, 6, 15, 12, 0, 0)
    run2_start = run1_end  # Next run starts exactly where last one ended
    
    run1_records = set(extractor.extract(since=None, until=run1_end)["order_id"])
    run2_records = set(extractor.extract(since=run2_start, until=None)["order_id"])
    
    # Records at the boundary should appear in exactly one run
    overlap = run1_records.intersection(run2_records)
    assert len(overlap) == 0 or "boundary handling is correct for your use case"

Phase 2: Transformation Testing

Unit Tests for Transformation Logic

Extract transformation functions and test them with known input/output pairs:

# src/transformations/orders.py
def normalize_currency(amount: float, currency: str, exchange_rates: dict) -> float:
    """Convert amount to USD using provided exchange rates."""
    if currency == "USD":
        return amount
    if currency not in exchange_rates:
        raise ValueError(f"Unknown currency: {currency}")
    return amount / exchange_rates[currency]

def classify_order_value(total_usd: float) -> str:
    if total_usd < 50:
        return "micro"
    elif total_usd < 500:
        return "standard"
    elif total_usd < 5000:
        return "large"
    else:
        return "enterprise"
# tests/test_transformations.py
import pytest
from src.transformations.orders import normalize_currency, classify_order_value

EXCHANGE_RATES = {"EUR": 1.10, "GBP": 1.27, "JPY": 0.0067}

class TestNormalizeCurrency:
    def test_usd_passthrough(self):
        assert normalize_currency(100.0, "USD", EXCHANGE_RATES) == 100.0
    
    def test_eur_conversion(self):
        result = normalize_currency(110.0, "EUR", EXCHANGE_RATES)
        assert abs(result - 100.0) < 0.01
    
    def test_unknown_currency_raises(self):
        with pytest.raises(ValueError, match="Unknown currency: AUD"):
            normalize_currency(100.0, "AUD", EXCHANGE_RATES)
    
    def test_zero_amount(self):
        assert normalize_currency(0.0, "EUR", EXCHANGE_RATES) == 0.0

class TestClassifyOrderValue:
    @pytest.mark.parametrize("amount,expected", [
        (0.0, "micro"),
        (49.99, "micro"),
        (50.0, "standard"),
        (499.99, "standard"),
        (500.0, "large"),
        (4999.99, "large"),
        (5000.0, "enterprise"),
        (100000.0, "enterprise"),
    ])
    def test_classification_boundaries(self, amount, expected):
        assert classify_order_value(amount) == expected

End-to-End Transformation Tests

For the full transformation pipeline, use golden file testing: run the transformation against a fixed input, compare against a stored expected output:

import pandas as pd
import json
from pathlib import Path
from src.transformations.pipeline import run_transformations

FIXTURE_DIR = Path("tests/fixtures")

def test_full_transformation_golden_file():
    input_df = pd.read_csv(FIXTURE_DIR / "raw_orders_sample.csv")
    result = run_transformations(input_df)
    
    # Load expected output (golden file)
    expected = pd.read_csv(FIXTURE_DIR / "expected_transformed_orders.csv")
    
    pd.testing.assert_frame_equal(
        result.reset_index(drop=True),
        expected.reset_index(drop=True),
        check_exact=False,
        rtol=1e-4
    )

Generate the golden file once from a known-good run, commit it to version control, and update it only when the transformation logic intentionally changes.

Join Accuracy Testing

Joins are the most common source of transformation bugs—they can produce fan-outs (duplicate rows) or silently drop records:

def test_customer_join_no_row_duplication():
    orders = pd.DataFrame({
        "order_id": [1, 2, 3],
        "customer_id": [101, 101, 102],
        "amount": [100.0, 200.0, 150.0],
    })
    customers = pd.DataFrame({
        "customer_id": [101, 102],
        "customer_name": ["Alice", "Bob"],
        "tier": ["gold", "silver"],
    })
    
    result = join_orders_with_customers(orders, customers)
    
    # No fan-out: row count should match orders
    assert len(result) == len(orders), (
        f"Join produced {len(result)} rows from {len(orders)} orders—possible fan-out"
    )
    # Verify specific values
    alice_orders = result[result["customer_name"] == "Alice"]
    assert len(alice_orders) == 2  # Customer 101 has 2 orders

def test_customer_join_preserves_unmatched_orders():
    orders = pd.DataFrame({
        "order_id": [1, 2],
        "customer_id": [101, 999],  # 999 doesn't exist in customers
        "amount": [100.0, 200.0],
    })
    customers = pd.DataFrame({
        "customer_id": [101],
        "customer_name": ["Alice"],
    })
    
    result = join_orders_with_customers(orders, customers)
    
    # Left join: all orders should be present
    assert len(result) == 2
    # Unmatched order has null customer_name
    unmatched = result[result["order_id"] == 2]
    assert pd.isna(unmatched["customer_name"].iloc[0])

Phase 3: Load Verification Testing

Primary Key Integrity

After loading, verify no duplicates were introduced:

-- Check for duplicates in loaded table
SELECT 
    order_id,
    COUNT(*) as occurrences
FROM warehouse.orders
GROUP BY order_id
HAVING COUNT(*) > 1
ORDER BY occurrences DESC;
def test_no_duplicate_primary_keys(dest_conn):
    cursor = dest_conn.cursor()
    cursor.execute("""
        SELECT COUNT(*) FROM (
            SELECT order_id
            FROM warehouse_orders
            GROUP BY order_id
            HAVING COUNT(*) > 1
        ) duplicates
    """)
    duplicate_count = cursor.fetchone()[0]
    
    assert duplicate_count == 0, (
        f"Found {duplicate_count} duplicate order_ids after load"
    )

Referential Integrity

Verify foreign keys are intact after load:

-- Find orders without a matching customer
SELECT COUNT(*) as orphaned_orders
FROM warehouse.orders o
LEFT JOIN warehouse.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
def test_no_orphaned_foreign_keys(dest_conn):
    cursor = dest_conn.cursor()
    cursor.execute("""
        SELECT COUNT(*) 
        FROM warehouse_orders o
        LEFT JOIN warehouse_customers c ON o.customer_id = c.customer_id
        WHERE c.customer_id IS NULL
    """)
    orphan_count = cursor.fetchone()[0]
    
    assert orphan_count == 0, (
        f"Found {orphan_count} orders with no matching customer record"
    )

Null Rate Verification

Required fields should not have unexpected null rates after loading:

def test_critical_columns_not_null(dest_conn):
    cursor = dest_conn.cursor()
    cursor.execute("""
        SELECT
            SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_ids,
            SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_ids,
            SUM(CASE WHEN order_total IS NULL THEN 1 ELSE 0 END) as null_totals,
            COUNT(*) as total_rows
        FROM warehouse_orders
        WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
    """)
    row = cursor.fetchone()
    null_order_ids, null_customer_ids, null_totals, total = row
    
    assert null_order_ids == 0, "Null order_ids found in loaded data"
    assert null_customer_ids == 0, "Null customer_ids found in loaded data"
    assert null_totals / total < 0.001, f"High null rate in order_total: {null_totals}/{total}"

End-to-End Reconciliation

The final check: source total metrics equal destination total metrics:

def test_end_to_end_reconciliation(source_conn, dest_conn):
    # Source metrics
    source_cursor = source_conn.cursor()
    source_cursor.execute("""
        SELECT 
            COUNT(*) as row_count,
            SUM(order_amount) as total_amount
        FROM source_orders
        WHERE DATE(created_at) = CURRENT_DATE - 1
    """)
    source_count, source_amount = source_cursor.fetchone()
    
    # Destination metrics
    dest_cursor = dest_conn.cursor()
    dest_cursor.execute("""
        SELECT 
            COUNT(*) as row_count,
            SUM(order_total) as total_amount
        FROM warehouse_orders
        WHERE DATE(source_created_at) = CURRENT_DATE - 1
    """)
    dest_count, dest_amount = dest_cursor.fetchone()
    
    assert source_count == dest_count, (
        f"Row count mismatch: source={source_count}, destination={dest_count}"
    )
    assert abs(source_amount - dest_amount) < 0.01, (
        f"Amount mismatch: source={source_amount}, destination={dest_amount}"
    )

Testing Idempotency

ETL pipelines should be idempotent: running the same pipeline twice with the same input should produce the same output without duplication.

def test_pipeline_idempotency(pipeline, dest_conn):
    # Run the pipeline once
    pipeline.run(date="2025-06-15")
    
    cursor = dest_conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM warehouse_orders WHERE source_date = '2025-06-15'")
    count_after_first_run = cursor.fetchone()[0]
    
    # Run it again with the same date
    pipeline.run(date="2025-06-15")
    
    cursor.execute("SELECT COUNT(*) FROM warehouse_orders WHERE source_date = '2025-06-15'")
    count_after_second_run = cursor.fetchone()[0]
    
    assert count_after_first_run == count_after_second_run, (
        f"Pipeline is not idempotent: {count_after_first_run} rows after first run, "
        f"{count_after_second_run} rows after second run"
    )

Scheduling ETL Tests in CI

# .github/workflows/etl-tests.yml
name: ETL Test Suite
on:
  schedule:
    - cron: '30 5 * * *'  # Run after nightly ETL completes
  push:
    paths:
      - 'src/transformations/**'
      - 'tests/**'

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements-test.txt
      - run: pytest tests/unit/ -v --tb=short

  reconciliation-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements-test.txt
      - run: pytest tests/reconciliation/ -v -m "reconciliation"
        env:
          SOURCE_DB_URL: ${{ secrets.SOURCE_DB_URL }}
          DEST_DB_URL: ${{ secrets.DEST_DB_URL }}

Summary

ETL testing is organized around the three pipeline phases:

Phase Key Tests Tools
Extraction Row count reconciliation, checksum matching, incremental boundary pytest, SQL queries
Transformation Unit tests on functions, join accuracy, golden file comparison pytest, pandas
Load Duplicate detection, referential integrity, null rates SQL assertions

All phases benefit from end-to-end reconciliation: source total equals destination total. This single check catches a surprising percentage of ETL bugs.

Run unit tests on every code change, run reconciliation tests on a schedule after your ETL pipelines complete, and alert immediately when checks fail. The goal is to catch data quality issues in the pipeline, not after the data reaches dashboards and reports.

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest