Real-Time Application Testing: How to Test Live Updates and Push Notifications

Real-Time Application Testing: How to Test Live Updates and Push Notifications

Real-time applications — live dashboards, collaborative tools, chat systems, notification feeds — are notoriously hard to test. Events arrive asynchronously, timing matters, and traditional request/response testing patterns don't apply. This guide covers every approach you need.

Why Real-Time Testing Is Different

Traditional REST API testing is synchronous: send request, assert response. Real-time apps are different:

  • Events arrive when the server decides, not when you ask
  • Multiple clients may be affected by one action
  • Order of events matters
  • Network conditions affect behavior
  • Connection state (connected/disconnected/reconnecting) changes over time

You need to test the sequence of events, not just individual responses.

Testing Strategy by Layer

Layer 1: Unit Tests
  └── Test event handlers, state machines, notification logic

Layer 2: Integration Tests  
  └── Test WebSocket/SSE connections, message routing

Layer 3: End-to-End Tests
  └── Test real-time UI updates from the user's perspective

Layer 1: Unit Testing Event Handlers

Test your event processing logic without any network:

# src/event_processor.py
from dataclasses import dataclass
from typing import Callable
import asyncio

@dataclass
class Event:
    type: str
    payload: dict
    timestamp: float

class EventProcessor:
    def __init__(self):
        self.handlers: dict[str, list[Callable]] = {}
        self.processed_events: list[Event] = []
        self.state = "idle"
    
    def on(self, event_type: str, handler: Callable):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    async def process(self, event: Event):
        self.processed_events.append(event)
        for handler in self.handlers.get(event.type, []):
            await handler(event)
        for handler in self.handlers.get("*", []):  # Wildcard handlers
            await handler(event)
# tests/test_event_processor.py
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock
from src.event_processor import EventProcessor, Event
import time

@pytest.mark.asyncio
async def test_handler_called_for_matching_event():
    processor = EventProcessor()
    handler = AsyncMock()
    processor.on("user.joined", handler)
    
    event = Event(type="user.joined", payload={"user_id": "abc"}, timestamp=time.time())
    await processor.process(event)
    
    handler.assert_called_once_with(event)

@pytest.mark.asyncio
async def test_handler_not_called_for_different_event():
    processor = EventProcessor()
    handler = AsyncMock()
    processor.on("user.joined", handler)
    
    await processor.process(Event("user.left", {}, time.time()))
    
    handler.assert_not_called()

@pytest.mark.asyncio
async def test_wildcard_handler_called_for_all_events():
    processor = EventProcessor()
    wildcard = AsyncMock()
    specific = AsyncMock()
    processor.on("*", wildcard)
    processor.on("user.joined", specific)
    
    await processor.process(Event("user.joined", {}, time.time()))
    await processor.process(Event("message.sent", {}, time.time()))
    
    assert wildcard.call_count == 2
    assert specific.call_count == 1

@pytest.mark.asyncio
async def test_events_processed_in_order():
    processor = EventProcessor()
    received = []
    processor.on("*", AsyncMock(side_effect=lambda e: received.append(e.payload["seq"])))
    
    for i in range(5):
        await processor.process(Event("tick", {"seq": i}, time.time()))
    
    assert received == [0, 1, 2, 3, 4]

@pytest.mark.asyncio
async def test_multiple_handlers_for_same_event():
    processor = EventProcessor()
    h1, h2, h3 = AsyncMock(), AsyncMock(), AsyncMock()
    for h in [h1, h2, h3]:
        processor.on("update", h)
    
    await processor.process(Event("update", {}, time.time()))
    
    h1.assert_called_once()
    h2.assert_called_once()
    h3.assert_called_once()

Testing Notification Logic

# tests/test_notification_service.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.notification_service import NotificationService

class TestNotificationService:
    @pytest.fixture
    def service(self):
        return NotificationService()
    
    @pytest.mark.asyncio
    async def test_sends_push_when_user_offline(self, service):
        service.is_user_online = MagicMock(return_value=False)
        service.send_push_notification = AsyncMock()
        service.send_websocket_message = AsyncMock()
        
        await service.notify_user("user-123", {"type": "message", "text": "Hello"})
        
        service.send_push_notification.assert_called_once()
        service.send_websocket_message.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_sends_websocket_when_user_online(self, service):
        service.is_user_online = MagicMock(return_value=True)
        service.send_push_notification = AsyncMock()
        service.send_websocket_message = AsyncMock()
        
        await service.notify_user("user-123", {"type": "message", "text": "Hello"})
        
        service.send_websocket_message.assert_called_once()
        service.send_push_notification.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_falls_back_to_push_if_websocket_fails(self, service):
        service.is_user_online = MagicMock(return_value=True)
        service.send_push_notification = AsyncMock()
        service.send_websocket_message = AsyncMock(side_effect=ConnectionError("WS closed"))
        
        await service.notify_user("user-123", {"type": "message", "text": "Hello"})
        
        service.send_push_notification.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_batches_notifications_for_offline_users(self, service):
        service.is_user_online = MagicMock(return_value=False)
        service.send_push_notification = AsyncMock()
        
        # Send 3 notifications rapidly
        for i in range(3):
            await service.notify_user("user-123", {"seq": i})
        
        # Should batch into one push (or send individually — test your policy)
        assert service.send_push_notification.call_count <= 3

