MQTT Testing Guide: Validate Message Brokers and IoT Messaging Pipelines
MQTT is the messaging backbone of most IoT systems. Lightweight, publish-subscribe, and designed for unreliable networks — it's the protocol that connects millions of devices to cloud backends. When MQTT breaks, data stops flowing, alerts don't fire, and device control fails.
Testing MQTT systems requires understanding the protocol's semantics: QoS levels, retained messages, Last Will, topic hierarchies, and broker behavior under load.
MQTT Fundamentals for Testers
Before writing tests, understand what MQTT promises:
Quality of Service (QoS) levels:
- QoS 0 — At most once: fire and forget. No delivery guarantee. Fastest.
- QoS 1 — At least once: guaranteed delivery, possible duplicates. Client must handle deduplication.
- QoS 2 — Exactly once: guaranteed single delivery. Highest overhead.
Retained messages: A retained message is stored by the broker and delivered immediately to new subscribers. Used to publish "last known state" — a new subscriber gets the current device status without waiting for the next publish.
Last Will and Testament (LWT): Configured at connection time. If the client disconnects abnormally, the broker publishes the LWT message on the client's behalf. Used for "device offline" notifications.
Topic wildcards:
+— single level:devices/+/temperaturematchesdevices/device-001/temperature#— multi-level:devices/#matches everything underdevices/
Your tests must validate all of these behaviors, not just basic publish/subscribe.
Setting Up an MQTT Test Environment
Local Broker with Mosquitto
# Docker Compose: Mosquitto + test tooling
version: <span class="hljs-string">'3.8'
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- <span class="hljs-string">"1883:1883"
- <span class="hljs-string">"8883:8883" <span class="hljs-comment"># TLS
- <span class="hljs-string">"9001:9001" <span class="hljs-comment"># WebSocket
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
<span class="hljs-comment"># MQTT explorer for debugging
mqtt-explorer:
image: smeagolworms4/mqtt-explorer
ports:
- <span class="hljs-string">"4000:4000"# mosquitto.conf
listener 1883
allow_anonymous true
listener 8883
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
require_certificate falseHiveMQ Broker for Production-Like Testing
HiveMQ Community Edition provides a production-grade broker for integration testing:
hivemq:
image: hivemq/hivemq4:latest
ports:
- "1883:1883"
- "8080:8080" # Control Center
environment:
- HIVEMQ_LOG_LEVEL=INFOTesting with Python and Paho-MQTT
Test Framework Setup
# conftest.py
import paho.mqtt.client as mqtt
import pytest
import time
import json
import threading
from queue import Queue, Empty
class MQTTTestClient:
"""Test client with blocking message receipt."""
def __init__(self, broker='localhost', port=1883, client_id=None):
self.broker = broker
self.port = port
self.message_queue = Queue()
self.connected = threading.Event()
self.client = mqtt.Client(client_id=client_id or f'test-{id(self)}')
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
self.disconnected = threading.Event()
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected.set()
def _on_message(self, client, userdata, msg):
self.message_queue.put({
'topic': msg.topic,
'payload': msg.payload,
'qos': msg.qos,
'retain': msg.retain,
})
def _on_disconnect(self, client, userdata, rc):
self.connected.clear()
self.disconnected.set()
def connect(self, timeout=5):
self.client.connect(self.broker, self.port)
self.client.loop_start()
assert self.connected.wait(timeout), f'Connection timeout after {timeout}s'
return self
def subscribe(self, topic, qos=0):
self.client.subscribe(topic, qos=qos)
time.sleep(0.1) # brief delay for subscription acknowledgment
return self
def publish(self, topic, payload, qos=0, retain=False):
if isinstance(payload, dict):
payload = json.dumps(payload)
result = self.client.publish(topic, payload, qos=qos, retain=retain)
result.wait_for_publish(timeout=5)
return self
def receive(self, timeout=5):
try:
return self.message_queue.get(timeout=timeout)
except Empty:
return None
def receive_all(self, timeout=2):
messages = []
deadline = time.time() + timeout
while time.time() < deadline:
msg = self.receive(timeout=max(0, deadline - time.time()))
if msg:
messages.append(msg)
return messages
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
@pytest.fixture
def mqtt_client():
client = MQTTTestClient().connect()
yield client
client.disconnect()
@pytest.fixture
def publisher():
client = MQTTTestClient(client_id='test-publisher').connect()
yield client
client.disconnect()
@pytest.fixture
def subscriber():
client = MQTTTestClient(client_id='test-subscriber').connect()
yield client
client.disconnect()Core MQTT Test Cases
Basic Publish/Subscribe
def test_publish_subscribe_basic(publisher, subscriber):
"""A subscriber should receive messages published to its topic."""
topic = 'test/basic'
payload = {'sensor': 'temp', 'value': 22.5}
subscriber.subscribe(topic)
publisher.publish(topic, payload)
msg = subscriber.receive(timeout=3)
assert msg is not None, 'No message received'
assert msg['topic'] == topic
received = json.loads(msg['payload'])
assert received['value'] == 22.5
def test_wildcard_subscription_single_level(publisher, subscriber):
"""Single-level wildcard + should match one topic level."""
subscriber.subscribe('devices/+/temperature')
publisher.publish('devices/device-001/temperature', {'value': 22.5})
publisher.publish('devices/device-002/temperature', {'value': 18.1})
publisher.publish('devices/device-001/humidity', {'value': 60}) # should NOT match
messages = subscriber.receive_all(timeout=2)
topics = [m['topic'] for m in messages]
assert 'devices/device-001/temperature' in topics
assert 'devices/device-002/temperature' in topics
assert 'devices/device-001/humidity' not in topics
def test_wildcard_subscription_multi_level(publisher, subscriber):
"""Multi-level wildcard # should match all sub-topics."""
subscriber.subscribe('devices/#')
publisher.publish('devices/device-001/temperature', b'22.5')
publisher.publish('devices/device-001/humidity', b'60')
publisher.publish('devices/device-001/status/battery', b'85')
messages = subscriber.receive_all(timeout=2)
assert len(messages) == 3QoS Level Testing
def test_qos0_fire_and_forget(publisher, subscriber):
"""QoS 0 messages should be delivered without acknowledgment."""
subscriber.subscribe('test/qos0', qos=0)
publisher.publish('test/qos0', 'hello', qos=0)
msg = subscriber.receive(timeout=3)
assert msg is not None
assert msg['qos'] == 0
def test_qos1_at_least_once(publisher, subscriber):
"""QoS 1 should guarantee delivery."""
subscriber.subscribe('test/qos1', qos=1)
# Publish with QoS 1
publisher.publish('test/qos1', 'important-data', qos=1)
msg = subscriber.receive(timeout=3)
assert msg is not None
assert msg['payload'] == b'important-data'
def test_qos2_exactly_once(publisher, subscriber):
"""QoS 2 should deliver exactly once — no duplicates."""
received_ids = []
subscriber.subscribe('test/qos2', qos=2)
for i in range(5):
publisher.publish('test/qos2', json.dumps({'id': i}), qos=2)
messages = subscriber.receive_all(timeout=5)
ids = [json.loads(m['payload'])['id'] for m in messages]
# No duplicates with QoS 2
assert len(ids) == len(set(ids)), 'Duplicate messages received with QoS 2'
assert set(ids) == {0, 1, 2, 3, 4}Retained Messages
def test_retained_message_delivered_on_subscribe(publisher):
"""New subscriber should receive the retained message immediately."""
topic = f'test/retained/{int(time.time())}'
# Publish retained
publisher.publish(topic, json.dumps({'status': 'online'}), retain=True)
time.sleep(0.5)
# New subscriber connects AFTER publish
late_subscriber = MQTTTestClient(client_id='late-subscriber').connect()
late_subscriber.subscribe(topic)
msg = late_subscriber.receive(timeout=3)
late_subscriber.disconnect()
assert msg is not None, 'Retained message not delivered to new subscriber'
assert msg['retain'] is True
assert json.loads(msg['payload'])['status'] == 'online'
def test_clear_retained_message(publisher):
"""Publishing empty payload to topic should clear retained message."""
topic = f'test/retained/clear/{int(time.time())}'
publisher.publish(topic, 'keep-me', retain=True)
time.sleep(0.2)
# Clear retained by publishing empty payload
publisher.publish(topic, '', retain=True)
time.sleep(0.2)
late_subscriber = MQTTTestClient(client_id='clear-test-subscriber').connect()
late_subscriber.subscribe(topic)
msg = late_subscriber.receive(timeout=2)
late_subscriber.disconnect()
assert msg is None, 'Retained message was not cleared'Last Will and Testament
def test_lwt_published_on_abnormal_disconnect():
"""Broker should publish LWT when client disconnects abnormally."""
lwt_topic = f'devices/device-lwt-test/status'
# Observer subscribes to LWT topic
observer = MQTTTestClient(client_id='lwt-observer').connect()
observer.subscribe(lwt_topic)
# Client configures LWT
lwt_client = mqtt.Client(client_id='lwt-device')
lwt_client.will_set(
lwt_topic,
payload=json.dumps({'status': 'offline', 'reason': 'disconnected'}),
qos=1,
retain=True,
)
lwt_client.connect('localhost', 1883)
lwt_client.loop_start()
time.sleep(0.5)
# Simulate abnormal disconnect (network drop)
lwt_client.loop_stop()
lwt_client._sock.close() # force close without DISCONNECT packet
# Broker should deliver LWT
msg = observer.receive(timeout=10)
observer.disconnect()
assert msg is not None, 'LWT not published after abnormal disconnect'
payload = json.loads(msg['payload'])
assert payload['status'] == 'offline'Load Testing MQTT Brokers
Use MQTT load testing tools to verify broker performance under production-like conditions:
With mqtt-bench
# 1000 publishers, 1000 messages each, QoS 1
mqtt-bench -action pub \
-broker tcp://localhost:1883 \
-clients 1000 \
-count 1000 \
-qos 1 \
-topic <span class="hljs-string">'benchmark/load/%d'With Gatling MQTT Plugin
// Gatling IoT scenario
class MqttSimulation extends Simulation {
val mqttConf = mqtt
.broker("localhost", 1883)
.cleanSession(true)
.credentials("user", "pass")
.qosDefault(QosAtLeastOnce)
.reconnectAttemptsMax(3)
val scn = scenario("IoT Device Simulation")
.exec(mqtt("connect").connect())
.repeat(100) {
exec(mqtt("publish telemetry")
.publish("devices/#{deviceId}/telemetry")
.message(StringBody("""{"temp": #{temperature}, "ts": #{timestamp}}"""))
.qos(QosAtLeastOnce)
)
.pause(1)
}
setUp(
scn.inject(rampUsers(1000).during(60.seconds))
).protocols(mqttConf)
.assertions(
global.successfulRequests.percent.gte(99),
global.responseTime.percentile3.lte(200),
)
}Testing MQTT with TLS
def test_tls_connection():
"""Client should connect securely with TLS."""
client = mqtt.Client(client_id='tls-test')
client.tls_set(
ca_certs='certs/ca.crt',
certfile='certs/client.crt',
keyfile='certs/client.key',
)
connected = threading.Event()
client.on_connect = lambda c, u, f, rc: connected.set() if rc == 0 else None
client.connect('localhost', 8883)
client.loop_start()
assert connected.wait(timeout=5), 'TLS connection failed'
client.loop_stop()
client.disconnect()
def test_rejected_without_certificate():
"""Connection without valid certificate should be rejected."""
client = mqtt.Client(client_id='no-cert-test')
# Don't configure TLS
with pytest.raises(Exception):
client.connect('localhost', 8883) # TLS port, no certRunning Tests in CI
name: MQTT Integration Tests
on: [pull_request]
jobs:
mqtt-tests:
runs-on: ubuntu-latest
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- 1883:1883
options: >-
--health-cmd "mosquitto_pub -h localhost -t test -n -q 1"
--health-interval 5s
--health-timeout 3s
--health-retries 5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install paho-mqtt pytest
- run: pytest tests/mqtt/ -v --tb=shortCommon MQTT Testing Mistakes
Not testing QoS semantics. QoS 0, 1, and 2 have meaningfully different guarantees. Test each explicitly — bugs often appear when QoS levels mismatch between publisher and subscriber.
Ignoring message ordering. MQTT doesn't guarantee message order across topics. Tests that assume ordering will be flaky.
Not testing retained messages. Retained messages are critical for device state synchronization. Skipping retained message tests leaves a common failure mode untested.
No LWT testing. LWT is how systems detect device disconnection. Without LWT tests, abnormal disconnection handling is untested.
Using production brokers for integration tests. Always test against a dedicated test broker. Using production MQTT means test messages pollute real device topics.
No load tests. An MQTT broker that handles 10 devices in testing may collapse under 10,000 in production. Load test your broker before scaling.
HelpMeTest helps QA teams build comprehensive test coverage for IoT applications, including the cloud application layer that processes MQTT data. Start free.