Long Polling Testing Guide: Test HTTP Polling and Comet Patterns

Long Polling Testing Guide: Test HTTP Polling and Comet Patterns

Long polling is a technique where the client sends an HTTP request that the server holds open until data is available — then responds and the client immediately sends another request. It's older than WebSockets or SSE, but still widely used for compatibility, simplicity, and firewall traversal. Testing long polling correctly means testing timing, timeouts, and the polling loop itself.

How Long Polling Works

Client                    Server
  |-- GET /poll ------------>|
  |                          | (waits for data or timeout)
  |<-- 200 {data} -----------|  ← data arrived
  |-- GET /poll ------------>|  ← immediately re-polls
  |                          |
  |                    (30s timeout)
  |<-- 200 {status:"empty"} |  ← no data, client re-polls
  |-- GET /poll ------------>|

The client must:

  1. Send a request with a timeout parameter
  2. Handle a real response (data available)
  3. Handle an empty/timeout response (no data, re-poll)
  4. Handle errors (network failure, server down)
  5. Stop polling when instructed

Server-Side Long Polling Implementation

# src/long_poll_server.py
import asyncio
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse

app = FastAPI()

# In-memory event queue per channel (use Redis in production)
event_queues: dict[str, asyncio.Queue] = {}

def get_queue(channel: str) -> asyncio.Queue:
    if channel not in event_queues:
        event_queues[channel] = asyncio.Queue()
    return event_queues[channel]

@app.get("/poll/{channel}")
async def long_poll(
    channel: str,
    timeout: int = Query(default=30, ge=1, le=60),
    last_id: int = Query(default=0)
):
    queue = get_queue(channel)
    
    try:
        # Wait for an event or timeout
        event = await asyncio.wait_for(queue.get(), timeout=timeout)
        return JSONResponse({
            "status": "data",
            "event": event,
            "timestamp": event.get("id", 0)
        })
    except asyncio.TimeoutError:
        return JSONResponse({
            "status": "timeout",
            "message": "No new events, please re-poll"
        })

@app.post("/publish/{channel}")
async def publish_event(channel: str, event: dict):
    queue = get_queue(channel)
    await queue.put(event)
    return {"status": "published"}

Testing the Long Poll Endpoint

# tests/test_long_poll_server.py
import pytest
import asyncio
import httpx
from fastapi.testclient import TestClient
from src.long_poll_server import app, event_queues

client = TestClient(app)

@pytest.fixture(autouse=True)
def clear_queues():
    event_queues.clear()
    yield
    event_queues.clear()

class TestLongPollEndpoint:
    def test_returns_timeout_when_no_events(self):
        """With no events queued, should return timeout response."""
        response = client.get("/poll/test-channel?timeout=1")
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "timeout"

    def test_returns_data_when_event_available(self):
        """Pre-queue an event, then poll — should return immediately."""
        # Publish an event first
        client.post("/publish/test-channel", json={"type": "update", "value": 42})
        
        # Poll — should return immediately with the event
        response = client.get("/poll/test-channel?timeout=5")
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "data"
        assert data["event"]["type"] == "update"
        assert data["event"]["value"] == 42

    def test_different_channels_isolated(self):
        """Events in channel A should not appear in channel B polls."""
        client.post("/publish/channel-a", json={"msg": "for A"})
        
        response = client.get("/poll/channel-b?timeout=1")
        data = response.json()
        assert data["status"] == "timeout"  # Channel B got nothing

    def test_timeout_parameter_respected(self):
        """Short timeout should return quickly."""
        import time
        start = time.time()
        client.get("/poll/empty-channel?timeout=1")
        elapsed = time.time() - start
        assert elapsed < 2.5  # Should return close to the 1s timeout

    def test_multiple_events_delivered_sequentially(self):
        """Multiple events should be consumed one per poll."""
        for i in range(3):
            client.post("/publish/seq-channel", json={"seq": i})
        
        events = []
        for _ in range(3):
            response = client.get("/poll/seq-channel?timeout=1")
            data = response.json()
            assert data["status"] == "data"
            events.append(data["event"]["seq"])
        
        assert events == [0, 1, 2]

@pytest.mark.asyncio
async def test_publish_while_polling():
    """Event published during an active poll should be delivered."""
    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
        async def do_poll():
            return await ac.get("/poll/live-channel?timeout=5")
        
        async def publish_after_delay():
            await asyncio.sleep(0.5)
            await ac.post("/publish/live-channel", json={"msg": "live event"})
        
        poll_task = asyncio.create_task(do_poll())
        publish_task = asyncio.create_task(publish_after_delay())
        
        response, _ = await asyncio.gather(poll_task, publish_task)
        
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "data"
        assert data["event"]["msg"] == "live event"

Testing the Client-Side Polling Loop

