MQTT Testing Guide: Validate Message Brokers and IoT Messaging Pipelines

MQTT Testing Guide: Validate Message Brokers and IoT Messaging Pipelines

MQTT is the messaging backbone of most IoT systems. Lightweight, publish-subscribe, and designed for unreliable networks — it's the protocol that connects millions of devices to cloud backends. When MQTT breaks, data stops flowing, alerts don't fire, and device control fails.

Testing MQTT systems requires understanding the protocol's semantics: QoS levels, retained messages, Last Will, topic hierarchies, and broker behavior under load.

MQTT Fundamentals for Testers

Before writing tests, understand what MQTT promises:

Quality of Service (QoS) levels:

  • QoS 0 — At most once: fire and forget. No delivery guarantee. Fastest.
  • QoS 1 — At least once: guaranteed delivery, possible duplicates. Client must handle deduplication.
  • QoS 2 — Exactly once: guaranteed single delivery. Highest overhead.

Retained messages: A retained message is stored by the broker and delivered immediately to new subscribers. Used to publish "last known state" — a new subscriber gets the current device status without waiting for the next publish.

Last Will and Testament (LWT): Configured at connection time. If the client disconnects abnormally, the broker publishes the LWT message on the client's behalf. Used for "device offline" notifications.

Topic wildcards:

  • + — single level: devices/+/temperature matches devices/device-001/temperature
  • # — multi-level: devices/# matches everything under devices/

Your tests must validate all of these behaviors, not just basic publish/subscribe.

Setting Up an MQTT Test Environment

Local Broker with Mosquitto

# Docker Compose: Mosquitto + test tooling
version: <span class="hljs-string">'3.8'
services:
  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - <span class="hljs-string">"1883:1883"
      - <span class="hljs-string">"8883:8883"  <span class="hljs-comment"># TLS
      - <span class="hljs-string">"9001:9001"  <span class="hljs-comment"># WebSocket
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

  <span class="hljs-comment"># MQTT explorer for debugging
  mqtt-explorer:
    image: smeagolworms4/mqtt-explorer
    ports:
      - <span class="hljs-string">"4000:4000"
# mosquitto.conf
listener 1883
allow_anonymous true

listener 8883
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
require_certificate false

HiveMQ Broker for Production-Like Testing

HiveMQ Community Edition provides a production-grade broker for integration testing:

hivemq:
  image: hivemq/hivemq4:latest
  ports:
    - "1883:1883"
    - "8080:8080"  # Control Center
  environment:
    - HIVEMQ_LOG_LEVEL=INFO

Testing with Python and Paho-MQTT

Test Framework Setup

# conftest.py
import paho.mqtt.client as mqtt
import pytest
import time
import json
import threading
from queue import Queue, Empty

class MQTTTestClient:
    """Test client with blocking message receipt."""
    
    def __init__(self, broker='localhost', port=1883, client_id=None):
        self.broker = broker
        self.port = port
        self.message_queue = Queue()
        self.connected = threading.Event()
        
        self.client = mqtt.Client(client_id=client_id or f'test-{id(self)}')
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect
        
        self.disconnected = threading.Event()
    
    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected.set()
    
    def _on_message(self, client, userdata, msg):
        self.message_queue.put({
            'topic': msg.topic,
            'payload': msg.payload,
            'qos': msg.qos,
            'retain': msg.retain,
        })
    
    def _on_disconnect(self, client, userdata, rc):
        self.connected.clear()
        self.disconnected.set()
    
    def connect(self, timeout=5):
        self.client.connect(self.broker, self.port)
        self.client.loop_start()
        assert self.connected.wait(timeout), f'Connection timeout after {timeout}s'
        return self
    
    def subscribe(self, topic, qos=0):
        self.client.subscribe(topic, qos=qos)
        time.sleep(0.1)  # brief delay for subscription acknowledgment
        return self
    
    def publish(self, topic, payload, qos=0, retain=False):
        if isinstance(payload, dict):
            payload = json.dumps(payload)
        result = self.client.publish(topic, payload, qos=qos, retain=retain)
        result.wait_for_publish(timeout=5)
        return self
    
    def receive(self, timeout=5):
        try:
            return self.message_queue.get(timeout=timeout)
        except Empty:
            return None
    
    def receive_all(self, timeout=2):
        messages = []
        deadline = time.time() + timeout
        while time.time() < deadline:
            msg = self.receive(timeout=max(0, deadline - time.time()))
            if msg:
                messages.append(msg)
        return messages
    
    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

@pytest.fixture
def mqtt_client():
    client = MQTTTestClient().connect()
    yield client
    client.disconnect()

@pytest.fixture
def publisher():
    client = MQTTTestClient(client_id='test-publisher').connect()
    yield client
    client.disconnect()

