SSE vs WebSocket Testing: Test Real-Time Streaming APIs

SSE vs WebSocket Testing: Test Real-Time Streaming APIs

Choosing between Server-Sent Events and WebSockets is a common architectural decision. Both enable real-time data delivery, but they have fundamentally different characteristics that require different testing approaches. This guide compares them — and shows you how to test each effectively.

SSE vs WebSocket: The Key Differences

Feature SSE WebSocket
Direction Server → Client only Bidirectional
Protocol HTTP/1.1, HTTP/2 Custom (ws://)
Reconnection Automatic Manual
Firewalls Works through most May be blocked
Browser support All modern All modern
Binary data No (text only) Yes
Multiplexing (HTTP/2) Yes No
Max connections Many (HTTP/2) Many

Use SSE when: Server pushes data to clients (feeds, notifications, AI streaming, logs) Use WebSocket when: Bidirectional communication (chat, gaming, collaborative editing)

Testing SSE Performance

# tests/performance/test_sse_performance.py
import pytest
import asyncio
import time
import httpx
from src.sse_server import app

class TestSSEPerformance:
    @pytest.mark.asyncio
    async def test_sse_latency_under_load(self):
        """Measure SSE event delivery latency with 50 concurrent connections."""
        latencies = []
        
        async def measure_latency():
            send_time = time.time()
            async with httpx.AsyncClient(app=app, base_url="http://test") as client:
                async with client.stream("GET", "/events") as response:
                    async for line in response.aiter_lines():
                        if line.startswith("data:"):
                            receive_time = time.time()
                            latencies.append(receive_time - send_time)
                            break
        
        tasks = [measure_latency() for _ in range(50)]
        await asyncio.gather(*tasks)
        
        avg_latency = sum(latencies) / len(latencies)
        p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
        
        print(f"\nSSE Latency (50 clients):")
        print(f"  Average: {avg_latency*1000:.1f}ms")
        print(f"  P95: {p95_latency*1000:.1f}ms")
        
        assert avg_latency < 0.1  # Average under 100ms
        assert p95_latency < 0.5  # P95 under 500ms

    @pytest.mark.asyncio
    async def test_sse_throughput(self):
        """Measure events per second for a single SSE stream."""
        event_count = 0
        duration = 5.0
        
        async with httpx.AsyncClient(app=app, base_url="http://test") as client:
            async with client.stream("GET", "/high-frequency-events") as response:
                deadline = time.time() + duration
                async for line in response.aiter_lines():
                    if line.startswith("data:"):
                        event_count += 1
                    if time.time() >= deadline:
                        break
        
        events_per_second = event_count / duration
        print(f"\nSSE Throughput: {events_per_second:.0f} events/sec")
        assert events_per_second > 10  # At least 10 events/sec

Testing WebSocket Performance

# tests/performance/test_websocket_performance.py
import pytest
import asyncio
import time
import websockets
from src.ws_server import start_server

class TestWebSocketPerformance:
    @pytest.mark.asyncio
    async def test_ws_round_trip_latency(self):
        """Measure WebSocket round-trip latency (send + receive echo)."""
        latencies = []
        
        async with websockets.connect("ws://localhost:8765") as ws:
            for _ in range(100):
                send_time = time.time()
                await ws.send("ping")
                response = await ws.recv()
                receive_time = time.time()
                
                assert response == "pong"
                latencies.append(receive_time - send_time)
        
        avg = sum(latencies) / len(latencies)
        p99 = sorted(latencies)[99]
        
        print(f"\nWebSocket Round-Trip Latency (100 pings):")
        print(f"  Average: {avg*1000:.2f}ms")
        print(f"  P99: {p99*1000:.2f}ms")
        
        assert avg < 0.01  # Average under 10ms on localhost
        assert p99 < 0.05  # P99 under 50ms

    @pytest.mark.asyncio
    async def test_ws_concurrent_message_delivery(self):
        """Test message delivery to 100 concurrent WebSocket clients."""
        received = []
        
        async def client_task():
            async with websockets.connect("ws://localhost:8765") as ws:
                msg = await asyncio.wait_for(ws.recv(), timeout=5.0)
                received.append(msg)
        
        # Connect 100 clients simultaneously
        tasks = [asyncio.create_task(client_task()) for _ in range(100)]
        
        # Broadcast a message to all
        await asyncio.sleep(0.1)
        async with websockets.connect("ws://localhost:8765/admin") as admin:
            await admin.send('{"type": "broadcast", "msg": "hello"}')
        
        await asyncio.gather(*tasks)
        
        assert len(received) == 100
        assert all("hello" in r for r in received)

Comparative Testing: Same Feature, Both Protocols

When evaluating SSE vs WebSocket for your use case, write the same test for both:

# tests/test_protocol_comparison.py
import pytest
import asyncio
import time
import httpx
import websockets

class RealTimeProtocolTests:
    """Base class with tests that apply to both SSE and WebSocket."""
    
    async def receive_n_events(self, n: int) -> list:
        raise NotImplementedError
    
    async def send_event(self, event: dict):
        raise NotImplementedError
    
    @pytest.mark.asyncio
    async def test_events_delivered_in_order(self):
        events_task = asyncio.create_task(self.receive_n_events(5))
        
        await asyncio.sleep(0.1)
        for i in range(5):
            await self.send_event({"seq": i})
        
        events = await events_task
        seqs = [e["seq"] for e in events]
        assert seqs == sorted(seqs), f"Out of order: {seqs}"

    @pytest.mark.asyncio
    async def test_handles_rapid_events(self):
        """100 events in rapid succession — all should be received."""
        events_task = asyncio.create_task(self.receive_n_events(100))
        
        await asyncio.sleep(0.05)
        for i in range(100):
            await self.send_event({"seq": i})
        
        events = await asyncio.wait_for(events_task, timeout=10.0)
        assert len(events) == 100

class TestSSEProtocol(RealTimeProtocolTests):
    async def receive_n_events(self, n):
        events = []
        async with httpx.AsyncClient() as client:
            async with client.stream("GET", "http://localhost:8001/sse") as r:
                async for line in r.aiter_lines():
                    if line.startswith("data:"):
                        import json
                        events.append(json.loads(line[5:]))
                    if len(events) >= n:
                        break
        return events
    
    async def send_event(self, event):
        async with httpx.AsyncClient() as client:
            await client.post("http://localhost:8001/publish", json=event)

class TestWebSocketProtocol(RealTimeProtocolTests):
    async def receive_n_events(self, n):
        events = []
        async with websockets.connect("ws://localhost:8002/ws") as ws:
            for _ in range(n):
                import json
                msg = await asyncio.wait_for(ws.recv(), timeout=5.0)
                events.append(json.loads(msg))
        return events
    
    async def send_event(self, event):
        async with websockets.connect("ws://localhost:8002/admin") as ws:
            import json
            await ws.send(json.dumps({"type": "publish", "event": event}))

Testing Protocol Fallback

Production apps often implement fallback: try WebSocket, fall back to SSE, fall back to long polling:

// src/realTimeClient.js
class RealTimeClient {
  constructor(url, options = {}) {
    this.url = url;
    this.onEvent = options.onEvent || (() => {});
    this.onConnectionChange = options.onConnectionChange || (() => {});
    this.protocol = null;
    this.connection = null;
  }
  
  async connect() {
    try {
      await this._connectWebSocket();
      this.protocol = 'websocket';
    } catch (wsError) {
      try {
        await this._connectSSE();
        this.protocol = 'sse';
      } catch (sseError) {
        this._startLongPolling();
        this.protocol = 'long-polling';
      }
    }
    this.onConnectionChange(this.protocol);
  }
  
  _connectWebSocket() {
    return new Promise((resolve, reject) => {
      const ws = new WebSocket(this.url.replace('http', 'ws') + '/ws');
      ws.onopen = () => {
        this.connection = ws;
        ws.onmessage = (e) => this.onEvent(JSON.parse(e.data));
        resolve();
      };
      ws.onerror = reject;
      setTimeout(reject, 3000);
    });
  }
  
  _connectSSE() {
    return new Promise((resolve, reject) => {
      const es = new EventSource(`${this.url}/sse`);
      es.onopen = () => {
        this.connection = es;
        es.onmessage = (e) => this.onEvent(JSON.parse(e.data));
        resolve();
      };
      es.onerror = reject;
      setTimeout(reject, 3000);
    });
  }
  
  _startLongPolling() {
    const { PollClient } = require('./pollClient');
    this.connection = new PollClient(this.url, 'default', {
      onEvent: this.onEvent
    });
    this.connection.start();
  }
  
  disconnect() {
    if (this.connection) {
      if (this.connection.close) this.connection.close();
      if (this.connection.stop) this.connection.stop();
    }
  }
}
// tests/realTimeClient.test.js

describe('RealTimeClient Fallback', () => {
  let mockWS, mockES;
  
  beforeEach(() => {
    global.WebSocket = jest.fn();
    global.EventSource = jest.fn();
    mockWS = { onopen: null, onerror: null, onmessage: null, close: jest.fn() };
    mockES = { onopen: null, onerror: null, onmessage: null, close: jest.fn() };
    WebSocket.mockImplementation(() => mockWS);
    EventSource.mockImplementation(() => mockES);
  });

  test('uses WebSocket when available', async () => {
    const { RealTimeClient } = require('../src/realTimeClient');
    const onConnectionChange = jest.fn();
    const client = new RealTimeClient('http://server', { onConnectionChange });
    
    const connectPromise = client.connect();
    // Simulate WebSocket connection success
    mockWS.onopen();
    await connectPromise;
    
    expect(client.protocol).toBe('websocket');
    expect(onConnectionChange).toHaveBeenCalledWith('websocket');
  });

  test('falls back to SSE when WebSocket fails', async () => {
    const { RealTimeClient } = require('../src/realTimeClient');
    const onConnectionChange = jest.fn();
    const client = new RealTimeClient('http://server', { onConnectionChange });
    
    const connectPromise = client.connect();
    // WebSocket fails
    mockWS.onerror(new Error('WebSocket not supported'));
    // SSE succeeds
    setTimeout(() => mockES.onopen(), 10);
    await connectPromise;
    
    expect(client.protocol).toBe('sse');
    expect(onConnectionChange).toHaveBeenCalledWith('sse');
  });

  test('routes events through callback regardless of protocol', async () => {
    const onEvent = jest.fn();
    const { RealTimeClient } = require('../src/realTimeClient');
    const client = new RealTimeClient('http://server', { onEvent });
    
    const connectPromise = client.connect();
    mockWS.onopen();
    await connectPromise;
    
    // Simulate event via WebSocket
    mockWS.onmessage({ data: JSON.stringify({ type: 'update', value: 42 }) });
    
    expect(onEvent).toHaveBeenCalledWith({ type: 'update', value: 42 });
  });

  test('SSE events parsed correctly', async () => {
    const onEvent = jest.fn();
    const { RealTimeClient } = require('../src/realTimeClient');
    const client = new RealTimeClient('http://server', { onEvent });
    
    const connectPromise = client.connect();
    mockWS.onerror(new Error('WS failed'));
    setTimeout(() => mockES.onopen(), 10);
    await connectPromise;
    
    // Simulate SSE event
    mockES.onmessage({ data: JSON.stringify({ type: 'notification', msg: 'Hello' }) });
    
    expect(onEvent).toHaveBeenCalledWith({ type: 'notification', msg: 'Hello' });
  });
});

Testing Protocol-Specific Edge Cases

SSE-Specific: Multi-line Events

def test_sse_multiline_event_reassembled():
    """SSE multi-line data fields must be joined with newlines."""
    raw_event = "id: 1\nevent: chat\ndata: line one\ndata: line two\ndata: line three\n\n"
    event = parse_sse_event(raw_event)
    assert event.data == "line one\nline two\nline three"
    assert event.event_type == "chat"
    assert event.id == "1"

WebSocket-Specific: Binary Frame Support

@pytest.mark.asyncio
async def test_websocket_binary_frame():
    """WebSockets support binary frames — SSE does not."""
    async with websockets.connect("ws://localhost:8765") as ws:
        binary_data = b"\x00\x01\x02\x03\xff"
        await ws.send(binary_data)
        response = await ws.recv()
        assert isinstance(response, bytes)
        assert response == binary_data  # Echo test

SSE-Specific: Automatic Reconnection with Last-Event-ID

def test_sse_reconnect_sends_last_event_id(mock_requests):
    """After reconnect, EventSource sends Last-Event-ID header."""
    # First connection: receive event with id=42, then disconnect
    # Second connection should include Last-Event-ID: 42
    from src.sse_client import SSEClientWithReconnect
    client = SSEClientWithReconnect("http://api/sse")
    
    client.simulate_receive_event(id="42", data="first event")
    client.simulate_disconnect()
    client.simulate_reconnect()
    
    reconnect_request = mock_requests.last_request
    assert reconnect_request.headers.get("Last-Event-ID") == "42"

Choosing Between SSE and WebSocket

Based on your testing results:

Choose SSE if:

  • You only need server-to-client streaming (feeds, notifications, AI output)
  • HTTP/2 multiplexing matters (many concurrent streams per connection)
  • You need automatic reconnection without client code
  • Proxy/firewall compatibility is important

Choose WebSocket if:

  • Clients also send data frequently (chat, gaming, collaboration)
  • You need binary protocol support
  • Sub-millisecond round-trip latency is critical
  • You need full-duplex real-time communication

For either: HelpMeTest can validate that your real-time feature works correctly from the user's perspective, regardless of which protocol powers it under the hood.

Summary

  • Test the same behaviors for both — ordering, delivery, reconnection, error handling
  • SSE-specific tests: multi-line events, Last-Event-ID header, automatic reconnection
  • WebSocket-specific tests: binary frames, bidirectional message routing, protocol upgrade
  • Test fallback chains — WebSocket → SSE → long polling should degrade gracefully
  • Performance test both — SSE scales better with HTTP/2; WebSocket has lower round-trip latency
  • End-to-end tests validate user experience regardless of the underlying protocol

Read more