Async Testing in FastAPI with pytest-asyncio: Endpoints, WebSockets, and Background Tasks

Async Testing in FastAPI with pytest-asyncio: Endpoints, WebSockets, and Background Tasks

FastAPI is async-native. Endpoint functions declared with async def run in an async context, and testing them requires an async test client and an async test runner. pytest-asyncio combined with httpx.AsyncClient gives you everything you need. This guide covers async endpoint testing, database integration, WebSocket testing, and background task verification.

Why Async Testing Is Different

When you use TestClient (synchronous), it runs your async application inside a thread with anyio.from_thread.run_sync. This works for most cases but has important limitations:

  • No true concurrency testing — concurrent requests don't actually run concurrently
  • Async database clients failasyncpg, motor, aioredis need an event loop that the sync client doesn't provide cleanly
  • WebSocket testing is awkward — WebSocket connections are inherently async

AsyncClient from httpx solves all three. Tests run inside pytest-asyncio's event loop, your async dependencies work correctly, and WebSocket testing is natural.

Setup

pip install fastapi httpx pytest-asyncio anyio sqlalchemy[asyncio] aiosqlite

Configure pytest-asyncio mode in pytest.ini:

[pytest]
asyncio_mode = auto

Or mark individual tests explicitly:

@pytest.mark.asyncio
async def test_something():
    ...

The asyncio_mode = auto setting makes every async def test_* function automatically async — less boilerplate.

Basic Async TestClient

# tests/test_async_basic.py
import pytest
import httpx
from httpx import AsyncClient
from app.main import app

@pytest.fixture
async def async_client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

async def test_health_check(async_client):
    response = await async_client.get("/health")
    assert response.status_code == 200
    assert response.json() == {"status": "ok"}