@pytest.fixture
def subscriber():
    client = MQTTTestClient(client_id='test-subscriber').connect()
    yield client
    client.disconnect()

Core MQTT Test Cases

Basic Publish/Subscribe

def test_publish_subscribe_basic(publisher, subscriber):
    """A subscriber should receive messages published to its topic."""
    topic = 'test/basic'
    payload = {'sensor': 'temp', 'value': 22.5}
    
    subscriber.subscribe(topic)
    publisher.publish(topic, payload)
    
    msg = subscriber.receive(timeout=3)
    
    assert msg is not None, 'No message received'
    assert msg['topic'] == topic
    received = json.loads(msg['payload'])
    assert received['value'] == 22.5

def test_wildcard_subscription_single_level(publisher, subscriber):
    """Single-level wildcard + should match one topic level."""
    subscriber.subscribe('devices/+/temperature')
    
    publisher.publish('devices/device-001/temperature', {'value': 22.5})
    publisher.publish('devices/device-002/temperature', {'value': 18.1})
    publisher.publish('devices/device-001/humidity', {'value': 60})  # should NOT match
    
    messages = subscriber.receive_all(timeout=2)
    topics = [m['topic'] for m in messages]
    
    assert 'devices/device-001/temperature' in topics
    assert 'devices/device-002/temperature' in topics
    assert 'devices/device-001/humidity' not in topics

def test_wildcard_subscription_multi_level(publisher, subscriber):
    """Multi-level wildcard # should match all sub-topics."""
    subscriber.subscribe('devices/#')
    
    publisher.publish('devices/device-001/temperature', b'22.5')
    publisher.publish('devices/device-001/humidity', b'60')
    publisher.publish('devices/device-001/status/battery', b'85')
    
    messages = subscriber.receive_all(timeout=2)
    assert len(messages) == 3

QoS Level Testing

def test_qos0_fire_and_forget(publisher, subscriber):
    """QoS 0 messages should be delivered without acknowledgment."""
    subscriber.subscribe('test/qos0', qos=0)
    publisher.publish('test/qos0', 'hello', qos=0)
    
    msg = subscriber.receive(timeout=3)
    assert msg is not None
    assert msg['qos'] == 0

def test_qos1_at_least_once(publisher, subscriber):
    """QoS 1 should guarantee delivery."""
    subscriber.subscribe('test/qos1', qos=1)
    
    # Publish with QoS 1
    publisher.publish('test/qos1', 'important-data', qos=1)
    
    msg = subscriber.receive(timeout=3)
    assert msg is not None
    assert msg['payload'] == b'important-data'

def test_qos2_exactly_once(publisher, subscriber):
    """QoS 2 should deliver exactly once — no duplicates."""
    received_ids = []
    
    subscriber.subscribe('test/qos2', qos=2)
    
    for i in range(5):
        publisher.publish('test/qos2', json.dumps({'id': i}), qos=2)
    
    messages = subscriber.receive_all(timeout=5)
    ids = [json.loads(m['payload'])['id'] for m in messages]
    
    # No duplicates with QoS 2
    assert len(ids) == len(set(ids)), 'Duplicate messages received with QoS 2'
    assert set(ids) == {0, 1, 2, 3, 4}

Retained Messages

def test_retained_message_delivered_on_subscribe(publisher):
    """New subscriber should receive the retained message immediately."""
    topic = f'test/retained/{int(time.time())}'
    
    # Publish retained
    publisher.publish(topic, json.dumps({'status': 'online'}), retain=True)
    time.sleep(0.5)
    
    # New subscriber connects AFTER publish
    late_subscriber = MQTTTestClient(client_id='late-subscriber').connect()
    late_subscriber.subscribe(topic)
    
    msg = late_subscriber.receive(timeout=3)
    late_subscriber.disconnect()
    
    assert msg is not None, 'Retained message not delivered to new subscriber'
    assert msg['retain'] is True
    assert json.loads(msg['payload'])['status'] == 'online'

def test_clear_retained_message(publisher):
    """Publishing empty payload to topic should clear retained message."""
    topic = f'test/retained/clear/{int(time.time())}'
    
    publisher.publish(topic, 'keep-me', retain=True)
    time.sleep(0.2)
    
    # Clear retained by publishing empty payload
    publisher.publish(topic, '', retain=True)
    time.sleep(0.2)
    
    late_subscriber = MQTTTestClient(client_id='clear-test-subscriber').connect()
    late_subscriber.subscribe(topic)
    
    msg = late_subscriber.receive(timeout=2)
    late_subscriber.disconnect()
    
    assert msg is None, 'Retained message was not cleared'

Last Will and Testament

