MQTT Protocol Testing Strategies: From Unit Tests to Load Testing

MQTT Protocol Testing Strategies: From Unit Tests to Load Testing

MQTT is the backbone of most IoT systems — lightweight, broker-based, designed for unreliable networks. Testing MQTT is different from testing REST APIs: you're testing asynchronous publish/subscribe flows, quality of service levels, retained messages, and broker behavior under load.

This guide covers practical MQTT testing strategies from unit tests to load testing.

MQTT Testing Fundamentals

MQTT has a few concepts that affect how you test:

  • Topics — hierarchical strings (sensors/room1/temperature). Wildcards: + for one level, # for any depth.
  • QoS levels — 0 (at most once), 1 (at least once), 2 (exactly once)
  • Retained messages — broker stores last message per topic; new subscribers get it immediately
  • Last Will — message sent when a client disconnects unexpectedly

Each of these needs dedicated tests.

Setting Up a Test Broker

Use Mosquitto in Docker for local and CI testing:

# docker-compose.test.yml
services:
  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"
      - "9001:9001"
    volumes:
      - ./mosquitto-test.conf:/mosquitto/config/mosquitto.conf

  app:
    build: .
    depends_on:
      - mosquitto
    environment:
      MQTT_BROKER: mqtt://mosquitto:1883
# mosquitto-test.conf
listener 1883
allow_anonymous true
persistence false  # No state between test runs
log_type all

For more advanced scenarios, HiveMQ offers a free Docker image with a web UI.

Unit Testing MQTT Clients

Test your publisher and subscriber logic in isolation with mocked clients:

# Python test with unittest.mock
import pytest
from unittest.mock import MagicMock, patch, call
from myapp.sensors import TemperatureSensor

def test_sensor_publishes_on_reading():
    mock_client = MagicMock()
    sensor = TemperatureSensor(client=mock_client, topic="sensors/temp1")
    
    sensor.publish_reading(23.5)
    
    mock_client.publish.assert_called_once_with(
        "sensors/temp1",
        '{"value": 23.5, "unit": "celsius"}',
        qos=1,
        retain=False
    )

def test_sensor_uses_qos_1_for_critical_readings():
    mock_client = MagicMock()
    sensor = TemperatureSensor(client=mock_client, topic="sensors/temp1")
    
    sensor.publish_alert(temperature=85.0, threshold=80.0)
    
    call_args = mock_client.publish.call_args
    assert call_args.kwargs['qos'] == 1  # At-least-once for alerts

For Node.js:

const { MqttClient } = require('mqtt');

jest.mock('mqtt', () => ({
  connect: jest.fn(() => ({
    publish: jest.fn(),
    subscribe: jest.fn(),
    on: jest.fn()
  }))
}));

const { SensorPublisher } = require('../src/sensors');

describe('SensorPublisher', () => {
  it('publishes telemetry to correct topic', () => {
    const client = require('mqtt').connect();
    const publisher = new SensorPublisher(client, 'building-a');

    publisher.publishTemperature('room-101', 22.3);

    expect(client.publish).toHaveBeenCalledWith(
      'building-a/room-101/temperature',
      JSON.stringify({ value: 22.3, timestamp: expect.any(Number) }),
      { qos: 0, retain: false },
      undefined
    );
  });
});

Integration Testing with a Real Broker

Mock tests verify logic; integration tests verify the MQTT flow works end-to-end:

import pytest
import paho.mqtt.client as mqtt
import json
import threading
import time

class MqttTestHelper:
    """Helper to receive MQTT messages in tests."""
    
    def __init__(self, broker_host, broker_port=1883):
        self.received = []
        self.event = threading.Event()
        
        self.client = mqtt.Client(client_id="test_subscriber")
        self.client.on_message = self._on_message
        self.client.connect(broker_host, broker_port)
        self.client.loop_start()
    
    def _on_message(self, client, userdata, msg):
        self.received.append({
            'topic': msg.topic,
            'payload': json.loads(msg.payload),
            'qos': msg.qos
        })
        self.event.set()
    
    def subscribe(self, topic, qos=0):
        self.client.subscribe(topic, qos)
        return self
    
    def wait_for_message(self, timeout=5):
        received = self.event.wait(timeout)
        self.event.clear()
        return received
    
    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()


@pytest.fixture
def mqtt_broker():
    return "localhost"

def test_sensor_data_flows_through_broker(mqtt_broker):
    helper = MqttTestHelper(mqtt_broker)
    helper.subscribe("sensors/+/temperature")
    
    # Trigger your application to publish
    from myapp.sensors import TemperatureSensor
    sensor = TemperatureSensor(broker=mqtt_broker, topic="sensors/room1/temperature")
    sensor.simulate_reading(25.0)
    
    assert helper.wait_for_message(timeout=5), "No message received within 5 seconds"
    assert len(helper.received) == 1
    assert helper.received[0]['payload']['value'] == 25.0
    assert helper.received[0]['topic'] == "sensors/room1/temperature"
    
    helper.disconnect()

Testing QoS Levels

QoS 0 can drop messages; QoS 1 guarantees delivery but may duplicate. Test the right level for each message type:

def test_qos_1_delivers_despite_network_issues(mqtt_broker):
    """QoS 1 should redeliver if PUBACK is not received."""
    helper = MqttTestHelper(mqtt_broker)
    helper.subscribe("alerts/#", qos=1)
    
    publisher = mqtt.Client()
    publisher.connect(mqtt_broker)
    
    # Publish with QoS 1
    publisher.publish("alerts/temperature_high", json.dumps({"sensor": "1", "value": 90}), qos=1)
    
    assert helper.wait_for_message(timeout=5)
    assert helper.received[0]['qos'] == 1
    
    publisher.disconnect()
    helper.disconnect()

