Testing LLM Streaming Responses: SSE, Chunks, and Real-Time AI Output

Testing LLM Streaming Responses: SSE, Chunks, and Real-Time AI Output

LLM streaming delivers tokens as they're generated, using Server-Sent Events (SSE). This improves perceived performance but introduces new failure modes: partial content on disconnect, chunk reassembly bugs, race conditions in streaming UI, and client-side buffering issues. Testing streaming requires a dedicated approach beyond standard API testing.

Key Takeaways

Streaming has failure modes that batch requests don't. Truncated responses on disconnect, malformed SSE events in the middle of a stream, partial JSON in the final chunk, and UI rendering bugs that only appear when content arrives incrementally.

Mock streaming with real SSE format, not just mocked return values. A mock that returns the complete response instantly doesn't test your streaming parser at all. Use test SSE servers or streaming response fixtures.

Test what happens when the stream stops unexpectedly. Connection drops mid-stream are common. Your app needs to handle this gracefully — not silently display a truncated response to users.

Frontend streaming bugs are separate from backend streaming bugs. The server may stream correctly but your React/Vue component may flicker, lose focus, or render in wrong order. Test both layers.

Why Streaming Tests Are Different

Most LLM testing guides focus on batch requests: send a prompt, get a response, assert on it. Streaming changes the contract entirely.

With streaming:

  • Content arrives in chunks (tokens or token groups)
  • Your UI updates incrementally as chunks arrive
  • The stream can stop at any point — cleanly or not
  • You need to reassemble chunks into a coherent response
  • Error events arrive as SSE events, not HTTP error codes

All of these create failure modes that only appear in streaming scenarios.

Understanding SSE Format

OpenAI streaming uses Server-Sent Events. Each chunk looks like:

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hello"},"index":0}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":" world"},"index":0}]}

data: [DONE]

Your streaming parser must handle:

  • Multiple data: lines per SSE event
  • Empty lines (SSE heartbeat)
  • [DONE] sentinel
  • Error events: data: {"error": {"message": "...", "type": "..."}}

Testing Streaming with OpenAI SDK

import pytest
from openai import OpenAI
from typing import Generator

client = OpenAI()

