Testing Apache Airflow DAGs: Unit Tests, DAG Validation, and pytest-airflow

Testing Apache Airflow DAGs: Unit Tests, DAG Validation, and pytest-airflow

Apache Airflow DAGs are Python code. That means you can test them like any other Python code — validate structure, unit test operators, and run DAGs against test data. This guide covers the full testing spectrum for Airflow.

DAG Integrity Tests

The first thing to test is that your DAGs load without errors. A DAG with a broken import or syntax error will silently fail to appear in the Airflow UI.

# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag

def test_no_import_errors():
    """All DAGs load without import errors."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    assert len(dagbag.import_errors) == 0, (
        f"DAG import errors:\n{dagbag.import_errors}"
    )

def test_dags_present():
    """Expected DAGs are present."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    expected_dags = ["etl_orders", "sync_users", "daily_report"]

    for dag_id in expected_dags:
        assert dag_id in dagbag.dags, f"DAG '{dag_id}' not found"

def test_dag_structure():
    """Verify DAG metadata is correctly set."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("etl_orders")

    assert dag.schedule_interval == "@daily"
    assert dag.catchup is False
    assert dag.max_active_runs == 1

Task Dependency Tests

Verify that your task graph has the right structure:

def test_etl_task_order():
    """Tasks execute in the correct order."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("etl_orders")

    extract_task = dag.get_task("extract_orders")
    transform_task = dag.get_task("transform_orders")
    load_task = dag.get_task("load_to_warehouse")

    # Check downstream dependencies
    assert "transform_orders" in [t.task_id for t in extract_task.downstream_list]
    assert "load_to_warehouse" in [t.task_id for t in transform_task.downstream_list]

    # Check upstream dependencies
    assert "extract_orders" in [t.task_id for t in transform_task.upstream_list]
    assert "transform_orders" in [t.task_id for t in load_task.upstream_list]

