Server-Sent Events (SSE) Testing: pytest, Playwright, and End-to-End Scenarios

Server-Sent Events (SSE) Testing: pytest, Playwright, and End-to-End Scenarios

Server-Sent Events are simple to use but tricky to test because streaming responses, reconnection logic, and event routing all behave differently from normal HTTP. This guide covers pytest with httpx, Playwright EventSource interception, and k6 load testing for SSE endpoints.

Server-Sent Events (SSE) provide a lightweight, one-directional stream from server to client over HTTP. They're ideal for live dashboards, notification feeds, and progress updates. Testing them requires a different approach than standard request/response APIs: you need to handle chunked streaming, verify event type routing, exercise reconnection behavior, and ensure Last-Event-ID is honored for reliable delivery. Here's how to do all of that.

pytest with httpx AsyncClient for Streaming

The standard approach for testing SSE in Python is httpx.AsyncClient with stream(). This gives you a real HTTP streaming connection without spinning up a separate process.

# test_sse_endpoint.py
import asyncio
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app  # your FastAPI/Starlette app

@pytest.mark.asyncio
async def test_sse_endpoint_emits_events():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        events = []
        async with client.stream("GET", "/events") as response:
            assert response.status_code == 200
            assert "text/event-stream" in response.headers["content-type"]

            async for line in response.aiter_lines():
                if line.startswith("data:"):
                    events.append(line[5:].strip())
                if len(events) >= 3:
                    break  # collect 3 events then stop

        assert len(events) == 3
        assert all(event for event in events)  # non-empty

For a FastAPI SSE endpoint, the server side looks like this:

# app/main.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_generator(topic: str):
    event_id = 0
    while True:
        event_id += 1
        data = json.dumps({"id": event_id, "topic": topic})
        yield f"id: {event_id}\nevent: update\ndata: {data}\n\n"
        await asyncio.sleep(1)

@app.get("/events")
async def stream_events(topic: str = "default"):
    return StreamingResponse(
        event_generator(topic),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        }
    )

Testing Event Type Routing

SSE supports named event types via event: fields. Test that your client correctly routes different event types to the right handlers:

@pytest.mark.asyncio
async def test_event_type_routing():
    """Verify that 'alert' and 'update' events are emitted separately."""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        collected = {"update": [], "alert": []}
        current_event = "message"  # default SSE event type

        async with client.stream("GET", "/events/mixed") as response:
            async for line in response.aiter_lines():
                if line.startswith("event:"):
                    current_event = line[6:].strip()
                elif line.startswith("data:"):
                    if current_event in collected:
                        collected[current_event].append(line[5:].strip())

                total = sum(len(v) for v in collected.values())
                if total >= 4:
                    break

        assert len(collected["update"]) >= 2
        assert len(collected["alert"]) >= 1

Reconnection and retry Field Testing

SSE clients automatically reconnect when the connection drops. The server can control the reconnect interval with the retry: field. Test that your server sends it and that clients respect it:

@pytest.mark.asyncio
async def test_sse_sends_retry_directive():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        found_retry = False
        retry_ms = None

        async with client.stream("GET", "/events") as response:
            async for line in response.aiter_lines():
                if line.startswith("retry:"):
                    found_retry = True
                    retry_ms = int(line[6:].strip())
                    break

        assert found_retry, "Server should emit a retry directive"
        assert 1000 <= retry_ms <= 30000, f"retry should be reasonable, got {retry_ms}ms"

To test reconnection behavior with Last-Event-ID, simulate a client that reconnects after receiving some events:

@pytest.mark.asyncio
async def test_last_event_id_resumes_from_correct_position():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        # First connection: collect 3 events and record the last ID
        ids_received = []
        async with client.stream("GET", "/events/resumable") as response:
            async for line in response.aiter_lines():
                if line.startswith("id:"):
                    ids_received.append(line[3:].strip())
                if len(ids_received) >= 3:
                    break

        last_id = ids_received[-1]

        # Reconnect with Last-Event-ID header
        new_ids = []
        async with client.stream(
            "GET", "/events/resumable",
            headers={"Last-Event-ID": last_id}
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("id:"):
                    new_ids.append(line[3:].strip())
                if len(new_ids) >= 2:
                    break

        # The server should resume from after last_id, not from the beginning
        assert int(new_ids[0]) > int(last_id), (
            f"Expected server to resume after id={last_id}, got {new_ids[0]}"
        )

Multiprocessing SSE Consumer Pattern

For testing SSE consumers that need to run alongside other code, use a separate thread with a queue:

import threading
import queue
import time

def sse_consumer_thread(url: str, event_queue: queue.Queue, stop_event: threading.Event):
    """Runs in a background thread, feeding SSE events into a queue."""
    import httpx
    with httpx.stream("GET", url) as response:
        for line in response.iter_lines():
            if stop_event.is_set():
                break
            if line.startswith("data:"):
                event_queue.put(line[5:].strip())

def test_sse_consumer_integration():
    events = queue.Queue()
    stop = threading.Event()

    thread = threading.Thread(
        target=sse_consumer_thread,
        args=("http://localhost:8000/events", events, stop)
    )
    thread.start()

    received = []
    deadline = time.time() + 5
    while time.time() < deadline:
        try:
            received.append(events.get(timeout=1))
        except queue.Empty:
            break

    stop.set()
    thread.join(timeout=2)

    assert len(received) >= 2, f"Expected at least 2 events, got {len(received)}"

Playwright SSE Interception with EventSource

On the browser side, Playwright can intercept SSE connections and inject synthetic events for testing your frontend handlers:

// sse.spec.ts
import { test, expect } from '@playwright/test';

test('displays live updates from SSE feed', async ({ page }) => {
  // Intercept the SSE endpoint and inject controlled events
  await page.route('/events', async route => {
    const events = [
      'id: 1\nevent: update\ndata: {"message": "First update"}\n\n',
      'id: 2\nevent: update\ndata: {"message": "Second update"}\n\n',
      'id: 3\nevent: alert\ndata: {"message": "Critical alert"}\n\n',
    ];

    await route.fulfill({
      status: 200,
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
      },
      body: events.join(''),
    });
  });

  await page.goto('http://localhost:3000/dashboard');

  await expect(page.locator('[data-testid="update-feed"] li')).toHaveCount(2);
  await expect(page.locator('[data-testid="alert-banner"]')).toHaveText('Critical alert');
});