def test_lwt_published_on_abnormal_disconnect():
    """Broker should publish LWT when client disconnects abnormally."""
    lwt_topic = f'devices/device-lwt-test/status'
    
    # Observer subscribes to LWT topic
    observer = MQTTTestClient(client_id='lwt-observer').connect()
    observer.subscribe(lwt_topic)
    
    # Client configures LWT
    lwt_client = mqtt.Client(client_id='lwt-device')
    lwt_client.will_set(
        lwt_topic,
        payload=json.dumps({'status': 'offline', 'reason': 'disconnected'}),
        qos=1,
        retain=True,
    )
    lwt_client.connect('localhost', 1883)
    lwt_client.loop_start()
    time.sleep(0.5)
    
    # Simulate abnormal disconnect (network drop)
    lwt_client.loop_stop()
    lwt_client._sock.close()  # force close without DISCONNECT packet
    
    # Broker should deliver LWT
    msg = observer.receive(timeout=10)
    observer.disconnect()
    
    assert msg is not None, 'LWT not published after abnormal disconnect'
    payload = json.loads(msg['payload'])
    assert payload['status'] == 'offline'

Load Testing MQTT Brokers

Use MQTT load testing tools to verify broker performance under production-like conditions:

With mqtt-bench

# 1000 publishers, 1000 messages each, QoS 1
mqtt-bench -action pub \
  -broker tcp://localhost:1883 \
  -clients 1000 \
  -count 1000 \
  -qos 1 \
  -topic <span class="hljs-string">'benchmark/load/%d'

With Gatling MQTT Plugin

// Gatling IoT scenario
class MqttSimulation extends Simulation {
  val mqttConf = mqtt
    .broker("localhost", 1883)
    .cleanSession(true)
    .credentials("user", "pass")
    .qosDefault(QosAtLeastOnce)
    .reconnectAttemptsMax(3)

  val scn = scenario("IoT Device Simulation")
    .exec(mqtt("connect").connect())
    .repeat(100) {
      exec(mqtt("publish telemetry")
        .publish("devices/#{deviceId}/telemetry")
        .message(StringBody("""{"temp": #{temperature}, "ts": #{timestamp}}"""))
        .qos(QosAtLeastOnce)
      )
      .pause(1)
    }

  setUp(
    scn.inject(rampUsers(1000).during(60.seconds))
  ).protocols(mqttConf)
    .assertions(
      global.successfulRequests.percent.gte(99),
      global.responseTime.percentile3.lte(200),
    )
}

Testing MQTT with TLS

def test_tls_connection():
    """Client should connect securely with TLS."""
    client = mqtt.Client(client_id='tls-test')
    client.tls_set(
        ca_certs='certs/ca.crt',
        certfile='certs/client.crt',
        keyfile='certs/client.key',
    )
    
    connected = threading.Event()
    client.on_connect = lambda c, u, f, rc: connected.set() if rc == 0 else None
    
    client.connect('localhost', 8883)
    client.loop_start()
    
    assert connected.wait(timeout=5), 'TLS connection failed'
    
    client.loop_stop()
    client.disconnect()

def test_rejected_without_certificate():
    """Connection without valid certificate should be rejected."""
    client = mqtt.Client(client_id='no-cert-test')
    # Don't configure TLS
    
    with pytest.raises(Exception):
        client.connect('localhost', 8883)  # TLS port, no cert

Running Tests in CI

name: MQTT Integration Tests

on: [pull_request]

jobs:
  mqtt-tests:
    runs-on: ubuntu-latest
    services:
      mosquitto:
        image: eclipse-mosquitto:2
        ports:
          - 1883:1883
        options: >-
          --health-cmd "mosquitto_pub -h localhost -t test -n -q 1"
          --health-interval 5s
          --health-timeout 3s
          --health-retries 5
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      - run: pip install paho-mqtt pytest
      - run: pytest tests/mqtt/ -v --tb=short

Common MQTT Testing Mistakes

Not testing QoS semantics. QoS 0, 1, and 2 have meaningfully different guarantees. Test each explicitly — bugs often appear when QoS levels mismatch between publisher and subscriber.

Ignoring message ordering. MQTT doesn't guarantee message order across topics. Tests that assume ordering will be flaky.

Not testing retained messages. Retained messages are critical for device state synchronization. Skipping retained message tests leaves a common failure mode untested.

No LWT testing. LWT is how systems detect device disconnection. Without LWT tests, abnormal disconnection handling is untested.

Using production brokers for integration tests. Always test against a dedicated test broker. Using production MQTT means test messages pollute real device topics.

No load tests. An MQTT broker that handles 10 devices in testing may collapse under 10,000 in production. Load test your broker before scaling.


HelpMeTest helps QA teams build comprehensive test coverage for IoT applications, including the cloud application layer that processes MQTT data. Start free.

Read more