IoT Device Testing Guide: Firmware, Protocols, and MQTT Testing
IoT devices fail in ways that software-only systems never do. Firmware bugs can brick hardware. Protocol mismatches cause silent data loss. Power cycling under load reveals race conditions that unit tests miss entirely. Testing IoT devices requires a different mindset and a different toolchain.
This guide covers the full IoT testing stack: from firmware validation to protocol-level testing to MQTT message verification.
Firmware Testing
Unit Testing Embedded Code
Embedded firmware is often written in C or C++, but that doesn't mean you can't unit test it. Use a framework like Unity, CMock, or CppUTest to test logic without hardware:
// Unity framework example
#include "unity.h"
#include "sensor_driver.h"
void setUp(void) {
sensor_driver_init();
}
void test_temperature_conversion_celsius(void) {
// Raw ADC value from temperature sensor
uint16_t raw_adc = 512;
float temp_c = sensor_convert_to_celsius(raw_adc);
// Expect value within ±0.5°C of expected
TEST_ASSERT_FLOAT_WITHIN(0.5f, 25.3f, temp_c);
}
void test_out_of_range_returns_error(void) {
uint16_t raw_adc = 0xFFFF; // Sensor fault value
float temp_c = sensor_convert_to_celsius(raw_adc);
TEST_ASSERT_EQUAL_FLOAT(SENSOR_ERROR_VALUE, temp_c);
}Run these tests on the host machine, not the target hardware — fast feedback matters.
Hardware-in-Loop (HIL) Testing
Unit tests on the host can't catch hardware interactions. HIL testing runs firmware on real hardware and injects stimuli:
# Python test using pyserial to drive a device under test
import serial
import pytest
import time
@pytest.fixture
def device():
port = serial.Serial('/dev/ttyUSB0', baudrate=115200, timeout=5)
yield port
port.close()
def test_firmware_boot_sequence(device):
# Reset device
device.write(b'RESET\r\n')
time.sleep(2)
# Read boot output
output = device.read_until(b'READY').decode('utf-8', errors='replace')
assert 'Boot OK' in output
assert 'READY' in output
assert 'ERROR' not in output
def test_sensor_reading_valid_range(device):
device.write(b'READ_SENSOR\r\n')
response = device.readline().decode().strip()
# Parse JSON response: {"temp": 22.3, "humidity": 45.1}
import json
data = json.loads(response)
assert 20.0 <= data['temp'] <= 30.0, f"Temperature {data['temp']} out of range"
assert 30.0 <= data['humidity'] <= 70.0OTA Update Testing
Over-the-air update failures can leave devices in an unrecoverable state. Test the update path explicitly:
def test_ota_update_and_rollback():
device = IoTDevice(device_id="test-unit-001")
original_version = device.get_firmware_version()
# Push a new firmware version
device.trigger_ota_update(firmware_url="http://ota-server/v2.1.0.bin")
# Wait for update to complete
assert device.wait_for_boot(timeout_seconds=120)
assert device.get_firmware_version() == "2.1.0"
# Simulate update failure on next attempt
device.trigger_ota_update(firmware_url="http://ota-server/v2.2.0-corrupt.bin")
# Device should rollback to previous known-good version
assert device.wait_for_boot(timeout_seconds=120)
assert device.get_firmware_version() == "2.1.0" # Rolled backProtocol Testing
MQTT Testing
MQTT is the dominant protocol for IoT messaging. Testing it requires both a broker (Mosquitto is the go-to) and client-side assertions.
Setting up a test broker:
# docker-compose.yml for testing
services:
mqtt-broker:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883"
- "8883:8883" # TLS
- "9001:9001" # WebSocket
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.confTesting MQTT message flow:
import paho.mqtt.client as mqtt
import pytest
import threading
import time
class MQTTTestCollector:
def __init__(self, broker_host, topic):
self.messages = []
self.client = mqtt.Client()
self.client.on_message = self._on_message
self.client.connect(broker_host, 1883)
self.client.subscribe(topic)
self.client.loop_start()
def _on_message(self, client, userdata, msg):
self.messages.append({
"topic": msg.topic,
"payload": msg.payload.decode(),
"qos": msg.qos,
"timestamp": time.time()
})
def wait_for_messages(self, count, timeout=10):
deadline = time.time() + timeout
while len(self.messages) < count and time.time() < deadline:
time.sleep(0.1)
return len(self.messages) >= count
def test_sensor_publishes_to_correct_topic():
collector = MQTTTestCollector("localhost", "devices/+/telemetry")
# Trigger sensor reading
device = IoTDevice("sensor-01")
device.trigger_reading()
assert collector.wait_for_messages(1, timeout=5)
msg = collector.messages[0]
assert msg["topic"] == "devices/sensor-01/telemetry"
assert msg["qos"] == 1 # At-least-once delivery
import json
payload = json.loads(msg["payload"])
assert "temperature" in payload
assert "timestamp" in payload
def test_qos_levels():
"""Test that QoS 2 messages are delivered exactly once."""
collector = MQTTTestCollector("localhost", "devices/critical/+")
device = IoTDevice("critical-sensor-01")
device.publish_critical_alert(qos=2)
# Wait for messages
time.sleep(2)
# QoS 2 = exactly once
critical_messages = [m for m in collector.messages
if "critical" in m["topic"]]
assert len(critical_messages) == 1, \
f"Expected exactly 1 critical message, got {len(critical_messages)}"Testing MQTT Retain and Last Will
def test_last_will_on_unexpected_disconnect():
"""Device should publish its last will when it disconnects unexpectedly."""
will_collector = MQTTTestCollector("localhost", "devices/sensor-01/status")
# Connect a device with a last will
device = IoTDeviceWithLastWill(
device_id="sensor-01",
will_topic="devices/sensor-01/status",
will_payload='{"status": "offline", "reason": "unexpected"}',
will_qos=1,
will_retain=True
)
device.connect()
# Force an unclean disconnect (kill TCP connection)
device.force_disconnect()
# Last will should be published
assert will_collector.wait_for_messages(1, timeout=10)
import json
status = json.loads(will_collector.messages[0]["payload"])
assert status["status"] == "offline"CoAP Testing
CoAP is used in constrained devices where MQTT is too heavy:
from aiocoap import Context, Message, Code
async def test_coap_sensor_resource():
ctx = await Context.create_client_context()
# GET sensor reading
request = Message(code=Code.GET, uri="coap://192.168.1.100/sensors/temperature")
response = await ctx.request(request).response
assert response.code == Code.CONTENT
assert response.payload # Non-empty response
import json
data = json.loads(response.payload)
assert "value" in data
assert "unit" in data
assert data["unit"] == "celsius"Power and Resource Testing
IoT devices run on batteries or tight power budgets. Test power consumption and resource usage:
def test_sleep_mode_power_consumption():
power_meter = USBPowerMeter(port="/dev/ttyUSB1")
device = IoTDevice("power-test-unit")
device.enter_sleep_mode()
time.sleep(5) # Measure during sleep
avg_current_ma = power_meter.average_current_ma(window_seconds=5)
assert avg_current_ma < 0.1, \
f"Sleep current {avg_current_ma:.3f}mA exceeds 100µA budget"
def test_memory_usage_after_24h_operation(simulated_device):
"""Memory should not leak over time."""
initial_free_heap = simulated_device.get_free_heap()
# Simulate 24 hours of operation (compressed in test)
simulated_device.run_for(simulated_hours=24)
final_free_heap = simulated_device.get_free_heap()
heap_delta = initial_free_heap - final_free_heap
assert heap_delta < 1024, \
f"Memory leaked {heap_delta} bytes over 24h simulation"CI/CD for IoT
Simulation-First Pipeline
# .github/workflows/iot-tests.yml
name: IoT Tests
on: [push, pull_request]
jobs:
firmware-unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install ARM toolchain
run: sudo apt-get install -y gcc-arm-none-eabi
- name: Build and test firmware
run: |
cmake -B build -DTARGET=host-test
cmake --build build
./build/firmware-tests
protocol-tests:
runs-on: ubuntu-latest
services:
mosquitto:
image: eclipse-mosquitto:2.0
ports: ["1883:1883"]
steps:
- uses: actions/checkout@v4
- run: pip install paho-mqtt pytest
- run: pytest tests/protocol/ -v
hil-tests:
runs-on: self-hosted # Must have physical hardware attached
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- run: pytest tests/hil/ -v --device=/dev/ttyUSB0Common IoT Testing Mistakes
Testing only the happy path — Network dropout, sensor faults, and power interruptions are normal operating conditions for IoT. Test them explicitly.
Ignoring clock drift — Edge devices often have poor RTC accuracy. Test what happens when timestamps are off by minutes.
Not testing reconnection — A device that connects once and never reconnects after failure is nearly useless. Make reconnection logic a first-class test target.
Assuming consistent hardware — Two "identical" sensor units from different production batches may behave differently. Test with multiple units when possible.
Skipping endurance tests — IoT devices run for months. A memory leak that takes a week to manifest won't show up in a 60-second test.
Robust IoT testing is a combination of fast simulation-based tests and periodic real-hardware validation. Build the simulation layer first so your team gets fast feedback, then close the gap with HIL tests on actual devices.