Async Python Database Testing: asyncpg, pytest, Connection Pooling, and Testcontainers

Async Python Database Testing: asyncpg, pytest, Connection Pooling, and Testcontainers

asyncpg is the fastest PostgreSQL driver for Python—but testing async database code correctly requires more setup than the synchronous equivalent. This guide covers the full testing stack: spinning up real PostgreSQL via Testcontainers, managing async connection pools in tests, and isolating tests without sacrificing speed.

Why asyncpg Testing Is Different

asyncpg bypasses the DB-API 2.0 interface entirely. It doesn't work with SQLAlchemy's ORM layer by default (you need asyncpg + sqlalchemy[asyncio] for that). Testing raw asyncpg code means working with:

  • asyncpg.Connection objects (not sessions)
  • asyncpg.Pool for connection pooling
  • Binary protocol responses (not dict-like rows by default)

The patterns for testing SQLAlchemy async sessions don't directly apply here.

Stack Requirements

pip install asyncpg pytest pytest-asyncio testcontainers[postgres]

Add to pyproject.toml:

[tool.pytest.ini_options]
asyncio_mode = "auto"

Real PostgreSQL with Testcontainers

Never mock asyncpg. The binary protocol, row factories, and connection pool behavior are too complex to mock correctly—mocks will pass tests that fail in production. Use a real PostgreSQL instance via Testcontainers:

# conftest.py
import asyncio
import pytest
import pytest_asyncio
import asyncpg
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="session")
def postgres_container():
    with PostgresContainer("postgres:16-alpine") as pg:
        yield pg

@pytest.fixture(scope="session")
def db_url(postgres_container):
    return postgres_container.get_connection_url().replace("postgresql+psycopg2://", "")

The scope="session" means one container starts for the entire test run—much faster than starting a fresh container per test.

Creating the Schema

Apply your schema once per session using asyncpg directly:

