IoT End-to-End Testing: From Device Sensor to Cloud Dashboard

IoT End-to-End Testing: From Device Sensor to Cloud Dashboard

An IoT system has layers. The firmware handles sensor reading and protocol communication. The cloud platform handles ingestion, storage, and routing. The application displays data, triggers alerts, and accepts user commands.

Unit tests and protocol tests cover individual layers. End-to-end (E2E) testing validates the full pipeline — from a sensor reading on a physical device to a value displayed on a web dashboard, with all the data transformation, routing, and processing in between.

This guide covers how to design and implement E2E tests for IoT pipelines.

The IoT E2E Testing Challenge

IoT E2E testing is harder than web application E2E testing for several reasons:

Physical device dependency. Real sensor data requires real (or simulated) devices. You can't stub out hardware the way you stub out an API.

Time-based assertions. IoT data flows asynchronously. A temperature reading published via MQTT may take seconds to appear in a dashboard after routing, storage, and API processing. Tests must account for propagation delay.

Multiple system boundaries. A single user-visible action (a temperature alert) may cross firmware, MQTT broker, cloud functions, database, and API boundaries. Each is a potential failure point.

State synchronization. Device state, cloud state, and UI state must stay in sync. Testing that they're synchronized requires accessing multiple systems simultaneously.

Non-deterministic data. Real sensor readings vary. Tests must use range assertions, not exact value matches.

The IoT E2E Test Architecture

Effective IoT E2E tests use three test components in parallel:

Device controller: Sends commands to the device (real or simulated) and reads its state.

Platform verifier: Monitors the IoT platform — checks MQTT topics, queries APIs, reads databases.

Application tester: Validates the user-facing application — dashboards, alert UIs, control panels.

┌─────────────┐    MQTT     ┌──────────────┐    API    ┌───────────────┐
│   IoT       │ ──────────▶ │  Cloud       │ ────────▶ │  Web          │
│   Device    │             │  Platform    │           │  Dashboard    │
│   (real or  │ ◀────────── │  (AWS IoT,   │ ◀──────── │  (React,      │
│   simulated)│  commands   │   Azure IoT) │  updates  │   Vue, etc.)  │
└─────────────┘             └──────────────┘           └───────────────┘
       ▲                           ▲                           ▲
       │                           │                           │
  Device Controller         Platform Verifier         Application Tester

Your E2E test orchestrates all three.

Device Simulation for E2E Tests

Physical devices make E2E tests slow, fragile, and environment-dependent. For most scenarios, simulated devices are better.

Simple MQTT Device Simulator

# device_simulator.py
import paho.mqtt.client as mqtt
import json
import time
import random
import threading

class IoTDeviceSimulator:
    """Simulates an IoT sensor device for E2E testing."""
    
    def __init__(self, device_id, broker='localhost', port=1883):
        self.device_id = device_id
        self.broker = broker
        self.port = port
        self._running = False
        self._temperature = 22.0
        self._humidity = 60.0
        self._connected = threading.Event()
        
        self.client = mqtt.Client(client_id=device_id)
        self.client.will_set(
            f'devices/{device_id}/status',
            json.dumps({'online': False}),
            qos=1, retain=True
        )
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
    
    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            # Subscribe to command topics
            self.client.subscribe(f'devices/{self.device_id}/commands/#', qos=1)
            # Publish online status
            self.client.publish(
                f'devices/{self.device_id}/status',
                json.dumps({'online': True, 'firmware': '1.2.3'}),
                qos=1, retain=True
            )
            self._connected.set()
    
    def _on_message(self, client, userdata, msg):
        """Handle commands from cloud."""
        command = json.loads(msg.payload)
        if 'set_temperature_threshold' in command:
            self._threshold = command['set_temperature_threshold']
        elif 'reset' in command:
            self._temperature = 22.0
    
    def start(self):
        self.client.connect(self.broker, self.port)
        self.client.loop_start()
        self._connected.wait(timeout=5)
        self._running = True
        
        # Start publishing sensor data
        threading.Thread(target=self._publish_telemetry, daemon=True).start()
        return self
    
    def _publish_telemetry(self):
        while self._running:
            # Simulate realistic sensor readings with noise
            self._temperature += random.uniform(-0.1, 0.1)
            self._humidity += random.uniform(-0.5, 0.5)
            
            self.client.publish(
                f'devices/{self.device_id}/telemetry',
                json.dumps({
                    'device_id': self.device_id,
                    'temperature': round(self._temperature, 2),
                    'humidity': round(self._humidity, 1),
                    'timestamp': int(time.time()),
                }),
                qos=1
            )
            time.sleep(5)  # publish every 5 seconds
    
    def set_temperature(self, value):
        """Directly set temperature for test scenarios."""
        self._temperature = value
    
    def stop(self):
        self._running = False
        self.client.publish(
            f'devices/{self.device_id}/status',
            json.dumps({'online': False}),
            qos=1, retain=True
        )
        self.client.loop_stop()
        self.client.disconnect()