def test_qos_2_delivers_exactly_once(mqtt_broker):
    """QoS 2 should deliver exactly once, even on retry."""
    received_count = 0
    
    def count_messages(client, userdata, msg):
        nonlocal received_count
        received_count += 1
    
    subscriber = mqtt.Client()
    subscriber.on_message = count_messages
    subscriber.connect(mqtt_broker)
    subscriber.subscribe("commands/actuate", qos=2)
    subscriber.loop_start()
    
    publisher = mqtt.Client()
    publisher.connect(mqtt_broker)
    publisher.publish("commands/actuate", json.dumps({"action": "open_valve"}), qos=2)
    
    time.sleep(1)
    
    assert received_count == 1  # Exactly once
    
    subscriber.loop_stop()

Testing Retained Messages

Retained messages should be received by new subscribers immediately:

def test_retained_message_delivered_to_new_subscriber(mqtt_broker):
    # Publish a retained message
    publisher = mqtt.Client()
    publisher.connect(mqtt_broker)
    publisher.publish(
        "devices/device123/status",
        json.dumps({"online": True}),
        qos=1,
        retain=True
    )
    publisher.disconnect()
    time.sleep(0.2)  # Let broker store it
    
    # New subscriber should immediately receive retained message
    helper = MqttTestHelper(mqtt_broker)
    helper.subscribe("devices/device123/status")
    
    received = helper.wait_for_message(timeout=3)
    
    assert received, "New subscriber should get retained message immediately"
    assert helper.received[0]['payload']['online'] == True
    
    helper.disconnect()

def test_clear_retained_message(mqtt_broker):
    # Clear retained by publishing empty payload
    client = mqtt.Client()
    client.connect(mqtt_broker)
    client.publish("devices/device123/status", payload="", retain=True)
    client.disconnect()
    
    # New subscriber should NOT receive any message
    helper = MqttTestHelper(mqtt_broker)
    helper.subscribe("devices/device123/status")
    
    received = helper.wait_for_message(timeout=2)
    assert not received, "Cleared retained message should not be delivered"

Testing Last Will (LWT)

Last Will messages detect unexpected client disconnections:

def test_lwt_published_on_unexpected_disconnect(mqtt_broker):
    helper = MqttTestHelper(mqtt_broker)
    helper.subscribe("devices/device456/status")
    
    # Connect client with LWT configured
    device_client = mqtt.Client()
    device_client.will_set(
        "devices/device456/status",
        json.dumps({"online": False, "reason": "disconnected"}),
        qos=1,
        retain=True
    )
    device_client.connect(mqtt_broker)
    
    # Simulate unexpected disconnect (no DISCONNECT packet)
    device_client.socket().close()  # Force TCP close
    
    # Broker should publish LWT after keepalive timeout
    received = helper.wait_for_message(timeout=10)
    
    assert received
    assert helper.received[0]['payload']['online'] == False

Load Testing MQTT

For production confidence, load test your broker and subscribers:

import concurrent.futures
import time

def load_test_mqtt_throughput(broker_host, target_messages_per_second=1000, duration_seconds=30):
    received_count = 0
    start_time = time.time()
    
    helper = MqttTestHelper(broker_host)
    helper.subscribe("loadtest/#")
    
    def publish_batch(thread_id, messages_per_thread):
        client = mqtt.Client(client_id=f"loadtest_pub_{thread_id}")
        client.connect(broker_host)
        for i in range(messages_per_thread):
            client.publish(f"loadtest/{thread_id}", json.dumps({"seq": i}), qos=0)
        client.disconnect()
    
    num_threads = 10
    messages_per_thread = (target_messages_per_second * duration_seconds) // num_threads
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [executor.submit(publish_batch, i, messages_per_thread) for i in range(num_threads)]
        concurrent.futures.wait(futures)
    
    elapsed = time.time() - start_time
    actual_rate = len(helper.received) / elapsed
    
    print(f"Published: {num_threads * messages_per_thread}")
    print(f"Received: {len(helper.received)}")
    print(f"Duration: {elapsed:.1f}s")
    print(f"Rate: {actual_rate:.0f} msg/s")
    print(f"Drop rate: {(1 - len(helper.received) / (num_threads * messages_per_thread)) * 100:.1f}%")
    
    helper.disconnect()

MQTT Testing in CI

name: MQTT Integration Tests

on: [push, pull_request]

jobs:
  mqtt-tests:
    runs-on: ubuntu-latest
    services:
      mosquitto:
        image: eclipse-mosquitto:2
        ports:
          - 1883:1883
        options: >-
          --health-cmd "mosquitto_pub -h localhost -t test -m test"
          --health-interval 5s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.12" }
      
      - run: pip install paho-mqtt pytest
      
      - name: Run MQTT tests
        run: pytest tests/mqtt/ -v
        env:
          MQTT_BROKER: localhost
          MQTT_PORT: 1883

Summary

MQTT testing needs three layers:

  1. Unit tests — mock the client, test publish/subscribe logic in isolation
  2. Integration tests — use a real broker, test QoS, retained messages, and LWT
  3. Load tests — verify throughput, drop rates, and broker stability under pressure

The MqttTestHelper pattern (subscribe + wait for message) is your core tool for integration tests. Use it liberally — async message delivery is where most MQTT bugs hide.

Read more