Data Pipeline Testing: Unit and Integration Tests for Airflow, Prefect, and Dagster

Data Pipeline Testing: Unit and Integration Tests for Airflow, Prefect, and Dagster

Data pipeline orchestrators (Airflow, Prefect, Dagster) each have different testing models, but the principle is the same: unit-test your business logic in isolation, integration-test the orchestration layer with real or containerized dependencies. This guide covers testing patterns for all three orchestrators plus shared practices that apply to any pipeline.

Key Takeaways

Test business logic independently of the orchestrator. Extract transformation logic into plain Python functions, then test those functions with pytest. Don't test business logic through the orchestration framework.

Use pytest-airflow or the DagBag API for DAG structure tests. Verify that DAGs load without errors, tasks have the right upstream/downstream relationships, and default args are correctly set.

Prefect tasks are plain Python functions. Test them directly with pytest—no Prefect runtime needed. Test flows with the synchronous runner in tests.

Dagster's testing utilities make asset testing straightforward. materialize_to_memory() runs assets without an orchestration layer, returning the materialized values for assertion.

Mock external dependencies at the boundary. Database calls, API requests, and file I/O should be mocked in unit tests. Save real connections for integration tests that run against containerized services.

The Testing Problem with Data Pipelines

Data pipeline code is typically harder to test than application code because:

  1. Pipelines depend heavily on external systems (databases, APIs, cloud storage)
  2. Business logic is often mixed with orchestration configuration
  3. Running an end-to-end pipeline test is slow and requires infrastructure
  4. Pipeline bugs often produce wrong data rather than exceptions

The solution is the same as in application testing: separate concerns, test at multiple levels, and use test doubles for external dependencies.

Shared Testing Principles

Regardless of which orchestrator you use, apply these patterns:

Extract Business Logic from Orchestration Code

Bad — business logic inside the orchestrator:

# Bad: logic inside Airflow operator
def process_orders(**context):
    conn = PostgresHook("postgres_conn").get_conn()
    df = pd.read_sql("SELECT * FROM raw_orders WHERE status = 'pending'", conn)
    df["total"] = df["quantity"] * df["price"] * (1 - df["discount_rate"])
    df["category"] = df["total"].apply(lambda x: "large" if x > 1000 else "small")
    df.to_sql("processed_orders", conn, if_exists="replace")

Good — business logic in a testable function:

