Testing Event-Driven Architectures: Kafka, RabbitMQ, and SQS
Event-driven architectures are notoriously difficult to test. The fundamental challenge: your service publishes an event and returns immediately. The effect of that event — a record updated in another service, a notification sent, an inventory count decremented — happens asynchronously, possibly seconds later, possibly on a different server. Traditional request/response testing assumes synchronous causality. Event-driven systems break that assumption.
This guide covers practical testing strategies for the three most common message brokers: Kafka, RabbitMQ, and SQS. The same patterns apply broadly: spin up real broker instances with Testcontainers, use polling-based eventually() helpers instead of fixed sleep() calls, and test failure paths (dead letter queues, poison messages, timeout behavior) as rigorously as happy paths.
The Core Challenge: Asynchrony and Eventual Consistency
A test for an event-driven system typically looks like this:
- Arrange: set up consumers, producers, and any required state
- Act: publish an event (or trigger an action that publishes one)
- Assert: wait for the downstream effect to appear
Step 3 is the hard part. The effect might appear in 10ms or 2 seconds depending on load, consumer lag, and retry behavior. Fixed setTimeout calls make tests flaky — too short and they fail in CI, too long and the test suite takes forever.
The solution is a polling eventually() helper:
// test-helpers/eventually.js
async function eventually(assertion, options = {}) {
const { timeout = 5000, interval = 100, message = 'Condition not met' } = options;
const deadline = Date.now() + timeout;
while (Date.now() < deadline) {
try {
await assertion();
return; // passed
} catch {
// not yet, keep polling
}
await new Promise((r) => setTimeout(r, interval));
}
// Final attempt — let the error propagate
await assertion();
}
module.exports = { eventually };Use it everywhere an async effect needs to be observed:
await eventually(async () => {
const record = await db.findById('order-123');
expect(record.status).toBe('processed');
}, { timeout: 10000, message: 'Order was not processed in time' });Testcontainers for Kafka Integration Tests
Testcontainers spins up real Docker containers in your test suite and tears them down afterward. No external broker needed.
npm install --save-dev testcontainers kafkajs// kafka/integration.test.js
const { KafkaContainer } = require('@testcontainers/kafka');
const { Kafka } = require('kafkajs');
const { eventually } = require('../test-helpers/eventually');
let kafkaContainer, kafka, producer, consumer;
beforeAll(async () => {
kafkaContainer = await new KafkaContainer('confluentinc/cp-kafka:7.5.0')
.withExposedPorts(9093)
.start();
const brokers = [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`];
kafka = new Kafka({ clientId: 'test', brokers });
producer = kafka.producer();
consumer = kafka.consumer({ groupId: 'test-group' });
await producer.connect();
await consumer.connect();
}, 60000); // container startup can take 30-40s
afterAll(async () => {
await producer.disconnect();
await consumer.disconnect();
await kafkaContainer.stop();
});
test('order-placed event triggers inventory deduction', async () => {
const received = [];
await consumer.subscribe({ topic: 'inventory-updates', fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
received.push(JSON.parse(message.value.toString()));
},
});
// Publish the triggering event
await producer.send({
topic: 'order-placed',
messages: [{
key: 'order-123',
value: JSON.stringify({ orderId: 'order-123', sku: 'WIDGET-1', qty: 3 }),
}],
});
await eventually(
() => {
const update = received.find((r) => r.sku === 'WIDGET-1');
if (!update) throw new Error('No inventory update received');
expect(update.delta).toBe(-3);
},
{ timeout: 15000 }
);
});
test('consumer handles malformed messages without crashing', async () => {
const errors = [];
const valid = [];
await consumer.subscribe({ topic: 'order-placed', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
try {
const data = JSON.parse(message.value.toString());
valid.push(data);
} catch {
errors.push(message.value.toString());
}
},
});
// Send a malformed message
await producer.send({
topic: 'order-placed',
messages: [{ value: 'not valid json' }],
});
// Send a valid message immediately after
await producer.send({
topic: 'order-placed',
messages: [{ value: JSON.stringify({ orderId: 'order-456', sku: 'GADGET-2', qty: 1 }) }],
});
await eventually(() => {
expect(errors).toHaveLength(1);
expect(valid).toHaveLength(1); // processing continued after the bad message
}, { timeout: 10000 });
});Testing Dead Letter Queues
Dead letter queues (DLQ) are where messages go after failing processing N times. Testing DLQ behavior is critical — if your DLQ routing is broken, you lose messages silently.
// kafka/dlq.test.js
test('message ends up in DLQ after max retry exhaustion', async () => {
const dlqMessages = [];
let processAttempts = 0;
// Subscribe to the DLQ topic
await dlqConsumer.subscribe({ topic: 'order-placed.DLT', fromBeginning: true });
await dlqConsumer.run({
eachMessage: async ({ message }) => {
dlqMessages.push(JSON.parse(message.value.toString()));
},
});
// Subscribe to main topic — always throw to force retries
await consumer.subscribe({ topic: 'order-placed', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
processAttempts++;
throw new Error('Simulated processing failure');
},
});
await producer.send({
topic: 'order-placed',
messages: [{
key: 'poison-order',
value: JSON.stringify({ orderId: 'poison-order', sku: 'BAD-SKU' }),
}],
});
// Wait for message to exhaust retries and land in DLQ
await eventually(() => {
expect(dlqMessages).toHaveLength(1);
expect(dlqMessages[0].orderId).toBe('poison-order');
}, { timeout: 30000 });
// Verify it was retried before being DLQ'd
expect(processAttempts).toBeGreaterThan(1);
});Testcontainers for RabbitMQ
npm install --save-dev @testcontainers/rabbitmq amqplib// rabbitmq/integration.test.js
const { RabbitMQContainer } = require('@testcontainers/rabbitmq');
const amqplib = require('amqplib');
const { eventually } = require('../test-helpers/eventually');
let container, connection, channel;
beforeAll(async () => {
container = await new RabbitMQContainer('rabbitmq:3.12-management').start();
connection = await amqplib.connect(container.getAmqpUrl());
channel = await connection.createChannel();
}, 60000);
afterAll(async () => {
await channel.close();
await connection.close();
await container.stop();
});
test('dead letter exchange receives rejected messages', async () => {
const dlxName = 'test.dlx';
const dlqName = 'test.dlq';
const mainQueue = 'test.orders';
// Set up DLX infrastructure
await channel.assertExchange(dlxName, 'direct', { durable: false });
await channel.assertQueue(dlqName, { durable: false });
await channel.bindQueue(dlqName, dlxName, 'dead');
// Main queue routes rejected messages to DLX
await channel.assertQueue(mainQueue, {
durable: false,
arguments: {
'x-dead-letter-exchange': dlxName,
'x-dead-letter-routing-key': 'dead',
},
});
const dlqReceived = [];
// Consume from DLQ
await channel.consume(dlqName, (msg) => {
dlqReceived.push(JSON.parse(msg.content.toString()));
channel.ack(msg);
});
// Consumer that rejects the message (nack with requeue: false)
await channel.consume(mainQueue, (msg) => {
channel.nack(msg, false, false); // reject, don't requeue → goes to DLX
});
await channel.sendToQueue(mainQueue, Buffer.from(JSON.stringify({ id: 'bad-msg' })));
await eventually(() => {
expect(dlqReceived).toHaveLength(1);
expect(dlqReceived[0].id).toBe('bad-msg');
}, { timeout: 5000 });
});
test('fanout exchange delivers to all bound queues', async () => {
const exchange = 'test.notifications';
await channel.assertExchange(exchange, 'fanout', { durable: false });
const queues = ['queue-email', 'queue-sms', 'queue-push'];
const received = { 'queue-email': [], 'queue-sms': [], 'queue-push': [] };
for (const q of queues) {
await channel.assertQueue(q, { durable: false });
await channel.bindQueue(q, exchange, '');
await channel.consume(q, (msg) => {
received[q].push(JSON.parse(msg.content.toString()));
channel.ack(msg);
});
}
channel.publish(exchange, '', Buffer.from(JSON.stringify({ type: 'order-shipped', orderId: '789' })));
await eventually(() => {
for (const q of queues) {
expect(received[q]).toHaveLength(1);
expect(received[q][0].orderId).toBe('789');
}
}, { timeout: 3000 });
});LocalStack for SQS and SNS Testing
LocalStack runs AWS services locally. Use it for SQS/SNS integration tests:
npm install --save-dev @localstack/testcontainers @aws-sdk/client-sqs @aws-sdk/client-sns// sqs/integration.test.js
const { LocalstackContainer } = require('@testcontainers/localstack');
const { SQSClient, CreateQueueCommand, SendMessageCommand, ReceiveMessageCommand,
DeleteMessageCommand, GetQueueAttributesCommand } = require('@aws-sdk/client-sqs');
const { SNSClient, CreateTopicCommand, SubscribeCommand, PublishCommand } = require('@aws-sdk/client-sns');
const { eventually } = require('../test-helpers/eventually');
let container, sqsClient, snsClient;
beforeAll(async () => {
container = await new LocalstackContainer('localstack/localstack:3.2').start();
const endpoint = container.getConnectionUri();
const config = { endpoint, region: 'us-east-1', credentials: { accessKeyId: 'test', secretAccessKey: 'test' } };
sqsClient = new SQSClient(config);
snsClient = new SNSClient(config);
}, 90000);
afterAll(() => container.stop());
test('SQS message survives visibility timeout and is redelivered', async () => {
const { QueueUrl } = await sqsClient.send(new CreateQueueCommand({
QueueName: 'test-redelivery',
Attributes: { VisibilityTimeout: '1' }, // 1 second for fast tests
}));
await sqsClient.send(new SendMessageCommand({
QueueUrl,
MessageBody: JSON.stringify({ task: 'process-report', id: 'rpt-001' }),
}));
// Receive but don't delete (simulating a crash)
const first = await sqsClient.send(new ReceiveMessageCommand({
QueueUrl,
MaxNumberOfMessages: 1,
}));
expect(first.Messages).toHaveLength(1);
const receiptHandle = first.Messages[0].ReceiptHandle;
// Message becomes invisible during processing
const invisible = await sqsClient.send(new ReceiveMessageCommand({ QueueUrl }));
expect(invisible.Messages ?? []).toHaveLength(0);
// Wait for visibility timeout to expire (1s + buffer)
await new Promise((r) => setTimeout(r, 1500));
// Message reappears
const redelivered = await sqsClient.send(new ReceiveMessageCommand({ QueueUrl }));
expect(redelivered.Messages).toHaveLength(1);
expect(JSON.parse(redelivered.Messages[0].Body).id).toBe('rpt-001');
});
test('SNS fan-out delivers to multiple SQS queues', async () => {
const { TopicArn } = await snsClient.send(new CreateTopicCommand({ Name: 'order-events' }));
const queues = [];
for (const name of ['svc-inventory', 'svc-analytics', 'svc-email']) {
const { QueueUrl } = await sqsClient.send(new CreateQueueCommand({ QueueName: name }));
const attrs = await sqsClient.send(new GetQueueAttributesCommand({
QueueUrl,
AttributeNames: ['QueueArn'],
}));
queues.push({ QueueUrl, arn: attrs.Attributes.QueueArn });
await snsClient.send(new SubscribeCommand({
TopicArn,
Protocol: 'sqs',
Endpoint: attrs.Attributes.QueueArn,
}));
}
await snsClient.send(new PublishCommand({
TopicArn,
Message: JSON.stringify({ type: 'order-placed', orderId: 'ord-999' }),
}));
await eventually(async () => {
for (const q of queues) {
const result = await sqsClient.send(new ReceiveMessageCommand({
QueueUrl: q.QueueUrl,
MaxNumberOfMessages: 1,
}));
expect(result.Messages).toHaveLength(1);
}
}, { timeout: 10000 });
});Testing Idempotency
Event-driven systems must handle duplicate message delivery. Test that your consumers are idempotent:
test('processing the same event twice produces the same result', async () => {
const event = { eventId: 'evt-dedup-123', orderId: 'order-456', amount: 99.99 };
// Process once
await orderProcessor.handle(event);
const afterFirst = await db.getOrder('order-456');
// Process again (duplicate)
await orderProcessor.handle(event);
const afterSecond = await db.getOrder('order-456');
// State should be identical — no double-charging, no duplicate records
expect(afterSecond).toEqual(afterFirst);
expect(afterSecond.chargeCount).toBe(1); // not 2
});CI Configuration
Event-driven integration tests are slow. Keep them in a separate suite:
{
"jest": {
"projects": [
{
"displayName": "unit",
"testMatch": ["**/*.test.js"],
"testPathIgnorePatterns": ["/integration/"]
},
{
"displayName": "integration",
"testMatch": ["**/integration/**/*.test.js"],
"testTimeout": 120000,
"maxWorkers": 1
}
]
}
}Run integration tests with:
# .github/workflows/integration.yml
- name: Run integration tests
run: npx jest --selectProjects integration
env:
DOCKER_BUILDKIT: "1"Testcontainers needs Docker available in the CI environment. GitHub Actions workers have Docker by default; GitLab CI needs services: docker:dind.
Event-driven testing is slower and more complex than REST testing, but the bugs it catches — lost messages, DLQ routing failures, idempotency violations, visibility timeout behavior — are the ones that cause production incidents. The investment in a proper eventually() helper and Testcontainers setup pays off the first time it catches a message processing bug before deployment.