Apache Airflow DAG Testing: pytest, DagBag Assertions, and Task Isolation
Apache Airflow DAGs fail in production in ways that no amount of YAML linting will catch. A DAG that imports correctly can still fail to load in the scheduler. A task that works in isolation can deadlock in the context of a specific execution date. An operator that runs fine locally can fail in production due to connection pool exhaustion.
The solution is a structured test suite that exercises DAGs before they hit your scheduler. This guide covers DAG loading validation, task isolation testing, operator mocking, and CI integration.
The Three Layers of Airflow DAG Testing
Layer 1 — DAG loading validation. Does the DAG parse without errors? Can the scheduler load it? This catches import errors, missing connections, and configuration mistakes before deployment.
Layer 2 — DAG structure tests. Does the DAG have the expected tasks, dependencies, and schedule? This is regression protection — catching when a refactor accidentally changes the pipeline structure.
Layer 3 — Task execution tests. Does each task do the right thing when executed? This requires mocking external connections and running tasks in isolation.
Setting Up the Test Environment
pip install apache-airflow pytest pytest-airflow apache-airflow-providers-postgresConfigure test settings to avoid connecting to real infrastructure:
# tests/conftest.py
import os
import pytest
from unittest.mock import patch
# Override Airflow home to use test configs
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-test"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "sqlite:////tmp/airflow-test/airflow.db"
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dagbag():
"""Load DAGBag from the project's DAG directory."""
return DagBag(dag_folder="dags/", include_examples=False)Layer 1: DAG Loading Validation
Every DAG must load without errors. This is your baseline:
# tests/test_dag_loading.py
import pytest
from airflow.models import DagBag
def test_all_dags_load_without_errors(dagbag):
"""All DAGs in the dags/ directory must load without import errors."""
assert len(dagbag.import_errors) == 0, (
f"DAG import errors detected:\n" +
"\n".join(
f" {dag_file}: {error}"
for dag_file, error in dagbag.import_errors.items()
)
)
def test_expected_dags_are_present(dagbag):
"""Verify that required DAGs are present (catches accidental deletions)."""
expected_dag_ids = [
"daily_etl_pipeline",
"hourly_data_sync",
"weekly_report_generation",
"monthly_billing_rollup",
]
for dag_id in expected_dag_ids:
assert dag_id in dagbag.dags, (
f"Required DAG '{dag_id}' not found. "
f"Available DAGs: {list(dagbag.dags.keys())}"
)
def test_no_dag_has_schedule_interval_none_unexpectedly(dagbag):
"""
Dags with schedule=None are paused and never run automatically.
Verify that only known 'manual trigger' DAGs have this setting.
"""
allowed_unscheduled = {"manual_backfill", "incident_response"}
for dag_id, dag in dagbag.dags.items():
if dag.schedule_interval is None and dag_id not in allowed_unscheduled:
pytest.fail(
f"DAG '{dag_id}' has schedule_interval=None but is not in the "
f"allowed list. Add it to allowed_unscheduled if intentional."
)Layer 2: DAG Structure Tests
Catch structural regressions:
# tests/test_dag_structure.py
import pytest
from datetime import datetime
class TestDailyETLPipeline:
@pytest.fixture(autouse=True)
def dag(self, dagbag):
dag = dagbag.get_dag("daily_etl_pipeline")
assert dag is not None, "daily_etl_pipeline DAG not found"
return dag
def test_dag_has_expected_tasks(self, dag):
"""DAG must contain all required tasks."""
expected_task_ids = {
"extract_source_data",
"validate_schema",
"transform_records",
"load_to_warehouse",
"send_completion_alert",
}
actual_task_ids = set(dag.task_ids)
missing = expected_task_ids - actual_task_ids
extra = actual_task_ids - expected_task_ids
assert not missing, f"Missing tasks: {missing}"
# Extra tasks are allowed — just log them
def test_task_dependencies_are_correct(self, dag):
"""Validate the critical path: extract → validate → transform → load."""
def get_downstream(task_id: str) -> set:
return {t.task_id for t in dag.get_task(task_id).downstream_list}
assert "validate_schema" in get_downstream("extract_source_data")
assert "transform_records" in get_downstream("validate_schema")
assert "load_to_warehouse" in get_downstream("transform_records")
assert "send_completion_alert" in get_downstream("load_to_warehouse")
def test_dag_runs_daily(self, dag):
"""DAG must run daily — changing this is a breaking change."""
assert dag.schedule_interval in ["@daily", "0 0 * * *"], (
f"Expected daily schedule, got: {dag.schedule_interval}"
)
def test_dag_does_not_catch_up(self, dag):
"""Catchup must be disabled to prevent retroactive backfill on deployment."""
assert dag.catchup is False, (
"catchup=True will trigger historical runs on deployment. "
"Set catchup=False unless backfill is explicitly desired."
)
def test_all_tasks_have_retries_configured(self, dag):
"""Every task should retry at least once on failure."""
for task in dag.tasks:
assert task.retries >= 1, (
f"Task '{task.task_id}' has retries={task.retries}. "
"All tasks should retry at least once."
)
def test_task_timeout_is_set(self, dag):
"""Tasks without timeouts can hang indefinitely."""
for task in dag.tasks:
assert task.execution_timeout is not None, (
f"Task '{task.task_id}' has no execution_timeout. "
"Long-running tasks without timeouts can block slots for hours."
)Layer 3: Task Execution Tests
Test what each task actually does by running it with mocked connections:
# tests/test_task_execution.py
import pytest
from unittest.mock import patch, MagicMock, call
from datetime import datetime
from airflow.models import TaskInstance, DagRun
from airflow.utils.state import DagRunState
class TestExtractSourceDataTask:
@pytest.fixture
def task_instance(self, dagbag):
dag = dagbag.get_dag("daily_etl_pipeline")
task = dag.get_task("extract_source_data")
# Create a minimal DagRun for testing
dag_run = DagRun(
dag_id=dag.dag_id,
run_id="test-run-001",
execution_date=datetime(2026, 5, 19),
state=DagRunState.RUNNING
)
ti = TaskInstance(task=task, run_id="test-run-001")
ti.dag_run = dag_run
return ti
@patch("dags.etl.source_db.get_records")
@patch("airflow.hooks.base.BaseHook.get_connection")
def test_extract_pulls_records_for_execution_date(
self, mock_get_conn, mock_get_records, task_instance
):
"""Extract task should pull only records for the DAG's execution date."""
mock_get_conn.return_value = MagicMock(
host="test-db", port=5432, login="user", password="pass"
)
mock_get_records.return_value = [
{"id": 1, "value": "record-1", "created_at": "2026-05-19"},
{"id": 2, "value": "record-2", "created_at": "2026-05-19"},
]
task_instance.run(ignore_all_deps=True, ignore_task_deps=True, test_mode=True)
# Verify it queried by execution date
mock_get_records.assert_called_once()
call_args = mock_get_records.call_args
assert "2026-05-19" in str(call_args)
# Verify record count is stored in XCom
record_count = task_instance.xcom_pull(key="record_count")
assert record_count == 2
@patch("dags.etl.source_db.get_records")
@patch("airflow.hooks.base.BaseHook.get_connection")
def test_extract_handles_empty_result(
self, mock_get_conn, mock_get_records, task_instance
):
"""Extract task should handle zero records gracefully (no failure)."""
mock_get_conn.return_value = MagicMock()
mock_get_records.return_value = []
# Should not raise — empty extraction is valid
task_instance.run(ignore_all_deps=True, ignore_task_deps=True, test_mode=True)
record_count = task_instance.xcom_pull(key="record_count")
assert record_count == 0
class TestValidateSchemaTask:
@patch("dags.etl.validators.validate_records")
def test_validate_fails_on_schema_violation(self, mock_validate, dagbag):
"""Schema validation failure should mark task as failed."""
from airflow.exceptions import AirflowException
mock_validate.side_effect = ValueError("Missing required field: customer_id")
dag = dagbag.get_dag("daily_etl_pipeline")
task = dag.get_task("validate_schema")
ti = TaskInstance(task=task, run_id="test-run-002")
with pytest.raises((AirflowException, ValueError)):
ti.run(ignore_all_deps=True, ignore_task_deps=True, test_mode=True)Testing Operators
For custom operators, test the execute method directly:
# tests/test_custom_operator.py
import pytest
from unittest.mock import MagicMock, patch
from dags.operators.warehouse_load_operator import WarehouseLoadOperator
class TestWarehouseLoadOperator:
def test_execute_uploads_records_to_warehouse(self):
"""Operator should upload all records from XCom to the warehouse."""
operator = WarehouseLoadOperator(
task_id="test_load",
warehouse_conn_id="warehouse_default",
table="test_events",
source_task_id="transform_records"
)
mock_context = {
"ti": MagicMock(),
"execution_date": datetime(2026, 5, 19)
}
# Simulate XCom data from upstream task
mock_context["ti"].xcom_pull.return_value = [
{"event_id": "e1", "type": "purchase", "amount": 99.99},
{"event_id": "e2", "type": "purchase", "amount": 49.99},
]
with patch("dags.operators.warehouse_load_operator.WarehouseHook") as MockHook:
mock_hook = MockHook.return_value
mock_hook.insert_rows.return_value = 2
result = operator.execute(mock_context)
# Verify all records were uploaded
MockHook.assert_called_once_with("warehouse_default")
mock_hook.insert_rows.assert_called_once()
insert_args = mock_hook.insert_rows.call_args
rows = insert_args.kwargs.get("rows") or insert_args.args[0]
assert len(rows) == 2
def test_execute_raises_on_partial_upload_failure(self):
"""Partial upload failure should fail the task (not silently succeed)."""
operator = WarehouseLoadOperator(
task_id="test_load",
warehouse_conn_id="warehouse_default",
table="test_events",
source_task_id="transform_records"
)
mock_context = {"ti": MagicMock(), "execution_date": datetime(2026, 5, 19)}
mock_context["ti"].xcom_pull.return_value = [{"event_id": "e1"}]
with patch("dags.operators.warehouse_load_operator.WarehouseHook") as MockHook:
MockHook.return_value.insert_rows.side_effect = Exception("Connection reset")
with pytest.raises(Exception, match="Connection reset"):
operator.execute(mock_context)Mocking Airflow Connections in Tests
# tests/conftest.py (additions)
from airflow.models import Connection
from airflow import settings
@pytest.fixture(scope="session", autouse=True)
def setup_test_connections():
"""Create test connections so operators don't try to reach real infrastructure."""
session = settings.Session()
test_connections = [
Connection(
conn_id="warehouse_default",
conn_type="postgres",
host="localhost",
port=5432,
schema="test_warehouse"
),
Connection(
conn_id="source_db",
conn_type="postgres",
host="localhost",
port=5432,
schema="test_source"
),
]
for conn in test_connections:
session.merge(conn)
session.commit()
session.close()
yield
# Cleanup handled by SQLite test DB resetCI Pipeline
# .github/workflows/airflow-tests.yml
name: Airflow DAG Tests
on: [push, pull_request]
jobs:
dag-tests:
runs-on: ubuntu-latest
env:
AIRFLOW_HOME: /tmp/airflow-test
AIRFLOW__CORE__UNIT_TEST_MODE: "True"
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: "sqlite:////tmp/airflow-test/airflow.db"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Airflow and test dependencies
run: |
pip install "apache-airflow==2.9.0" pytest pytest-cov
airflow db init
- name: Run DAG loading tests
run: pytest tests/test_dag_loading.py -v
- name: Run DAG structure tests
run: pytest tests/test_dag_structure.py -v
- name: Run task execution tests
run: pytest tests/test_task_execution.py tests/test_custom_operator.py -v
- name: Coverage report
run: pytest tests/ --cov=dags/ --cov-report=xml --cov-fail-under=70Monitoring DAG Health in Production
Test coverage tells you that DAGs work as expected locally. But Airflow DAGs fail in production for reasons that tests don't catch: scheduler restarts, connection pool exhaustion, disk space on worker nodes, and network timeouts to external systems.
HelpMeTest health checks let you set up monitoring that verifies your critical DAGs completed successfully — checking the Airflow API or database for run status — and alerts you when a scheduled run fails or doesn't start.
Conclusion
Airflow DAG testing protects you at three levels: loading (catches import errors before they crash the scheduler), structure (catches dependency graph regressions), and execution (catches business logic bugs in task code). The loading tests are mandatory and run in under a second. The structure and execution tests add safety for changes to established pipelines. Run all three in CI and treat any DAG loading test failure as a merge blocker.