Testing Alembic Migrations: Ensuring Safe Database Upgrades in Python

Testing Alembic Migrations: Ensuring Safe Database Upgrades in Python

Alembic is the standard database migration tool for SQLAlchemy applications, but its revision system is easy to break silently. This post covers a complete pytest-based testing strategy: upgrade and downgrade testing with real databases, Testcontainers for isolated environments, SQLAlchemy fixture patterns, and idempotency verification to catch the errors that only show up under real conditions.

Key Takeaways

Test both upgrade() and downgrade() for every revision. Most teams test the forward path only. A broken downgrade means you can't roll back a failed deployment — often the worst time to discover that.

Use Testcontainers to test against the exact database version running in production. SQLite in-memory is tempting for speed but hides dialect issues, missing PostgreSQL features, and constraint behavior differences that will bite you in production.

Idempotency testing catches a common data migration mistake. Running a data migration twice on the same dataset should produce the same result — without duplicating rows or raising constraint errors. Surprisingly few teams test for this.

Alembic's Revision System and Its Failure Modes

Alembic uses a linked-list of revision files, each with an up_revision that points to its parent. When you run alembic upgrade head, Alembic walks this chain from the current database revision to the latest, applying each upgrade() function in order. alembic downgrade -1 walks one step back.

This architecture is clean, but it creates specific failure modes that tests must cover:

Branch divergence: Two developers create revisions on different branches. When merged, Alembic has two heads and refuses to migrate. You detect this by checking alembic heads — any output with more than one line is a problem.

Missing downgrade functions: Alembic generates a stub downgrade() function with pass. If nobody writes the actual downgrade SQL, rolling back a failed deployment does nothing.

Data-dependent upgrade failures: An upgrade() that adds a NOT NULL column without a default will fail on any table with existing rows. This works fine on an empty test database and fails in production.

Revision chain corruption: A revision file that references the wrong down_revision creates a gap in the chain. Alembic will silently skip migrations between the gap.

A proper test suite catches all of these.

Project Setup and Alembic Configuration

A typical project structure:

myapp/
  alembic/
    env.py
    script.py.mako
    versions/
      001_create_users.py
      002_add_email_index.py
      003_create_orders.py
  alembic.ini
  models.py
  tests/
    conftest.py
    test_migrations.py
    test_data_migrations.py

The alembic.ini:

[alembic]
script_location = alembic
sqlalchemy.url = postgresql+psycopg2://testuser:testpass@localhost:5432/testdb

A migration revision file:

