Datafold Data Diff for CI/CD: Catching Data Regressions Before They Ship

Datafold Data Diff for CI/CD: Catching Data Regressions Before They Ship

Datafold is a data reliability platform that specializes in data diffing — comparing two versions of a dataset to detect changes introduced by a pipeline update. When you change a dbt model, a SQL transformation, or an ETL job, Datafold shows you exactly which rows changed, which columns were affected, and by how much. This guide covers integrating Datafold's data diff into your CI/CD pipeline to catch data regressions before they reach production.

Why Data Diffing Matters

The traditional code review process catches syntax errors and logic bugs, but it can't tell you if a SQL change accidentally changes 15% of your revenue calculations. Data diff fills that gap:

  • Before: Change goes to production → downstream dashboards break → data team investigates → 4 hours lost
  • After: Change creates a PR → Datafold diffs production vs. proposed → PR comment shows "amount column changed for 1,247 rows" → engineer reviews before merging

data-diff: The Open Source Tool

Datafold's data-diff is available as an open-source CLI tool:

pip install data-diff 'data-diff[bigquery]'  <span class="hljs-comment"># or [snowflake], [postgres], etc.

<span class="hljs-comment"># Compare two tables
data-diff \
  <span class="hljs-string">"bigquery://project/dataset/orders_production" \
  <span class="hljs-string">"bigquery://project/dataset/orders_staging" \
  -k order_id \
  -c amount,status,customer_id \
  --stats

Output:

Diff statistics:
  Total rows: 10,000 (production), 10,000 (staging)
  Rows exclusive to production: 0
  Rows exclusive to staging: 0
  Rows with differences: 47

  Column differences:
    amount: 47 rows differ (0.47%)
    status: 0 rows differ
    customer_id: 0 rows differ

dbt Integration: Data Diff in Pull Requests

Datafold's primary use case is comparing dbt model outputs between production and a CI branch.

Setup

  1. Connect Datafold to your data warehouse
  2. Connect Datafold to your dbt project (dbt Cloud or dbt Core with Slim CI)
  3. Add meta.datafold annotations to your dbt models:
# models/orders.yaml
models:
  - name: orders
    meta:
      datafold:
        datadiff:
          pk:
            - order_id
    columns:
      - name: order_id
        description: "Unique order identifier"
      - name: customer_id
      - name: amount
        description: "Order total in USD"
      - name: status

CI/CD Pipeline

# .github/workflows/dbt-ci.yml
name: dbt CI with Datafold

on:
  pull_request:
    paths:
      - 'models/**'
      - 'macros/**'

jobs:
  dbt-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Install dbt
        run: pip install dbt-bigquery

      - name: Run dbt in CI environment
        run: |
          dbt run \
            --select state:modified+ \
            --defer \
            --state ./prod-manifest
        env:
          DBT_PROFILES_DIR: .

      - name: Run dbt tests
        run: |
          dbt test \
            --select state:modified+ \
            --defer \
            --state ./prod-manifest

      - name: Submit diff to Datafold
        run: |
          pip install datafold-sdk
          datafold dbt upload \
            --ci-config-id ${{ secrets.DATAFOLD_CI_CONFIG_ID }} \
            --run-type pull_request \
            --target-folder ./target
        env:
          DATAFOLD_API_KEY: ${{ secrets.DATAFOLD_API_KEY }}

Datafold automatically posts a PR comment showing affected rows, column-level changes, and a diff summary.

Writing Data Diff Tests Programmatically

For custom diff logic, use data-diff's Python API:

from data_diff import connect_to_table, diff_tables

def test_transformation_produces_equivalent_results():
    """Verify refactored SQL produces identical output to original"""
    
    table1 = connect_to_table(
        "snowflake://account/warehouse/DB/SCHEMA",
        "ORDERS_V1",
        "ORDER_ID"
    )
    
    table2 = connect_to_table(
        "snowflake://account/warehouse/DB/SCHEMA",
        "ORDERS_V2",
        "ORDER_ID"
    )
    
    diff_results = list(diff_tables(table1, table2))
    
    assert len(diff_results) == 0, (
        f"Tables differ in {len(diff_results)} rows. "
        f"First differences: {diff_results[:5]}"
    )

def test_revenue_calculation_unchanged_after_refactor():
    """Verify revenue numbers are identical before and after SQL refactor"""
    
    old_table = connect_to_table(
        "bigquery://project/dataset",
        "revenue_metrics_old",
        "date"
    )
    
    new_table = connect_to_table(
        "bigquery://project/dataset",
        "revenue_metrics_new",
        "date"
    )
    
    diffs = list(diff_tables(
        old_table, new_table,
        extra_columns=("total_revenue", "order_count", "avg_order_value")
    ))
    
    if diffs:
        affected_dates = [d[0] for d in diffs]
        pytest.fail(
            f"Revenue calculation changed for {len(diffs)} dates: "
            f"{affected_dates[:10]}..."
        )