async def test_list_articles(async_client):
    response = await async_client.get("/api/articles/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

Async Database Integration

For async database testing with SQLAlchemy 2.0 + asyncpg (or aiosqlite for tests):

# app/database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session
# tests/conftest.py
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import StaticPool
from httpx import AsyncClient

from app.main import app
from app.database import get_db, Base

# Use aiosqlite for in-memory async database in tests
TEST_DATABASE_URL = "sqlite+aiosqlite://"

test_engine = create_async_engine(
    TEST_DATABASE_URL,
    connect_args={"check_same_thread": False},
    poolclass=StaticPool,
)
TestAsyncSessionLocal = async_sessionmaker(
    test_engine,
    expire_on_commit=False,
    class_=AsyncSession
)

@pytest_asyncio.fixture(autouse=True)
async def setup_database():
    """Create tables before each test, drop after."""
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

@pytest_asyncio.fixture
async def db_session():
    """Provide an async test database session."""
    async with TestAsyncSessionLocal() as session:
        yield session

@pytest_asyncio.fixture
async def async_client(db_session):
    """AsyncClient with async database dependency overridden."""
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

    app.dependency_overrides.clear()

Tests using the async database:

# tests/test_articles_async.py
from app.models import Article

async def test_create_article(async_client, db_session):
    payload = {
        "title": "Async Article",
        "content": "Written asynchronously",
        "status": "draft"
    }
    response = await async_client.post(
        "/api/articles/",
        json=payload,
        headers={"Authorization": "Bearer test-token"}
    )
    assert response.status_code == 201
    data = response.json()
    assert data["title"] == "Async Article"
    assert "id" in data

async def test_list_only_published(async_client, db_session):
    # Insert test data directly via async session
    db_session.add_all([
        Article(title="Published One", status="published", content="a"),
        Article(title="Published Two", status="published", content="b"),
        Article(title="Draft", status="draft", content="c"),
    ])
    await db_session.commit()

    response = await async_client.get("/api/articles/")
    assert response.status_code == 200
    titles = [a["title"] for a in response.json()]
    assert "Published One" in titles
    assert "Published Two" in titles
    assert "Draft" not in titles

async def test_concurrent_requests_handled(async_client, db_session):
    """Test that concurrent requests don't interfere with each other."""
    import asyncio

    async def create_article(title):
        return await async_client.post("/api/articles/", json={
            "title": title,
            "content": "Concurrent content",
            "status": "draft"
        }, headers={"Authorization": "Bearer test-token"})

    # Fire 10 concurrent requests
    responses = await asyncio.gather(*[
        create_article(f"Concurrent Article {i}")
        for i in range(10)
    ])

    for response in responses:
        assert response.status_code == 201

    # Verify all 10 were created
    list_response = await async_client.get("/api/articles/")
    assert len(list_response.json()) == 10

Async Dependency Overrides

# app/dependencies.py
async def get_cache():
    """Returns async Redis client."""
    return await aioredis.from_url(settings.REDIS_URL)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    payload = await verify_jwt_token(token)
    user = await get_user_by_id(payload["sub"])
    if not user:
        raise HTTPException(status_code=401)
    return user
# tests/conftest.py
from unittest.mock import AsyncMock
from app.dependencies import get_cache, get_current_user

class MockUser:
    id = 1
    username = "asynctest"
    email = "async@test.com"
    is_active = True

@pytest_asyncio.fixture
async def mock_cache():
    mock = AsyncMock()
    mock.get.return_value = None
    mock.set.return_value = True
    mock.delete.return_value = 1
    app.dependency_overrides[get_cache] = lambda: mock
    yield mock
    app.dependency_overrides.pop(get_cache, None)

@pytest_asyncio.fixture
async def authenticated_async_client(async_client):
    app.dependency_overrides[get_current_user] = lambda: MockUser()
    yield async_client
    # Override cleared by async_client fixture cleanup
async def test_cached_response(authenticated_async_client, mock_cache):
    # First request — cache miss, fetches from DB
    mock_cache.get.return_value = None
    response = await authenticated_async_client.get("/api/articles/1")
    assert response.status_code == 200
    mock_cache.set.assert_called_once()

    # Second request — cache hit
    mock_cache.get.return_value = b'{"id": 1, "title": "Cached"}'
    response = await authenticated_async_client.get("/api/articles/1")
    assert response.status_code == 200
    # Should not hit DB on second request
    assert mock_cache.get.call_count == 2

WebSocket Testing

FastAPI's TestClient supports WebSocket testing synchronously. For async WebSocket testing:

# app/main.py
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/chat/{room_id}")
async def websocket_chat(websocket: WebSocket, room_id: str):
    await websocket.accept()
    try:
        while True:
            message = await websocket.receive_text()
            # Broadcast or echo
            await websocket.send_text(f"Room {room_id}: {message}")
    except WebSocketDisconnect:
        pass

Test WebSockets with the synchronous TestClient (easier):

from fastapi.testclient import TestClient

def test_websocket_echo():
    with TestClient(app) as client:
        with client.websocket_connect("/ws/chat/room1") as ws:
            ws.send_text("Hello")
            response = ws.receive_text()
            assert response == "Room room1: Hello"

def test_websocket_multiple_messages():
    with TestClient(app) as client:
        with client.websocket_connect("/ws/chat/general") as ws:
            for i in range(3):
                ws.send_text(f"Message {i}")
                response = ws.receive_text()
                assert f"Message {i}" in response

def test_websocket_disconnect_handled():
    """Verify the server handles disconnection cleanly."""
    with TestClient(app) as client:
        with client.websocket_connect("/ws/chat/room2") as ws:
            ws.send_text("First message")
            ws.receive_text()
        # Context manager exit triggers disconnect
        # Test that subsequent connections still work
        with client.websocket_connect("/ws/chat/room2") as ws:
            ws.send_text("After disconnect")
            response = ws.receive_text()
            assert "After disconnect" in response

For authenticated WebSockets:

# app/main.py
@app.websocket("/ws/notifications")
async def websocket_notifications(
    websocket: WebSocket,
    token: str = Query(...)
):
    user = await verify_ws_token(token)
    if not user:
        await websocket.close(code=4001)
        return
    await websocket.accept()
    # Send notifications for this user
    ...
def test_websocket_requires_valid_token():
    with TestClient(app) as client:
        with pytest.raises(Exception):
            with client.websocket_connect("/ws/notifications?token=invalid") as ws:
                ws.receive_text()

def test_websocket_valid_token_accepted():
    token = generate_test_token(user_id=1)
    with TestClient(app) as client:
        with client.websocket_connect(f"/ws/notifications?token={token}") as ws:
            # Should be able to receive data
            ws.send_text("ping")
            response = ws.receive_text()
            assert response is not None

Background Task Testing

# app/main.py
from fastapi import BackgroundTasks

async def process_article_async(article_id: int, db: AsyncSession):
    """Background task: generate summary, notify subscribers."""
    article = await db.get(Article, article_id)
    article.summary = await generate_ai_summary(article.content)
    article.status = "processed"
    await db.commit()

@app.post("/api/articles/{article_id}/process")
async def trigger_processing(
    article_id: int,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    background_tasks.add_task(process_article_async, article_id, db)
    return {"status": "processing_started"}

FastAPI's BackgroundTasks run after the response is sent, but before the test client closes. In TestClient, they complete synchronously within the request cycle:

from unittest.mock import patch, AsyncMock

def test_background_task_triggered(client):
    """Verify background task is queued and runs."""
    with patch("app.main.process_article_async") as mock_process:
        mock_process.return_value = None
        response = client.post("/api/articles/1/process")
        assert response.status_code == 200
        assert response.json()["status"] == "processing_started"
        # Background tasks run before TestClient response completes
        mock_process.assert_called_once_with(1, ANY)

async def test_background_task_completes(async_client, db_session):
    """Test the actual background task function."""
    from app.tasks import process_article_async

    article = Article(
        title="Needs Processing",
        content="Long content to summarize",
        status="published"
    )
    db_session.add(article)
    await db_session.commit()
    await db_session.refresh(article)

    with patch("app.tasks.generate_ai_summary") as mock_summarize:
        mock_summarize.return_value = "AI-generated summary"
        await process_article_async(article.id, db_session)

    await db_session.refresh(article)
    assert article.summary == "AI-generated summary"
    assert article.status == "processed"

Testing Server-Sent Events (SSE)

# app/main.py
from fastapi.responses import StreamingResponse
import asyncio

async def event_generator(topic: str):
    for i in range(5):
        yield f"data: Event {i} for {topic}\n\n"
        await asyncio.sleep(0.01)

@app.get("/api/events/{topic}")
async def subscribe_events(topic: str):
    return StreamingResponse(
        event_generator(topic),
        media_type="text/event-stream"
    )
async def test_sse_stream(async_client):
    response = await async_client.get(
        "/api/events/news",
        headers={"Accept": "text/event-stream"}
    )
    assert response.status_code == 200
    assert "text/event-stream" in response.headers["content-type"]
    content = response.text
    assert "Event 0 for news" in content
    assert "Event 4 for news" in content

Testing Lifespan with Async Dependencies

# app/main.py
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
    app.state.redis = await aioredis.from_url(REDIS_URL)
    yield
    # Shutdown
    await app.state.db_pool.close()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)
# tests/conftest.py
from unittest.mock import AsyncMock, patch

@pytest_asyncio.fixture
async def async_client():
    mock_pool = AsyncMock()
    mock_redis = AsyncMock()

    with patch("app.main.asyncpg.create_pool", return_value=mock_pool), \
         patch("app.main.aioredis.from_url", return_value=mock_redis):
        async with AsyncClient(app=app, base_url="http://test") as client:
            yield client

Running Async Tests

# Run all tests including async
pytest

<span class="hljs-comment"># Verbose output
pytest -v

<span class="hljs-comment"># Run specific async test file
pytest tests/test_async_endpoints.py -v

<span class="hljs-comment"># Run with coverage
pytest --cov=app --cov-report=html

<span class="hljs-comment"># Run only WebSocket tests
pytest -k <span class="hljs-string">"websocket" -v

<span class="hljs-comment"># Run in parallel (async tests work with pytest-xdist)
pytest -n auto

Common Pitfalls

Event loop conflicts — don't create your own event loop in async tests. Let pytest-asyncio manage it.

# Wrong
async def test_something():
    loop = asyncio.get_event_loop()  # don't do this
    result = loop.run_until_complete(some_coroutine())

# Right
async def test_something():
    result = await some_coroutine()

Fixture scope mismatches — async fixtures must use matching scopes. A session-scoped async fixture needs a session-scoped event loop.

# pytest.ini
[pytest]
asyncio_mode = auto

# For session-scoped async fixtures
@pytest_asyncio.fixture(scope="session")
async def session_resource():
    resource = await create_expensive_resource()
    yield resource
    await resource.close()

Not awaiting coroutines — any async def dependency must be await-ed or used with async with/async for.

Beyond Tests: Monitoring Async Applications in Production

Async FastAPI applications are often more complex in production than unit tests reveal. Connection pools drain under load, WebSocket connections accumulate, background tasks queue up. These runtime conditions don't appear in controlled test environments.

HelpMeTest monitors your live FastAPI application continuously — testing real endpoints, real WebSocket connections, and real API flows against your deployed infrastructure. When your async application starts failing under production load or after a dependency update, HelpMeTest catches it before your users do. Your pytest-asyncio suite proves correctness; HelpMeTest proves availability.

Summary

Async testing in FastAPI requires three things:

  1. pytest-asyncio with asyncio_mode = auto — runs async test functions in an event loop automatically
  2. httpx.AsyncClient — async HTTP client that works with FastAPI's ASGI interface
  3. Async fixtures — using @pytest_asyncio.fixture for fixtures that await operations

Key patterns:

  • Override async dependencies the same way as sync ones — app.dependency_overrides[get_db] = override
  • Use AsyncMock (not MagicMock) for async dependencies and coroutines
  • Test WebSockets with TestClient.websocket_connect() — synchronous context manager, no async required
  • Background tasks run inline during TestClient calls — mock the task function to verify it's triggered, then test the task function separately with direct await calls
  • Use aiosqlite for in-memory async database testing — no PostgreSQL needed in CI

Read more