# pytest fixture
@pytest.fixture
def simulated_device():
    device = IoTDeviceSimulator('test-device-001').start()
    yield device
    device.stop()

For Real Device Testing

When real hardware is required (testing firmware behavior, power management, or physical interfaces), use a device under test (DUT) controller:

# dut_controller.py — controls real hardware via serial/USB
import serial
import json

class DUTController:
    """Controls a real device under test via serial console."""
    
    def __init__(self, port='/dev/ttyUSB0', baud=115200):
        self.serial = serial.Serial(port, baud, timeout=5)
    
    def send_command(self, command):
        self.serial.write((json.dumps(command) + '\n').encode())
    
    def read_response(self, timeout=5):
        deadline = time.time() + timeout
        while time.time() < deadline:
            line = self.serial.readline().decode().strip()
            if line:
                return json.loads(line)
        raise TimeoutError('No response from device')
    
    def simulate_sensor_spike(self, value):
        """Trigger a specific sensor reading via test command interface."""
        self.send_command({'test_mode': True, 'temperature_override': value})
        return self.read_response()

Cloud Platform Verification

After the device publishes data, verify the cloud platform received and processed it correctly.

AWS IoT Core Verification

import boto3

class AWSIoTVerifier:
    def __init__(self, region='us-east-1'):
        self.iot = boto3.client('iot', region_name=region)
        self.iot_data = boto3.client('iot-data', region_name=region)
        self.dynamodb = boto3.resource('dynamodb', region_name=region)
    
    def get_device_shadow(self, device_id):
        response = self.iot_data.get_thing_shadow(thingName=device_id)
        return json.loads(response['payload'])
    
    def wait_for_shadow_update(self, device_id, key, expected_value, timeout=30):
        deadline = time.time() + timeout
        while time.time() < deadline:
            shadow = self.get_device_shadow(device_id)
            reported = shadow.get('state', {}).get('reported', {})
            if reported.get(key) == expected_value:
                return True
            time.sleep(1)
        return False
    
    def get_latest_telemetry(self, device_id, table_name='device-telemetry'):
        table = self.dynamodb.Table(table_name)
        response = table.query(
            KeyConditionExpression='device_id = :d',
            ExpressionAttributeValues={':d': device_id},
            ScanIndexForward=False,
            Limit=1,
        )
        return response['Items'][0] if response['Items'] else None
    
    def wait_for_telemetry(self, device_id, min_timestamp, timeout=30):
        deadline = time.time() + timeout
        while time.time() < deadline:
            latest = self.get_latest_telemetry(device_id)
            if latest and latest['timestamp'] >= min_timestamp:
                return latest
            time.sleep(2)
        return None

Application Layer Testing with Playwright

The final layer is the user-facing application. Use Playwright to verify the dashboard reflects real device data.

# test_iot_e2e.py
import pytest
from playwright.sync_api import sync_playwright
import time