// src/pollClient.js
class PollClient {
  constructor(baseUrl, channel, options = {}) {
    this.baseUrl = baseUrl;
    this.channel = channel;
    this.timeout = options.timeout || 30;
    this.onEvent = options.onEvent || (() => {});
    this.onError = options.onError || (() => {});
    this.running = false;
    this.retryDelay = options.retryDelay || 1000;
    this.maxRetries = options.maxRetries || 5;
    this.retryCount = 0;
    this.lastId = 0;
  }
  
  start() {
    this.running = true;
    this._poll();
  }
  
  stop() {
    this.running = false;
  }
  
  async _poll() {
    if (!this.running) return;
    
    try {
      const res = await fetch(
        `${this.baseUrl}/poll/${this.channel}?timeout=${this.timeout}&last_id=${this.lastId}`
      );
      
      if (!res.ok) throw new Error(`HTTP ${res.status}`);
      
      const data = await res.json();
      this.retryCount = 0;  // Reset on success
      
      if (data.status === 'data') {
        this.lastId = data.timestamp || this.lastId;
        this.onEvent(data.event);
      }
      // On 'timeout', just re-poll immediately
    } catch (err) {
      this.retryCount++;
      if (this.retryCount >= this.maxRetries) {
        this.onError(new Error(`Max retries (${this.maxRetries}) exceeded: ${err.message}`));
        this.running = false;
        return;
      }
      await this._sleep(this.retryDelay * this.retryCount);
    }
    
    if (this.running) {
      setTimeout(() => this._poll(), 0);
    }
  }
  
  _sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

module.exports = { PollClient };
// tests/pollClient.test.js
const { PollClient } = require('../src/pollClient');

global.fetch = jest.fn();

describe('PollClient', () => {
  beforeEach(() => {
    jest.clearAllMocks();
    jest.useFakeTimers();
  });
  
  afterEach(() => {
    jest.useRealTimers();
  });
  
  function mockFetchResponse(data) {
    fetch.mockResolvedValueOnce({
      ok: true,
      json: async () => data
    });
  }
  
  function mockFetchError(message = 'Network error') {
    fetch.mockRejectedValueOnce(new Error(message));
  }

  test('calls onEvent when data received', async () => {
    const onEvent = jest.fn();
    mockFetchResponse({ status: 'data', event: { type: 'update', value: 42 } });
    mockFetchResponse({ status: 'timeout' }); // Stop after first event
    
    const client = new PollClient('http://api', 'test', { onEvent, timeout: 30 });
    client.start();
    
    await Promise.resolve(); // Let microtasks run
    jest.runAllTimers();
    await Promise.resolve();
    
    expect(onEvent).toHaveBeenCalledWith({ type: 'update', value: 42 });
  });

  test('re-polls after timeout response', async () => {
    mockFetchResponse({ status: 'timeout' });
    mockFetchResponse({ status: 'timeout' });
    mockFetchResponse({ status: 'timeout' });
    
    const client = new PollClient('http://api', 'test', { onEvent: jest.fn() });
    client.start();
    
    // Let 3 polls complete
    for (let i = 0; i < 3; i++) {
      await Promise.resolve();
      jest.runAllTimers();
    }
    await Promise.resolve();
    
    expect(fetch).toHaveBeenCalledTimes(3);
  });

  test('stops polling when stop() called', async () => {
    mockFetchResponse({ status: 'timeout' });
    
    const client = new PollClient('http://api', 'test', { onEvent: jest.fn() });
    client.start();
    await Promise.resolve();
    
    client.stop();
    jest.runAllTimers();
    await Promise.resolve();
    
    expect(fetch).toHaveBeenCalledTimes(1); // Only the initial poll
  });

  test('retries on network error', async () => {
    mockFetchError('Connection refused');
    mockFetchResponse({ status: 'timeout' }); // Succeeds on retry
    
    const client = new PollClient('http://api', 'test', {
      onEvent: jest.fn(),
      retryDelay: 100,
      maxRetries: 3
    });
    client.start();
    
    await Promise.resolve();
    jest.runAllTimers();
    await Promise.resolve();
    
    expect(fetch).toHaveBeenCalledTimes(2);
  });

  test('calls onError after max retries exceeded', async () => {
    const onError = jest.fn();
    
    for (let i = 0; i <= 5; i++) {
      mockFetchError('Server down');
    }
    
    const client = new PollClient('http://api', 'test', {
      onEvent: jest.fn(),
      onError,
      maxRetries: 5,
      retryDelay: 10
    });
    client.start();
    
    // Run through all retries
    for (let i = 0; i < 10; i++) {
      await Promise.resolve();
      jest.runAllTimers();
    }
    await Promise.resolve();
    
    expect(onError).toHaveBeenCalledTimes(1);
    expect(onError.mock.calls[0][0].message).toContain('Max retries');
  });

  test('sends last_id parameter for deduplication', async () => {
    mockFetchResponse({ status: 'data', event: { id: 100, msg: 'first' }, timestamp: 100 });
    mockFetchResponse({ status: 'timeout' });
    
    const client = new PollClient('http://api', 'test', { onEvent: jest.fn() });
    client.start();
    
    await Promise.resolve();
    jest.runAllTimers();
    await Promise.resolve();
    jest.runAllTimers();
    await Promise.resolve();
    
    const secondCallUrl = fetch.mock.calls[1][0];
    expect(secondCallUrl).toContain('last_id=100');
  });

  test('includes correct timeout in poll URL', () => {
    mockFetchResponse({ status: 'timeout' });
    
    const client = new PollClient('http://api', 'test', {
      onEvent: jest.fn(),
      timeout: 45
    });
    client.start();
    
    expect(fetch.mock.calls[0][0]).toContain('timeout=45');
  });
});

Testing Timeout Edge Cases

# tests/test_long_poll_timeouts.py
import pytest
import asyncio
import httpx
import time
from src.long_poll_server import app

@pytest.mark.asyncio
async def test_minimum_timeout_honored():
    """timeout=1 should return in roughly 1 second."""
    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
        start = time.time()
        response = await ac.get("/poll/empty?timeout=1", timeout=10.0)
        elapsed = time.time() - start
    
