MQTT Protocol Testing Strategies: From Unit Tests to Load Testing
MQTT is the backbone of most IoT systems — lightweight, broker-based, designed for unreliable networks. Testing MQTT is different from testing REST APIs: you're testing asynchronous publish/subscribe flows, quality of service levels, retained messages, and broker behavior under load.
This guide covers practical MQTT testing strategies from unit tests to load testing.
MQTT Testing Fundamentals
MQTT has a few concepts that affect how you test:
- Topics — hierarchical strings (
sensors/room1/temperature). Wildcards:+for one level,#for any depth. - QoS levels — 0 (at most once), 1 (at least once), 2 (exactly once)
- Retained messages — broker stores last message per topic; new subscribers get it immediately
- Last Will — message sent when a client disconnects unexpectedly
Each of these needs dedicated tests.
Setting Up a Test Broker
Use Mosquitto in Docker for local and CI testing:
# docker-compose.test.yml
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto-test.conf:/mosquitto/config/mosquitto.conf
app:
build: .
depends_on:
- mosquitto
environment:
MQTT_BROKER: mqtt://mosquitto:1883# mosquitto-test.conf
listener 1883
allow_anonymous true
persistence false # No state between test runs
log_type allFor more advanced scenarios, HiveMQ offers a free Docker image with a web UI.
Unit Testing MQTT Clients
Test your publisher and subscriber logic in isolation with mocked clients:
# Python test with unittest.mock
import pytest
from unittest.mock import MagicMock, patch, call
from myapp.sensors import TemperatureSensor
def test_sensor_publishes_on_reading():
mock_client = MagicMock()
sensor = TemperatureSensor(client=mock_client, topic="sensors/temp1")
sensor.publish_reading(23.5)
mock_client.publish.assert_called_once_with(
"sensors/temp1",
'{"value": 23.5, "unit": "celsius"}',
qos=1,
retain=False
)
def test_sensor_uses_qos_1_for_critical_readings():
mock_client = MagicMock()
sensor = TemperatureSensor(client=mock_client, topic="sensors/temp1")
sensor.publish_alert(temperature=85.0, threshold=80.0)
call_args = mock_client.publish.call_args
assert call_args.kwargs['qos'] == 1 # At-least-once for alertsFor Node.js:
const { MqttClient } = require('mqtt');
jest.mock('mqtt', () => ({
connect: jest.fn(() => ({
publish: jest.fn(),
subscribe: jest.fn(),
on: jest.fn()
}))
}));
const { SensorPublisher } = require('../src/sensors');
describe('SensorPublisher', () => {
it('publishes telemetry to correct topic', () => {
const client = require('mqtt').connect();
const publisher = new SensorPublisher(client, 'building-a');
publisher.publishTemperature('room-101', 22.3);
expect(client.publish).toHaveBeenCalledWith(
'building-a/room-101/temperature',
JSON.stringify({ value: 22.3, timestamp: expect.any(Number) }),
{ qos: 0, retain: false },
undefined
);
});
});Integration Testing with a Real Broker
Mock tests verify logic; integration tests verify the MQTT flow works end-to-end:
import pytest
import paho.mqtt.client as mqtt
import json
import threading
import time
class MqttTestHelper:
"""Helper to receive MQTT messages in tests."""
def __init__(self, broker_host, broker_port=1883):
self.received = []
self.event = threading.Event()
self.client = mqtt.Client(client_id="test_subscriber")
self.client.on_message = self._on_message
self.client.connect(broker_host, broker_port)
self.client.loop_start()
def _on_message(self, client, userdata, msg):
self.received.append({
'topic': msg.topic,
'payload': json.loads(msg.payload),
'qos': msg.qos
})
self.event.set()
def subscribe(self, topic, qos=0):
self.client.subscribe(topic, qos)
return self
def wait_for_message(self, timeout=5):
received = self.event.wait(timeout)
self.event.clear()
return received
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
@pytest.fixture
def mqtt_broker():
return "localhost"
def test_sensor_data_flows_through_broker(mqtt_broker):
helper = MqttTestHelper(mqtt_broker)
helper.subscribe("sensors/+/temperature")
# Trigger your application to publish
from myapp.sensors import TemperatureSensor
sensor = TemperatureSensor(broker=mqtt_broker, topic="sensors/room1/temperature")
sensor.simulate_reading(25.0)
assert helper.wait_for_message(timeout=5), "No message received within 5 seconds"
assert len(helper.received) == 1
assert helper.received[0]['payload']['value'] == 25.0
assert helper.received[0]['topic'] == "sensors/room1/temperature"
helper.disconnect()Testing QoS Levels
QoS 0 can drop messages; QoS 1 guarantees delivery but may duplicate. Test the right level for each message type:
def test_qos_1_delivers_despite_network_issues(mqtt_broker):
"""QoS 1 should redeliver if PUBACK is not received."""
helper = MqttTestHelper(mqtt_broker)
helper.subscribe("alerts/#", qos=1)
publisher = mqtt.Client()
publisher.connect(mqtt_broker)
# Publish with QoS 1
publisher.publish("alerts/temperature_high", json.dumps({"sensor": "1", "value": 90}), qos=1)
assert helper.wait_for_message(timeout=5)
assert helper.received[0]['qos'] == 1
publisher.disconnect()
helper.disconnect()
def test_qos_2_delivers_exactly_once(mqtt_broker):
"""QoS 2 should deliver exactly once, even on retry."""
received_count = 0
def count_messages(client, userdata, msg):
nonlocal received_count
received_count += 1
subscriber = mqtt.Client()
subscriber.on_message = count_messages
subscriber.connect(mqtt_broker)
subscriber.subscribe("commands/actuate", qos=2)
subscriber.loop_start()
publisher = mqtt.Client()
publisher.connect(mqtt_broker)
publisher.publish("commands/actuate", json.dumps({"action": "open_valve"}), qos=2)
time.sleep(1)
assert received_count == 1 # Exactly once
subscriber.loop_stop()Testing Retained Messages
Retained messages should be received by new subscribers immediately:
def test_retained_message_delivered_to_new_subscriber(mqtt_broker):
# Publish a retained message
publisher = mqtt.Client()
publisher.connect(mqtt_broker)
publisher.publish(
"devices/device123/status",
json.dumps({"online": True}),
qos=1,
retain=True
)
publisher.disconnect()
time.sleep(0.2) # Let broker store it
# New subscriber should immediately receive retained message
helper = MqttTestHelper(mqtt_broker)
helper.subscribe("devices/device123/status")
received = helper.wait_for_message(timeout=3)
assert received, "New subscriber should get retained message immediately"
assert helper.received[0]['payload']['online'] == True
helper.disconnect()
def test_clear_retained_message(mqtt_broker):
# Clear retained by publishing empty payload
client = mqtt.Client()
client.connect(mqtt_broker)
client.publish("devices/device123/status", payload="", retain=True)
client.disconnect()
# New subscriber should NOT receive any message
helper = MqttTestHelper(mqtt_broker)
helper.subscribe("devices/device123/status")
received = helper.wait_for_message(timeout=2)
assert not received, "Cleared retained message should not be delivered"Testing Last Will (LWT)
Last Will messages detect unexpected client disconnections:
def test_lwt_published_on_unexpected_disconnect(mqtt_broker):
helper = MqttTestHelper(mqtt_broker)
helper.subscribe("devices/device456/status")
# Connect client with LWT configured
device_client = mqtt.Client()
device_client.will_set(
"devices/device456/status",
json.dumps({"online": False, "reason": "disconnected"}),
qos=1,
retain=True
)
device_client.connect(mqtt_broker)
# Simulate unexpected disconnect (no DISCONNECT packet)
device_client.socket().close() # Force TCP close
# Broker should publish LWT after keepalive timeout
received = helper.wait_for_message(timeout=10)
assert received
assert helper.received[0]['payload']['online'] == FalseLoad Testing MQTT
For production confidence, load test your broker and subscribers:
import concurrent.futures
import time
def load_test_mqtt_throughput(broker_host, target_messages_per_second=1000, duration_seconds=30):
received_count = 0
start_time = time.time()
helper = MqttTestHelper(broker_host)
helper.subscribe("loadtest/#")
def publish_batch(thread_id, messages_per_thread):
client = mqtt.Client(client_id=f"loadtest_pub_{thread_id}")
client.connect(broker_host)
for i in range(messages_per_thread):
client.publish(f"loadtest/{thread_id}", json.dumps({"seq": i}), qos=0)
client.disconnect()
num_threads = 10
messages_per_thread = (target_messages_per_second * duration_seconds) // num_threads
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(publish_batch, i, messages_per_thread) for i in range(num_threads)]
concurrent.futures.wait(futures)
elapsed = time.time() - start_time
actual_rate = len(helper.received) / elapsed
print(f"Published: {num_threads * messages_per_thread}")
print(f"Received: {len(helper.received)}")
print(f"Duration: {elapsed:.1f}s")
print(f"Rate: {actual_rate:.0f} msg/s")
print(f"Drop rate: {(1 - len(helper.received) / (num_threads * messages_per_thread)) * 100:.1f}%")
helper.disconnect()MQTT Testing in CI
name: MQTT Integration Tests
on: [push, pull_request]
jobs:
mqtt-tests:
runs-on: ubuntu-latest
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- 1883:1883
options: >-
--health-cmd "mosquitto_pub -h localhost -t test -m test"
--health-interval 5s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.12" }
- run: pip install paho-mqtt pytest
- name: Run MQTT tests
run: pytest tests/mqtt/ -v
env:
MQTT_BROKER: localhost
MQTT_PORT: 1883Summary
MQTT testing needs three layers:
- Unit tests — mock the client, test publish/subscribe logic in isolation
- Integration tests — use a real broker, test QoS, retained messages, and LWT
- Load tests — verify throughput, drop rates, and broker stability under pressure
The MqttTestHelper pattern (subscribe + wait for message) is your core tool for integration tests. Use it liberally — async message delivery is where most MQTT bugs hide.