def test_temperature_sensor_to_dashboard(simulated_device):
    """Full pipeline: device publishes temperature → cloud processes → dashboard shows value."""
    
    device_id = 'test-device-001'
    verifier = AWSIoTVerifier()
    
    # Set a known temperature on the device
    target_temp = 35.7
    publish_time = int(time.time())
    simulated_device.set_temperature(target_temp)
    
    # Wait for telemetry to reach cloud (MQTT → IoT Core → DynamoDB)
    telemetry = verifier.wait_for_telemetry(device_id, publish_time, timeout=30)
    assert telemetry is not None, 'Telemetry did not reach cloud within 30 seconds'
    assert abs(telemetry['temperature'] - target_temp) < 0.5, \
        f"Expected ~{target_temp}°C in cloud, got {telemetry['temperature']}"
    
    # Verify dashboard shows the value
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto(f'https://dashboard.youriot.com/devices/{device_id}')
        page.wait_for_selector('[data-testid="temperature-reading"]')
        
        displayed_temp = float(
            page.locator('[data-testid="temperature-reading"]').inner_text()
        )
        assert abs(displayed_temp - target_temp) < 1.0, \
            f"Dashboard shows {displayed_temp}°C, expected ~{target_temp}°C"
        
        browser.close()

def test_high_temperature_alert_fires(simulated_device):
    """When temperature exceeds threshold, alert should appear in dashboard."""
    
    device_id = 'test-device-001'
    verifier = AWSIoTVerifier()
    alert_threshold = 40.0
    
    # Trigger high temperature reading
    simulated_device.set_temperature(alert_threshold + 5)  # 45°C — above threshold
    trigger_time = int(time.time())
    
    # Verify telemetry reached cloud
    telemetry = verifier.wait_for_telemetry(device_id, trigger_time, timeout=30)
    assert telemetry is not None
    
    # Verify alert in dashboard
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto(f'https://dashboard.youriot.com/devices/{device_id}/alerts')
        
        # Wait for alert to appear (processing delay expected)
        page.wait_for_selector('[data-testid="active-alert"]', timeout=15000)
        
        alert_text = page.locator('[data-testid="active-alert"]').inner_text()
        assert 'temperature' in alert_text.lower()
        assert str(int(alert_threshold)) in alert_text
        
        browser.close()

def test_device_command_reaches_device(simulated_device):
    """Command issued from dashboard should be received by device."""
    
    device_id = 'test-device-001'
    received_commands = []
    
    # Monitor device command topic
    def command_received(msg):
        received_commands.append(json.loads(msg.payload))
    
    monitor = MQTTTestClient().connect()
    monitor.subscribe(f'devices/{device_id}/commands/#')
    
    # Send command from dashboard
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto(f'https://dashboard.youriot.com/devices/{device_id}/controls')
        
        page.locator('[data-testid="set-threshold-input"]').fill('38')
        page.locator('[data-testid="apply-threshold-btn"]').click()
        page.wait_for_selector('[data-testid="command-sent-confirmation"]')
        
        browser.close()
    
    # Verify command reached device
    time.sleep(5)  # allow MQTT propagation
    monitor.disconnect()
    
    assert len(received_commands) > 0, 'No command received by device'
    assert any(
        cmd.get('set_temperature_threshold') == 38
        for cmd in received_commands
    ), 'Expected threshold command not received'

def test_device_offline_status_shown(simulated_device):
    """When device disconnects, dashboard should show offline status."""
    
    device_id = 'test-device-001'
    
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto(f'https://dashboard.youriot.com/devices/{device_id}')
        
        # Verify initially online
        page.wait_for_selector('[data-testid="device-status-online"]')
        
        # Disconnect device
        simulated_device.stop()
        
        # LWT message should trigger offline status
        page.wait_for_selector('[data-testid="device-status-offline"]', timeout=15000)
        
        status_text = page.locator('[data-testid="device-status-offline"]').inner_text()
        assert 'offline' in status_text.lower()
        
        browser.close()