@pytest_asyncio.fixture(scope="session")
async def db_pool(db_url):
    pool = await asyncpg.create_pool(db_url, min_size=2, max_size=10)

    async with pool.acquire() as conn:
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                email TEXT UNIQUE NOT NULL,
                name TEXT NOT NULL,
                created_at TIMESTAMPTZ DEFAULT now()
            );

            CREATE TABLE IF NOT EXISTS orders (
                id SERIAL PRIMARY KEY,
                user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
                total NUMERIC(10,2) NOT NULL,
                created_at TIMESTAMPTZ DEFAULT now()
            );
        """)

    yield pool
    await pool.close()

For real projects, run your migrations in the fixture instead:

async with pool.acquire() as conn:
    # Run Alembic migrations programmatically
    from alembic.config import Config
    from alembic import command
    alembic_cfg = Config("alembic.ini")
    command.upgrade(alembic_cfg, "head")

Test Isolation with Savepoints

The transactional rollback pattern from SQLAlchemy translates to asyncpg savepoints:

@pytest_asyncio.fixture
async def db_conn(db_pool):
    async with db_pool.acquire() as conn:
        transaction = conn.transaction()
        await transaction.start()
        yield conn
        await transaction.rollback()

Each test gets a connection inside a transaction. When the fixture teardown runs, the transaction rolls back—leaving the database clean for the next test. Tests can even call COMMIT—the outer transaction rollback undoes all nested commits.

Important caveat: PostgreSQL doesn't support nested transactions by default. If your application code calls BEGIN inside the test, you'll get an error. Work around this with savepoints:

@pytest_asyncio.fixture
async def db_conn(db_pool):
    async with db_pool.acquire() as conn:
        await conn.execute("BEGIN")
        await conn.execute("SAVEPOINT test_start")
        yield conn
        await conn.execute("ROLLBACK TO SAVEPOINT test_start")
        await conn.execute("ROLLBACK")

Writing Async Tests with asyncpg

asyncpg returns Record objects, not dicts. Access fields by name:

async def test_create_user(db_conn):
    await db_conn.execute(
        "INSERT INTO users (email, name) VALUES ($1, $2)",
        "test@example.com", "Test User"
    )

    row = await db_conn.fetchrow(
        "SELECT * FROM users WHERE email = $1",
        "test@example.com"
    )

    assert row["name"] == "Test User"
    assert row["email"] == "test@example.com"
    assert row["id"] is not None

async def test_fetch_multiple_rows(db_conn):
    emails = [f"user{i}@example.com" for i in range(5)]
    for email in emails:
        await db_conn.execute("INSERT INTO users (email, name) VALUES ($1, $2)", email, "User")

    rows = await db_conn.fetch("SELECT email FROM users ORDER BY email")
    assert len(rows) == 5
    assert rows[0]["email"] == "user0@example.com"

Testing Connection Pool Behavior

Connection pool behavior is often untested—and pool exhaustion causes production incidents. Test it explicitly:

async def test_pool_handles_concurrent_requests(db_pool):
    """Verify pool serves concurrent requests without deadlock."""
    async def fetch_one(pool):
        async with pool.acquire() as conn:
            return await conn.fetchval("SELECT 1")

    results = await asyncio.gather(*[fetch_one(db_pool) for _ in range(20)])
    assert all(r == 1 for r in results)

async def test_pool_releases_connections_after_error(db_pool):
    """Verify connections return to pool even after query errors."""
    initial_free = db_pool.get_idle_size()

    with pytest.raises(asyncpg.UndefinedTableError):
        async with db_pool.acquire() as conn:
            await conn.fetch("SELECT * FROM nonexistent_table")

    # Connection should be back in the pool
    assert db_pool.get_idle_size() >= initial_free

Testing Transactions and Rollbacks

Test that your application correctly handles transaction boundaries:

async def test_transaction_rollback_on_error(db_conn):
    # Insert a user
    await db_conn.execute("INSERT INTO users (email, name) VALUES ($1, $2)", "tx@test.com", "TX User")
    user = await db_conn.fetchrow("SELECT id FROM users WHERE email = $1", "tx@test.com")
    user_id = user["id"]

    # Attempt order with invalid constraint
    try:
        async with db_conn.transaction():
            await db_conn.execute(
                "INSERT INTO orders (user_id, total) VALUES ($1, $2)",
                user_id, 100.00
            )
            # Force a constraint violation
            await db_conn.execute(
                "INSERT INTO orders (user_id, total) VALUES ($1, $2)",
                99999, 50.00  # non-existent user_id
            )
    except asyncpg.ForeignKeyViolationError:
        pass

    # Verify rollback: no orders exist for this user
    count = await db_conn.fetchval(
        "SELECT COUNT(*) FROM orders WHERE user_id = $1",
        user_id
    )
    assert count == 0

Custom Row Type Factories

asyncpg lets you register custom type codecs. Test that your type registrations work:

async def test_custom_json_codec(db_pool):
    import json

    async with db_pool.acquire() as conn:
        await conn.set_type_codec(
            "jsonb",
            encoder=json.dumps,
            decoder=json.loads,
            schema="pg_catalog"
        )

        await conn.execute("""
            CREATE TEMP TABLE json_test (data JSONB)
        """)
        await conn.execute("INSERT INTO json_test VALUES ($1::jsonb)", '{"key": "value"}')

        row = await conn.fetchrow("SELECT data FROM json_test")
        assert row["data"]["key"] == "value"
        assert isinstance(row["data"], dict)

Parameterized Tests Across Multiple Queries

Use pytest.mark.parametrize for testing the same logic across multiple SQL patterns:

@pytest.mark.parametrize("email,name,expected_name", [
    ("alice@test.com", "Alice", "Alice"),
    ("BOB@TEST.COM", "Bob", "Bob"),
    ("charlie+tag@test.com", "Charlie", "Charlie"),
])
async def test_user_creation_variants(db_conn, email, name, expected_name):
    await db_conn.execute(
        "INSERT INTO users (email, name) VALUES ($1, $2)",
        email, name
    )
    row = await db_conn.fetchrow("SELECT name FROM users WHERE email = $1", email)
    assert row["name"] == expected_name

Performance: Measuring Query Time

Slow queries cause test flakiness. Assert query duration in performance-sensitive paths:

import time

async def test_user_lookup_by_email_is_fast(db_conn):
    await db_conn.execute("INSERT INTO users (email, name) VALUES ($1, $2)", "perf@test.com", "Perf")
    
    start = time.perf_counter()
    for _ in range(100):
        await db_conn.fetchrow("SELECT * FROM users WHERE email = $1", "perf@test.com")
    elapsed = time.perf_counter() - start

    # 100 indexed lookups should complete in under 500ms
    assert elapsed < 0.5, f"Query too slow: {elapsed:.3f}s for 100 lookups"

This catches missing indexes before they become production incidents.

Integrating Monitoring with HelpMeTest

Async database tests verify correctness at the unit level. HelpMeTest adds the monitoring layer: scheduled end-to-end tests that exercise your database-backed APIs from the outside, 24/7. When a connection pool exhausts in production or a migration breaks a query, HelpMeTest catches it before your users do.

Summary

Async Python database testing with asyncpg requires:

  1. Real PostgreSQL via Testcontainers—never mock the driver
  2. Session-scoped pool with schema creation once per test run
  3. Transaction rollback fixtures for test isolation without schema recreation
  4. Explicit pool behavior tests for concurrency and error recovery
  5. Savepoints when your application code uses nested transactions
  6. Type codec tests for custom JSONB, UUID, or domain type registrations

The setup investment pays off in a test suite that actually catches real bugs—not one that passes because mocks return what you told them to return.

Read more