# Good: pure function with no external dependencies
def calculate_order_totals(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["total"] = df["quantity"] * df["price"] * (1 - df["discount_rate"])
    df["category"] = df["total"].apply(lambda x: "large" if x > 1000 else "small")
    return df

# Thin orchestration wrapper
def process_orders(**context):
    conn = PostgresHook("postgres_conn").get_conn()
    df = pd.read_sql("SELECT * FROM raw_orders WHERE status = 'pending'", conn)
    result = calculate_order_totals(df)  # ← testable
    result.to_sql("processed_orders", conn, if_exists="replace")

Now calculate_order_totals can be tested with pure pytest—no database, no Airflow runtime needed.

Testing Apache Airflow

DAG Structure Tests

Test that your DAGs load without import errors and have the expected structure:

# tests/test_dag_structure.py
import pytest
from airflow.models import DagBag

@pytest.fixture
def dag_bag():
    return DagBag(dag_folder="dags/", include_examples=False)

def test_no_import_errors(dag_bag):
    assert len(dag_bag.import_errors) == 0, (
        f"DAG import errors: {dag_bag.import_errors}"
    )

def test_orders_dag_exists(dag_bag):
    assert "orders_pipeline" in dag_bag.dags

def test_orders_dag_task_count(dag_bag):
    dag = dag_bag.get_dag("orders_pipeline")
    assert len(dag.tasks) == 5  # adjust to your expected count

def test_orders_dag_schedule(dag_bag):
    dag = dag_bag.get_dag("orders_pipeline")
    assert str(dag.schedule_interval) == "0 6 * * *"

def test_task_dependencies(dag_bag):
    dag = dag_bag.get_dag("orders_pipeline")
    validate = dag.get_task("validate_raw_orders")
    transform = dag.get_task("transform_orders")
    load = dag.get_task("load_to_warehouse")
    
    assert transform.task_id in [t.task_id for t in validate.downstream_list]
    assert load.task_id in [t.task_id for t in transform.downstream_list]

Task Unit Tests

Test the business logic functions called by your tasks:

# tests/test_order_processing.py
import pandas as pd
import pytest
from dags.orders_pipeline import calculate_order_totals

def test_calculate_order_totals_basic():
    input_df = pd.DataFrame({
        "quantity": [2, 5],
        "price": [100.0, 50.0],
        "discount_rate": [0.1, 0.0]
    })
    result = calculate_order_totals(input_df)
    
    assert result["total"].tolist() == [180.0, 250.0]
    assert result["category"].tolist() == ["small", "small"]

def test_calculate_order_totals_large_order():
    input_df = pd.DataFrame({
        "quantity": [10],
        "price": [200.0],
        "discount_rate": [0.0]
    })
    result = calculate_order_totals(input_df)
    assert result["category"].iloc[0] == "large"

def test_calculate_order_totals_preserves_input():
    # Verify the function doesn't mutate the input
    input_df = pd.DataFrame({"quantity": [1], "price": [100.0], "discount_rate": [0.0]})
    original_cols = list(input_df.columns)
    calculate_order_totals(input_df)
    assert list(input_df.columns) == original_cols

Task Integration Tests with Mocks

When testing tasks that use Airflow hooks, mock the hook:

# tests/test_orders_task.py
from unittest.mock import patch, MagicMock
import pandas as pd
from dags.orders_pipeline import process_orders

@patch("dags.orders_pipeline.PostgresHook")
def test_process_orders_reads_from_correct_table(mock_hook):
    mock_conn = MagicMock()
    mock_hook.return_value.get_conn.return_value = mock_conn
    
    sample_data = pd.DataFrame({
        "quantity": [1], "price": [100.0], "discount_rate": [0.0]
    })
    
    with patch("pandas.read_sql", return_value=sample_data):
        process_orders()
    
    # Verify the query includes the status filter
    call_args = pd.read_sql.call_args[0][0]
    assert "status = 'pending'" in call_args

Testing Prefect

Prefect flows and tasks are Python functions with decorators. They're the easiest to test because you can call tasks and flows directly.

Task Tests

# flows/orders_flow.py
from prefect import flow, task
import pandas as pd

@task
def extract_orders(connection_string: str) -> pd.DataFrame:
    # Database extraction logic
    ...

@task
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["total"] = df["quantity"] * df["price"]
    df["is_large"] = df["total"] > 1000
    return df

@flow
def orders_pipeline(connection_string: str):
    raw = extract_orders(connection_string)
    processed = transform_orders(raw)
    return processed

Test the task function directly—no Prefect runtime needed:

# tests/test_orders_flow.py
import pandas as pd
import pytest
from flows.orders_flow import transform_orders

def test_transform_orders_calculates_total():
    df = pd.DataFrame({
        "quantity": [3, 2],
        "price": [50.0, 600.0]
    })
    result = transform_orders.fn(df)  # .fn() calls the underlying function
    
    assert result["total"].tolist() == [150.0, 1200.0]

def test_transform_orders_flags_large_orders():
    df = pd.DataFrame({"quantity": [1, 2], "price": [500.0, 600.0]})
    result = transform_orders.fn(df)
    
    assert result["is_large"].tolist() == [False, True]

Flow Tests

Test the full flow with a mock data source:

from unittest.mock import patch
import pandas as pd
from flows.orders_flow import orders_pipeline

def test_orders_pipeline_end_to_end():
    sample_data = pd.DataFrame({
        "quantity": [1, 2],
        "price": [100.0, 600.0]
    })
    
    with patch("flows.orders_flow.extract_orders") as mock_extract:
        mock_extract.return_value = sample_data
        
        # Prefect 2.x: call the flow directly in tests
        result = orders_pipeline("fake://connection")
    
    assert len(result) == 2
    assert result["total"].tolist() == [100.0, 1200.0]

Testing Prefect State Handling

Verify that your flow handles task failures correctly:

from prefect.testing.utilities import prefect_test_harness

def test_flow_handles_empty_dataset():
    with prefect_test_harness():
        with patch("flows.orders_flow.extract_orders") as mock_extract:
            mock_extract.return_value = pd.DataFrame()
            
            state = orders_pipeline("fake://connection", return_state=True)
            # Depending on your error handling, verify completed or failed state
            assert state.is_completed()

Testing Dagster

Dagster has the richest built-in testing utilities of the three orchestrators.

Asset Tests

Use materialize_to_memory() to run assets and assert on their outputs:

# pipelines/orders_assets.py
from dagster import asset
import pandas as pd

@asset
def raw_orders(database: DatabaseResource) -> pd.DataFrame:
    return database.execute("SELECT * FROM raw_orders WHERE status = 'pending'")

@asset
def processed_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    df = raw_orders.copy()
    df["total"] = df["quantity"] * df["price"]
    df["category"] = df["total"].apply(lambda x: "large" if x > 1000 else "small")
    return df

Test assets in isolation with materialize_to_memory:

# tests/test_orders_assets.py
import pandas as pd
from dagster import materialize_to_memory
from pipelines.orders_assets import processed_orders

def test_processed_orders_calculates_totals():
    sample_raw = pd.DataFrame({
        "quantity": [2, 5],
        "price": [100.0, 50.0]
    })
    
    result = materialize_to_memory(
        [processed_orders],
        resources={},
        run_config={},
        # Provide upstream asset values directly
        asset_selection=[processed_orders],
    )
    
    # Or test the function directly for pure logic
    from pipelines.orders_assets import processed_orders
    output = processed_orders.op.fn(sample_raw)
    assert output["total"].tolist() == [200.0, 250.0]

Op Tests

For graph-based Dagster jobs, test ops individually:

from dagster import build_op_context
from pipelines.orders_job import process_orders_op

def test_process_orders_op():
    context = build_op_context()
    
    input_data = pd.DataFrame({
        "quantity": [1], "price": [500.0]
    })
    
    result = process_orders_op(context, raw_orders=input_data)
    assert result["total"].iloc[0] == 500.0

Resource Mocking

Dagster's resource system makes dependency injection for tests clean:

from dagster import resource, build_op_context

class MockDatabaseResource:
    def execute(self, query: str):
        return pd.DataFrame({"quantity": [1], "price": [100.0]})

@resource
def mock_database():
    return MockDatabaseResource()

def test_raw_orders_with_mock_database():
    from pipelines.orders_assets import raw_orders
    
    context = build_op_context(resources={"database": MockDatabaseResource()})
    result = raw_orders(context)
    
    assert len(result) > 0
    assert "quantity" in result.columns

Integration Testing with Docker Compose

For integration tests that require real databases or message queues, use Docker Compose:

# docker-compose.test.yml
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: testdb
      POSTGRES_USER: test
      POSTGRES_PASSWORD: test
    ports:
      - "5433:5432"
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "test"]
      interval: 5s
      timeout: 5s
      retries: 5
