Async Testing in FastAPI with pytest-asyncio: Endpoints, WebSockets, and Background Tasks
FastAPI is async-native. Endpoint functions declared with async def run in an async context, and testing them requires an async test client and an async test runner. pytest-asyncio combined with httpx.AsyncClient gives you everything you need. This guide covers async endpoint testing, database integration, WebSocket testing, and background task verification.
Why Async Testing Is Different
When you use TestClient (synchronous), it runs your async application inside a thread with anyio.from_thread.run_sync. This works for most cases but has important limitations:
- No true concurrency testing — concurrent requests don't actually run concurrently
- Async database clients fail —
asyncpg,motor,aioredisneed an event loop that the sync client doesn't provide cleanly - WebSocket testing is awkward — WebSocket connections are inherently async
AsyncClient from httpx solves all three. Tests run inside pytest-asyncio's event loop, your async dependencies work correctly, and WebSocket testing is natural.
Setup
pip install fastapi httpx pytest-asyncio anyio sqlalchemy[asyncio] aiosqliteConfigure pytest-asyncio mode in pytest.ini:
[pytest]
asyncio_mode = autoOr mark individual tests explicitly:
@pytest.mark.asyncio
async def test_something():
...The asyncio_mode = auto setting makes every async def test_* function automatically async — less boilerplate.
Basic Async TestClient
# tests/test_async_basic.py
import pytest
import httpx
from httpx import AsyncClient
from app.main import app
@pytest.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
async def test_health_check(async_client):
response = await async_client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
async def test_list_articles(async_client):
response = await async_client.get("/api/articles/")
assert response.status_code == 200
assert isinstance(response.json(), list)Async Database Integration
For async database testing with SQLAlchemy 2.0 + asyncpg (or aiosqlite for tests):
# app/database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session# tests/conftest.py
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import StaticPool
from httpx import AsyncClient
from app.main import app
from app.database import get_db, Base
# Use aiosqlite for in-memory async database in tests
TEST_DATABASE_URL = "sqlite+aiosqlite://"
test_engine = create_async_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestAsyncSessionLocal = async_sessionmaker(
test_engine,
expire_on_commit=False,
class_=AsyncSession
)
@pytest_asyncio.fixture(autouse=True)
async def setup_database():
"""Create tables before each test, drop after."""
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest_asyncio.fixture
async def db_session():
"""Provide an async test database session."""
async with TestAsyncSessionLocal() as session:
yield session
@pytest_asyncio.fixture
async def async_client(db_session):
"""AsyncClient with async database dependency overridden."""
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
app.dependency_overrides.clear()Tests using the async database:
# tests/test_articles_async.py
from app.models import Article
async def test_create_article(async_client, db_session):
payload = {
"title": "Async Article",
"content": "Written asynchronously",
"status": "draft"
}
response = await async_client.post(
"/api/articles/",
json=payload,
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Async Article"
assert "id" in data
async def test_list_only_published(async_client, db_session):
# Insert test data directly via async session
db_session.add_all([
Article(title="Published One", status="published", content="a"),
Article(title="Published Two", status="published", content="b"),
Article(title="Draft", status="draft", content="c"),
])
await db_session.commit()
response = await async_client.get("/api/articles/")
assert response.status_code == 200
titles = [a["title"] for a in response.json()]
assert "Published One" in titles
assert "Published Two" in titles
assert "Draft" not in titles
async def test_concurrent_requests_handled(async_client, db_session):
"""Test that concurrent requests don't interfere with each other."""
import asyncio
async def create_article(title):
return await async_client.post("/api/articles/", json={
"title": title,
"content": "Concurrent content",
"status": "draft"
}, headers={"Authorization": "Bearer test-token"})
# Fire 10 concurrent requests
responses = await asyncio.gather(*[
create_article(f"Concurrent Article {i}")
for i in range(10)
])
for response in responses:
assert response.status_code == 201
# Verify all 10 were created
list_response = await async_client.get("/api/articles/")
assert len(list_response.json()) == 10Async Dependency Overrides
# app/dependencies.py
async def get_cache():
"""Returns async Redis client."""
return await aioredis.from_url(settings.REDIS_URL)
async def get_current_user(token: str = Depends(oauth2_scheme)):
payload = await verify_jwt_token(token)
user = await get_user_by_id(payload["sub"])
if not user:
raise HTTPException(status_code=401)
return user# tests/conftest.py
from unittest.mock import AsyncMock
from app.dependencies import get_cache, get_current_user
class MockUser:
id = 1
username = "asynctest"
email = "async@test.com"
is_active = True
@pytest_asyncio.fixture
async def mock_cache():
mock = AsyncMock()
mock.get.return_value = None
mock.set.return_value = True
mock.delete.return_value = 1
app.dependency_overrides[get_cache] = lambda: mock
yield mock
app.dependency_overrides.pop(get_cache, None)
@pytest_asyncio.fixture
async def authenticated_async_client(async_client):
app.dependency_overrides[get_current_user] = lambda: MockUser()
yield async_client
# Override cleared by async_client fixture cleanupasync def test_cached_response(authenticated_async_client, mock_cache):
# First request — cache miss, fetches from DB
mock_cache.get.return_value = None
response = await authenticated_async_client.get("/api/articles/1")
assert response.status_code == 200
mock_cache.set.assert_called_once()
# Second request — cache hit
mock_cache.get.return_value = b'{"id": 1, "title": "Cached"}'
response = await authenticated_async_client.get("/api/articles/1")
assert response.status_code == 200
# Should not hit DB on second request
assert mock_cache.get.call_count == 2WebSocket Testing
FastAPI's TestClient supports WebSocket testing synchronously. For async WebSocket testing:
# app/main.py
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/chat/{room_id}")
async def websocket_chat(websocket: WebSocket, room_id: str):
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
# Broadcast or echo
await websocket.send_text(f"Room {room_id}: {message}")
except WebSocketDisconnect:
passTest WebSockets with the synchronous TestClient (easier):
from fastapi.testclient import TestClient
def test_websocket_echo():
with TestClient(app) as client:
with client.websocket_connect("/ws/chat/room1") as ws:
ws.send_text("Hello")
response = ws.receive_text()
assert response == "Room room1: Hello"
def test_websocket_multiple_messages():
with TestClient(app) as client:
with client.websocket_connect("/ws/chat/general") as ws:
for i in range(3):
ws.send_text(f"Message {i}")
response = ws.receive_text()
assert f"Message {i}" in response
def test_websocket_disconnect_handled():
"""Verify the server handles disconnection cleanly."""
with TestClient(app) as client:
with client.websocket_connect("/ws/chat/room2") as ws:
ws.send_text("First message")
ws.receive_text()
# Context manager exit triggers disconnect
# Test that subsequent connections still work
with client.websocket_connect("/ws/chat/room2") as ws:
ws.send_text("After disconnect")
response = ws.receive_text()
assert "After disconnect" in responseFor authenticated WebSockets:
# app/main.py
@app.websocket("/ws/notifications")
async def websocket_notifications(
websocket: WebSocket,
token: str = Query(...)
):
user = await verify_ws_token(token)
if not user:
await websocket.close(code=4001)
return
await websocket.accept()
# Send notifications for this user
...def test_websocket_requires_valid_token():
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/ws/notifications?token=invalid") as ws:
ws.receive_text()
def test_websocket_valid_token_accepted():
token = generate_test_token(user_id=1)
with TestClient(app) as client:
with client.websocket_connect(f"/ws/notifications?token={token}") as ws:
# Should be able to receive data
ws.send_text("ping")
response = ws.receive_text()
assert response is not NoneBackground Task Testing
# app/main.py
from fastapi import BackgroundTasks
async def process_article_async(article_id: int, db: AsyncSession):
"""Background task: generate summary, notify subscribers."""
article = await db.get(Article, article_id)
article.summary = await generate_ai_summary(article.content)
article.status = "processed"
await db.commit()
@app.post("/api/articles/{article_id}/process")
async def trigger_processing(
article_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
background_tasks.add_task(process_article_async, article_id, db)
return {"status": "processing_started"}FastAPI's BackgroundTasks run after the response is sent, but before the test client closes. In TestClient, they complete synchronously within the request cycle:
from unittest.mock import patch, AsyncMock
def test_background_task_triggered(client):
"""Verify background task is queued and runs."""
with patch("app.main.process_article_async") as mock_process:
mock_process.return_value = None
response = client.post("/api/articles/1/process")
assert response.status_code == 200
assert response.json()["status"] == "processing_started"
# Background tasks run before TestClient response completes
mock_process.assert_called_once_with(1, ANY)
async def test_background_task_completes(async_client, db_session):
"""Test the actual background task function."""
from app.tasks import process_article_async
article = Article(
title="Needs Processing",
content="Long content to summarize",
status="published"
)
db_session.add(article)
await db_session.commit()
await db_session.refresh(article)
with patch("app.tasks.generate_ai_summary") as mock_summarize:
mock_summarize.return_value = "AI-generated summary"
await process_article_async(article.id, db_session)
await db_session.refresh(article)
assert article.summary == "AI-generated summary"
assert article.status == "processed"Testing Server-Sent Events (SSE)
# app/main.py
from fastapi.responses import StreamingResponse
import asyncio
async def event_generator(topic: str):
for i in range(5):
yield f"data: Event {i} for {topic}\n\n"
await asyncio.sleep(0.01)
@app.get("/api/events/{topic}")
async def subscribe_events(topic: str):
return StreamingResponse(
event_generator(topic),
media_type="text/event-stream"
)async def test_sse_stream(async_client):
response = await async_client.get(
"/api/events/news",
headers={"Accept": "text/event-stream"}
)
assert response.status_code == 200
assert "text/event-stream" in response.headers["content-type"]
content = response.text
assert "Event 0 for news" in content
assert "Event 4 for news" in contentTesting Lifespan with Async Dependencies
# app/main.py
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
app.state.redis = await aioredis.from_url(REDIS_URL)
yield
# Shutdown
await app.state.db_pool.close()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)# tests/conftest.py
from unittest.mock import AsyncMock, patch
@pytest_asyncio.fixture
async def async_client():
mock_pool = AsyncMock()
mock_redis = AsyncMock()
with patch("app.main.asyncpg.create_pool", return_value=mock_pool), \
patch("app.main.aioredis.from_url", return_value=mock_redis):
async with AsyncClient(app=app, base_url="http://test") as client:
yield clientRunning Async Tests
# Run all tests including async
pytest
<span class="hljs-comment"># Verbose output
pytest -v
<span class="hljs-comment"># Run specific async test file
pytest tests/test_async_endpoints.py -v
<span class="hljs-comment"># Run with coverage
pytest --cov=app --cov-report=html
<span class="hljs-comment"># Run only WebSocket tests
pytest -k <span class="hljs-string">"websocket" -v
<span class="hljs-comment"># Run in parallel (async tests work with pytest-xdist)
pytest -n autoCommon Pitfalls
Event loop conflicts — don't create your own event loop in async tests. Let pytest-asyncio manage it.
# Wrong
async def test_something():
loop = asyncio.get_event_loop() # don't do this
result = loop.run_until_complete(some_coroutine())
# Right
async def test_something():
result = await some_coroutine()Fixture scope mismatches — async fixtures must use matching scopes. A session-scoped async fixture needs a session-scoped event loop.
# pytest.ini
[pytest]
asyncio_mode = auto
# For session-scoped async fixtures
@pytest_asyncio.fixture(scope="session")
async def session_resource():
resource = await create_expensive_resource()
yield resource
await resource.close()Not awaiting coroutines — any async def dependency must be await-ed or used with async with/async for.
Beyond Tests: Monitoring Async Applications in Production
Async FastAPI applications are often more complex in production than unit tests reveal. Connection pools drain under load, WebSocket connections accumulate, background tasks queue up. These runtime conditions don't appear in controlled test environments.
HelpMeTest monitors your live FastAPI application continuously — testing real endpoints, real WebSocket connections, and real API flows against your deployed infrastructure. When your async application starts failing under production load or after a dependency update, HelpMeTest catches it before your users do. Your pytest-asyncio suite proves correctness; HelpMeTest proves availability.
Summary
Async testing in FastAPI requires three things:
pytest-asynciowithasyncio_mode = auto— runs async test functions in an event loop automaticallyhttpx.AsyncClient— async HTTP client that works with FastAPI's ASGI interface- Async fixtures — using
@pytest_asyncio.fixturefor fixtures thatawaitoperations
Key patterns:
- Override async dependencies the same way as sync ones —
app.dependency_overrides[get_db] = override - Use
AsyncMock(notMagicMock) for async dependencies and coroutines - Test WebSockets with
TestClient.websocket_connect()— synchronous context manager, no async required - Background tasks run inline during
TestClientcalls — mock the task function to verify it's triggered, then test the task function separately with directawaitcalls - Use
aiosqlitefor in-memory async database testing — no PostgreSQL needed in CI