Server-Sent Events (SSE) Testing: pytest, Playwright, and End-to-End Scenarios
Server-Sent Events are simple to use but tricky to test because streaming responses, reconnection logic, and event routing all behave differently from normal HTTP. This guide covers pytest with httpx, Playwright EventSource interception, and k6 load testing for SSE endpoints.
Server-Sent Events (SSE) provide a lightweight, one-directional stream from server to client over HTTP. They're ideal for live dashboards, notification feeds, and progress updates. Testing them requires a different approach than standard request/response APIs: you need to handle chunked streaming, verify event type routing, exercise reconnection behavior, and ensure Last-Event-ID is honored for reliable delivery. Here's how to do all of that.
pytest with httpx AsyncClient for Streaming
The standard approach for testing SSE in Python is httpx.AsyncClient with stream(). This gives you a real HTTP streaming connection without spinning up a separate process.
# test_sse_endpoint.py
import asyncio
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app # your FastAPI/Starlette app
@pytest.mark.asyncio
async def test_sse_endpoint_emits_events():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
events = []
async with client.stream("GET", "/events") as response:
assert response.status_code == 200
assert "text/event-stream" in response.headers["content-type"]
async for line in response.aiter_lines():
if line.startswith("data:"):
events.append(line[5:].strip())
if len(events) >= 3:
break # collect 3 events then stop
assert len(events) == 3
assert all(event for event in events) # non-emptyFor a FastAPI SSE endpoint, the server side looks like this:
# app/main.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def event_generator(topic: str):
event_id = 0
while True:
event_id += 1
data = json.dumps({"id": event_id, "topic": topic})
yield f"id: {event_id}\nevent: update\ndata: {data}\n\n"
await asyncio.sleep(1)
@app.get("/events")
async def stream_events(topic: str = "default"):
return StreamingResponse(
event_generator(topic),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)Testing Event Type Routing
SSE supports named event types via event: fields. Test that your client correctly routes different event types to the right handlers:
@pytest.mark.asyncio
async def test_event_type_routing():
"""Verify that 'alert' and 'update' events are emitted separately."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
collected = {"update": [], "alert": []}
current_event = "message" # default SSE event type
async with client.stream("GET", "/events/mixed") as response:
async for line in response.aiter_lines():
if line.startswith("event:"):
current_event = line[6:].strip()
elif line.startswith("data:"):
if current_event in collected:
collected[current_event].append(line[5:].strip())
total = sum(len(v) for v in collected.values())
if total >= 4:
break
assert len(collected["update"]) >= 2
assert len(collected["alert"]) >= 1Reconnection and retry Field Testing
SSE clients automatically reconnect when the connection drops. The server can control the reconnect interval with the retry: field. Test that your server sends it and that clients respect it:
@pytest.mark.asyncio
async def test_sse_sends_retry_directive():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
found_retry = False
retry_ms = None
async with client.stream("GET", "/events") as response:
async for line in response.aiter_lines():
if line.startswith("retry:"):
found_retry = True
retry_ms = int(line[6:].strip())
break
assert found_retry, "Server should emit a retry directive"
assert 1000 <= retry_ms <= 30000, f"retry should be reasonable, got {retry_ms}ms"To test reconnection behavior with Last-Event-ID, simulate a client that reconnects after receiving some events:
@pytest.mark.asyncio
async def test_last_event_id_resumes_from_correct_position():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# First connection: collect 3 events and record the last ID
ids_received = []
async with client.stream("GET", "/events/resumable") as response:
async for line in response.aiter_lines():
if line.startswith("id:"):
ids_received.append(line[3:].strip())
if len(ids_received) >= 3:
break
last_id = ids_received[-1]
# Reconnect with Last-Event-ID header
new_ids = []
async with client.stream(
"GET", "/events/resumable",
headers={"Last-Event-ID": last_id}
) as response:
async for line in response.aiter_lines():
if line.startswith("id:"):
new_ids.append(line[3:].strip())
if len(new_ids) >= 2:
break
# The server should resume from after last_id, not from the beginning
assert int(new_ids[0]) > int(last_id), (
f"Expected server to resume after id={last_id}, got {new_ids[0]}"
)Multiprocessing SSE Consumer Pattern
For testing SSE consumers that need to run alongside other code, use a separate thread with a queue:
import threading
import queue
import time
def sse_consumer_thread(url: str, event_queue: queue.Queue, stop_event: threading.Event):
"""Runs in a background thread, feeding SSE events into a queue."""
import httpx
with httpx.stream("GET", url) as response:
for line in response.iter_lines():
if stop_event.is_set():
break
if line.startswith("data:"):
event_queue.put(line[5:].strip())
def test_sse_consumer_integration():
events = queue.Queue()
stop = threading.Event()
thread = threading.Thread(
target=sse_consumer_thread,
args=("http://localhost:8000/events", events, stop)
)
thread.start()
received = []
deadline = time.time() + 5
while time.time() < deadline:
try:
received.append(events.get(timeout=1))
except queue.Empty:
break
stop.set()
thread.join(timeout=2)
assert len(received) >= 2, f"Expected at least 2 events, got {len(received)}"Playwright SSE Interception with EventSource
On the browser side, Playwright can intercept SSE connections and inject synthetic events for testing your frontend handlers:
// sse.spec.ts
import { test, expect } from '@playwright/test';
test('displays live updates from SSE feed', async ({ page }) => {
// Intercept the SSE endpoint and inject controlled events
await page.route('/events', async route => {
const events = [
'id: 1\nevent: update\ndata: {"message": "First update"}\n\n',
'id: 2\nevent: update\ndata: {"message": "Second update"}\n\n',
'id: 3\nevent: alert\ndata: {"message": "Critical alert"}\n\n',
];
await route.fulfill({
status: 200,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
body: events.join(''),
});
});
await page.goto('http://localhost:3000/dashboard');
await expect(page.locator('[data-testid="update-feed"] li')).toHaveCount(2);
await expect(page.locator('[data-testid="alert-banner"]')).toHaveText('Critical alert');
});To observe real SSE traffic without intercepting it, listen to the network events:
test('SSE connection emits heartbeats over time', async ({ page }) => {
const sseChunks: string[] = [];
page.on('response', async response => {
if (response.url().includes('/events')) {
const body = await response.text().catch(() => '');
sseChunks.push(body);
}
});
await page.goto('http://localhost:3000/dashboard');
await page.waitForTimeout(6000);
const allData = sseChunks.join('');
const eventCount = (allData.match(/^data:/mg) || []).length;
expect(eventCount).toBeGreaterThanOrEqual(3);
});Testing EventSource Error Handling in the Browser
Your frontend code should handle SSE errors and display appropriate UI. Test the error path by simulating a failing server:
test('shows reconnecting state when SSE fails', async ({ page }) => {
let requestCount = 0;
await page.route('/events', async route => {
requestCount++;
if (requestCount === 1) {
// First request fails
await route.fulfill({ status: 500, body: 'Internal Server Error' });
} else {
// Subsequent reconnects succeed
await route.fulfill({
status: 200,
headers: { 'Content-Type': 'text/event-stream' },
body: 'data: {"status": "recovered"}\n\n',
});
}
});
await page.goto('http://localhost:3000/dashboard');
// Should briefly show reconnecting state
await expect(page.locator('[data-testid="connection-badge"]')).toHaveText('Reconnecting...');
// Then recover after reconnect
await expect(page.locator('[data-testid="connection-badge"]')).toHaveText('Live', { timeout: 5000 });
});Testing Connection Limits Under Load
SSE connections are long-lived. Verify your server handles many concurrent clients:
@pytest.mark.asyncio
async def test_multiple_concurrent_sse_clients():
"""Ensure the server handles N concurrent SSE connections."""
transport = ASGITransport(app=app)
async def collect_one_event(client: AsyncClient) -> str:
async with client.stream("GET", "/events") as response:
async for line in response.aiter_lines():
if line.startswith("data:"):
return line[5:].strip()
return ""
async with AsyncClient(transport=transport, base_url="http://test") as client:
results = await asyncio.gather(*[
collect_one_event(client) for _ in range(10)
])
assert all(r for r in results), "All 10 concurrent clients should receive data"
assert len(results) == 10Load Testing SSE with k6
For performance testing, k6 has native SSE support. Create a k6 script that measures time-to-first-event and event throughput:
// k6-sse-load-test.js
import { EventSource } from 'k6/experimental/streams';
import { check, sleep } from 'k6';
import { Counter, Trend } from 'k6/metrics';
const eventsReceived = new Counter('sse_events_received');
const timeToFirstEvent = new Trend('sse_time_to_first_event_ms');
export const options = {
vus: 50,
duration: '30s',
};
export default function () {
const start = Date.now();
let firstEventReceived = false;
const es = new EventSource('http://localhost:8000/events');
es.onmessage = event => {
if (!firstEventReceived) {
timeToFirstEvent.add(Date.now() - start);
firstEventReceived = true;
}
eventsReceived.add(1);
check(event, { 'event has data': e => e.data.length > 0 });
};
es.onerror = err => {
console.error('SSE error:', err);
};
sleep(10);
es.close();
}Run with k6 run k6-sse-load-test.js and verify that p95 time-to-first-event stays under your SLA (typically under 500ms for local and under 2s for cross-region).
Complete SSE Test Checklist
A thorough SSE test suite covers: correct Content-Type: text/event-stream header, Cache-Control: no-cache to prevent buffering, id: field emission for resumability, retry: field presence and value sanity, Last-Event-ID honored on reconnect, named event types routed correctly, connection drops handled gracefully, no data loss across reconnects, proxy and load-balancer buffering disabled (via X-Accel-Buffering: no), and concurrent connection count within limits.
HelpMeTest can run your SSE scenarios continuously on a schedule, alerting your team immediately when reconnection behavior regresses or event delivery latency spikes in production.