Testing Apache Airflow DAGs: Unit Tests, DAG Validation, and pytest-airflow
Apache Airflow DAGs are Python code. That means you can test them like any other Python code — validate structure, unit test operators, and run DAGs against test data. This guide covers the full testing spectrum for Airflow.
DAG Integrity Tests
The first thing to test is that your DAGs load without errors. A DAG with a broken import or syntax error will silently fail to appear in the Airflow UI.
# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag
def test_no_import_errors():
"""All DAGs load without import errors."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dagbag.import_errors) == 0, (
f"DAG import errors:\n{dagbag.import_errors}"
)
def test_dags_present():
"""Expected DAGs are present."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
expected_dags = ["etl_orders", "sync_users", "daily_report"]
for dag_id in expected_dags:
assert dag_id in dagbag.dags, f"DAG '{dag_id}' not found"
def test_dag_structure():
"""Verify DAG metadata is correctly set."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("etl_orders")
assert dag.schedule_interval == "@daily"
assert dag.catchup is False
assert dag.max_active_runs == 1Task Dependency Tests
Verify that your task graph has the right structure:
def test_etl_task_order():
"""Tasks execute in the correct order."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("etl_orders")
extract_task = dag.get_task("extract_orders")
transform_task = dag.get_task("transform_orders")
load_task = dag.get_task("load_to_warehouse")
# Check downstream dependencies
assert "transform_orders" in [t.task_id for t in extract_task.downstream_list]
assert "load_to_warehouse" in [t.task_id for t in transform_task.downstream_list]
# Check upstream dependencies
assert "extract_orders" in [t.task_id for t in transform_task.upstream_list]
assert "transform_orders" in [t.task_id for t in load_task.upstream_list]
def test_no_cycles():
"""DAG has no circular dependencies."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("etl_orders")
# If the DAG has cycles, topological_sort() raises a CycleError
dag.topological_sort() # raises if there's a cycleUnit Testing Operators
Test the logic inside your operators without running Airflow:
# dags/operators/transform.py
def transform_orders(raw_data: list[dict]) -> list[dict]:
"""Transform raw order records."""
return [
{
"order_id": record["id"],
"customer_id": record["customer"]["id"],
"total": sum(item["price"] * item["quantity"] for item in record["items"]),
"status": record["status"].upper(),
}
for record in raw_data
if record["status"] != "cancelled"
]# tests/test_transform.py
from dags.operators.transform import transform_orders
def test_transform_filters_cancelled():
raw = [
{"id": "1", "customer": {"id": "c1"}, "items": [{"price": 10, "quantity": 2}], "status": "completed"},
{"id": "2", "customer": {"id": "c2"}, "items": [{"price": 5, "quantity": 1}], "status": "cancelled"},
{"id": "3", "customer": {"id": "c3"}, "items": [{"price": 20, "quantity": 3}], "status": "pending"},
]
result = transform_orders(raw)
assert len(result) == 2 # cancelled filtered out
assert result[0]["order_id"] == "1"
assert result[0]["total"] == 20
assert result[1]["order_id"] == "3"
assert result[1]["status"] == "PENDING"
def test_transform_empty_input():
assert transform_orders([]) == []Testing PythonOperator Tasks
For PythonOperator, extract the callable and test it directly:
# dags/etl_orders.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_orders(**context):
"""Extract orders from source system."""
execution_date = context["execution_date"]
# ... fetch orders for this date
return orders
def load_to_warehouse(ti, **context):
"""Load transformed orders to warehouse."""
transformed = ti.xcom_pull(task_ids="transform_orders")
# ... write to warehouse
return {"loaded": len(transformed)}
with DAG("etl_orders", start_date=datetime(2024, 1, 1)) as dag:
extract = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_to_warehouse,
)
extract >> loadTest the callables directly:
from unittest.mock import MagicMock, patch
from dags.etl_orders import extract_orders, load_to_warehouse
def test_extract_orders():
with patch("dags.etl_orders.orders_api") as mock_api:
mock_api.get_orders.return_value = [
{"id": "1", "amount": 99.99}
]
context = {"execution_date": datetime(2024, 3, 15)}
result = extract_orders(**context)
mock_api.get_orders.assert_called_once_with(date="2024-03-15")
assert len(result) == 1
def test_load_to_warehouse():
mock_ti = MagicMock()
mock_ti.xcom_pull.return_value = [{"order_id": "1", "total": 99.99}]
with patch("dags.etl_orders.warehouse_client") as mock_warehouse:
mock_warehouse.insert_batch.return_value = {"inserted": 1}
result = load_to_warehouse(ti=mock_ti)
mock_ti.xcom_pull.assert_called_once_with(task_ids="transform_orders")
assert result["loaded"] == 1pytest-airflow
pytest-airflow provides fixtures for testing DAGs in a real Airflow environment:
pip install pytest-airflowimport pytest
def test_dag_runs(dag):
"""Test that the DAG runs to completion using pytest-airflow dag fixture."""
assert dag.dag_id == "etl_orders"
# dag fixture provides a real DAG loaded from the dag_folder
def test_task_execute(dag, task):
"""Test a specific task executes without error."""
assert task.task_id in dag.task_idsTesting with Airflow's Test Utilities
Airflow provides dag.test() for running DAGs in test mode (Airflow 2.5+):
from airflow.models import DagBag
from datetime import datetime
def test_etl_dag_runs_successfully():
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("etl_orders")
dag.test(
run_conf={"date": "2024-03-15"},
execution_date=datetime(2024, 3, 15),
)
# dag.test() raises on failureMocking Variables and Connections
DAGs often rely on Airflow Variables and Connections. Set them up for tests:
import pytest
from airflow.models import Variable, Connection
from airflow import settings
@pytest.fixture(autouse=True)
def airflow_db():
"""Set up a clean Airflow DB for each test."""
from airflow.utils.db import initdb
initdb()
yield
# cleanup happens via scope
@pytest.fixture
def setup_variables():
"""Create Airflow Variables needed by DAGs."""
Variable.set("warehouse_url", "postgresql://test:test@localhost/testdb")
Variable.set("orders_api_url", "http://localhost:8080")
yield
Variable.delete("warehouse_url")
Variable.delete("orders_api_url")
@pytest.fixture
def setup_connections():
"""Create Airflow Connections needed by DAGs."""
session = settings.Session()
conn = Connection(
conn_id="warehouse",
conn_type="postgres",
host="localhost",
schema="testdb",
login="test",
password="test",
port=5432,
)
session.add(conn)
session.commit()
yield
session.delete(conn)
session.commit()
def test_dag_with_variables(setup_variables, setup_connections):
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("etl_orders")
# Now Variables and Connections are availableConftest for Airflow Tests
# tests/conftest.py
import os
import pytest
# Set minimal Airflow config for tests
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "sqlite:///tests/airflow_test.db"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "SequentialExecutor"
@pytest.fixture(scope="session")
def airflow_db():
from airflow.utils.db import initdb, resetdb
resetdb() # clean slate
initdb()
yieldTesting SLA Misses and Alerts
def test_sla_defined():
"""Critical DAGs have SLA defined."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("daily_report")
# The report must finish within 2 hours
from datetime import timedelta
assert dag.sla == timedelta(hours=2), "daily_report SLA must be 2 hours"
def test_alert_email_configured():
"""DAG has alert email for failures."""
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("etl_orders")
assert "alerts@example.com" in dag.default_args.get("email", [])
assert dag.default_args.get("email_on_failure") is TrueCI Pipeline for Airflow DAGs
# .github/workflows/dags.yml
name: Test DAGs
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
options: >-
--health-cmd pg_isready
--health-interval 10s
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__CORE__EXECUTOR: SequentialExecutor
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install Airflow
run: |
pip install apache-airflow==2.9.0 --constraint \
"https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.12.txt"
pip install -r requirements-test.txt
- name: Initialize Airflow DB
run: airflow db init
- name: Run tests
run: pytest tests/ -v --tb=shortEnd-to-End DAG Testing
Unit tests verify that individual operators produce correct output. End-to-end tests verify that the full pipeline — from trigger to final data state — produces the right result. HelpMeTest can verify the business-level outcomes of Airflow runs:
Scenario: daily ETL completes successfully
Given the ETL DAG ran at midnight
When all tasks complete
Then the warehouse contains today's order totals
And the daily report is available in the dashboard
And no alerts were sentThis catches failures in the data that unit tests — which mock the warehouse client — will never see.
Key Takeaways
- Always test DAG integrity first:
DagBagloads your DAGs and surfaces import errors that are invisible in the Airflow UI - Extract operator callables and test them as regular Python functions — no Airflow needed for pure logic tests
- Use
os.environinconftest.pyto configure a lightweight test database (SQLite works for most tests) - Mock Airflow Variables and Connections in fixtures rather than reading from real Airflow installs
- Run
dag.test()(Airflow 2.5+) for integration-level DAG runs without triggering Celery workers