Layer 2: Integration Testing with Real Connections

Testing with asyncio and in-process server

# tests/test_realtime_integration.py
import pytest
import asyncio
import aiohttp
from src.server import create_app

@pytest.fixture
async def app_server(aiohttp_server):
    app = create_app()
    server = await aiohttp_server(app)
    return server

@pytest.mark.asyncio
async def test_client_receives_broadcast(app_server):
    """Verify that a broadcast reaches all connected clients."""
    received = []
    
    async with aiohttp.ClientSession() as session:
        # Connect client
        async with session.ws_connect(f"{app_server.make_url('/ws')}") as ws:
            # Trigger broadcast from a second connection
            async with session.post(app_server.make_url('/api/broadcast'),
                                    json={"message": "hello everyone"}) as resp:
                assert resp.status == 200
            
            # Read the broadcasted message
            msg = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
            received.append(msg)
    
    assert len(received) == 1
    assert received[0]["message"] == "hello everyone"

@pytest.mark.asyncio
async def test_private_message_only_reaches_target(app_server):
    """Verify that private messages don't leak to other clients."""
    client1_received = []
    client2_received = []
    
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(f"{app_server.make_url('/ws')}?userId=user1") as ws1, \
                   session.ws_connect(f"{app_server.make_url('/ws')}?userId=user2") as ws2:
            
            # Send private message to user1
            await session.post(app_server.make_url('/api/private'),
                               json={"to": "user1", "message": "secret for user1"})
            
            # user1 should receive it
            try:
                msg = await asyncio.wait_for(ws1.receive_json(), timeout=3.0)
                client1_received.append(msg)
            except asyncio.TimeoutError:
                pass
            
            # user2 should NOT receive it
            try:
                msg = await asyncio.wait_for(ws2.receive_json(), timeout=1.0)
                client2_received.append(msg)
            except asyncio.TimeoutError:
                pass
    
    assert len(client1_received) == 1
    assert len(client2_received) == 0

@pytest.mark.asyncio
async def test_live_counter_updates(app_server):
    """Test that a live counter updates correctly."""
    counts = []
    
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(app_server.make_url('/ws/counter')) as ws:
            # Collect 5 counter updates
            for _ in range(5):
                msg = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
                counts.append(msg["count"])
    
    assert len(counts) == 5
    assert counts == sorted(counts)  # Counter only goes up
    assert counts[-1] > counts[0]   # Actually incremented

Testing Push Notifications

For mobile/browser push notifications, test the integration layer:

# tests/test_push_notifications.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock

class TestPushNotifications:
    @pytest.mark.asyncio
    async def test_sends_apns_for_ios_device(self):
        with patch("src.push.apns_client") as mock_apns:
            mock_apns.send_notification = AsyncMock(return_value={"status": "sent"})
            
            from src.push import send_push
            result = await send_push(
                device_token="abc123",
                platform="ios",
                title="New message",
                body="You have a new message"
            )
            
            mock_apns.send_notification.assert_called_once()
            call_args = mock_apns.send_notification.call_args
            assert call_args.kwargs["token"] == "abc123"
            assert call_args.kwargs["title"] == "New message"

    @pytest.mark.asyncio
    async def test_sends_fcm_for_android_device(self):
        with patch("src.push.fcm_client") as mock_fcm:
            mock_fcm.send = AsyncMock(return_value={"message_id": "msg-123"})
            
            from src.push import send_push
            await send_push(
                device_token="android-token-xyz",
                platform="android",
                title="New message",
                body="You have a new message"
            )
            
            mock_fcm.send.assert_called_once()

    @pytest.mark.asyncio
    async def test_handles_expired_push_token(self):
        with patch("src.push.apns_client") as mock_apns:
            mock_apns.send_notification = AsyncMock(
                side_effect=Exception("DeviceTokenNotForTopic")
            )
            
            from src.push import send_push, PushError
            with pytest.raises(PushError, match="token expired"):
                await send_push("expired-token", "ios", "Title", "Body")

    @pytest.mark.asyncio
    async def test_push_notification_payload_within_size_limit(self):
        """APNs has a 4KB payload limit."""
        from src.push import build_apns_payload
        
        payload = build_apns_payload(
            title="T" * 50,
            body="B" * 200,
            data={"key": "v" * 100}
        )
        
        import json
        payload_size = len(json.dumps(payload).encode("utf-8"))
        assert payload_size <= 4096, f"Payload too large: {payload_size} bytes"

