Redis Streams Testing: Consumer Groups, Pending Entries, and Testcontainers
Redis Streams differ from simple pub/sub — they persist messages and track consumer group acknowledgments via the Pending Entries List (PEL). Testing consumer groups means verifying XACK behavior, idle consumer claim logic, and re-delivery after consumer failure. Testcontainers provides a real Redis instance; Python's redis-py gives full Streams API access for assertions.
Key Takeaways
Test the Pending Entries List explicitly. After consuming but before ACKing, messages sit in the PEL. Verify XPENDING shows your message, then verify it's gone after XACK.
Test consumer failure and claim recovery. When a consumer crashes mid-processing, another consumer must claim the idle message via XCLAIM. Test this explicitly — it's the most common Redis Streams production bug.
Test MAXLEN trimming behavior. Streams with MAXLEN trim old messages. Test that your consumer handles gaps (messages trimmed before consumption) without crashing.
Use XREADGROUP COUNT 1 BLOCK 0 in tests, not infinite polling. Block with a timeout and assert on the result rather than sleeping.
Test group creation idempotency. XGROUP CREATE ... MKSTREAM is idempotent. Verify your consumer startup handles BUSYGROUP errors when the group already exists.
Redis Streams vs Pub/Sub
Redis pub/sub is fire-and-forget: subscribers that aren't connected when a message is published miss it. Redis Streams persist messages and allow consumer groups to track delivery state — each message has an ID, and the broker tracks which consumers have acknowledged which messages.
This persistence introduces state that needs explicit test coverage:
- Which messages are pending (delivered but not yet acked)
- Which consumers are currently active
- What happens when a consumer crashes mid-processing
Test Setup with Testcontainers
# requirements-test.txt
pytest==8.2.0
redis==5.0.4
testcontainers==4.5.0# conftest.py
import pytest
from testcontainers.redis import RedisContainer
import redis
@pytest.fixture(scope="session")
def redis_container():
with RedisContainer("redis:7.2-alpine") as container:
yield container
@pytest.fixture
def r(redis_container):
"""Fresh Redis client, cleaned between tests."""
client = redis.Redis(
host=redis_container.get_container_host_ip(),
port=redis_container.get_exposed_port(6379),
decode_responses=True
)
yield client
client.flushdb()
client.close()Testing Basic Consumer Group Operations
def test_consumer_group_creation(r):
stream_key = "orders"
# Create stream with initial message (MKSTREAM creates it if it doesn't exist)
r.xgroup_create(stream_key, "order-processors", id="0", mkstream=True)
# Verify the group exists
groups = r.xinfo_groups(stream_key)
assert len(groups) == 1
assert groups[0]["name"] == "order-processors"
assert groups[0]["consumers"] == 0
assert groups[0]["pending"] == 0
def test_group_creation_idempotent(r):
stream_key = "events"
r.xgroup_create(stream_key, "processors", id="$", mkstream=True)
# Second create should raise BUSYGROUP — handle it gracefully
try:
r.xgroup_create(stream_key, "processors", id="$", mkstream=True)
# If Redis returns OK on duplicate, that's also fine (some versions)
except redis.exceptions.ResponseError as e:
assert "BUSYGROUP" in str(e)
def test_consumer_reads_from_group(r):
stream_key = "tasks"
group_name = "workers"
consumer_name = "worker-1"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
# Produce messages
msg_id_1 = r.xadd(stream_key, {"task": "process-invoice", "id": "INV-001"})
msg_id_2 = r.xadd(stream_key, {"task": "send-email", "id": "EMAIL-001"})
# Consumer reads (delivers to consumer, enters PEL)
messages = r.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={stream_key: ">"}, # ">" = new messages not yet delivered
count=10,
block=1000 # block up to 1 second
)
assert messages is not None
assert len(messages) == 1 # one stream
stream_name, entries = messages[0]
assert len(entries) == 2
msg1_id, msg1_data = entries[0]
assert msg1_data["task"] == "process-invoice"
assert msg1_id == msg_id_1Testing the Pending Entries List
The PEL tracks messages delivered to consumers but not yet acknowledged. A growing PEL indicates a consumer that's consuming but not acking — a common bug.
def test_messages_enter_pel_after_delivery(r):
stream_key = "jobs"
group_name = "job-workers"
consumer_name = "worker-1"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
msg_id = r.xadd(stream_key, {"job": "build-report", "priority": "high"})
# Read the message — it enters PEL
r.xreadgroup(group_name, consumer_name, {stream_key: ">"}, count=1)
# Message should now be in PEL
pending = r.xpending(stream_key, group_name)
assert pending["pending"] == 1
assert pending["min"] == msg_id
assert pending["max"] == msg_id
# Detailed pending entries
pending_entries = r.xpending_range(stream_key, group_name, "-", "+", 10)
assert len(pending_entries) == 1
assert pending_entries[0]["message_id"] == msg_id
assert pending_entries[0]["consumer"] == consumer_name
assert pending_entries[0]["times_delivered"] == 1
def test_xack_removes_from_pel(r):
stream_key = "tasks"
group_name = "task-workers"
consumer = "worker-1"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
msg_id = r.xadd(stream_key, {"action": "notify-user"})
# Deliver to consumer
r.xreadgroup(group_name, consumer, {stream_key: ">"}, count=1)
# PEL has 1 entry
assert r.xpending(stream_key, group_name)["pending"] == 1
# Process and acknowledge
acked = r.xack(stream_key, group_name, msg_id)
assert acked == 1
# PEL should now be empty
assert r.xpending(stream_key, group_name)["pending"] == 0
def test_unacked_message_redeliverable(r):
"""Consumer reads but doesn't ack — message stays in PEL and can be re-read."""
stream_key = "events"
group_name = "handlers"
consumer = "handler-1"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
msg_id = r.xadd(stream_key, {"event": "user-signup"})
# Read without acking
r.xreadgroup(group_name, consumer, {stream_key: ">"}, count=1)
# Read pending messages (id "0" reads from PEL, not new messages)
pending_read = r.xreadgroup(group_name, consumer, {stream_key: "0"}, count=10)
entries = pending_read[0][1]
assert len(entries) == 1
assert entries[0][0] == msg_id
assert entries[0][1]["event"] == "user-signup"Testing Consumer Failure and Claim Recovery
When a consumer crashes, its pending messages become orphaned. Another consumer must claim them using XCLAIM or XAUTOCLAIM.
def test_xclaim_recovers_orphaned_messages(r):
"""Simulate consumer crash and recovery via XCLAIM."""
stream_key = "orders"
group_name = "order-processors"
crashed_consumer = "processor-1"
recovery_consumer = "processor-2"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
msg_id = r.xadd(stream_key, {"order": "ORDER-001", "amount": "99.99"})
# processor-1 reads the message but crashes before acking
r.xreadgroup(group_name, crashed_consumer, {stream_key: ">"}, count=1)
# Verify it's in processor-1's PEL
pending = r.xpending_range(stream_key, group_name, "-", "+", 10)
assert pending[0]["consumer"] == crashed_consumer
# processor-2 claims messages idle for more than 0ms (for test, use min_idle_time=0)
claimed = r.xclaim(
stream_key,
group_name,
recovery_consumer,
min_idle_time=0, # in production, use a realistic idle threshold like 30000 (30s)
message_ids=[msg_id]
)
assert len(claimed) == 1
claimed_id, claimed_data = claimed[0]
assert claimed_id == msg_id
assert claimed_data["order"] == "ORDER-001"
# Now the message belongs to processor-2
pending = r.xpending_range(stream_key, group_name, "-", "+", 10)
assert pending[0]["consumer"] == recovery_consumer
assert pending[0]["times_delivered"] == 2 # delivered twice now
# processor-2 processes and acks
r.xack(stream_key, group_name, msg_id)
assert r.xpending(stream_key, group_name)["pending"] == 0
def test_xautoclaim_batch_recovery(r):
"""XAUTOCLAIM (Redis 6.2+) atomically scans and claims idle messages."""
stream_key = "jobs"
group_name = "job-runners"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
ids = [r.xadd(stream_key, {"job": f"job-{i}"}) for i in range(5)]
# Consumer 1 reads all 5 but crashes
r.xreadgroup(group_name, "runner-1", {stream_key: ">"}, count=5)
# Recovery consumer claims idle messages (min_idle_time=0 for test immediacy)
next_id, claimed, deleted = r.xautoclaim(
stream_key, group_name, "runner-2",
min_idle_time=0, start_id="0-0", count=10
)
assert len(claimed) == 5
claimed_ids = [entry[0] for entry in claimed]
for original_id in ids:
assert original_id in claimed_ids
def test_delivery_count_tracked(r):
"""Messages track how many times they've been delivered."""
stream_key = "notifications"
group_name = "senders"
r.xgroup_create(stream_key, group_name, id="0", mkstream=True)
msg_id = r.xadd(stream_key, {"type": "push"})
# First delivery
r.xreadgroup(group_name, "sender-1", {stream_key: ">"}, count=1)
pending = r.xpending_range(stream_key, group_name, "-", "+", 1)
assert pending[0]["times_delivered"] == 1
# Claim (second delivery)
r.xclaim(stream_key, group_name, "sender-2", min_idle_time=0,
message_ids=[msg_id])
pending = r.xpending_range(stream_key, group_name, "-", "+", 1)
assert pending[0]["times_delivered"] == 2Testing MAXLEN Trimming
Streams with MAXLEN automatically trim old messages. Test that your consumer handles gaps gracefully:
def test_consumer_handles_trimmed_messages(r):
"""Consumer must not crash when starting from an ID that has been trimmed."""
stream_key = "events"
# Add 100 messages with MAXLEN=10 — first 90 get trimmed
for i in range(100):
r.xadd(stream_key, {"seq": str(i)}, maxlen=10, approximate=False)
# Try to create group starting from ID 0 (before any remaining messages)
r.xgroup_create(stream_key, "handlers", id="0", mkstream=False)
# Reading from ">" delivers the 10 remaining messages
messages = r.xreadgroup("handlers", "worker-1", {stream_key: ">"}, count=100)
entries = messages[0][1]
# We should get exactly 10 messages (the ones not trimmed)
assert len(entries) == 10
# The last message should have seq=99
last_seq = int(entries[-1][1]["seq"])
assert last_seq == 99
def test_xlen_after_trim(r):
stream_key = "logs"
for i in range(50):
r.xadd(stream_key, {"log": f"entry-{i}"}, maxlen=20, approximate=False)
assert r.xlen(stream_key) == 20Testing Pub/Sub Alongside Streams
Sometimes you need both pub/sub (for ephemeral fan-out) and streams (for durable processing). Test them independently:
def test_pubsub_subscriber_receives_message(r):
"""Pub/sub for real-time notifications, streams for durable processing."""
received = []
def message_handler(msg):
if msg["type"] == "message":
received.append(msg["data"])
# Create a subscriber in a separate thread
pubsub = r.pubsub()
pubsub.subscribe(**{"notifications": message_handler})
# Start listening in background thread
thread = pubsub.run_in_thread(sleep_time=0.01)
try:
# Give subscriber time to register
import time
time.sleep(0.1)
# Publish
r.publish("notifications", "user-logged-in:USER-001")
r.publish("notifications", "order-placed:ORDER-001")
# Wait for delivery
import time
timeout = time.time() + 5
while len(received) < 2 and time.time() < timeout:
time.sleep(0.05)
assert len(received) == 2
assert "user-logged-in:USER-001" in received
finally:
thread.stop()
pubsub.unsubscribe()
def test_pubsub_misses_messages_when_not_subscribed(r):
"""Pub/sub doesn't persist — messages sent before subscribe are lost."""
# Publish BEFORE subscribing
r.publish("updates", "missed-message")
received = []
pubsub = r.pubsub()
pubsub.subscribe(**{"updates": lambda msg: received.append(msg["data"])
if msg["type"] == "message" else None})
thread = pubsub.run_in_thread(sleep_time=0.01)
try:
import time
time.sleep(0.2)
# The message published before subscribe should not be received
assert len(received) == 0
finally:
thread.stop()Async Consumer Testing with asyncio-redis
For async Python applications:
import asyncio
import pytest
import redis.asyncio as aioredis
@pytest.fixture
async def async_redis(redis_container):
client = aioredis.Redis(
host=redis_container.get_container_host_ip(),
port=redis_container.get_exposed_port(6379),
decode_responses=True
)
yield client
await client.aclose()
@pytest.mark.asyncio
async def test_async_consumer_group(async_redis):
r = async_redis
stream_key = "async-events"
await r.xgroup_create(stream_key, "async-workers", id="0", mkstream=True)
msg_id = await r.xadd(stream_key, {"action": "process-payment"})
messages = await r.xreadgroup(
"async-workers", "worker-1",
{stream_key: ">"},
count=1, block=1000
)
assert messages is not None
entries = messages[0][1]
assert entries[0][0] == msg_id
await r.xack(stream_key, "async-workers", msg_id)
pending = await r.xpending(stream_key, "async-workers")
assert pending["pending"] == 0Common Bugs Found in These Tests
Not handling BUSYGROUP on startup. Consumer group initialization code that calls XGROUP CREATE without catching BUSYGROUP will crash on restart. Test it.
Reading ">" when checking pending messages. ">" reads new messages. To re-read pending (unacked) messages after a crash, read from "0". Test the distinction.
Never claiming idle messages. If your consumer reads messages and never claims orphaned ones from crashed peers, the PEL grows unbounded. Test the claim path.
Acking the wrong ID. If your consumer acks the message ID before processing completes (async bug), a crash between ack and processing silently drops the message. Test ordering.