Testing LLM Streaming Responses: SSE, Chunks, and Real-Time AI Output
LLM streaming delivers tokens as they're generated, using Server-Sent Events (SSE). This improves perceived performance but introduces new failure modes: partial content on disconnect, chunk reassembly bugs, race conditions in streaming UI, and client-side buffering issues. Testing streaming requires a dedicated approach beyond standard API testing.
Key Takeaways
Streaming has failure modes that batch requests don't. Truncated responses on disconnect, malformed SSE events in the middle of a stream, partial JSON in the final chunk, and UI rendering bugs that only appear when content arrives incrementally.
Mock streaming with real SSE format, not just mocked return values. A mock that returns the complete response instantly doesn't test your streaming parser at all. Use test SSE servers or streaming response fixtures.
Test what happens when the stream stops unexpectedly. Connection drops mid-stream are common. Your app needs to handle this gracefully — not silently display a truncated response to users.
Frontend streaming bugs are separate from backend streaming bugs. The server may stream correctly but your React/Vue component may flicker, lose focus, or render in wrong order. Test both layers.
Why Streaming Tests Are Different
Most LLM testing guides focus on batch requests: send a prompt, get a response, assert on it. Streaming changes the contract entirely.
With streaming:
- Content arrives in chunks (tokens or token groups)
- Your UI updates incrementally as chunks arrive
- The stream can stop at any point — cleanly or not
- You need to reassemble chunks into a coherent response
- Error events arrive as SSE events, not HTTP error codes
All of these create failure modes that only appear in streaming scenarios.
Understanding SSE Format
OpenAI streaming uses Server-Sent Events. Each chunk looks like:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hello"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":" world"},"index":0}]}
data: [DONE]Your streaming parser must handle:
- Multiple
data:lines per SSE event - Empty lines (SSE heartbeat)
[DONE]sentinel- Error events:
data: {"error": {"message": "...", "type": "..."}}
Testing Streaming with OpenAI SDK
import pytest
from openai import OpenAI
from typing import Generator
client = OpenAI()
def stream_completion(prompt: str) -> Generator[str, None, None]:
"""Stream tokens from OpenAI, yielding each text chunk"""
with client.chat.completions.stream(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
) as stream:
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
def collect_stream(generator: Generator) -> tuple[list[str], str]:
"""Collect all chunks and the assembled final response"""
chunks = list(generator)
full_response = "".join(chunks)
return chunks, full_response
@pytest.mark.llm
def test_stream_produces_multiple_chunks():
"""Streaming should deliver content in multiple pieces, not one batch"""
chunks, full = collect_stream(stream_completion("Count from 1 to 10"))
assert len(chunks) > 1, "Expected multiple chunks, got a single batch"
assert full, "Final assembled response should not be empty"
@pytest.mark.llm
def test_assembled_response_is_coherent():
"""Chunks assembled together should form readable text"""
chunks, full = collect_stream(
stream_completion("What is 2 + 2? Answer in one sentence.")
)
assert "4" in full, "Response should contain the answer"
assert len(full) > 5, "Response should be a complete sentence"
@pytest.mark.llm
def test_stream_terminates_cleanly():
"""Stream should have a definite end, not hang"""
import threading
result = {"done": False, "error": None}
def run_stream():
try:
chunks, _ = collect_stream(stream_completion("Say hello"))
result["done"] = True
except Exception as e:
result["error"] = str(e)
thread = threading.Thread(target=run_stream)
thread.start()
thread.join(timeout=30) # 30 second max
assert not thread.is_alive(), "Stream should complete within 30 seconds"
assert result["done"], f"Stream failed: {result['error']}"
assert result["error"] is NoneMocking Streaming for Unit Tests
Testing streaming logic without real LLM calls requires mocking the SSE stream:
from unittest.mock import MagicMock, patch
import asyncio
def make_mock_stream_chunks(content: str, chunk_size: int = 3):
"""Split content into mock SSE chunks"""
chunks = []
for i in range(0, len(content), chunk_size):
chunk_text = content[i:i + chunk_size]
mock_chunk = MagicMock()
mock_chunk.choices = [MagicMock()]
mock_chunk.choices[0].delta.content = chunk_text
mock_chunk.choices[0].finish_reason = None
chunks.append(mock_chunk)
# Final chunk with finish_reason
final_chunk = MagicMock()
final_chunk.choices = [MagicMock()]
final_chunk.choices[0].delta.content = None
final_chunk.choices[0].finish_reason = "stop"
chunks.append(final_chunk)
return chunks
class MockStream:
def __init__(self, chunks):
self._chunks = iter(chunks)
def __iter__(self):
return self._chunks
def __enter__(self):
return self
def __exit__(self, *args):
pass
@patch("openai.OpenAI")
def test_chunk_assembler_joins_correctly(mock_openai):
"""Test that your chunk assembly logic joins tokens correctly"""
mock_client = mock_openai.return_value
test_content = "Hello, this is a streaming test response."
mock_client.chat.completions.stream.return_value = MockStream(
make_mock_stream_chunks(test_content, chunk_size=5)
)
client = mock_openai()
chunks, full = collect_stream(stream_completion("test prompt"))
assert full == test_content, f"Expected '{test_content}', got '{full}'"
assert len(chunks) > 1, "Should have received multiple chunks"
@patch("openai.OpenAI")
def test_empty_chunks_are_ignored(mock_openai):
"""Empty delta content should not appear in assembled response"""
mock_client = mock_openai.return_value
# Mix of content and None chunks
chunks = [
MagicMock(choices=[MagicMock(delta=MagicMock(content="Hello"), finish_reason=None)]),
MagicMock(choices=[MagicMock(delta=MagicMock(content=None), finish_reason=None)]),
MagicMock(choices=[MagicMock(delta=MagicMock(content=" world"), finish_reason=None)]),
MagicMock(choices=[MagicMock(delta=MagicMock(content=None), finish_reason="stop")]),
]
mock_client.chat.completions.stream.return_value = MockStream(chunks)
client = mock_openai()
_, full = collect_stream(stream_completion("test"))
assert full == "Hello world"
assert "None" not in fullTesting Stream Interruption Handling
import time
class PartialStream:
"""Simulates a stream that disconnects after N chunks"""
def __init__(self, chunks: list, fail_after: int):
self._chunks = chunks
self._fail_after = fail_after
self._count = 0
def __iter__(self):
for chunk in self._chunks:
self._count += 1
if self._count > self._fail_after:
raise ConnectionError("Stream interrupted")
yield chunk
def __enter__(self):
return self
def __exit__(self, *args):
pass
def stream_with_recovery(prompt: str) -> str:
"""Stream with graceful degradation on disconnect"""
buffer = []
try:
for chunk in stream_completion(prompt):
buffer.append(chunk)
except ConnectionError:
# Return what we have so far
if buffer:
return "".join(buffer) + " [response truncated]"
raise
return "".join(buffer)
@patch("openai.OpenAI")
def test_partial_stream_returns_what_was_received(mock_openai):
"""On connection drop, return content received so far"""
mock_client = mock_openai.return_value
full_chunks = make_mock_stream_chunks("This is a complete response.", chunk_size=4)
# Simulate disconnect after 3 chunks
mock_client.chat.completions.stream.return_value = PartialStream(
full_chunks, fail_after=3
)
client = mock_openai()
result = stream_with_recovery("test prompt")
# Should return partial content with truncation indicator
assert len(result) > 0, "Should return partial content, not empty string"
assert "[response truncated]" in result, "Should indicate truncation"
@patch("openai.OpenAI")
def test_zero_chunk_stream_raises_properly(mock_openai):
"""If stream disconnects before any content, should raise"""
mock_client = mock_openai.return_value
mock_client.chat.completions.stream.return_value = PartialStream(
make_mock_stream_chunks("anything"), fail_after=0
)
client = mock_openai()
with pytest.raises(ConnectionError):
stream_with_recovery("test prompt")Testing Async Streaming
Most production apps use async streaming. Test the async path:
import asyncio
import pytest
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def async_stream_completion(prompt: str) -> list[str]:
chunks = []
async with async_client.chat.completions.stream(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
) as stream:
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
chunks.append(delta.content)
return chunks
@pytest.mark.asyncio
@pytest.mark.llm
async def test_async_stream_produces_chunks():
chunks = await async_stream_completion("Say hello in 3 words")
assert len(chunks) > 0
full = "".join(chunks)
assert len(full) > 3
@pytest.mark.asyncio
async def test_async_stream_cancellation():
"""Test that cancelling an async stream cleans up properly"""
task = asyncio.create_task(
async_stream_completion("Write a very long essay about the history of computing")
)
# Cancel after 0.5 seconds
await asyncio.sleep(0.5)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
# Verify no resource leaks (connection properly closed)
# In practice, check that your async context manager __aexit__ was calledTesting Streaming HTTP Endpoints
If you expose streaming as an HTTP endpoint (FastAPI, Flask, etc.):
# FastAPI streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import httpx
app = FastAPI()
@app.get("/stream")
async def stream_response(prompt: str):
async def generate():
async for chunk in async_stream_completion(prompt):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
# Tests for the HTTP streaming endpoint
@pytest.mark.asyncio
async def test_http_streaming_endpoint_delivers_sse():
"""Test your HTTP endpoint delivers proper SSE format"""
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
async with client.stream("GET", "/stream?prompt=hello") as response:
assert response.status_code == 200
assert "text/event-stream" in response.headers["content-type"]
events = []
async for line in response.aiter_lines():
if line.startswith("data: "):
events.append(line[6:])
assert len(events) > 1, "Should receive multiple SSE events"
assert events[-1] == "[DONE]", "Last event should be [DONE]"
# Check no empty events
non_done_events = [e for e in events if e != "[DONE]"]
assert all(len(e) > 0 for e in non_done_events)
@pytest.mark.asyncio
async def test_http_stream_correct_headers():
"""Verify SSE-required headers are present"""
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
async with client.stream("GET", "/stream?prompt=test") as response:
assert response.headers.get("cache-control") == "no-cache"
assert response.headers.get("connection") == "keep-alive"Testing Streaming UI with Playwright
For frontend testing, verify the streaming UI updates incrementally:
# Using HelpMeTest or Playwright for E2E streaming UI tests
from playwright.sync_api import sync_playwright
def test_streaming_ui_updates_incrementally():
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto("https://your-ai-app.com/chat")
# Send a message
page.fill("[data-testid='chat-input']", "Tell me a story")
page.click("[data-testid='send-button']")
# Verify streaming indicator appears
page.wait_for_selector("[data-testid='streaming-indicator']")
# Capture content at different points in time
content_at_1s = ""
content_at_3s = ""
import time
time.sleep(1)
content_at_1s = page.inner_text("[data-testid='assistant-message']")
time.sleep(2)
content_at_3s = page.inner_text("[data-testid='assistant-message']")
# Content should grow over time (streaming in progress)
assert len(content_at_3s) > len(content_at_1s), (
"Content should grow as stream arrives"
)
# Wait for completion
page.wait_for_selector("[data-testid='streaming-indicator']", state="hidden")
final_content = page.inner_text("[data-testid='assistant-message']")
assert len(final_content) > 50, "Final response should be substantive"
browser.close()
def test_streaming_does_not_cause_scroll_jump():
"""Verify UI stays at bottom during streaming, doesn't jump"""
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto("https://your-ai-app.com/chat")
page.fill("[data-testid='chat-input']", "Write a detailed explanation of quantum computing")
page.click("[data-testid='send-button']")
page.wait_for_selector("[data-testid='streaming-indicator']")
# Measure scroll positions during streaming
scroll_positions = []
for _ in range(10):
pos = page.evaluate("document.querySelector('.chat-container').scrollTop")
scroll_positions.append(pos)
import time
time.sleep(0.3)
# Scroll should be at or near bottom throughout (auto-scroll behavior)
container_height = page.evaluate(
"document.querySelector('.chat-container').scrollHeight"
)
# Last few positions should be near bottom
for pos in scroll_positions[-3:]:
assert pos > container_height * 0.8, (
f"Chat should stay near bottom during streaming, scroll at {pos}/{container_height}"
)
browser.close()Key Takeaways
Streaming introduces failure modes invisible in batch testing: truncated responses, chunk assembly bugs, connection drop handling, and UI rendering race conditions. Mock streaming with realistic chunk sequences, not instant responses. Test interruption handling explicitly — your users will hit network drops. For streaming UI, use browser automation to verify content grows incrementally and the UI stays stable throughout. Run real streaming tests in CI on schedule (not every build) to catch regressions without blocking deploys.