# alembic/versions/001_create_users.py
"""create users table

Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2024-05-15 12:00:00.000000

"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa

revision: str = 'a1b2c3d4e5f6'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    op.create_table(
        'users',
        sa.Column('id', sa.BigInteger(), nullable=False),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('subscription_tier', sa.String(20), nullable=False,
                  server_default='free'),
        sa.Column('created_at', sa.DateTime(timezone=True), nullable=False,
                  server_default=sa.text('CURRENT_TIMESTAMP')),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email', name='uq_users_email'),
    )
    op.create_index('idx_users_email', 'users', ['email'], unique=True)


def downgrade() -> None:
    op.drop_index('idx_users_email', table_name='users')
    op.drop_table('users')

pytest Fixtures for Migration Testing

The foundation of Alembic testing is a reliable pytest fixture that creates a clean database per test:

# tests/conftest.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker


def get_alembic_config(db_url: str) -> Config:
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", db_url)
    return alembic_cfg


@pytest.fixture(scope="session")
def db_url():
    """Override with environment variable or Testcontainers URL."""
    return "postgresql+psycopg2://testuser:testpass@localhost:5432/testdb"


@pytest.fixture(scope="function")
def migrated_db(db_url):
    """Apply all migrations, yield engine, then drop all tables."""
    engine = create_engine(db_url)

    alembic_cfg = get_alembic_config(db_url)
    command.upgrade(alembic_cfg, "head")

    yield engine

    # Clean up: downgrade to base after each test
    command.downgrade(alembic_cfg, "base")
    engine.dispose()


@pytest.fixture(scope="function")
def db_session(migrated_db):
    """SQLAlchemy session bound to the migrated database."""
    Session = sessionmaker(bind=migrated_db)
    session = Session()
    yield session
    session.close()

Testing upgrade() and downgrade() for Every Revision

The most critical test validates the full revision chain in both directions:

# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from sqlalchemy import create_engine, inspect, text
from tests.conftest import get_alembic_config


def get_all_revisions(alembic_cfg: Config) -> list[str]:
    """Return all revision IDs in upgrade order."""
    script = ScriptDirectory.from_config(alembic_cfg)
    revisions = []
    for rev in script.walk_revisions():
        revisions.append(rev.revision)
    return list(reversed(revisions))


def test_upgrade_all_revisions(db_url):
    """All migrations apply from base to head without errors."""
    alembic_cfg = get_alembic_config(db_url)
    engine = create_engine(db_url)

    command.downgrade(alembic_cfg, "base")
    command.upgrade(alembic_cfg, "head")

    # Verify alembic_version table exists and has a version
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version_num FROM alembic_version"))
        version = result.scalar()
        assert version is not None, "No version recorded after upgrade"

    engine.dispose()


def test_downgrade_all_revisions(db_url):
    """All migrations can be reversed from head to base."""
    alembic_cfg = get_alembic_config(db_url)
    engine = create_engine(db_url)

    # Start from head
    command.upgrade(alembic_cfg, "head")

    # Walk back step by step
    revisions = get_all_revisions(alembic_cfg)
    for _ in revisions:
        command.downgrade(alembic_cfg, "-1")

    # Verify database is at base (no version recorded)
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT count(*) FROM alembic_version"))
        count = result.scalar()
        assert count == 0, f"Expected 0 versions at base, got {count}"

    engine.dispose()


def test_each_revision_upgrades_and_downgrades(db_url):
    """Test upgrade+downgrade for each individual revision."""
    alembic_cfg = get_alembic_config(db_url)
    revisions = get_all_revisions(alembic_cfg)

    command.downgrade(alembic_cfg, "base")

    for revision in revisions:
        # Upgrade to this revision
        command.upgrade(alembic_cfg, revision)

        # Downgrade one step
        command.downgrade(alembic_cfg, "-1")

        # Upgrade back to continue the chain
        command.upgrade(alembic_cfg, revision)

    # Final state should be at head
    engine = create_engine(db_url)
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version_num FROM alembic_version"))
        current = result.scalar()
        assert current == revisions[-1], \
            f"Expected head revision {revisions[-1]}, got {current}"
    engine.dispose()

Checking for Revision Chain Integrity

These tests catch common structural problems in the revision chain:

def test_no_multiple_heads(db_url):
    """Merged branches sometimes create multiple heads — detect before deploy."""
    alembic_cfg = get_alembic_config(db_url)
    script = ScriptDirectory.from_config(alembic_cfg)
    heads = script.get_heads()

    assert len(heads) == 1, (
        f"Multiple migration heads detected: {heads}. "
        "Run 'alembic merge heads' to resolve."
    )


def test_all_revisions_have_downgrade(db_url):
    """Every revision must have a real downgrade function."""
    alembic_cfg = get_alembic_config(db_url)
    script = ScriptDirectory.from_config(alembic_cfg)

    missing_downgrade = []
    for rev in script.walk_revisions():
        # Inspect the downgrade function's source
        import inspect
        source = inspect.getsource(rev.module.downgrade)
        # A stub downgrade contains only 'pass'
        lines = [l.strip() for l in source.split('\n')
                 if l.strip() and not l.strip().startswith('#')
                 and not l.strip().startswith('def ')]
        if lines == ['pass']:
            missing_downgrade.append(rev.revision)

    assert not missing_downgrade, (
        f"Revisions with stub downgrade (pass only): {missing_downgrade}"
    )


def test_revision_chain_is_linear(db_url):
    """Verify each revision has exactly one parent (no unexpected merges)."""
    alembic_cfg = get_alembic_config(db_url)
    script = ScriptDirectory.from_config(alembic_cfg)

    for rev in script.walk_revisions():
        if rev.down_revision is not None:
            assert not isinstance(rev.down_revision, tuple), (
                f"Revision {rev.revision} has multiple parents: "
                f"{rev.down_revision}. Was this merge intentional?"
            )

Testcontainers for Python

Testing against the real database engine is non-negotiable for catching dialect issues. The testcontainers Python library makes this straightforward:

pip install testcontainers[postgres]
# tests/conftest.py (Testcontainers version)
import pytest
from testcontainers.postgres import PostgresContainer
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine


@pytest.fixture(scope="session")
def postgres_container():
    """Start a PostgreSQL container for the entire test session."""
    with PostgresContainer("postgres:16-alpine") as postgres:
        yield postgres


@pytest.fixture(scope="session")
def db_url(postgres_container):
    """Database URL from the running container."""
    return postgres_container.get_connection_url()


@pytest.fixture(scope="function")
def migrated_db(db_url):
    """Apply all migrations, yield engine, clean up after test."""
    engine = create_engine(db_url)
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", db_url)

    command.downgrade(alembic_cfg, "base")
    command.upgrade(alembic_cfg, "head")

    yield engine

    command.downgrade(alembic_cfg, "base")
    engine.dispose()

With the scope="session" container, PostgreSQL starts once for the entire test run and each test gets a clean schema by running downgrade/upgrade. The container startup cost (typically 3–8 seconds) is paid once.

For MySQL:

from testcontainers.mysql import MySqlContainer

@pytest.fixture(scope="session")
def mysql_container():
    with MySqlContainer("mysql:8.0") as mysql:
        yield mysql

SQLAlchemy Test Fixtures for Data Migration Tests

When testing data migrations, you need to insert rows in the state they'd be in before the migration runs, then verify the migration transformed them correctly.

# alembic/versions/004_backfill_display_names.py
"""backfill display_names

