ZeroMQ Testing Guide: Testing Patterns in Python and C++

ZeroMQ Testing Guide: Testing Patterns in Python and C++

ZeroMQ is a brokerless messaging library — there's no server to start, which makes testing both easier and harder. Easier because you can test sockets in the same process. Harder because ZeroMQ's asynchronous socket behavior creates timing issues that require careful test design. This guide covers testing REQ/REP, PUB/SUB, and PUSH/PULL patterns in Python and C++.

Key Takeaways

ZeroMQ has no broker — test in-process. Unlike RabbitMQ or Pulsar, there's no external server to start. ZeroMQ sockets communicate peer-to-peer, so you can test sender and receiver in the same process or thread.

Use inproc:// transport for unit tests. The inproc:// transport is faster than tcp:// and avoids port conflicts. Use it for unit tests; tcp:// for integration tests with real network hops.

PUB/SUB has a slow joiner problem — add a sleep or use XPUB/XSUB. A subscriber joining a PUB socket misses messages published before the subscription is established. Add a short sleep after subscribe, or use XPUB/XSUB which supports subscription handshake detection.

REQ/REP must alternate strictly. A REQ socket must receive before it can send again. Tests that send twice without receiving will deadlock. Use DEALER/ROUTER for non-blocking patterns.

Set LINGER to 0 in tests. socket.setsockopt(zmq.LINGER, 0) prevents sockets from blocking at close when there are unsent messages. Always set this in test fixtures.

ZeroMQ Testing Fundamentals

ZeroMQ differs from traditional message brokers: there is no server process, no broker to start, no queue to inspect. Sockets connect directly to each other. This has important implications for testing:

  • No broker fixture needed — just create sockets in your test
  • Transport choice mattersinproc:// (same process), ipc:// (same machine), tcp:// (network)
  • Async by default — send/receive are non-blocking unless you use recv() without NOBLOCK

Python Setup

pip install pyzmq pytest pytest-timeout

Python Fixtures

# conftest.py
import pytest
import zmq

@pytest.fixture
def zmq_context():
    ctx = zmq.Context()
    yield ctx
    ctx.destroy(linger=0)

def make_pair(ctx, socket_type_a, socket_type_b, transport="inproc://test-{}"):
    import uuid
    addr = transport.format(uuid.uuid4().hex[:8])
    a = ctx.socket(socket_type_a)
    b = ctx.socket(socket_type_b)
    a.setsockopt(zmq.LINGER, 0)
    b.setsockopt(zmq.LINGER, 0)
    a.bind(addr)
    b.connect(addr)
    return a, b, addr

Testing REQ/REP (Request-Reply)

REQ/REP is the most common ZeroMQ pattern. Test that the server responds correctly:

# tests/test_req_rep.py
import zmq
import threading
import pytest

def echo_server(ctx, addr, stop_event):
    server = ctx.socket(zmq.REP)
    server.setsockopt(zmq.LINGER, 0)
    server.bind(addr)
    while not stop_event.is_set():
        if server.poll(100):
            msg = server.recv_json()
            server.send_json({"echo": msg, "status": "ok"})
    server.close()

def test_req_rep_basic(zmq_context):
    addr = "inproc://req-rep-test"
    stop = threading.Event()

    server_thread = threading.Thread(target=echo_server, args=(zmq_context, addr, stop))
    server_thread.start()

    client = zmq_context.socket(zmq.REQ)
    client.setsockopt(zmq.LINGER, 0)
    client.connect(addr)

    client.send_json({"action": "ping"})
    reply = client.recv_json(flags=zmq.NOBLOCK | zmq.POLLIN)

    # Retry with poll
    poller = zmq.Poller()
    poller.register(client, zmq.POLLIN)
    sockets = dict(poller.poll(2000))
    if client in sockets:
        reply = client.recv_json()
    else:
        pytest.fail("Timeout waiting for reply")

    assert reply["status"] == "ok"
    assert reply["echo"]["action"] == "ping"

    stop.set()
    server_thread.join(timeout=2)
    client.close()

def test_req_rep_error_response(zmq_context):
    addr = "inproc://req-rep-error"
    stop = threading.Event()

    def error_server(ctx, addr, stop_event):
        s = ctx.socket(zmq.REP)
        s.setsockopt(zmq.LINGER, 0)
        s.bind(addr)
        while not stop_event.is_set():
            if s.poll(100):
                msg = s.recv_json()
                if msg.get("action") == "unknown":
                    s.send_json({"error": "unknown action"})
                else:
                    s.send_json({"ok": True})
        s.close()

    t = threading.Thread(target=error_server, args=(zmq_context, addr, stop))
    t.start()

    client = zmq_context.socket(zmq.REQ)
    client.setsockopt(zmq.LINGER, 0)
    client.connect(addr)

    client.send_json({"action": "unknown"})
    poller = zmq.Poller()
    poller.register(client, zmq.POLLIN)
    ready = dict(poller.poll(2000))
    assert client in ready, "Timeout"
    reply = client.recv_json()
    assert "error" in reply

    stop.set()
    t.join(timeout=2)
    client.close()

Testing PUB/SUB

The slow joiner problem is real. Use a poll-based approach to handle it:

# tests/test_pub_sub.py
import zmq
import time
import threading

def test_pub_sub_subscriber_receives_message(zmq_context):
    pub = zmq_context.socket(zmq.PUB)
    sub = zmq_context.socket(zmq.SUB)
    pub.setsockopt(zmq.LINGER, 0)
    sub.setsockopt(zmq.LINGER, 0)

    pub.bind("inproc://pubsub-test")
    sub.connect("inproc://pubsub-test")
    sub.setsockopt_string(zmq.SUBSCRIBE, "events.")

    # Slow joiner mitigation — wait for subscription to propagate
    time.sleep(0.05)

    pub.send_string("events.order hello-world")

    poller = zmq.Poller()
    poller.register(sub, zmq.POLLIN)
    ready = dict(poller.poll(2000))
    assert sub in ready, "Timed out waiting for pub/sub message"

    msg = sub.recv_string()
    assert msg == "events.order hello-world"

    pub.close()
    sub.close()