To observe real SSE traffic without intercepting it, listen to the network events:

test('SSE connection emits heartbeats over time', async ({ page }) => {
  const sseChunks: string[] = [];

  page.on('response', async response => {
    if (response.url().includes('/events')) {
      const body = await response.text().catch(() => '');
      sseChunks.push(body);
    }
  });

  await page.goto('http://localhost:3000/dashboard');
  await page.waitForTimeout(6000);

  const allData = sseChunks.join('');
  const eventCount = (allData.match(/^data:/mg) || []).length;
  expect(eventCount).toBeGreaterThanOrEqual(3);
});

Testing EventSource Error Handling in the Browser

Your frontend code should handle SSE errors and display appropriate UI. Test the error path by simulating a failing server:

test('shows reconnecting state when SSE fails', async ({ page }) => {
  let requestCount = 0;

  await page.route('/events', async route => {
    requestCount++;
    if (requestCount === 1) {
      // First request fails
      await route.fulfill({ status: 500, body: 'Internal Server Error' });
    } else {
      // Subsequent reconnects succeed
      await route.fulfill({
        status: 200,
        headers: { 'Content-Type': 'text/event-stream' },
        body: 'data: {"status": "recovered"}\n\n',
      });
    }
  });

  await page.goto('http://localhost:3000/dashboard');

  // Should briefly show reconnecting state
  await expect(page.locator('[data-testid="connection-badge"]')).toHaveText('Reconnecting...');

  // Then recover after reconnect
  await expect(page.locator('[data-testid="connection-badge"]')).toHaveText('Live', { timeout: 5000 });
});

Testing Connection Limits Under Load

SSE connections are long-lived. Verify your server handles many concurrent clients:

@pytest.mark.asyncio
async def test_multiple_concurrent_sse_clients():
    """Ensure the server handles N concurrent SSE connections."""
    transport = ASGITransport(app=app)

    async def collect_one_event(client: AsyncClient) -> str:
        async with client.stream("GET", "/events") as response:
            async for line in response.aiter_lines():
                if line.startswith("data:"):
                    return line[5:].strip()
        return ""

    async with AsyncClient(transport=transport, base_url="http://test") as client:
        results = await asyncio.gather(*[
            collect_one_event(client) for _ in range(10)
        ])

    assert all(r for r in results), "All 10 concurrent clients should receive data"
    assert len(results) == 10

Load Testing SSE with k6

For performance testing, k6 has native SSE support. Create a k6 script that measures time-to-first-event and event throughput:

// k6-sse-load-test.js
import { EventSource } from 'k6/experimental/streams';
import { check, sleep } from 'k6';
import { Counter, Trend } from 'k6/metrics';

const eventsReceived = new Counter('sse_events_received');
const timeToFirstEvent = new Trend('sse_time_to_first_event_ms');

export const options = {
  vus: 50,
  duration: '30s',
};

export default function () {
  const start = Date.now();
  let firstEventReceived = false;

  const es = new EventSource('http://localhost:8000/events');

  es.onmessage = event => {
    if (!firstEventReceived) {
      timeToFirstEvent.add(Date.now() - start);
      firstEventReceived = true;
    }
    eventsReceived.add(1);
    check(event, { 'event has data': e => e.data.length > 0 });
  };

  es.onerror = err => {
    console.error('SSE error:', err);
  };

  sleep(10);
  es.close();
}

Run with k6 run k6-sse-load-test.js and verify that p95 time-to-first-event stays under your SLA (typically under 500ms for local and under 2s for cross-region).

Complete SSE Test Checklist

A thorough SSE test suite covers: correct Content-Type: text/event-stream header, Cache-Control: no-cache to prevent buffering, id: field emission for resumability, retry: field presence and value sanity, Last-Event-ID honored on reconnect, named event types routed correctly, connection drops handled gracefully, no data loss across reconnects, proxy and load-balancer buffering disabled (via X-Accel-Buffering: no), and concurrent connection count within limits.

HelpMeTest can run your SSE scenarios continuously on a schedule, alerting your team immediately when reconnection behavior regresses or event delivery latency spikes in production.

Read more