ZeroMQ Testing Guide: Testing Patterns in Python and C++
ZeroMQ is a brokerless messaging library — there's no server to start, which makes testing both easier and harder. Easier because you can test sockets in the same process. Harder because ZeroMQ's asynchronous socket behavior creates timing issues that require careful test design. This guide covers testing REQ/REP, PUB/SUB, and PUSH/PULL patterns in Python and C++.
Key Takeaways
ZeroMQ has no broker — test in-process. Unlike RabbitMQ or Pulsar, there's no external server to start. ZeroMQ sockets communicate peer-to-peer, so you can test sender and receiver in the same process or thread.
Use inproc:// transport for unit tests. The inproc:// transport is faster than tcp:// and avoids port conflicts. Use it for unit tests; tcp:// for integration tests with real network hops.
PUB/SUB has a slow joiner problem — add a sleep or use XPUB/XSUB. A subscriber joining a PUB socket misses messages published before the subscription is established. Add a short sleep after subscribe, or use XPUB/XSUB which supports subscription handshake detection.
REQ/REP must alternate strictly. A REQ socket must receive before it can send again. Tests that send twice without receiving will deadlock. Use DEALER/ROUTER for non-blocking patterns.
Set LINGER to 0 in tests. socket.setsockopt(zmq.LINGER, 0) prevents sockets from blocking at close when there are unsent messages. Always set this in test fixtures.
ZeroMQ Testing Fundamentals
ZeroMQ differs from traditional message brokers: there is no server process, no broker to start, no queue to inspect. Sockets connect directly to each other. This has important implications for testing:
- No broker fixture needed — just create sockets in your test
- Transport choice matters —
inproc://(same process),ipc://(same machine),tcp://(network) - Async by default — send/receive are non-blocking unless you use
recv()withoutNOBLOCK
Python Setup
pip install pyzmq pytest pytest-timeoutPython Fixtures
# conftest.py
import pytest
import zmq
@pytest.fixture
def zmq_context():
ctx = zmq.Context()
yield ctx
ctx.destroy(linger=0)
def make_pair(ctx, socket_type_a, socket_type_b, transport="inproc://test-{}"):
import uuid
addr = transport.format(uuid.uuid4().hex[:8])
a = ctx.socket(socket_type_a)
b = ctx.socket(socket_type_b)
a.setsockopt(zmq.LINGER, 0)
b.setsockopt(zmq.LINGER, 0)
a.bind(addr)
b.connect(addr)
return a, b, addrTesting REQ/REP (Request-Reply)
REQ/REP is the most common ZeroMQ pattern. Test that the server responds correctly:
# tests/test_req_rep.py
import zmq
import threading
import pytest
def echo_server(ctx, addr, stop_event):
server = ctx.socket(zmq.REP)
server.setsockopt(zmq.LINGER, 0)
server.bind(addr)
while not stop_event.is_set():
if server.poll(100):
msg = server.recv_json()
server.send_json({"echo": msg, "status": "ok"})
server.close()
def test_req_rep_basic(zmq_context):
addr = "inproc://req-rep-test"
stop = threading.Event()
server_thread = threading.Thread(target=echo_server, args=(zmq_context, addr, stop))
server_thread.start()
client = zmq_context.socket(zmq.REQ)
client.setsockopt(zmq.LINGER, 0)
client.connect(addr)
client.send_json({"action": "ping"})
reply = client.recv_json(flags=zmq.NOBLOCK | zmq.POLLIN)
# Retry with poll
poller = zmq.Poller()
poller.register(client, zmq.POLLIN)
sockets = dict(poller.poll(2000))
if client in sockets:
reply = client.recv_json()
else:
pytest.fail("Timeout waiting for reply")
assert reply["status"] == "ok"
assert reply["echo"]["action"] == "ping"
stop.set()
server_thread.join(timeout=2)
client.close()
def test_req_rep_error_response(zmq_context):
addr = "inproc://req-rep-error"
stop = threading.Event()
def error_server(ctx, addr, stop_event):
s = ctx.socket(zmq.REP)
s.setsockopt(zmq.LINGER, 0)
s.bind(addr)
while not stop_event.is_set():
if s.poll(100):
msg = s.recv_json()
if msg.get("action") == "unknown":
s.send_json({"error": "unknown action"})
else:
s.send_json({"ok": True})
s.close()
t = threading.Thread(target=error_server, args=(zmq_context, addr, stop))
t.start()
client = zmq_context.socket(zmq.REQ)
client.setsockopt(zmq.LINGER, 0)
client.connect(addr)
client.send_json({"action": "unknown"})
poller = zmq.Poller()
poller.register(client, zmq.POLLIN)
ready = dict(poller.poll(2000))
assert client in ready, "Timeout"
reply = client.recv_json()
assert "error" in reply
stop.set()
t.join(timeout=2)
client.close()Testing PUB/SUB
The slow joiner problem is real. Use a poll-based approach to handle it:
# tests/test_pub_sub.py
import zmq
import time
import threading
def test_pub_sub_subscriber_receives_message(zmq_context):
pub = zmq_context.socket(zmq.PUB)
sub = zmq_context.socket(zmq.SUB)
pub.setsockopt(zmq.LINGER, 0)
sub.setsockopt(zmq.LINGER, 0)
pub.bind("inproc://pubsub-test")
sub.connect("inproc://pubsub-test")
sub.setsockopt_string(zmq.SUBSCRIBE, "events.")
# Slow joiner mitigation — wait for subscription to propagate
time.sleep(0.05)
pub.send_string("events.order hello-world")
poller = zmq.Poller()
poller.register(sub, zmq.POLLIN)
ready = dict(poller.poll(2000))
assert sub in ready, "Timed out waiting for pub/sub message"
msg = sub.recv_string()
assert msg == "events.order hello-world"
pub.close()
sub.close()
def test_pub_sub_filter_works(zmq_context):
pub = zmq_context.socket(zmq.PUB)
sub_orders = zmq_context.socket(zmq.SUB)
sub_payments = zmq_context.socket(zmq.SUB)
for s in [pub, sub_orders, sub_payments]:
s.setsockopt(zmq.LINGER, 0)
pub.bind("inproc://pubsub-filter")
sub_orders.connect("inproc://pubsub-filter")
sub_payments.connect("inproc://pubsub-filter")
sub_orders.setsockopt_string(zmq.SUBSCRIBE, "orders.")
sub_payments.setsockopt_string(zmq.SUBSCRIBE, "payments.")
time.sleep(0.05)
pub.send_string("orders.created event-data")
pub.send_string("payments.completed pay-data")
poller = zmq.Poller()
poller.register(sub_orders, zmq.POLLIN)
poller.register(sub_payments, zmq.POLLIN)
received_orders = []
received_payments = []
deadline = time.time() + 2
while time.time() < deadline and (len(received_orders) < 1 or len(received_payments) < 1):
ready = dict(poller.poll(200))
if sub_orders in ready:
received_orders.append(sub_orders.recv_string())
if sub_payments in ready:
received_payments.append(sub_payments.recv_string())
assert len(received_orders) == 1
assert len(received_payments) == 1
assert "orders." in received_orders[0]
assert "payments." in received_payments[0]
pub.close()
sub_orders.close()
sub_payments.close()Testing PUSH/PULL (Pipeline)
PUSH/PULL distributes work across workers:
def test_push_pull_distributes_to_workers(zmq_context):
pusher = zmq_context.socket(zmq.PUSH)
worker1 = zmq_context.socket(zmq.PULL)
worker2 = zmq_context.socket(zmq.PULL)
for s in [pusher, worker1, worker2]:
s.setsockopt(zmq.LINGER, 0)
pusher.bind("inproc://pipeline")
worker1.connect("inproc://pipeline")
worker2.connect("inproc://pipeline")
for i in range(4):
pusher.send_string(f"task-{i}")
poller = zmq.Poller()
poller.register(worker1, zmq.POLLIN)
poller.register(worker2, zmq.POLLIN)
received = []
deadline = time.time() + 2
while len(received) < 4 and time.time() < deadline:
ready = dict(poller.poll(200))
if worker1 in ready:
received.append(("w1", worker1.recv_string()))
if worker2 in ready:
received.append(("w2", worker2.recv_string()))
assert len(received) == 4, f"Expected 4 tasks, got {len(received)}"
tasks = [r[1] for r in received]
assert set(tasks) == {"task-0", "task-1", "task-2", "task-3"}
pusher.close()
worker1.close()
worker2.close()C++ Testing with GoogleTest
ZeroMQ C++ testing follows the same patterns:
// tests/test_zmq_req_rep.cpp
#include <gtest/gtest.h>
#include <zmq.hpp>
#include <thread>
#include <chrono>
TEST(ZeroMQTest, ReqRepEchoServer) {
zmq::context_t ctx(1);
// Server thread
std::thread server_thread([&ctx]() {
zmq::socket_t rep(ctx, zmq::socket_type::rep);
rep.set(zmq::sockopt::linger, 0);
rep.bind("inproc://cpp-req-rep");
zmq::message_t request;
auto result = rep.recv(request, zmq::recv_flags::none);
ASSERT_TRUE(result.has_value());
std::string req_str(static_cast<char*>(request.data()), request.size());
rep.send(zmq::buffer("pong:" + req_str), zmq::send_flags::none);
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
zmq::socket_t req(ctx, zmq::socket_type::req);
req.set(zmq::sockopt::linger, 0);
req.connect("inproc://cpp-req-rep");
req.send(zmq::buffer(std::string("ping")), zmq::send_flags::none);
zmq::message_t reply;
req.recv(reply, zmq::recv_flags::none);
std::string reply_str(static_cast<char*>(reply.data()), reply.size());
EXPECT_EQ(reply_str, "pong:ping");
server_thread.join();
}Common Test Pitfalls
| Problem | Cause | Fix |
|---|---|---|
Test hangs at recv() |
Sender crashed before sending | Use poll() with timeout before recv() |
| PUB/SUB messages never arrive | Slow joiner — subscriber not ready | Add time.sleep(0.05) after subscribe |
| REQ socket deadlocks | Sent twice without receiving | Use DEALER for non-strict request flow |
| Port conflicts in CI | Tests use fixed TCP port | Use inproc:// or port 0 binding |
| Tests hang at teardown | LINGER not set | Set socket.setsockopt(zmq.LINGER, 0) on every socket |
Summary
ZeroMQ testing requires no broker but demands careful attention to socket patterns and timing:
- Use
inproc://transport for fast in-process unit tests - Poll with timeout instead of blocking
recv()to avoid hanging tests - Handle the slow joiner in PUB/SUB with a short sleep or XPUB/XSUB
- Set LINGER=0 on every socket in test fixtures
The slow joiner and LINGER problems cause the most flaky tests — address them in your base fixtures and your ZeroMQ tests will be reliable.