Testing Presence and Online Status

# tests/test_presence.py
import pytest
import asyncio
from src.presence import PresenceManager

class TestPresenceManager:
    @pytest.fixture
    def presence(self):
        return PresenceManager()
    
    @pytest.mark.asyncio
    async def test_user_marked_online_on_connect(self, presence):
        await presence.connect("user-123", "session-abc")
        assert await presence.is_online("user-123")

    @pytest.mark.asyncio
    async def test_user_marked_offline_on_disconnect(self, presence):
        await presence.connect("user-123", "session-abc")
        await presence.disconnect("user-123", "session-abc")
        assert not await presence.is_online("user-123")

    @pytest.mark.asyncio
    async def test_user_stays_online_with_multiple_sessions(self, presence):
        """User with 2 tabs open — closing one tab should keep them online."""
        await presence.connect("user-123", "session-1")
        await presence.connect("user-123", "session-2")
        await presence.disconnect("user-123", "session-1")
        assert await presence.is_online("user-123")  # Still connected via session-2

    @pytest.mark.asyncio
    async def test_last_seen_updated_on_disconnect(self, presence):
        import time
        before = time.time()
        await presence.connect("user-123", "session-abc")
        await presence.disconnect("user-123", "session-abc")
        
        last_seen = await presence.get_last_seen("user-123")
        assert last_seen >= before

    @pytest.mark.asyncio
    async def test_presence_change_triggers_subscribers(self, presence):
        changes = []
        presence.on_change(lambda user_id, status: changes.append((user_id, status)))
        
        await presence.connect("user-123", "session-abc")
        await presence.disconnect("user-123", "session-abc")
        
        assert ("user-123", "online") in changes
        assert ("user-123", "offline") in changes

Testing Real-Time State Synchronization

Collaborative apps need to test state consistency across clients:

# tests/test_state_sync.py
import pytest
import asyncio

class TestStateSync:
    @pytest.mark.asyncio
    async def test_concurrent_updates_resolve_consistently(self, realtime_server):
        """Two clients updating the same resource should converge."""
        client1_state = []
        client2_state = []
        
        async with realtime_server.connect("user1") as c1, \
                   realtime_server.connect("user2") as c2:
            
            # Both clients update simultaneously
            await asyncio.gather(
                c1.send({"action": "increment", "field": "counter"}),
                c2.send({"action": "increment", "field": "counter"}),
            )
            
            # Wait for convergence
            await asyncio.sleep(0.5)
            
            s1 = await c1.get_state()
            s2 = await c2.get_state()
        
        # Both clients must see the same final state
        assert s1["counter"] == s2["counter"]
        assert s1["counter"] >= 2  # Both increments applied

    @pytest.mark.asyncio
    async def test_state_is_recovered_after_reconnect(self, realtime_server):
        """Client should recover state after reconnecting."""
        async with realtime_server.connect("user1") as c1:
            await c1.send({"action": "set", "key": "name", "value": "Alice"})
            state_before = await c1.get_state()
        
        # Disconnect and reconnect
        async with realtime_server.connect("user1") as c1:
            state_after = await c1.get_state()
        
        assert state_after["name"] == state_before["name"]

End-to-End Real-Time Testing with HelpMeTest

Unit and integration tests cover your event infrastructure. But users experience real-time features through a UI. HelpMeTest validates the complete user experience:

*** Test Cases ***
Live Chat Messages Appear Without Refresh
    Open Browser    https://your-app.com/chat/room-1    Chrome
    Open Browser    https://your-app.com/chat/room-1    Chrome    alias=user2
    Switch Browser    default
    Fill Text    #message-input    Hello from user 1
    Click    #send-button
    Switch Browser    user2
    Wait Until Element Contains    .message-list    Hello from user 1    timeout=5s

Notification Counter Updates In Real-Time
    Go To    https://your-app.com/dashboard
    ${before}=    Get Text    .notification-count
    # Trigger action that generates notification (via API)
    POST    /api/trigger-notification    {"userId": "test-user"}
    Wait Until Element Is Not    .notification-count    ${before}    timeout=10s

Live Dashboard Reflects Data Changes
    Go To    https://your-app.com/dashboard
    ${initial}=    Get Text    .live-metric
    Sleep    5s
    ${updated}=    Get Text    .live-metric
    Should Not Be Equal    ${initial}    ${updated}

Summary

  • Separate concerns — event handler logic (unit tests), connection/routing (integration), UI updates (e2e)
  • Use asyncio.wait_for with timeouts in async tests — never wait indefinitely
  • Test ordering — real-time events must arrive in the correct sequence
  • Test concurrent updates — multiple clients acting simultaneously is a common race condition source
  • Test presence logic — online/offline state with multiple sessions is tricky to get right
  • Test notification routing — online users get WebSocket, offline users get push
  • End-to-end tests verify users actually see live updates, not just that your server sends them

Read more