Data Pipeline Testing: Unit and Integration Tests for Airflow, Prefect, and Dagster
Data pipeline orchestrators (Airflow, Prefect, Dagster) each have different testing models, but the principle is the same: unit-test your business logic in isolation, integration-test the orchestration layer with real or containerized dependencies. This guide covers testing patterns for all three orchestrators plus shared practices that apply to any pipeline.
Key Takeaways
Test business logic independently of the orchestrator. Extract transformation logic into plain Python functions, then test those functions with pytest. Don't test business logic through the orchestration framework.
Use pytest-airflow or the DagBag API for DAG structure tests. Verify that DAGs load without errors, tasks have the right upstream/downstream relationships, and default args are correctly set.
Prefect tasks are plain Python functions. Test them directly with pytest—no Prefect runtime needed. Test flows with the synchronous runner in tests.
Dagster's testing utilities make asset testing straightforward. materialize_to_memory() runs assets without an orchestration layer, returning the materialized values for assertion.
Mock external dependencies at the boundary. Database calls, API requests, and file I/O should be mocked in unit tests. Save real connections for integration tests that run against containerized services.
The Testing Problem with Data Pipelines
Data pipeline code is typically harder to test than application code because:
- Pipelines depend heavily on external systems (databases, APIs, cloud storage)
- Business logic is often mixed with orchestration configuration
- Running an end-to-end pipeline test is slow and requires infrastructure
- Pipeline bugs often produce wrong data rather than exceptions
The solution is the same as in application testing: separate concerns, test at multiple levels, and use test doubles for external dependencies.
Shared Testing Principles
Regardless of which orchestrator you use, apply these patterns:
Extract Business Logic from Orchestration Code
Bad — business logic inside the orchestrator:
# Bad: logic inside Airflow operator
def process_orders(**context):
conn = PostgresHook("postgres_conn").get_conn()
df = pd.read_sql("SELECT * FROM raw_orders WHERE status = 'pending'", conn)
df["total"] = df["quantity"] * df["price"] * (1 - df["discount_rate"])
df["category"] = df["total"].apply(lambda x: "large" if x > 1000 else "small")
df.to_sql("processed_orders", conn, if_exists="replace")Good — business logic in a testable function:
# Good: pure function with no external dependencies
def calculate_order_totals(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["total"] = df["quantity"] * df["price"] * (1 - df["discount_rate"])
df["category"] = df["total"].apply(lambda x: "large" if x > 1000 else "small")
return df
# Thin orchestration wrapper
def process_orders(**context):
conn = PostgresHook("postgres_conn").get_conn()
df = pd.read_sql("SELECT * FROM raw_orders WHERE status = 'pending'", conn)
result = calculate_order_totals(df) # ← testable
result.to_sql("processed_orders", conn, if_exists="replace")Now calculate_order_totals can be tested with pure pytest—no database, no Airflow runtime needed.
Testing Apache Airflow
DAG Structure Tests
Test that your DAGs load without import errors and have the expected structure:
# tests/test_dag_structure.py
import pytest
from airflow.models import DagBag
@pytest.fixture
def dag_bag():
return DagBag(dag_folder="dags/", include_examples=False)
def test_no_import_errors(dag_bag):
assert len(dag_bag.import_errors) == 0, (
f"DAG import errors: {dag_bag.import_errors}"
)
def test_orders_dag_exists(dag_bag):
assert "orders_pipeline" in dag_bag.dags
def test_orders_dag_task_count(dag_bag):
dag = dag_bag.get_dag("orders_pipeline")
assert len(dag.tasks) == 5 # adjust to your expected count
def test_orders_dag_schedule(dag_bag):
dag = dag_bag.get_dag("orders_pipeline")
assert str(dag.schedule_interval) == "0 6 * * *"
def test_task_dependencies(dag_bag):
dag = dag_bag.get_dag("orders_pipeline")
validate = dag.get_task("validate_raw_orders")
transform = dag.get_task("transform_orders")
load = dag.get_task("load_to_warehouse")
assert transform.task_id in [t.task_id for t in validate.downstream_list]
assert load.task_id in [t.task_id for t in transform.downstream_list]Task Unit Tests
Test the business logic functions called by your tasks:
# tests/test_order_processing.py
import pandas as pd
import pytest
from dags.orders_pipeline import calculate_order_totals
def test_calculate_order_totals_basic():
input_df = pd.DataFrame({
"quantity": [2, 5],
"price": [100.0, 50.0],
"discount_rate": [0.1, 0.0]
})
result = calculate_order_totals(input_df)
assert result["total"].tolist() == [180.0, 250.0]
assert result["category"].tolist() == ["small", "small"]
def test_calculate_order_totals_large_order():
input_df = pd.DataFrame({
"quantity": [10],
"price": [200.0],
"discount_rate": [0.0]
})
result = calculate_order_totals(input_df)
assert result["category"].iloc[0] == "large"
def test_calculate_order_totals_preserves_input():
# Verify the function doesn't mutate the input
input_df = pd.DataFrame({"quantity": [1], "price": [100.0], "discount_rate": [0.0]})
original_cols = list(input_df.columns)
calculate_order_totals(input_df)
assert list(input_df.columns) == original_colsTask Integration Tests with Mocks
When testing tasks that use Airflow hooks, mock the hook:
# tests/test_orders_task.py
from unittest.mock import patch, MagicMock
import pandas as pd
from dags.orders_pipeline import process_orders
@patch("dags.orders_pipeline.PostgresHook")
def test_process_orders_reads_from_correct_table(mock_hook):
mock_conn = MagicMock()
mock_hook.return_value.get_conn.return_value = mock_conn
sample_data = pd.DataFrame({
"quantity": [1], "price": [100.0], "discount_rate": [0.0]
})
with patch("pandas.read_sql", return_value=sample_data):
process_orders()
# Verify the query includes the status filter
call_args = pd.read_sql.call_args[0][0]
assert "status = 'pending'" in call_argsTesting Prefect
Prefect flows and tasks are Python functions with decorators. They're the easiest to test because you can call tasks and flows directly.
Task Tests
# flows/orders_flow.py
from prefect import flow, task
import pandas as pd
@task
def extract_orders(connection_string: str) -> pd.DataFrame:
# Database extraction logic
...
@task
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["total"] = df["quantity"] * df["price"]
df["is_large"] = df["total"] > 1000
return df
@flow
def orders_pipeline(connection_string: str):
raw = extract_orders(connection_string)
processed = transform_orders(raw)
return processedTest the task function directly—no Prefect runtime needed:
# tests/test_orders_flow.py
import pandas as pd
import pytest
from flows.orders_flow import transform_orders
def test_transform_orders_calculates_total():
df = pd.DataFrame({
"quantity": [3, 2],
"price": [50.0, 600.0]
})
result = transform_orders.fn(df) # .fn() calls the underlying function
assert result["total"].tolist() == [150.0, 1200.0]
def test_transform_orders_flags_large_orders():
df = pd.DataFrame({"quantity": [1, 2], "price": [500.0, 600.0]})
result = transform_orders.fn(df)
assert result["is_large"].tolist() == [False, True]Flow Tests
Test the full flow with a mock data source:
from unittest.mock import patch
import pandas as pd
from flows.orders_flow import orders_pipeline
def test_orders_pipeline_end_to_end():
sample_data = pd.DataFrame({
"quantity": [1, 2],
"price": [100.0, 600.0]
})
with patch("flows.orders_flow.extract_orders") as mock_extract:
mock_extract.return_value = sample_data
# Prefect 2.x: call the flow directly in tests
result = orders_pipeline("fake://connection")
assert len(result) == 2
assert result["total"].tolist() == [100.0, 1200.0]Testing Prefect State Handling
Verify that your flow handles task failures correctly:
from prefect.testing.utilities import prefect_test_harness
def test_flow_handles_empty_dataset():
with prefect_test_harness():
with patch("flows.orders_flow.extract_orders") as mock_extract:
mock_extract.return_value = pd.DataFrame()
state = orders_pipeline("fake://connection", return_state=True)
# Depending on your error handling, verify completed or failed state
assert state.is_completed()Testing Dagster
Dagster has the richest built-in testing utilities of the three orchestrators.
Asset Tests
Use materialize_to_memory() to run assets and assert on their outputs:
# pipelines/orders_assets.py
from dagster import asset
import pandas as pd
@asset
def raw_orders(database: DatabaseResource) -> pd.DataFrame:
return database.execute("SELECT * FROM raw_orders WHERE status = 'pending'")
@asset
def processed_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
df = raw_orders.copy()
df["total"] = df["quantity"] * df["price"]
df["category"] = df["total"].apply(lambda x: "large" if x > 1000 else "small")
return dfTest assets in isolation with materialize_to_memory:
# tests/test_orders_assets.py
import pandas as pd
from dagster import materialize_to_memory
from pipelines.orders_assets import processed_orders
def test_processed_orders_calculates_totals():
sample_raw = pd.DataFrame({
"quantity": [2, 5],
"price": [100.0, 50.0]
})
result = materialize_to_memory(
[processed_orders],
resources={},
run_config={},
# Provide upstream asset values directly
asset_selection=[processed_orders],
)
# Or test the function directly for pure logic
from pipelines.orders_assets import processed_orders
output = processed_orders.op.fn(sample_raw)
assert output["total"].tolist() == [200.0, 250.0]Op Tests
For graph-based Dagster jobs, test ops individually:
from dagster import build_op_context
from pipelines.orders_job import process_orders_op
def test_process_orders_op():
context = build_op_context()
input_data = pd.DataFrame({
"quantity": [1], "price": [500.0]
})
result = process_orders_op(context, raw_orders=input_data)
assert result["total"].iloc[0] == 500.0Resource Mocking
Dagster's resource system makes dependency injection for tests clean:
from dagster import resource, build_op_context
class MockDatabaseResource:
def execute(self, query: str):
return pd.DataFrame({"quantity": [1], "price": [100.0]})
@resource
def mock_database():
return MockDatabaseResource()
def test_raw_orders_with_mock_database():
from pipelines.orders_assets import raw_orders
context = build_op_context(resources={"database": MockDatabaseResource()})
result = raw_orders(context)
assert len(result) > 0
assert "quantity" in result.columnsIntegration Testing with Docker Compose
For integration tests that require real databases or message queues, use Docker Compose:
# docker-compose.test.yml
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: testdb
POSTGRES_USER: test
POSTGRES_PASSWORD: test
ports:
- "5433:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "test"]
interval: 5s
timeout: 5s
retries: 5# conftest.py
import pytest
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
@pytest.fixture(scope="session")
def test_engine():
engine = create_engine("postgresql://test:test@localhost:5433/testdb")
yield engine
engine.dispose()
@pytest.fixture
def seed_raw_orders(test_engine):
"""Seed test data before each test, clean up after."""
data = pd.DataFrame({
"order_id": [1, 2, 3],
"quantity": [1, 2, 5],
"price": [100.0, 50.0, 200.0],
"status": ["pending", "pending", "shipped"]
})
data.to_sql("raw_orders", test_engine, if_exists="replace", index=False)
yield
test_engine.execute("DROP TABLE IF EXISTS raw_orders")CI Configuration
# .github/workflows/pipeline-tests.yml
name: Pipeline Tests
on: [push, pull_request]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install -r requirements-test.txt
- run: pytest tests/unit/ -v
integration-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_DB: testdb
POSTGRES_USER: test
POSTGRES_PASSWORD: test
ports:
- 5433:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- run: pip install -r requirements-test.txt
- run: pytest tests/integration/ -v
env:
TEST_DATABASE_URL: postgresql://test:test@localhost:5433/testdbSummary
The key principle for data pipeline testing is the same as for application testing: separate business logic from infrastructure. Extract transformation functions, test them with pure pytest, and only bring in the orchestrator for structure tests and integration tests.
- Airflow: test DAG structure with DagBag, business logic with pure pytest, hooks with mocks
- Prefect: call task functions directly with
.fn(), test flows with mocked dependencies - Dagster: use
materialize_to_memory()for asset tests,build_op_context()for op tests
All three benefit from integration test suites that run against real databases in CI, using Docker Compose or CI service containers to provision dependencies.