# conftest.py
import pytest
import psycopg2
import pandas as pd
from sqlalchemy import create_engine

@pytest.fixture(scope="session")
def test_engine():
    engine = create_engine("postgresql://test:test@localhost:5433/testdb")
    yield engine
    engine.dispose()

@pytest.fixture
def seed_raw_orders(test_engine):
    """Seed test data before each test, clean up after."""
    data = pd.DataFrame({
        "order_id": [1, 2, 3],
        "quantity": [1, 2, 5],
        "price": [100.0, 50.0, 200.0],
        "status": ["pending", "pending", "shipped"]
    })
    data.to_sql("raw_orders", test_engine, if_exists="replace", index=False)
    yield
    test_engine.execute("DROP TABLE IF EXISTS raw_orders")

CI Configuration

# .github/workflows/pipeline-tests.yml
name: Pipeline Tests
on: [push, pull_request]

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - run: pip install -r requirements-test.txt
      - run: pytest tests/unit/ -v

  integration-tests:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_DB: testdb
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
        ports:
          - 5433:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements-test.txt
      - run: pytest tests/integration/ -v
        env:
          TEST_DATABASE_URL: postgresql://test:test@localhost:5433/testdb

Summary

The key principle for data pipeline testing is the same as for application testing: separate business logic from infrastructure. Extract transformation functions, test them with pure pytest, and only bring in the orchestrator for structure tests and integration tests.

  • Airflow: test DAG structure with DagBag, business logic with pure pytest, hooks with mocks
  • Prefect: call task functions directly with .fn(), test flows with mocked dependencies
  • Dagster: use materialize_to_memory() for asset tests, build_op_context() for op tests

All three benefit from integration test suites that run against real databases in CI, using Docker Compose or CI service containers to provision dependencies.

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest