Async Python Database Testing: asyncpg, pytest, Connection Pooling, and Testcontainers
asyncpg is the fastest PostgreSQL driver for Python—but testing async database code correctly requires more setup than the synchronous equivalent. This guide covers the full testing stack: spinning up real PostgreSQL via Testcontainers, managing async connection pools in tests, and isolating tests without sacrificing speed.
Why asyncpg Testing Is Different
asyncpg bypasses the DB-API 2.0 interface entirely. It doesn't work with SQLAlchemy's ORM layer by default (you need asyncpg + sqlalchemy[asyncio] for that). Testing raw asyncpg code means working with:
asyncpg.Connectionobjects (not sessions)asyncpg.Poolfor connection pooling- Binary protocol responses (not dict-like rows by default)
The patterns for testing SQLAlchemy async sessions don't directly apply here.
Stack Requirements
pip install asyncpg pytest pytest-asyncio testcontainers[postgres]Add to pyproject.toml:
[tool.pytest.ini_options]
asyncio_mode = "auto"Real PostgreSQL with Testcontainers
Never mock asyncpg. The binary protocol, row factories, and connection pool behavior are too complex to mock correctly—mocks will pass tests that fail in production. Use a real PostgreSQL instance via Testcontainers:
# conftest.py
import asyncio
import pytest
import pytest_asyncio
import asyncpg
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="session")
def postgres_container():
with PostgresContainer("postgres:16-alpine") as pg:
yield pg
@pytest.fixture(scope="session")
def db_url(postgres_container):
return postgres_container.get_connection_url().replace("postgresql+psycopg2://", "")The scope="session" means one container starts for the entire test run—much faster than starting a fresh container per test.
Creating the Schema
Apply your schema once per session using asyncpg directly:
@pytest_asyncio.fixture(scope="session")
async def db_pool(db_url):
pool = await asyncpg.create_pool(db_url, min_size=2, max_size=10)
async with pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
total NUMERIC(10,2) NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
""")
yield pool
await pool.close()For real projects, run your migrations in the fixture instead:
async with pool.acquire() as conn:
# Run Alembic migrations programmatically
from alembic.config import Config
from alembic import command
alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")Test Isolation with Savepoints
The transactional rollback pattern from SQLAlchemy translates to asyncpg savepoints:
@pytest_asyncio.fixture
async def db_conn(db_pool):
async with db_pool.acquire() as conn:
transaction = conn.transaction()
await transaction.start()
yield conn
await transaction.rollback()Each test gets a connection inside a transaction. When the fixture teardown runs, the transaction rolls back—leaving the database clean for the next test. Tests can even call COMMIT—the outer transaction rollback undoes all nested commits.
Important caveat: PostgreSQL doesn't support nested transactions by default. If your application code calls BEGIN inside the test, you'll get an error. Work around this with savepoints:
@pytest_asyncio.fixture
async def db_conn(db_pool):
async with db_pool.acquire() as conn:
await conn.execute("BEGIN")
await conn.execute("SAVEPOINT test_start")
yield conn
await conn.execute("ROLLBACK TO SAVEPOINT test_start")
await conn.execute("ROLLBACK")Writing Async Tests with asyncpg
asyncpg returns Record objects, not dicts. Access fields by name:
async def test_create_user(db_conn):
await db_conn.execute(
"INSERT INTO users (email, name) VALUES ($1, $2)",
"test@example.com", "Test User"
)
row = await db_conn.fetchrow(
"SELECT * FROM users WHERE email = $1",
"test@example.com"
)
assert row["name"] == "Test User"
assert row["email"] == "test@example.com"
assert row["id"] is not None
async def test_fetch_multiple_rows(db_conn):
emails = [f"user{i}@example.com" for i in range(5)]
for email in emails:
await db_conn.execute("INSERT INTO users (email, name) VALUES ($1, $2)", email, "User")
rows = await db_conn.fetch("SELECT email FROM users ORDER BY email")
assert len(rows) == 5
assert rows[0]["email"] == "user0@example.com"Testing Connection Pool Behavior
Connection pool behavior is often untested—and pool exhaustion causes production incidents. Test it explicitly:
async def test_pool_handles_concurrent_requests(db_pool):
"""Verify pool serves concurrent requests without deadlock."""
async def fetch_one(pool):
async with pool.acquire() as conn:
return await conn.fetchval("SELECT 1")
results = await asyncio.gather(*[fetch_one(db_pool) for _ in range(20)])
assert all(r == 1 for r in results)
async def test_pool_releases_connections_after_error(db_pool):
"""Verify connections return to pool even after query errors."""
initial_free = db_pool.get_idle_size()
with pytest.raises(asyncpg.UndefinedTableError):
async with db_pool.acquire() as conn:
await conn.fetch("SELECT * FROM nonexistent_table")
# Connection should be back in the pool
assert db_pool.get_idle_size() >= initial_freeTesting Transactions and Rollbacks
Test that your application correctly handles transaction boundaries:
async def test_transaction_rollback_on_error(db_conn):
# Insert a user
await db_conn.execute("INSERT INTO users (email, name) VALUES ($1, $2)", "tx@test.com", "TX User")
user = await db_conn.fetchrow("SELECT id FROM users WHERE email = $1", "tx@test.com")
user_id = user["id"]
# Attempt order with invalid constraint
try:
async with db_conn.transaction():
await db_conn.execute(
"INSERT INTO orders (user_id, total) VALUES ($1, $2)",
user_id, 100.00
)
# Force a constraint violation
await db_conn.execute(
"INSERT INTO orders (user_id, total) VALUES ($1, $2)",
99999, 50.00 # non-existent user_id
)
except asyncpg.ForeignKeyViolationError:
pass
# Verify rollback: no orders exist for this user
count = await db_conn.fetchval(
"SELECT COUNT(*) FROM orders WHERE user_id = $1",
user_id
)
assert count == 0Custom Row Type Factories
asyncpg lets you register custom type codecs. Test that your type registrations work:
async def test_custom_json_codec(db_pool):
import json
async with db_pool.acquire() as conn:
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog"
)
await conn.execute("""
CREATE TEMP TABLE json_test (data JSONB)
""")
await conn.execute("INSERT INTO json_test VALUES ($1::jsonb)", '{"key": "value"}')
row = await conn.fetchrow("SELECT data FROM json_test")
assert row["data"]["key"] == "value"
assert isinstance(row["data"], dict)Parameterized Tests Across Multiple Queries
Use pytest.mark.parametrize for testing the same logic across multiple SQL patterns:
@pytest.mark.parametrize("email,name,expected_name", [
("alice@test.com", "Alice", "Alice"),
("BOB@TEST.COM", "Bob", "Bob"),
("charlie+tag@test.com", "Charlie", "Charlie"),
])
async def test_user_creation_variants(db_conn, email, name, expected_name):
await db_conn.execute(
"INSERT INTO users (email, name) VALUES ($1, $2)",
email, name
)
row = await db_conn.fetchrow("SELECT name FROM users WHERE email = $1", email)
assert row["name"] == expected_namePerformance: Measuring Query Time
Slow queries cause test flakiness. Assert query duration in performance-sensitive paths:
import time
async def test_user_lookup_by_email_is_fast(db_conn):
await db_conn.execute("INSERT INTO users (email, name) VALUES ($1, $2)", "perf@test.com", "Perf")
start = time.perf_counter()
for _ in range(100):
await db_conn.fetchrow("SELECT * FROM users WHERE email = $1", "perf@test.com")
elapsed = time.perf_counter() - start
# 100 indexed lookups should complete in under 500ms
assert elapsed < 0.5, f"Query too slow: {elapsed:.3f}s for 100 lookups"This catches missing indexes before they become production incidents.
Integrating Monitoring with HelpMeTest
Async database tests verify correctness at the unit level. HelpMeTest adds the monitoring layer: scheduled end-to-end tests that exercise your database-backed APIs from the outside, 24/7. When a connection pool exhausts in production or a migration breaks a query, HelpMeTest catches it before your users do.
Summary
Async Python database testing with asyncpg requires:
- Real PostgreSQL via Testcontainers—never mock the driver
- Session-scoped pool with schema creation once per test run
- Transaction rollback fixtures for test isolation without schema recreation
- Explicit pool behavior tests for concurrency and error recovery
- Savepoints when your application code uses nested transactions
- Type codec tests for custom JSONB, UUID, or domain type registrations
The setup investment pays off in a test suite that actually catches real bugs—not one that passes because mocks return what you told them to return.