    assert response.json()["status"] == "timeout"
    assert 0.8 <= elapsed <= 2.0  # Allow some variance

@pytest.mark.asyncio
async def test_max_timeout_capped():
    """Server should cap timeout at maximum allowed value."""
    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
        # Request 300s timeout — server should cap at 60s
        response = await ac.get("/poll/empty?timeout=300", timeout=10.0)
    
    # Should respond quickly (capped + no data = timeout response)
    # The server validates max and uses its own cap
    assert response.status_code == 200

@pytest.mark.asyncio
async def test_concurrent_polls_on_same_channel():
    """Multiple clients polling same channel — each gets its own event."""
    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
        # Start 3 concurrent polls
        poll_tasks = [
            asyncio.create_task(ac.get("/poll/shared?timeout=5"))
            for _ in range(3)
        ]
        
        # Publish 3 events
        await asyncio.sleep(0.1)
        for i in range(3):
            await ac.post("/publish/shared", json={"i": i})
        
        responses = await asyncio.gather(*poll_tasks)
    
    # All should have received data (queue distributes one event per consumer)
    data_responses = [r for r in responses if r.json()["status"] == "data"]
    assert len(data_responses) >= 1  # At least one got data

Integration Test: Full Polling Loop

# tests/test_polling_integration.py
import pytest
import asyncio
import httpx
from src.long_poll_server import app

@pytest.mark.asyncio
async def test_full_polling_loop_receives_all_events():
    """Simulate a complete polling loop that receives 5 events."""
    received = []
    
    async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
        async def poll_loop():
            last_id = 0
            while len(received) < 5:
                response = await ac.get(f"/poll/events?timeout=5&last_id={last_id}")
                data = response.json()
                if data["status"] == "data":
                    received.append(data["event"])
                    last_id = data.get("timestamp", last_id)
        
        async def publish_events():
            await asyncio.sleep(0.2)
            for i in range(5):
                await ac.post("/publish/events", json={"seq": i})
                await asyncio.sleep(0.05)
        
        await asyncio.gather(
            asyncio.wait_for(poll_loop(), timeout=15),
            publish_events()
        )
    
    assert len(received) == 5
    assert [e["seq"] for e in received] == list(range(5))

End-to-End Testing

For applications using long polling in their UI, end-to-end tests validate that updates actually appear for users:

HelpMeTest can test long-polling-based real-time UIs:

*** Test Cases ***
Polling-Based Notifications Appear In UI
    Go To    https://your-app.com/notifications
    Element Should Contain    .notification-count    0
    # Trigger server-side event via API
    POST    /api/events    {"type": "alert", "message": "New alert!"}
    Wait Until Element Contains    .notification-list    New alert!    timeout=15s

Live Status Updates Via Polling
    Go To    https://your-app.com/job-status/job-123
    Wait Until Element Contains    .job-status    processing    timeout=5s
    # Job completes on the server
    Wait Until Element Contains    .job-status    completed    timeout=60s

Summary

  • Test timeout behavior — the server should return empty responses, not hang indefinitely
  • Test channel isolation — events in one channel must not appear in another
  • Test sequential delivery — events should be consumed in order across re-polls
  • Test the polling loop — client must re-poll after every response (data or timeout)
  • Test retry logic — network failures should trigger backoff, not a fast retry loop
  • Test last_id deduplication — client should track the last event received
  • Test concurrent clients — queue semantics under load matter for reliability
  • Use end-to-end tests to verify that UI actually updates when the server publishes events

Read more