Revision ID: d4e5f6a7b8c9
Revises: c3d4e5f6a7b8
"""
from alembic import op
import sqlalchemy as sa

revision = 'd4e5f6a7b8c9'
down_revision = 'c3d4e5f6a7b8'


def upgrade() -> None:
    # Add the new column
    op.add_column('users',
        sa.Column('display_name', sa.String(255), nullable=True))

    # Backfill from first_name + last_name
    op.execute("""
        UPDATE users
        SET display_name = TRIM(COALESCE(first_name, '') || ' ' || COALESCE(last_name, ''))
        WHERE display_name IS NULL
    """)


def downgrade() -> None:
    op.drop_column('users', 'display_name')
# tests/test_data_migrations.py
import pytest
from alembic import command
from sqlalchemy import text
from tests.conftest import get_alembic_config


def test_display_name_backfill(db_url):
    """Backfill migration sets display_name from first_name + last_name."""
    alembic_cfg = get_alembic_config(db_url)
    engine = create_engine(db_url)

    # Apply migrations up to the revision BEFORE the backfill
    command.downgrade(alembic_cfg, "base")
    command.upgrade(alembic_cfg, "c3d4e5f6a7b8")  # one before d4e5f6a7b8c9

    # Insert test data in the pre-migration schema state
    with engine.connect() as conn:
        conn.execute(text("""
            INSERT INTO users (id, email, first_name, last_name, created_at)
            VALUES
              (1, 'alice@example.com', 'Alice', 'Smith', NOW()),
              (2, 'bob@example.com', 'Bob', NULL, NOW()),
              (3, 'anon@example.com', NULL, NULL, NOW())
        """))
        conn.commit()

    # Apply the backfill migration
    command.upgrade(alembic_cfg, "d4e5f6a7b8c9")

    # Verify results
    with engine.connect() as conn:
        rows = conn.execute(
            text("SELECT id, display_name FROM users ORDER BY id")
        ).fetchall()

    assert rows[0].display_name == "Alice Smith"
    assert rows[1].display_name == "Bob"
    assert rows[2].display_name == ""  # both NULL → empty string

    engine.dispose()

Idempotency Testing

A well-written data migration should be safe to run twice — it should produce the same result without duplicating data or raising constraint errors. This is especially important when migrations run as part of rolling deployments where a migration might be applied to some replicas before others.

def test_migration_is_idempotent(db_url):
    """Running the migration twice produces the same result as running it once."""
    alembic_cfg = get_alembic_config(db_url)
    engine = create_engine(db_url)

    # Apply all migrations
    command.upgrade(alembic_cfg, "head")

    # Insert some data
    with engine.connect() as conn:
        conn.execute(text("""
            INSERT INTO users (id, email, first_name, last_name, created_at)
            VALUES (1, 'alice@example.com', 'Alice', 'Smith', NOW())
        """))
        conn.commit()

    # Run the backfill migration logic directly (simulate re-run)
    with engine.connect() as conn:
        conn.execute(text("""
            UPDATE users
            SET display_name = TRIM(
                COALESCE(first_name, '') || ' ' || COALESCE(last_name, ''))
            WHERE display_name IS NULL OR display_name = ''
        """))
        conn.commit()

    # Run it again
    with engine.connect() as conn:
        conn.execute(text("""
            UPDATE users
            SET display_name = TRIM(
                COALESCE(first_name, '') || ' ' || COALESCE(last_name, ''))
            WHERE display_name IS NULL OR display_name = ''
        """))
        conn.commit()

    # Verify no duplicates, same results
    with engine.connect() as conn:
        count = conn.execute(
            text("SELECT COUNT(*) FROM users")).scalar()
        display_name = conn.execute(
            text("SELECT display_name FROM users WHERE id = 1")).scalar()

    assert count == 1, f"Expected 1 user, got {count} — possible duplicate"
    assert display_name == "Alice Smith"

    engine.dispose()

For migrations that use INSERT ... ON CONFLICT DO NOTHING or INSERT ... ON CONFLICT DO UPDATE, idempotency is baked in. Test that the conflict handling works as expected:

def test_upsert_migration_handles_conflicts(db_url):
    """Migration using ON CONFLICT does not fail when run on pre-populated table."""
    # ... setup and apply migration
    # Insert a row that would conflict
    with engine.connect() as conn:
        conn.execute(text("""
            INSERT INTO user_preferences (user_id, key, value)
            VALUES (1, 'theme', 'dark')
            ON CONFLICT (user_id, key) DO NOTHING
        """))
        conn.commit()

    # Run the seed migration logic that inserts the same row
    # Should not raise UniqueViolation
    with engine.connect() as conn:
        conn.execute(text("""
            INSERT INTO user_preferences (user_id, key, value)
            SELECT id, 'theme', 'light'
            FROM users
            ON CONFLICT (user_id, key) DO NOTHING
        """))
        conn.commit()

    # Verify original value was preserved (not overwritten)
    with engine.connect() as conn:
        value = conn.execute(text("""
            SELECT value FROM user_preferences
            WHERE user_id = 1 AND key = 'theme'
        """)).scalar()
    assert value == "dark"

Schema Comparison After Migration

After applying all migrations, compare the resulting schema against your SQLAlchemy model definitions to catch drift:

from sqlalchemy import create_engine, inspect
from myapp.models import Base  # your SQLAlchemy declarative base


def test_schema_matches_sqlalchemy_models(db_url):
    """Database schema after migrations matches SQLAlchemy model definitions."""
    engine = create_engine(db_url)
    inspector = inspect(engine)

    db_tables = set(inspector.get_table_names())
    model_tables = {t.name for t in Base.metadata.sorted_tables}

    # Every model should have a corresponding table
    missing_tables = model_tables - db_tables
    assert not missing_tables, (
        f"Tables in models but not in database: {missing_tables}. "
        "Did you forget a migration?"
    )

    # Check columns for each table
    for table in Base.metadata.sorted_tables:
        db_columns = {
            c["name"]: c for c in inspector.get_columns(table.name)
        }
        model_columns = {c.name: c for c in table.columns}

        missing_columns = set(model_columns.keys()) - set(db_columns.keys())
        assert not missing_columns, (
            f"Table {table.name}: columns in model but missing in DB: "
            f"{missing_columns}"
        )

    engine.dispose()

This test catches the common mistake of adding a column to a SQLAlchemy model without writing the corresponding Alembic migration.

CI Integration

A complete GitHub Actions workflow for Alembic migration testing:

name: Alembic Migration Tests

on:
  push:
    paths:
      - 'alembic/versions/**'
      - 'tests/test_migrations*'
  pull_request:

jobs:
  migration-tests:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:16-alpine
        env:
          POSTGRES_DB: testdb
          POSTGRES_USER: testuser
          POSTGRES_PASSWORD: testpass
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'
          cache: 'pip'

      - name: Install dependencies
        run: pip install -r requirements-test.txt

      - name: Check for multiple heads
        run: |
          HEAD_COUNT=$(alembic heads | wc -l)
          if [ "$HEAD_COUNT" -gt 1 ]; then
            echo "ERROR: Multiple migration heads detected"
            alembic heads
            exit 1
          fi

      - name: Run migration tests
        run: pytest tests/test_migrations.py tests/test_data_migrations.py -v
        env:
          SQLALCHEMY_URL: postgresql+psycopg2://testuser:testpass@localhost:5432/testdb

      - name: Verify schema matches models
        run: pytest tests/test_schema_integrity.py -v
        env:
          SQLALCHEMY_URL: postgresql+psycopg2://testuser:testpass@localhost:5432/testdb

Running Tests Locally Without Docker

For quick local iteration without Docker, SQLite (with dialect warnings expected) is still useful for the structural tests:

@pytest.fixture(scope="function")
def sqlite_migrated_db():
    """Fast in-memory SQLite for structural tests only."""
    engine = create_engine("sqlite:///:memory:")
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", "sqlite:///:memory:")

    # Note: SQLite has limited ALTER TABLE support
    # Only use this for chain structure tests, not dialect tests
    command.upgrade(alembic_cfg, "head")
    yield engine
    command.downgrade(alembic_cfg, "base")
    engine.dispose()

Use SQLite for fast chain structure and revision integrity tests, and Testcontainers PostgreSQL for data migration and schema validation tests.

Summary: The Complete Test Pyramid

Test Type Speed Tool What It Catches
Chain integrity < 1s alembic CLI Multiple heads, missing downgrade stubs
Structural tests 1–5s SQLite + pytest Revision chain order, model/schema sync
Integration tests 30–60s Testcontainers + pytest Dialect issues, constraint behavior
Data migration tests 30–60s Testcontainers + pytest Backfill correctness, idempotency
Full up/down cycle 60–120s Testcontainers + pytest End-to-end rollback safety

Run chain integrity and structural tests in pre-commit hooks. Run everything in CI on every migration change. Never ship a migration to staging without a green full up/down cycle test.


HelpMeTest can run your Alembic migration tests automatically on every pull request, catching upgrade and rollback failures before they reach production — sign up free

Read more