Long Polling Testing Guide: Test HTTP Polling and Comet Patterns
Long polling is a technique where the client sends an HTTP request that the server holds open until data is available — then responds and the client immediately sends another request. It's older than WebSockets or SSE, but still widely used for compatibility, simplicity, and firewall traversal. Testing long polling correctly means testing timing, timeouts, and the polling loop itself.
How Long Polling Works
Client Server
|-- GET /poll ------------>|
| | (waits for data or timeout)
|<-- 200 {data} -----------| ← data arrived
|-- GET /poll ------------>| ← immediately re-polls
| |
| (30s timeout)
|<-- 200 {status:"empty"} | ← no data, client re-polls
|-- GET /poll ------------>|The client must:
- Send a request with a timeout parameter
- Handle a real response (data available)
- Handle an empty/timeout response (no data, re-poll)
- Handle errors (network failure, server down)
- Stop polling when instructed
Server-Side Long Polling Implementation
# src/long_poll_server.py
import asyncio
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse
app = FastAPI()
# In-memory event queue per channel (use Redis in production)
event_queues: dict[str, asyncio.Queue] = {}
def get_queue(channel: str) -> asyncio.Queue:
if channel not in event_queues:
event_queues[channel] = asyncio.Queue()
return event_queues[channel]
@app.get("/poll/{channel}")
async def long_poll(
channel: str,
timeout: int = Query(default=30, ge=1, le=60),
last_id: int = Query(default=0)
):
queue = get_queue(channel)
try:
# Wait for an event or timeout
event = await asyncio.wait_for(queue.get(), timeout=timeout)
return JSONResponse({
"status": "data",
"event": event,
"timestamp": event.get("id", 0)
})
except asyncio.TimeoutError:
return JSONResponse({
"status": "timeout",
"message": "No new events, please re-poll"
})
@app.post("/publish/{channel}")
async def publish_event(channel: str, event: dict):
queue = get_queue(channel)
await queue.put(event)
return {"status": "published"}Testing the Long Poll Endpoint
# tests/test_long_poll_server.py
import pytest
import asyncio
import httpx
from fastapi.testclient import TestClient
from src.long_poll_server import app, event_queues
client = TestClient(app)
@pytest.fixture(autouse=True)
def clear_queues():
event_queues.clear()
yield
event_queues.clear()
class TestLongPollEndpoint:
def test_returns_timeout_when_no_events(self):
"""With no events queued, should return timeout response."""
response = client.get("/poll/test-channel?timeout=1")
assert response.status_code == 200
data = response.json()
assert data["status"] == "timeout"
def test_returns_data_when_event_available(self):
"""Pre-queue an event, then poll — should return immediately."""
# Publish an event first
client.post("/publish/test-channel", json={"type": "update", "value": 42})
# Poll — should return immediately with the event
response = client.get("/poll/test-channel?timeout=5")
assert response.status_code == 200
data = response.json()
assert data["status"] == "data"
assert data["event"]["type"] == "update"
assert data["event"]["value"] == 42
def test_different_channels_isolated(self):
"""Events in channel A should not appear in channel B polls."""
client.post("/publish/channel-a", json={"msg": "for A"})
response = client.get("/poll/channel-b?timeout=1")
data = response.json()
assert data["status"] == "timeout" # Channel B got nothing
def test_timeout_parameter_respected(self):
"""Short timeout should return quickly."""
import time
start = time.time()
client.get("/poll/empty-channel?timeout=1")
elapsed = time.time() - start
assert elapsed < 2.5 # Should return close to the 1s timeout
def test_multiple_events_delivered_sequentially(self):
"""Multiple events should be consumed one per poll."""
for i in range(3):
client.post("/publish/seq-channel", json={"seq": i})
events = []
for _ in range(3):
response = client.get("/poll/seq-channel?timeout=1")
data = response.json()
assert data["status"] == "data"
events.append(data["event"]["seq"])
assert events == [0, 1, 2]
@pytest.mark.asyncio
async def test_publish_while_polling():
"""Event published during an active poll should be delivered."""
async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
async def do_poll():
return await ac.get("/poll/live-channel?timeout=5")
async def publish_after_delay():
await asyncio.sleep(0.5)
await ac.post("/publish/live-channel", json={"msg": "live event"})
poll_task = asyncio.create_task(do_poll())
publish_task = asyncio.create_task(publish_after_delay())
response, _ = await asyncio.gather(poll_task, publish_task)
assert response.status_code == 200
data = response.json()
assert data["status"] == "data"
assert data["event"]["msg"] == "live event"Testing the Client-Side Polling Loop
// src/pollClient.js
class PollClient {
constructor(baseUrl, channel, options = {}) {
this.baseUrl = baseUrl;
this.channel = channel;
this.timeout = options.timeout || 30;
this.onEvent = options.onEvent || (() => {});
this.onError = options.onError || (() => {});
this.running = false;
this.retryDelay = options.retryDelay || 1000;
this.maxRetries = options.maxRetries || 5;
this.retryCount = 0;
this.lastId = 0;
}
start() {
this.running = true;
this._poll();
}
stop() {
this.running = false;
}
async _poll() {
if (!this.running) return;
try {
const res = await fetch(
`${this.baseUrl}/poll/${this.channel}?timeout=${this.timeout}&last_id=${this.lastId}`
);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const data = await res.json();
this.retryCount = 0; // Reset on success
if (data.status === 'data') {
this.lastId = data.timestamp || this.lastId;
this.onEvent(data.event);
}
// On 'timeout', just re-poll immediately
} catch (err) {
this.retryCount++;
if (this.retryCount >= this.maxRetries) {
this.onError(new Error(`Max retries (${this.maxRetries}) exceeded: ${err.message}`));
this.running = false;
return;
}
await this._sleep(this.retryDelay * this.retryCount);
}
if (this.running) {
setTimeout(() => this._poll(), 0);
}
}
_sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
module.exports = { PollClient };// tests/pollClient.test.js
const { PollClient } = require('../src/pollClient');
global.fetch = jest.fn();
describe('PollClient', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
function mockFetchResponse(data) {
fetch.mockResolvedValueOnce({
ok: true,
json: async () => data
});
}
function mockFetchError(message = 'Network error') {
fetch.mockRejectedValueOnce(new Error(message));
}
test('calls onEvent when data received', async () => {
const onEvent = jest.fn();
mockFetchResponse({ status: 'data', event: { type: 'update', value: 42 } });
mockFetchResponse({ status: 'timeout' }); // Stop after first event
const client = new PollClient('http://api', 'test', { onEvent, timeout: 30 });
client.start();
await Promise.resolve(); // Let microtasks run
jest.runAllTimers();
await Promise.resolve();
expect(onEvent).toHaveBeenCalledWith({ type: 'update', value: 42 });
});
test('re-polls after timeout response', async () => {
mockFetchResponse({ status: 'timeout' });
mockFetchResponse({ status: 'timeout' });
mockFetchResponse({ status: 'timeout' });
const client = new PollClient('http://api', 'test', { onEvent: jest.fn() });
client.start();
// Let 3 polls complete
for (let i = 0; i < 3; i++) {
await Promise.resolve();
jest.runAllTimers();
}
await Promise.resolve();
expect(fetch).toHaveBeenCalledTimes(3);
});
test('stops polling when stop() called', async () => {
mockFetchResponse({ status: 'timeout' });
const client = new PollClient('http://api', 'test', { onEvent: jest.fn() });
client.start();
await Promise.resolve();
client.stop();
jest.runAllTimers();
await Promise.resolve();
expect(fetch).toHaveBeenCalledTimes(1); // Only the initial poll
});
test('retries on network error', async () => {
mockFetchError('Connection refused');
mockFetchResponse({ status: 'timeout' }); // Succeeds on retry
const client = new PollClient('http://api', 'test', {
onEvent: jest.fn(),
retryDelay: 100,
maxRetries: 3
});
client.start();
await Promise.resolve();
jest.runAllTimers();
await Promise.resolve();
expect(fetch).toHaveBeenCalledTimes(2);
});
test('calls onError after max retries exceeded', async () => {
const onError = jest.fn();
for (let i = 0; i <= 5; i++) {
mockFetchError('Server down');
}
const client = new PollClient('http://api', 'test', {
onEvent: jest.fn(),
onError,
maxRetries: 5,
retryDelay: 10
});
client.start();
// Run through all retries
for (let i = 0; i < 10; i++) {
await Promise.resolve();
jest.runAllTimers();
}
await Promise.resolve();
expect(onError).toHaveBeenCalledTimes(1);
expect(onError.mock.calls[0][0].message).toContain('Max retries');
});
test('sends last_id parameter for deduplication', async () => {
mockFetchResponse({ status: 'data', event: { id: 100, msg: 'first' }, timestamp: 100 });
mockFetchResponse({ status: 'timeout' });
const client = new PollClient('http://api', 'test', { onEvent: jest.fn() });
client.start();
await Promise.resolve();
jest.runAllTimers();
await Promise.resolve();
jest.runAllTimers();
await Promise.resolve();
const secondCallUrl = fetch.mock.calls[1][0];
expect(secondCallUrl).toContain('last_id=100');
});
test('includes correct timeout in poll URL', () => {
mockFetchResponse({ status: 'timeout' });
const client = new PollClient('http://api', 'test', {
onEvent: jest.fn(),
timeout: 45
});
client.start();
expect(fetch.mock.calls[0][0]).toContain('timeout=45');
});
});Testing Timeout Edge Cases
# tests/test_long_poll_timeouts.py
import pytest
import asyncio
import httpx
import time
from src.long_poll_server import app
@pytest.mark.asyncio
async def test_minimum_timeout_honored():
"""timeout=1 should return in roughly 1 second."""
async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
start = time.time()
response = await ac.get("/poll/empty?timeout=1", timeout=10.0)
elapsed = time.time() - start
assert response.json()["status"] == "timeout"
assert 0.8 <= elapsed <= 2.0 # Allow some variance
@pytest.mark.asyncio
async def test_max_timeout_capped():
"""Server should cap timeout at maximum allowed value."""
async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
# Request 300s timeout — server should cap at 60s
response = await ac.get("/poll/empty?timeout=300", timeout=10.0)
# Should respond quickly (capped + no data = timeout response)
# The server validates max and uses its own cap
assert response.status_code == 200
@pytest.mark.asyncio
async def test_concurrent_polls_on_same_channel():
"""Multiple clients polling same channel — each gets its own event."""
async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
# Start 3 concurrent polls
poll_tasks = [
asyncio.create_task(ac.get("/poll/shared?timeout=5"))
for _ in range(3)
]
# Publish 3 events
await asyncio.sleep(0.1)
for i in range(3):
await ac.post("/publish/shared", json={"i": i})
responses = await asyncio.gather(*poll_tasks)
# All should have received data (queue distributes one event per consumer)
data_responses = [r for r in responses if r.json()["status"] == "data"]
assert len(data_responses) >= 1 # At least one got dataIntegration Test: Full Polling Loop
# tests/test_polling_integration.py
import pytest
import asyncio
import httpx
from src.long_poll_server import app
@pytest.mark.asyncio
async def test_full_polling_loop_receives_all_events():
"""Simulate a complete polling loop that receives 5 events."""
received = []
async with httpx.AsyncClient(app=app, base_url="http://test") as ac:
async def poll_loop():
last_id = 0
while len(received) < 5:
response = await ac.get(f"/poll/events?timeout=5&last_id={last_id}")
data = response.json()
if data["status"] == "data":
received.append(data["event"])
last_id = data.get("timestamp", last_id)
async def publish_events():
await asyncio.sleep(0.2)
for i in range(5):
await ac.post("/publish/events", json={"seq": i})
await asyncio.sleep(0.05)
await asyncio.gather(
asyncio.wait_for(poll_loop(), timeout=15),
publish_events()
)
assert len(received) == 5
assert [e["seq"] for e in received] == list(range(5))End-to-End Testing
For applications using long polling in their UI, end-to-end tests validate that updates actually appear for users:
HelpMeTest can test long-polling-based real-time UIs:
*** Test Cases ***
Polling-Based Notifications Appear In UI
Go To https://your-app.com/notifications
Element Should Contain .notification-count 0
# Trigger server-side event via API
POST /api/events {"type": "alert", "message": "New alert!"}
Wait Until Element Contains .notification-list New alert! timeout=15s
Live Status Updates Via Polling
Go To https://your-app.com/job-status/job-123
Wait Until Element Contains .job-status processing timeout=5s
# Job completes on the server
Wait Until Element Contains .job-status completed timeout=60sSummary
- Test timeout behavior — the server should return empty responses, not hang indefinitely
- Test channel isolation — events in one channel must not appear in another
- Test sequential delivery — events should be consumed in order across re-polls
- Test the polling loop — client must re-poll after every response (data or timeout)
- Test retry logic — network failures should trigger backoff, not a fast retry loop
- Test
last_iddeduplication — client should track the last event received - Test concurrent clients — queue semantics under load matter for reliability
- Use end-to-end tests to verify that UI actually updates when the server publishes events