def test_no_cycles():
    """DAG has no circular dependencies."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("etl_orders")

    # If the DAG has cycles, topological_sort() raises a CycleError
    dag.topological_sort()  # raises if there's a cycle

Unit Testing Operators

Test the logic inside your operators without running Airflow:

# dags/operators/transform.py
def transform_orders(raw_data: list[dict]) -> list[dict]:
    """Transform raw order records."""
    return [
        {
            "order_id": record["id"],
            "customer_id": record["customer"]["id"],
            "total": sum(item["price"] * item["quantity"] for item in record["items"]),
            "status": record["status"].upper(),
        }
        for record in raw_data
        if record["status"] != "cancelled"
    ]
# tests/test_transform.py
from dags.operators.transform import transform_orders

def test_transform_filters_cancelled():
    raw = [
        {"id": "1", "customer": {"id": "c1"}, "items": [{"price": 10, "quantity": 2}], "status": "completed"},
        {"id": "2", "customer": {"id": "c2"}, "items": [{"price": 5, "quantity": 1}], "status": "cancelled"},
        {"id": "3", "customer": {"id": "c3"}, "items": [{"price": 20, "quantity": 3}], "status": "pending"},
    ]

    result = transform_orders(raw)

    assert len(result) == 2  # cancelled filtered out
    assert result[0]["order_id"] == "1"
    assert result[0]["total"] == 20
    assert result[1]["order_id"] == "3"
    assert result[1]["status"] == "PENDING"

def test_transform_empty_input():
    assert transform_orders([]) == []

Testing PythonOperator Tasks

For PythonOperator, extract the callable and test it directly:

# dags/etl_orders.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_orders(**context):
    """Extract orders from source system."""
    execution_date = context["execution_date"]
    # ... fetch orders for this date
    return orders

def load_to_warehouse(ti, **context):
    """Load transformed orders to warehouse."""
    transformed = ti.xcom_pull(task_ids="transform_orders")
    # ... write to warehouse
    return {"loaded": len(transformed)}

with DAG("etl_orders", start_date=datetime(2024, 1, 1)) as dag:
    extract = PythonOperator(
        task_id="extract_orders",
        python_callable=extract_orders,
    )
    load = PythonOperator(
        task_id="load_to_warehouse",
        python_callable=load_to_warehouse,
    )
    extract >> load

Test the callables directly:

from unittest.mock import MagicMock, patch
from dags.etl_orders import extract_orders, load_to_warehouse

def test_extract_orders():
    with patch("dags.etl_orders.orders_api") as mock_api:
        mock_api.get_orders.return_value = [
            {"id": "1", "amount": 99.99}
        ]

        context = {"execution_date": datetime(2024, 3, 15)}
        result = extract_orders(**context)

    mock_api.get_orders.assert_called_once_with(date="2024-03-15")
    assert len(result) == 1

def test_load_to_warehouse():
    mock_ti = MagicMock()
    mock_ti.xcom_pull.return_value = [{"order_id": "1", "total": 99.99}]

    with patch("dags.etl_orders.warehouse_client") as mock_warehouse:
        mock_warehouse.insert_batch.return_value = {"inserted": 1}

        result = load_to_warehouse(ti=mock_ti)

    mock_ti.xcom_pull.assert_called_once_with(task_ids="transform_orders")
    assert result["loaded"] == 1

pytest-airflow

pytest-airflow provides fixtures for testing DAGs in a real Airflow environment:

pip install pytest-airflow
import pytest

def test_dag_runs(dag):
    """Test that the DAG runs to completion using pytest-airflow dag fixture."""
    assert dag.dag_id == "etl_orders"
    # dag fixture provides a real DAG loaded from the dag_folder

def test_task_execute(dag, task):
    """Test a specific task executes without error."""
    assert task.task_id in dag.task_ids

Testing with Airflow's Test Utilities

Airflow provides dag.test() for running DAGs in test mode (Airflow 2.5+):

from airflow.models import DagBag
from datetime import datetime

def test_etl_dag_runs_successfully():
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("etl_orders")

    dag.test(
        run_conf={"date": "2024-03-15"},
        execution_date=datetime(2024, 3, 15),
    )
    # dag.test() raises on failure

Mocking Variables and Connections

DAGs often rely on Airflow Variables and Connections. Set them up for tests:

import pytest
from airflow.models import Variable, Connection
from airflow import settings

@pytest.fixture(autouse=True)
def airflow_db():
    """Set up a clean Airflow DB for each test."""
    from airflow.utils.db import initdb
    initdb()
    yield
    # cleanup happens via scope

@pytest.fixture
def setup_variables():
    """Create Airflow Variables needed by DAGs."""
    Variable.set("warehouse_url", "postgresql://test:test@localhost/testdb")
    Variable.set("orders_api_url", "http://localhost:8080")
    yield
    Variable.delete("warehouse_url")
    Variable.delete("orders_api_url")

@pytest.fixture
def setup_connections():
    """Create Airflow Connections needed by DAGs."""
    session = settings.Session()
    conn = Connection(
        conn_id="warehouse",
        conn_type="postgres",
        host="localhost",
        schema="testdb",
        login="test",
        password="test",
        port=5432,
    )
    session.add(conn)
    session.commit()
    yield
    session.delete(conn)
    session.commit()

def test_dag_with_variables(setup_variables, setup_connections):
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("etl_orders")
    # Now Variables and Connections are available

Conftest for Airflow Tests

# tests/conftest.py
import os
import pytest

# Set minimal Airflow config for tests
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "sqlite:///tests/airflow_test.db"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "SequentialExecutor"

@pytest.fixture(scope="session")
def airflow_db():
    from airflow.utils.db import initdb, resetdb
    resetdb()  # clean slate
    initdb()
    yield

Testing SLA Misses and Alerts

def test_sla_defined():
    """Critical DAGs have SLA defined."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("daily_report")

    # The report must finish within 2 hours
    from datetime import timedelta
    assert dag.sla == timedelta(hours=2), "daily_report SLA must be 2 hours"

def test_alert_email_configured():
    """DAG has alert email for failures."""
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("etl_orders")

    assert "alerts@example.com" in dag.default_args.get("email", [])
    assert dag.default_args.get("email_on_failure") is True

CI Pipeline for Airflow DAGs

# .github/workflows/dags.yml
name: Test DAGs

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        options: >-
          --health-cmd pg_isready
          --health-interval 10s

    env:
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost/airflow
      AIRFLOW__CORE__LOAD_EXAMPLES: "false"
      AIRFLOW__CORE__EXECUTOR: SequentialExecutor

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install Airflow
        run: |
          pip install apache-airflow==2.9.0 --constraint \
            "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.12.txt"
          pip install -r requirements-test.txt

      - name: Initialize Airflow DB
        run: airflow db init

      - name: Run tests
        run: pytest tests/ -v --tb=short

End-to-End DAG Testing

Unit tests verify that individual operators produce correct output. End-to-end tests verify that the full pipeline — from trigger to final data state — produces the right result. HelpMeTest can verify the business-level outcomes of Airflow runs:

Scenario: daily ETL completes successfully
  Given the ETL DAG ran at midnight
  When all tasks complete
  Then the warehouse contains today's order totals
  And the daily report is available in the dashboard
  And no alerts were sent

This catches failures in the data that unit tests — which mock the warehouse client — will never see.

Key Takeaways

  • Always test DAG integrity first: DagBag loads your DAGs and surfaces import errors that are invisible in the Airflow UI
  • Extract operator callables and test them as regular Python functions — no Airflow needed for pure logic tests
  • Use os.environ in conftest.py to configure a lightweight test database (SQLite works for most tests)
  • Mock Airflow Variables and Connections in fixtures rather than reading from real Airflow installs
  • Run dag.test() (Airflow 2.5+) for integration-level DAG runs without triggering Celery workers

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest