Real-Time Application Testing: How to Test Live Updates and Push Notifications
Real-time applications — live dashboards, collaborative tools, chat systems, notification feeds — are notoriously hard to test. Events arrive asynchronously, timing matters, and traditional request/response testing patterns don't apply. This guide covers every approach you need.
Why Real-Time Testing Is Different
Traditional REST API testing is synchronous: send request, assert response. Real-time apps are different:
- Events arrive when the server decides, not when you ask
- Multiple clients may be affected by one action
- Order of events matters
- Network conditions affect behavior
- Connection state (connected/disconnected/reconnecting) changes over time
You need to test the sequence of events, not just individual responses.
Testing Strategy by Layer
Layer 1: Unit Tests
└── Test event handlers, state machines, notification logic
Layer 2: Integration Tests
└── Test WebSocket/SSE connections, message routing
Layer 3: End-to-End Tests
└── Test real-time UI updates from the user's perspectiveLayer 1: Unit Testing Event Handlers
Test your event processing logic without any network:
# src/event_processor.py
from dataclasses import dataclass
from typing import Callable
import asyncio
@dataclass
class Event:
type: str
payload: dict
timestamp: float
class EventProcessor:
def __init__(self):
self.handlers: dict[str, list[Callable]] = {}
self.processed_events: list[Event] = []
self.state = "idle"
def on(self, event_type: str, handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
async def process(self, event: Event):
self.processed_events.append(event)
for handler in self.handlers.get(event.type, []):
await handler(event)
for handler in self.handlers.get("*", []): # Wildcard handlers
await handler(event)# tests/test_event_processor.py
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock
from src.event_processor import EventProcessor, Event
import time
@pytest.mark.asyncio
async def test_handler_called_for_matching_event():
processor = EventProcessor()
handler = AsyncMock()
processor.on("user.joined", handler)
event = Event(type="user.joined", payload={"user_id": "abc"}, timestamp=time.time())
await processor.process(event)
handler.assert_called_once_with(event)
@pytest.mark.asyncio
async def test_handler_not_called_for_different_event():
processor = EventProcessor()
handler = AsyncMock()
processor.on("user.joined", handler)
await processor.process(Event("user.left", {}, time.time()))
handler.assert_not_called()
@pytest.mark.asyncio
async def test_wildcard_handler_called_for_all_events():
processor = EventProcessor()
wildcard = AsyncMock()
specific = AsyncMock()
processor.on("*", wildcard)
processor.on("user.joined", specific)
await processor.process(Event("user.joined", {}, time.time()))
await processor.process(Event("message.sent", {}, time.time()))
assert wildcard.call_count == 2
assert specific.call_count == 1
@pytest.mark.asyncio
async def test_events_processed_in_order():
processor = EventProcessor()
received = []
processor.on("*", AsyncMock(side_effect=lambda e: received.append(e.payload["seq"])))
for i in range(5):
await processor.process(Event("tick", {"seq": i}, time.time()))
assert received == [0, 1, 2, 3, 4]
@pytest.mark.asyncio
async def test_multiple_handlers_for_same_event():
processor = EventProcessor()
h1, h2, h3 = AsyncMock(), AsyncMock(), AsyncMock()
for h in [h1, h2, h3]:
processor.on("update", h)
await processor.process(Event("update", {}, time.time()))
h1.assert_called_once()
h2.assert_called_once()
h3.assert_called_once()Testing Notification Logic
# tests/test_notification_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.notification_service import NotificationService
class TestNotificationService:
@pytest.fixture
def service(self):
return NotificationService()
@pytest.mark.asyncio
async def test_sends_push_when_user_offline(self, service):
service.is_user_online = MagicMock(return_value=False)
service.send_push_notification = AsyncMock()
service.send_websocket_message = AsyncMock()
await service.notify_user("user-123", {"type": "message", "text": "Hello"})
service.send_push_notification.assert_called_once()
service.send_websocket_message.assert_not_called()
@pytest.mark.asyncio
async def test_sends_websocket_when_user_online(self, service):
service.is_user_online = MagicMock(return_value=True)
service.send_push_notification = AsyncMock()
service.send_websocket_message = AsyncMock()
await service.notify_user("user-123", {"type": "message", "text": "Hello"})
service.send_websocket_message.assert_called_once()
service.send_push_notification.assert_not_called()
@pytest.mark.asyncio
async def test_falls_back_to_push_if_websocket_fails(self, service):
service.is_user_online = MagicMock(return_value=True)
service.send_push_notification = AsyncMock()
service.send_websocket_message = AsyncMock(side_effect=ConnectionError("WS closed"))
await service.notify_user("user-123", {"type": "message", "text": "Hello"})
service.send_push_notification.assert_called_once()
@pytest.mark.asyncio
async def test_batches_notifications_for_offline_users(self, service):
service.is_user_online = MagicMock(return_value=False)
service.send_push_notification = AsyncMock()
# Send 3 notifications rapidly
for i in range(3):
await service.notify_user("user-123", {"seq": i})
# Should batch into one push (or send individually — test your policy)
assert service.send_push_notification.call_count <= 3Layer 2: Integration Testing with Real Connections
Testing with asyncio and in-process server
# tests/test_realtime_integration.py
import pytest
import asyncio
import aiohttp
from src.server import create_app
@pytest.fixture
async def app_server(aiohttp_server):
app = create_app()
server = await aiohttp_server(app)
return server
@pytest.mark.asyncio
async def test_client_receives_broadcast(app_server):
"""Verify that a broadcast reaches all connected clients."""
received = []
async with aiohttp.ClientSession() as session:
# Connect client
async with session.ws_connect(f"{app_server.make_url('/ws')}") as ws:
# Trigger broadcast from a second connection
async with session.post(app_server.make_url('/api/broadcast'),
json={"message": "hello everyone"}) as resp:
assert resp.status == 200
# Read the broadcasted message
msg = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
received.append(msg)
assert len(received) == 1
assert received[0]["message"] == "hello everyone"
@pytest.mark.asyncio
async def test_private_message_only_reaches_target(app_server):
"""Verify that private messages don't leak to other clients."""
client1_received = []
client2_received = []
async with aiohttp.ClientSession() as session:
async with session.ws_connect(f"{app_server.make_url('/ws')}?userId=user1") as ws1, \
session.ws_connect(f"{app_server.make_url('/ws')}?userId=user2") as ws2:
# Send private message to user1
await session.post(app_server.make_url('/api/private'),
json={"to": "user1", "message": "secret for user1"})
# user1 should receive it
try:
msg = await asyncio.wait_for(ws1.receive_json(), timeout=3.0)
client1_received.append(msg)
except asyncio.TimeoutError:
pass
# user2 should NOT receive it
try:
msg = await asyncio.wait_for(ws2.receive_json(), timeout=1.0)
client2_received.append(msg)
except asyncio.TimeoutError:
pass
assert len(client1_received) == 1
assert len(client2_received) == 0
@pytest.mark.asyncio
async def test_live_counter_updates(app_server):
"""Test that a live counter updates correctly."""
counts = []
async with aiohttp.ClientSession() as session:
async with session.ws_connect(app_server.make_url('/ws/counter')) as ws:
# Collect 5 counter updates
for _ in range(5):
msg = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
counts.append(msg["count"])
assert len(counts) == 5
assert counts == sorted(counts) # Counter only goes up
assert counts[-1] > counts[0] # Actually incrementedTesting Push Notifications
For mobile/browser push notifications, test the integration layer:
# tests/test_push_notifications.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
class TestPushNotifications:
@pytest.mark.asyncio
async def test_sends_apns_for_ios_device(self):
with patch("src.push.apns_client") as mock_apns:
mock_apns.send_notification = AsyncMock(return_value={"status": "sent"})
from src.push import send_push
result = await send_push(
device_token="abc123",
platform="ios",
title="New message",
body="You have a new message"
)
mock_apns.send_notification.assert_called_once()
call_args = mock_apns.send_notification.call_args
assert call_args.kwargs["token"] == "abc123"
assert call_args.kwargs["title"] == "New message"
@pytest.mark.asyncio
async def test_sends_fcm_for_android_device(self):
with patch("src.push.fcm_client") as mock_fcm:
mock_fcm.send = AsyncMock(return_value={"message_id": "msg-123"})
from src.push import send_push
await send_push(
device_token="android-token-xyz",
platform="android",
title="New message",
body="You have a new message"
)
mock_fcm.send.assert_called_once()
@pytest.mark.asyncio
async def test_handles_expired_push_token(self):
with patch("src.push.apns_client") as mock_apns:
mock_apns.send_notification = AsyncMock(
side_effect=Exception("DeviceTokenNotForTopic")
)
from src.push import send_push, PushError
with pytest.raises(PushError, match="token expired"):
await send_push("expired-token", "ios", "Title", "Body")
@pytest.mark.asyncio
async def test_push_notification_payload_within_size_limit(self):
"""APNs has a 4KB payload limit."""
from src.push import build_apns_payload
payload = build_apns_payload(
title="T" * 50,
body="B" * 200,
data={"key": "v" * 100}
)
import json
payload_size = len(json.dumps(payload).encode("utf-8"))
assert payload_size <= 4096, f"Payload too large: {payload_size} bytes"Testing Presence and Online Status
# tests/test_presence.py
import pytest
import asyncio
from src.presence import PresenceManager
class TestPresenceManager:
@pytest.fixture
def presence(self):
return PresenceManager()
@pytest.mark.asyncio
async def test_user_marked_online_on_connect(self, presence):
await presence.connect("user-123", "session-abc")
assert await presence.is_online("user-123")
@pytest.mark.asyncio
async def test_user_marked_offline_on_disconnect(self, presence):
await presence.connect("user-123", "session-abc")
await presence.disconnect("user-123", "session-abc")
assert not await presence.is_online("user-123")
@pytest.mark.asyncio
async def test_user_stays_online_with_multiple_sessions(self, presence):
"""User with 2 tabs open — closing one tab should keep them online."""
await presence.connect("user-123", "session-1")
await presence.connect("user-123", "session-2")
await presence.disconnect("user-123", "session-1")
assert await presence.is_online("user-123") # Still connected via session-2
@pytest.mark.asyncio
async def test_last_seen_updated_on_disconnect(self, presence):
import time
before = time.time()
await presence.connect("user-123", "session-abc")
await presence.disconnect("user-123", "session-abc")
last_seen = await presence.get_last_seen("user-123")
assert last_seen >= before
@pytest.mark.asyncio
async def test_presence_change_triggers_subscribers(self, presence):
changes = []
presence.on_change(lambda user_id, status: changes.append((user_id, status)))
await presence.connect("user-123", "session-abc")
await presence.disconnect("user-123", "session-abc")
assert ("user-123", "online") in changes
assert ("user-123", "offline") in changesTesting Real-Time State Synchronization
Collaborative apps need to test state consistency across clients:
# tests/test_state_sync.py
import pytest
import asyncio
class TestStateSync:
@pytest.mark.asyncio
async def test_concurrent_updates_resolve_consistently(self, realtime_server):
"""Two clients updating the same resource should converge."""
client1_state = []
client2_state = []
async with realtime_server.connect("user1") as c1, \
realtime_server.connect("user2") as c2:
# Both clients update simultaneously
await asyncio.gather(
c1.send({"action": "increment", "field": "counter"}),
c2.send({"action": "increment", "field": "counter"}),
)
# Wait for convergence
await asyncio.sleep(0.5)
s1 = await c1.get_state()
s2 = await c2.get_state()
# Both clients must see the same final state
assert s1["counter"] == s2["counter"]
assert s1["counter"] >= 2 # Both increments applied
@pytest.mark.asyncio
async def test_state_is_recovered_after_reconnect(self, realtime_server):
"""Client should recover state after reconnecting."""
async with realtime_server.connect("user1") as c1:
await c1.send({"action": "set", "key": "name", "value": "Alice"})
state_before = await c1.get_state()
# Disconnect and reconnect
async with realtime_server.connect("user1") as c1:
state_after = await c1.get_state()
assert state_after["name"] == state_before["name"]End-to-End Real-Time Testing with HelpMeTest
Unit and integration tests cover your event infrastructure. But users experience real-time features through a UI. HelpMeTest validates the complete user experience:
*** Test Cases ***
Live Chat Messages Appear Without Refresh
Open Browser https://your-app.com/chat/room-1 Chrome
Open Browser https://your-app.com/chat/room-1 Chrome alias=user2
Switch Browser default
Fill Text #message-input Hello from user 1
Click #send-button
Switch Browser user2
Wait Until Element Contains .message-list Hello from user 1 timeout=5s
Notification Counter Updates In Real-Time
Go To https://your-app.com/dashboard
${before}= Get Text .notification-count
# Trigger action that generates notification (via API)
POST /api/trigger-notification {"userId": "test-user"}
Wait Until Element Is Not .notification-count ${before} timeout=10s
Live Dashboard Reflects Data Changes
Go To https://your-app.com/dashboard
${initial}= Get Text .live-metric
Sleep 5s
${updated}= Get Text .live-metric
Should Not Be Equal ${initial} ${updated}Summary
- Separate concerns — event handler logic (unit tests), connection/routing (integration), UI updates (e2e)
- Use
asyncio.wait_forwith timeouts in async tests — never wait indefinitely - Test ordering — real-time events must arrive in the correct sequence
- Test concurrent updates — multiple clients acting simultaneously is a common race condition source
- Test presence logic — online/offline state with multiple sessions is tricky to get right
- Test notification routing — online users get WebSocket, offline users get push
- End-to-end tests verify users actually see live updates, not just that your server sends them