def stream_completion(prompt: str) -> Generator[str, None, None]:
    """Stream tokens from OpenAI, yielding each text chunk"""
    with client.chat.completions.stream(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield delta.content

def collect_stream(generator: Generator) -> tuple[list[str], str]:
    """Collect all chunks and the assembled final response"""
    chunks = list(generator)
    full_response = "".join(chunks)
    return chunks, full_response

@pytest.mark.llm
def test_stream_produces_multiple_chunks():
    """Streaming should deliver content in multiple pieces, not one batch"""
    chunks, full = collect_stream(stream_completion("Count from 1 to 10"))
    
    assert len(chunks) > 1, "Expected multiple chunks, got a single batch"
    assert full, "Final assembled response should not be empty"

@pytest.mark.llm
def test_assembled_response_is_coherent():
    """Chunks assembled together should form readable text"""
    chunks, full = collect_stream(
        stream_completion("What is 2 + 2? Answer in one sentence.")
    )
    
    assert "4" in full, "Response should contain the answer"
    assert len(full) > 5, "Response should be a complete sentence"

@pytest.mark.llm
def test_stream_terminates_cleanly():
    """Stream should have a definite end, not hang"""
    import threading
    
    result = {"done": False, "error": None}
    
    def run_stream():
        try:
            chunks, _ = collect_stream(stream_completion("Say hello"))
            result["done"] = True
        except Exception as e:
            result["error"] = str(e)
    
    thread = threading.Thread(target=run_stream)
    thread.start()
    thread.join(timeout=30)  # 30 second max
    
    assert not thread.is_alive(), "Stream should complete within 30 seconds"
    assert result["done"], f"Stream failed: {result['error']}"
    assert result["error"] is None

Mocking Streaming for Unit Tests

Testing streaming logic without real LLM calls requires mocking the SSE stream:

from unittest.mock import MagicMock, patch
import asyncio

def make_mock_stream_chunks(content: str, chunk_size: int = 3):
    """Split content into mock SSE chunks"""
    chunks = []
    for i in range(0, len(content), chunk_size):
        chunk_text = content[i:i + chunk_size]
        mock_chunk = MagicMock()
        mock_chunk.choices = [MagicMock()]
        mock_chunk.choices[0].delta.content = chunk_text
        mock_chunk.choices[0].finish_reason = None
        chunks.append(mock_chunk)
    
    # Final chunk with finish_reason
    final_chunk = MagicMock()
    final_chunk.choices = [MagicMock()]
    final_chunk.choices[0].delta.content = None
    final_chunk.choices[0].finish_reason = "stop"
    chunks.append(final_chunk)
    
    return chunks

class MockStream:
    def __init__(self, chunks):
        self._chunks = iter(chunks)
    
    def __iter__(self):
        return self._chunks
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        pass

@patch("openai.OpenAI")
def test_chunk_assembler_joins_correctly(mock_openai):
    """Test that your chunk assembly logic joins tokens correctly"""
    mock_client = mock_openai.return_value
    test_content = "Hello, this is a streaming test response."
    
    mock_client.chat.completions.stream.return_value = MockStream(
        make_mock_stream_chunks(test_content, chunk_size=5)
    )
    
    client = mock_openai()
    chunks, full = collect_stream(stream_completion("test prompt"))
    
    assert full == test_content, f"Expected '{test_content}', got '{full}'"
    assert len(chunks) > 1, "Should have received multiple chunks"

@patch("openai.OpenAI")
def test_empty_chunks_are_ignored(mock_openai):
    """Empty delta content should not appear in assembled response"""
    mock_client = mock_openai.return_value
    
    # Mix of content and None chunks
    chunks = [
        MagicMock(choices=[MagicMock(delta=MagicMock(content="Hello"), finish_reason=None)]),
        MagicMock(choices=[MagicMock(delta=MagicMock(content=None), finish_reason=None)]),
        MagicMock(choices=[MagicMock(delta=MagicMock(content=" world"), finish_reason=None)]),
        MagicMock(choices=[MagicMock(delta=MagicMock(content=None), finish_reason="stop")]),
    ]
    mock_client.chat.completions.stream.return_value = MockStream(chunks)
    
    client = mock_openai()
    _, full = collect_stream(stream_completion("test"))
    
    assert full == "Hello world"
    assert "None" not in full

Testing Stream Interruption Handling

import time

class PartialStream:
    """Simulates a stream that disconnects after N chunks"""
    def __init__(self, chunks: list, fail_after: int):
        self._chunks = chunks
        self._fail_after = fail_after
        self._count = 0
    
    def __iter__(self):
        for chunk in self._chunks:
            self._count += 1
            if self._count > self._fail_after:
                raise ConnectionError("Stream interrupted")
            yield chunk
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        pass

def stream_with_recovery(prompt: str) -> str:
    """Stream with graceful degradation on disconnect"""
    buffer = []
    try:
        for chunk in stream_completion(prompt):
            buffer.append(chunk)
    except ConnectionError:
        # Return what we have so far
        if buffer:
            return "".join(buffer) + " [response truncated]"
        raise
    return "".join(buffer)

@patch("openai.OpenAI")
def test_partial_stream_returns_what_was_received(mock_openai):
    """On connection drop, return content received so far"""
    mock_client = mock_openai.return_value
    full_chunks = make_mock_stream_chunks("This is a complete response.", chunk_size=4)
    
    # Simulate disconnect after 3 chunks
    mock_client.chat.completions.stream.return_value = PartialStream(
        full_chunks, fail_after=3
    )
    
    client = mock_openai()
    result = stream_with_recovery("test prompt")
    
    # Should return partial content with truncation indicator
    assert len(result) > 0, "Should return partial content, not empty string"
    assert "[response truncated]" in result, "Should indicate truncation"

@patch("openai.OpenAI")
def test_zero_chunk_stream_raises_properly(mock_openai):
    """If stream disconnects before any content, should raise"""
    mock_client = mock_openai.return_value
    mock_client.chat.completions.stream.return_value = PartialStream(
        make_mock_stream_chunks("anything"), fail_after=0
    )
    
    client = mock_openai()
    with pytest.raises(ConnectionError):
        stream_with_recovery("test prompt")

Testing Async Streaming

Most production apps use async streaming. Test the async path:

import asyncio
import pytest
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def async_stream_completion(prompt: str) -> list[str]:
    chunks = []
    async with async_client.chat.completions.stream(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                chunks.append(delta.content)
    return chunks

@pytest.mark.asyncio
@pytest.mark.llm
async def test_async_stream_produces_chunks():
    chunks = await async_stream_completion("Say hello in 3 words")
    assert len(chunks) > 0
    full = "".join(chunks)
    assert len(full) > 3

@pytest.mark.asyncio
async def test_async_stream_cancellation():
    """Test that cancelling an async stream cleans up properly"""
    task = asyncio.create_task(
        async_stream_completion("Write a very long essay about the history of computing")
    )
    
    # Cancel after 0.5 seconds
    await asyncio.sleep(0.5)
    task.cancel()
    
    with pytest.raises(asyncio.CancelledError):
        await task
    
    # Verify no resource leaks (connection properly closed)
    # In practice, check that your async context manager __aexit__ was called

Testing Streaming HTTP Endpoints

If you expose streaming as an HTTP endpoint (FastAPI, Flask, etc.):

# FastAPI streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import httpx

app = FastAPI()

@app.get("/stream")
async def stream_response(prompt: str):
    async def generate():
        async for chunk in async_stream_completion(prompt):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

# Tests for the HTTP streaming endpoint
@pytest.mark.asyncio
async def test_http_streaming_endpoint_delivers_sse():
    """Test your HTTP endpoint delivers proper SSE format"""
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        async with client.stream("GET", "/stream?prompt=hello") as response:
            assert response.status_code == 200
            assert "text/event-stream" in response.headers["content-type"]
            
            events = []
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    events.append(line[6:])
            
            assert len(events) > 1, "Should receive multiple SSE events"
            assert events[-1] == "[DONE]", "Last event should be [DONE]"
            
            # Check no empty events
            non_done_events = [e for e in events if e != "[DONE]"]
            assert all(len(e) > 0 for e in non_done_events)

@pytest.mark.asyncio
async def test_http_stream_correct_headers():
    """Verify SSE-required headers are present"""
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        async with client.stream("GET", "/stream?prompt=test") as response:
            assert response.headers.get("cache-control") == "no-cache"
            assert response.headers.get("connection") == "keep-alive"

Testing Streaming UI with Playwright

For frontend testing, verify the streaming UI updates incrementally:

# Using HelpMeTest or Playwright for E2E streaming UI tests
from playwright.sync_api import sync_playwright

def test_streaming_ui_updates_incrementally():
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto("https://your-ai-app.com/chat")
        
        # Send a message
        page.fill("[data-testid='chat-input']", "Tell me a story")
        page.click("[data-testid='send-button']")
        
        # Verify streaming indicator appears
        page.wait_for_selector("[data-testid='streaming-indicator']")
        
        # Capture content at different points in time
        content_at_1s = ""
        content_at_3s = ""
        
        import time
        time.sleep(1)
        content_at_1s = page.inner_text("[data-testid='assistant-message']")
        
        time.sleep(2)
        content_at_3s = page.inner_text("[data-testid='assistant-message']")
        
        # Content should grow over time (streaming in progress)
        assert len(content_at_3s) > len(content_at_1s), (
            "Content should grow as stream arrives"
        )
        
        # Wait for completion
        page.wait_for_selector("[data-testid='streaming-indicator']", state="hidden")
        final_content = page.inner_text("[data-testid='assistant-message']")
        
        assert len(final_content) > 50, "Final response should be substantive"
        browser.close()

def test_streaming_does_not_cause_scroll_jump():
    """Verify UI stays at bottom during streaming, doesn't jump"""
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto("https://your-ai-app.com/chat")
        
        page.fill("[data-testid='chat-input']", "Write a detailed explanation of quantum computing")
        page.click("[data-testid='send-button']")
        
        page.wait_for_selector("[data-testid='streaming-indicator']")
        
        # Measure scroll positions during streaming
        scroll_positions = []
        for _ in range(10):
            pos = page.evaluate("document.querySelector('.chat-container').scrollTop")
            scroll_positions.append(pos)
            import time
            time.sleep(0.3)
        
        # Scroll should be at or near bottom throughout (auto-scroll behavior)
        container_height = page.evaluate(
            "document.querySelector('.chat-container').scrollHeight"
        )
        
        # Last few positions should be near bottom
        for pos in scroll_positions[-3:]:
            assert pos > container_height * 0.8, (
                f"Chat should stay near bottom during streaming, scroll at {pos}/{container_height}"
            )
        
        browser.close()

Key Takeaways

Streaming introduces failure modes invisible in batch testing: truncated responses, chunk assembly bugs, connection drop handling, and UI rendering race conditions. Mock streaming with realistic chunk sequences, not instant responses. Test interruption handling explicitly — your users will hit network drops. For streaming UI, use browser automation to verify content grows incrementally and the UI stays stable throughout. Run real streaming tests in CI on schedule (not every build) to catch regressions without blocking deploys.

Read more