Data Pipeline Verification

Beyond the UI layer, verify intermediate data pipeline components:

def test_telemetry_stored_correctly():
    """Verify telemetry data is stored with correct schema in database."""
    device_id = 'test-device-001'
    device = IoTDeviceSimulator(device_id).start()
    
    try:
        device.set_temperature(22.5)
        publish_time = int(time.time())
        
        verifier = AWSIoTVerifier()
        record = verifier.wait_for_telemetry(device_id, publish_time, timeout=30)
        
        assert record is not None
        assert record['device_id'] == device_id
        assert 'temperature' in record
        assert 'humidity' in record
        assert 'timestamp' in record
        assert record['timestamp'] >= publish_time
        # Verify temperature is within realistic range
        assert -40 <= record['temperature'] <= 85
    finally:
        device.stop()

def test_data_not_stored_for_unknown_device():
    """Telemetry from unregistered devices should be rejected."""
    unknown_device = IoTDeviceSimulator('unregistered-device-999').start()
    
    try:
        unknown_device.set_temperature(22.0)
        publish_time = int(time.time())
        
        time.sleep(10)  # allow processing time
        
        verifier = AWSIoTVerifier()
        # Nothing should be stored for unregistered device
        record = verifier.get_latest_telemetry('unregistered-device-999')
        assert record is None or record['timestamp'] < publish_time
    finally:
        unknown_device.stop()

Test Environment Management

IoT E2E tests need careful environment management to avoid test interference:

# conftest.py
@pytest.fixture(scope='session')
def test_environment():
    """Create isolated test resources for the session."""
    env = {
        'device_id': f'test-device-{int(time.time())}',
        'alert_rule_id': None,
    }
    
    # Create test device in IoT platform
    iot = boto3.client('iot')
    iot.create_thing(thingName=env['device_id'])
    
    yield env
    
    # Cleanup: delete test device and data
    iot.delete_thing(thingName=env['device_id'])
    # Clean up DynamoDB test records
    cleanup_test_telemetry(env['device_id'])

@pytest.fixture
def simulated_device(test_environment):
    device = IoTDeviceSimulator(
        test_environment['device_id']
    ).start()
    yield device
    device.stop()

CI/CD Integration

name: IoT E2E Tests

on:
  push:
    branches: [main]

jobs:
  e2e-tests:
    runs-on: ubuntu-latest
    environment: staging
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      
      - name: Install dependencies
        run: |
          pip install paho-mqtt pytest playwright boto3
          playwright install chromium
      
      - name: Start MQTT broker (for protocol tests)
        run: docker run -d -p 1883:1883 eclipse-mosquitto:2
      
      - name: Run IoT E2E tests
        run: pytest tests/e2e/iot/ -v --timeout=120
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          DASHBOARD_URL: ${{ vars.STAGING_DASHBOARD_URL }}
          IOT_BROKER_URL: ${{ vars.STAGING_IOT_BROKER }}

Common IoT E2E Testing Mistakes

Testing without timing margins. IoT pipelines are asynchronous. Hard-coded sleeps are unreliable. Use polling with timeouts — wait for state to be true, not assume it will be after N seconds.

No device cleanup. Test devices created in the IoT platform must be cleaned up. Stale test devices and old test data accumulate and confuse future tests.

Testing against production. IoT E2E tests create real MQTT traffic, trigger real alerts, and store real data. Always use isolated staging environments.

Ignoring data schema evolution. IoT pipelines often transform data between layers. Verify the schema at each layer — what the device sends, what the database stores, and what the API returns may all differ intentionally.

One-way testing only. Test both directions: device to cloud (telemetry) AND cloud to device (commands). Many teams test telemetry thoroughly but skip command delivery testing.


HelpMeTest helps teams build and run the application layer tests in IoT E2E pipelines — dashboard validation, API verification, and alert testing — without writing complex Playwright infrastructure from scratch. Start free.

Read more