Column-Level Diff Analysis

For business-critical columns, go deeper than row-level comparison:

from data_diff import connect_to_table, diff_tables
import pandas as pd

def analyze_column_diff(old_table_conn, new_table_conn, primary_key, column):
    """Analyze the distribution of changes in a specific column"""
    
    old_table = connect_to_table(old_table_conn, primary_key=primary_key)
    new_table = connect_to_table(new_table_conn, primary_key=primary_key)
    
    diffs = list(diff_tables(old_table, new_table, extra_columns=(column,)))
    
    if not diffs:
        return {"status": "identical", "changed_rows": 0}
    
    old_values = [d[1][column] for d in diffs if d[0] == '-']
    new_values = [d[1][column] for d in diffs if d[0] == '+']
    
    old_series = pd.Series(old_values)
    new_series = pd.Series(new_values)
    
    return {
        "status": "different",
        "changed_rows": len(diffs) // 2,
        "old_mean": old_series.mean(),
        "new_mean": new_series.mean(),
        "percent_change": (new_series.mean() - old_series.mean()) / old_series.mean() * 100,
        "old_null_count": old_series.isna().sum(),
        "new_null_count": new_series.isna().sum(),
    }

def test_refactor_doesnt_change_revenue_distribution():
    analysis = analyze_column_diff(
        old_table_conn="bigquery://project/dataset/orders_old",
        new_table_conn="bigquery://project/dataset/orders_new",
        primary_key="order_id",
        column="amount"
    )
    
    if analysis["status"] == "different":
        percent_change = abs(analysis["percent_change"])
        assert percent_change < 0.01, (
            f"Revenue changed by {percent_change:.2f}% after refactor. "
            f"Old mean: {analysis['old_mean']:.2f}, New mean: {analysis['new_mean']:.2f}"
        )

Testing Rollback Safety

Before merging a breaking change, verify rollback produces identical results:

def test_rollback_produces_identical_results():
    """Verify we can roll back to v1 if v2 has issues"""
    
    # Compare production (running v1) against rollback target (also v1 logic)
    prod_table = connect_to_table(PROD_CONNECTION, "orders", "order_id")
    rollback_table = connect_to_table(STAGING_CONNECTION, "orders_rollback", "order_id")
    
    diffs = list(diff_tables(prod_table, rollback_table))
    
    assert len(diffs) == 0, (
        "Rollback target doesn't match production — rollback would introduce changes. "
        f"Diff count: {len(diffs)}"
    )

Integrating with HelpMeTest

While Datafold tests the data pipeline, HelpMeTest tests what users see. Combine them:

  1. Datafold: runs in CI, verifies SQL changes don't alter data values
  2. HelpMeTest: runs post-deploy, verifies dashboard and UI values match expectations
Health check: revenue-dashboard-values
Interval: 30 minutes
Test:
  Navigate to /analytics/revenue
  Verify today's revenue is greater than 0
  Verify row count matches expected range (1000-50000)
  Verify no "data unavailable" or error messages
Alert: Slack + email on failure

This covers the full pipeline: data quality at the SQL layer (Datafold) and user-visible correctness at the application layer (HelpMeTest).

Alerting on Diff Thresholds

For automated pipelines, block execution if diff exceeds safe thresholds:

def gate_on_diff_threshold(old_table, new_table, max_changed_percent=5.0):
    """Block pipeline if too many rows differ from previous run"""
    total_rows = get_row_count(new_table)
    diffs = list(diff_tables(old_table, new_table))
    
    changed_rows = len(diffs) // 2  # each diff appears as - and + row
    changed_percent = (changed_rows / total_rows) * 100
    
    if changed_percent > max_changed_percent:
        raise Exception(
            f"Pipeline blocked: {changed_percent:.1f}% of rows changed "
            f"(threshold: {max_changed_percent}%). "
            f"Changed rows: {changed_rows:,} / {total_rows:,}"
        )
    
    print(f"Diff check passed: {changed_percent:.1f}% of rows changed ({changed_rows:,} rows)")

Summary

Datafold data diff catches data regressions that code review misses — silent SQL changes that alter calculated values, unexpected row deletions, or distribution shifts. Integrate it into CI/CD by comparing dbt model outputs between production and CI environments, with automated PR comments showing column-level diffs. Add programmatic diff tests for critical revenue and business metric columns, gate on diff thresholds for automated pipelines, and combine with application-layer monitoring (HelpMeTest) for end-to-end data reliability coverage.

Read more