def test_pub_sub_filter_works(zmq_context):
    pub = zmq_context.socket(zmq.PUB)
    sub_orders = zmq_context.socket(zmq.SUB)
    sub_payments = zmq_context.socket(zmq.SUB)

    for s in [pub, sub_orders, sub_payments]:
        s.setsockopt(zmq.LINGER, 0)

    pub.bind("inproc://pubsub-filter")
    sub_orders.connect("inproc://pubsub-filter")
    sub_payments.connect("inproc://pubsub-filter")

    sub_orders.setsockopt_string(zmq.SUBSCRIBE, "orders.")
    sub_payments.setsockopt_string(zmq.SUBSCRIBE, "payments.")

    time.sleep(0.05)

    pub.send_string("orders.created event-data")
    pub.send_string("payments.completed pay-data")

    poller = zmq.Poller()
    poller.register(sub_orders, zmq.POLLIN)
    poller.register(sub_payments, zmq.POLLIN)

    received_orders = []
    received_payments = []

    deadline = time.time() + 2
    while time.time() < deadline and (len(received_orders) < 1 or len(received_payments) < 1):
        ready = dict(poller.poll(200))
        if sub_orders in ready:
            received_orders.append(sub_orders.recv_string())
        if sub_payments in ready:
            received_payments.append(sub_payments.recv_string())

    assert len(received_orders) == 1
    assert len(received_payments) == 1
    assert "orders." in received_orders[0]
    assert "payments." in received_payments[0]

    pub.close()
    sub_orders.close()
    sub_payments.close()

Testing PUSH/PULL (Pipeline)

PUSH/PULL distributes work across workers:

def test_push_pull_distributes_to_workers(zmq_context):
    pusher = zmq_context.socket(zmq.PUSH)
    worker1 = zmq_context.socket(zmq.PULL)
    worker2 = zmq_context.socket(zmq.PULL)

    for s in [pusher, worker1, worker2]:
        s.setsockopt(zmq.LINGER, 0)

    pusher.bind("inproc://pipeline")
    worker1.connect("inproc://pipeline")
    worker2.connect("inproc://pipeline")

    for i in range(4):
        pusher.send_string(f"task-{i}")

    poller = zmq.Poller()
    poller.register(worker1, zmq.POLLIN)
    poller.register(worker2, zmq.POLLIN)

    received = []
    deadline = time.time() + 2
    while len(received) < 4 and time.time() < deadline:
        ready = dict(poller.poll(200))
        if worker1 in ready:
            received.append(("w1", worker1.recv_string()))
        if worker2 in ready:
            received.append(("w2", worker2.recv_string()))

    assert len(received) == 4, f"Expected 4 tasks, got {len(received)}"
    tasks = [r[1] for r in received]
    assert set(tasks) == {"task-0", "task-1", "task-2", "task-3"}

    pusher.close()
    worker1.close()
    worker2.close()

C++ Testing with GoogleTest

ZeroMQ C++ testing follows the same patterns:

// tests/test_zmq_req_rep.cpp
#include <gtest/gtest.h>
#include <zmq.hpp>
#include <thread>
#include <chrono>

TEST(ZeroMQTest, ReqRepEchoServer) {
    zmq::context_t ctx(1);

    // Server thread
    std::thread server_thread([&ctx]() {
        zmq::socket_t rep(ctx, zmq::socket_type::rep);
        rep.set(zmq::sockopt::linger, 0);
        rep.bind("inproc://cpp-req-rep");

        zmq::message_t request;
        auto result = rep.recv(request, zmq::recv_flags::none);
        ASSERT_TRUE(result.has_value());

        std::string req_str(static_cast<char*>(request.data()), request.size());
        rep.send(zmq::buffer("pong:" + req_str), zmq::send_flags::none);
    });

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    zmq::socket_t req(ctx, zmq::socket_type::req);
    req.set(zmq::sockopt::linger, 0);
    req.connect("inproc://cpp-req-rep");

    req.send(zmq::buffer(std::string("ping")), zmq::send_flags::none);

    zmq::message_t reply;
    req.recv(reply, zmq::recv_flags::none);

    std::string reply_str(static_cast<char*>(reply.data()), reply.size());
    EXPECT_EQ(reply_str, "pong:ping");

    server_thread.join();
}

Common Test Pitfalls

Problem Cause Fix
Test hangs at recv() Sender crashed before sending Use poll() with timeout before recv()
PUB/SUB messages never arrive Slow joiner — subscriber not ready Add time.sleep(0.05) after subscribe
REQ socket deadlocks Sent twice without receiving Use DEALER for non-strict request flow
Port conflicts in CI Tests use fixed TCP port Use inproc:// or port 0 binding
Tests hang at teardown LINGER not set Set socket.setsockopt(zmq.LINGER, 0) on every socket

Summary

ZeroMQ testing requires no broker but demands careful attention to socket patterns and timing:

  • Use inproc:// transport for fast in-process unit tests
  • Poll with timeout instead of blocking recv() to avoid hanging tests
  • Handle the slow joiner in PUB/SUB with a short sleep or XPUB/XSUB
  • Set LINGER=0 on every socket in test fixtures

The slow joiner and LINGER problems cause the most flaky tests — address them in your base fixtures and your ZeroMQ tests will be reliable.

Read more