Testing Event-Driven Architectures: Kafka, RabbitMQ, and SQS

Testing Event-Driven Architectures: Kafka, RabbitMQ, and SQS

Event-driven architectures are notoriously difficult to test. The fundamental challenge: your service publishes an event and returns immediately. The effect of that event — a record updated in another service, a notification sent, an inventory count decremented — happens asynchronously, possibly seconds later, possibly on a different server. Traditional request/response testing assumes synchronous causality. Event-driven systems break that assumption.

This guide covers practical testing strategies for the three most common message brokers: Kafka, RabbitMQ, and SQS. The same patterns apply broadly: spin up real broker instances with Testcontainers, use polling-based eventually() helpers instead of fixed sleep() calls, and test failure paths (dead letter queues, poison messages, timeout behavior) as rigorously as happy paths.

The Core Challenge: Asynchrony and Eventual Consistency

A test for an event-driven system typically looks like this:

  1. Arrange: set up consumers, producers, and any required state
  2. Act: publish an event (or trigger an action that publishes one)
  3. Assert: wait for the downstream effect to appear

Step 3 is the hard part. The effect might appear in 10ms or 2 seconds depending on load, consumer lag, and retry behavior. Fixed setTimeout calls make tests flaky — too short and they fail in CI, too long and the test suite takes forever.

The solution is a polling eventually() helper:

// test-helpers/eventually.js
async function eventually(assertion, options = {}) {
  const { timeout = 5000, interval = 100, message = 'Condition not met' } = options;
  const deadline = Date.now() + timeout;

  while (Date.now() < deadline) {
    try {
      await assertion();
      return; // passed
    } catch {
      // not yet, keep polling
    }
    await new Promise((r) => setTimeout(r, interval));
  }

  // Final attempt — let the error propagate
  await assertion();
}

module.exports = { eventually };

Use it everywhere an async effect needs to be observed:

await eventually(async () => {
  const record = await db.findById('order-123');
  expect(record.status).toBe('processed');
}, { timeout: 10000, message: 'Order was not processed in time' });

Testcontainers for Kafka Integration Tests

Testcontainers spins up real Docker containers in your test suite and tears them down afterward. No external broker needed.

npm install --save-dev testcontainers kafkajs
// kafka/integration.test.js
const { KafkaContainer } = require('@testcontainers/kafka');
const { Kafka } = require('kafkajs');
const { eventually } = require('../test-helpers/eventually');

let kafkaContainer, kafka, producer, consumer;

beforeAll(async () => {
  kafkaContainer = await new KafkaContainer('confluentinc/cp-kafka:7.5.0')
    .withExposedPorts(9093)
    .start();

  const brokers = [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`];

  kafka = new Kafka({ clientId: 'test', brokers });
  producer = kafka.producer();
  consumer = kafka.consumer({ groupId: 'test-group' });

  await producer.connect();
  await consumer.connect();
}, 60000); // container startup can take 30-40s

afterAll(async () => {
  await producer.disconnect();
  await consumer.disconnect();
  await kafkaContainer.stop();
});

test('order-placed event triggers inventory deduction', async () => {
  const received = [];

  await consumer.subscribe({ topic: 'inventory-updates', fromBeginning: true });
  await consumer.run({
    eachMessage: async ({ message }) => {
      received.push(JSON.parse(message.value.toString()));
    },
  });

  // Publish the triggering event
  await producer.send({
    topic: 'order-placed',
    messages: [{
      key: 'order-123',
      value: JSON.stringify({ orderId: 'order-123', sku: 'WIDGET-1', qty: 3 }),
    }],
  });

  await eventually(
    () => {
      const update = received.find((r) => r.sku === 'WIDGET-1');
      if (!update) throw new Error('No inventory update received');
      expect(update.delta).toBe(-3);
    },
    { timeout: 15000 }
  );
});

test('consumer handles malformed messages without crashing', async () => {
  const errors = [];
  const valid = [];

  await consumer.subscribe({ topic: 'order-placed', fromBeginning: false });
  await consumer.run({
    eachMessage: async ({ message }) => {
      try {
        const data = JSON.parse(message.value.toString());
        valid.push(data);
      } catch {
        errors.push(message.value.toString());
      }
    },
  });

  // Send a malformed message
  await producer.send({
    topic: 'order-placed',
    messages: [{ value: 'not valid json' }],
  });

  // Send a valid message immediately after
  await producer.send({
    topic: 'order-placed',
    messages: [{ value: JSON.stringify({ orderId: 'order-456', sku: 'GADGET-2', qty: 1 }) }],
  });

  await eventually(() => {
    expect(errors).toHaveLength(1);
    expect(valid).toHaveLength(1); // processing continued after the bad message
  }, { timeout: 10000 });
});

Testing Dead Letter Queues

Dead letter queues (DLQ) are where messages go after failing processing N times. Testing DLQ behavior is critical — if your DLQ routing is broken, you lose messages silently.

// kafka/dlq.test.js
test('message ends up in DLQ after max retry exhaustion', async () => {
  const dlqMessages = [];
  let processAttempts = 0;

  // Subscribe to the DLQ topic
  await dlqConsumer.subscribe({ topic: 'order-placed.DLT', fromBeginning: true });
  await dlqConsumer.run({
    eachMessage: async ({ message }) => {
      dlqMessages.push(JSON.parse(message.value.toString()));
    },
  });

  // Subscribe to main topic — always throw to force retries
  await consumer.subscribe({ topic: 'order-placed', fromBeginning: false });
  await consumer.run({
    eachMessage: async ({ message }) => {
      processAttempts++;
      throw new Error('Simulated processing failure');
    },
  });

  await producer.send({
    topic: 'order-placed',
    messages: [{
      key: 'poison-order',
      value: JSON.stringify({ orderId: 'poison-order', sku: 'BAD-SKU' }),
    }],
  });

  // Wait for message to exhaust retries and land in DLQ
  await eventually(() => {
    expect(dlqMessages).toHaveLength(1);
    expect(dlqMessages[0].orderId).toBe('poison-order');
  }, { timeout: 30000 });

  // Verify it was retried before being DLQ'd
  expect(processAttempts).toBeGreaterThan(1);
});

Testcontainers for RabbitMQ

npm install --save-dev @testcontainers/rabbitmq amqplib
// rabbitmq/integration.test.js
const { RabbitMQContainer } = require('@testcontainers/rabbitmq');
const amqplib = require('amqplib');
const { eventually } = require('../test-helpers/eventually');

let container, connection, channel;

beforeAll(async () => {
  container = await new RabbitMQContainer('rabbitmq:3.12-management').start();
  connection = await amqplib.connect(container.getAmqpUrl());
  channel = await connection.createChannel();
}, 60000);

afterAll(async () => {
  await channel.close();
  await connection.close();
  await container.stop();
});

test('dead letter exchange receives rejected messages', async () => {
  const dlxName = 'test.dlx';
  const dlqName = 'test.dlq';
  const mainQueue = 'test.orders';

  // Set up DLX infrastructure
  await channel.assertExchange(dlxName, 'direct', { durable: false });
  await channel.assertQueue(dlqName, { durable: false });
  await channel.bindQueue(dlqName, dlxName, 'dead');

  // Main queue routes rejected messages to DLX
  await channel.assertQueue(mainQueue, {
    durable: false,
    arguments: {
      'x-dead-letter-exchange': dlxName,
      'x-dead-letter-routing-key': 'dead',
    },
  });

  const dlqReceived = [];

  // Consume from DLQ
  await channel.consume(dlqName, (msg) => {
    dlqReceived.push(JSON.parse(msg.content.toString()));
    channel.ack(msg);
  });

  // Consumer that rejects the message (nack with requeue: false)
  await channel.consume(mainQueue, (msg) => {
    channel.nack(msg, false, false); // reject, don't requeue → goes to DLX
  });

  await channel.sendToQueue(mainQueue, Buffer.from(JSON.stringify({ id: 'bad-msg' })));

  await eventually(() => {
    expect(dlqReceived).toHaveLength(1);
    expect(dlqReceived[0].id).toBe('bad-msg');
  }, { timeout: 5000 });
});

test('fanout exchange delivers to all bound queues', async () => {
  const exchange = 'test.notifications';
  await channel.assertExchange(exchange, 'fanout', { durable: false });

  const queues = ['queue-email', 'queue-sms', 'queue-push'];
  const received = { 'queue-email': [], 'queue-sms': [], 'queue-push': [] };

  for (const q of queues) {
    await channel.assertQueue(q, { durable: false });
    await channel.bindQueue(q, exchange, '');
    await channel.consume(q, (msg) => {
      received[q].push(JSON.parse(msg.content.toString()));
      channel.ack(msg);
    });
  }

  channel.publish(exchange, '', Buffer.from(JSON.stringify({ type: 'order-shipped', orderId: '789' })));

  await eventually(() => {
    for (const q of queues) {
      expect(received[q]).toHaveLength(1);
      expect(received[q][0].orderId).toBe('789');
    }
  }, { timeout: 3000 });
});

LocalStack for SQS and SNS Testing

LocalStack runs AWS services locally. Use it for SQS/SNS integration tests:

npm install --save-dev @localstack/testcontainers @aws-sdk/client-sqs @aws-sdk/client-sns
// sqs/integration.test.js
const { LocalstackContainer } = require('@testcontainers/localstack');
const { SQSClient, CreateQueueCommand, SendMessageCommand, ReceiveMessageCommand,
        DeleteMessageCommand, GetQueueAttributesCommand } = require('@aws-sdk/client-sqs');
const { SNSClient, CreateTopicCommand, SubscribeCommand, PublishCommand } = require('@aws-sdk/client-sns');
const { eventually } = require('../test-helpers/eventually');

let container, sqsClient, snsClient;

beforeAll(async () => {
  container = await new LocalstackContainer('localstack/localstack:3.2').start();

  const endpoint = container.getConnectionUri();
  const config = { endpoint, region: 'us-east-1', credentials: { accessKeyId: 'test', secretAccessKey: 'test' } };

  sqsClient = new SQSClient(config);
  snsClient = new SNSClient(config);
}, 90000);

afterAll(() => container.stop());

test('SQS message survives visibility timeout and is redelivered', async () => {
  const { QueueUrl } = await sqsClient.send(new CreateQueueCommand({
    QueueName: 'test-redelivery',
    Attributes: { VisibilityTimeout: '1' }, // 1 second for fast tests
  }));

  await sqsClient.send(new SendMessageCommand({
    QueueUrl,
    MessageBody: JSON.stringify({ task: 'process-report', id: 'rpt-001' }),
  }));

  // Receive but don't delete (simulating a crash)
  const first = await sqsClient.send(new ReceiveMessageCommand({
    QueueUrl,
    MaxNumberOfMessages: 1,
  }));

  expect(first.Messages).toHaveLength(1);
  const receiptHandle = first.Messages[0].ReceiptHandle;

  // Message becomes invisible during processing
  const invisible = await sqsClient.send(new ReceiveMessageCommand({ QueueUrl }));
  expect(invisible.Messages ?? []).toHaveLength(0);

  // Wait for visibility timeout to expire (1s + buffer)
  await new Promise((r) => setTimeout(r, 1500));

  // Message reappears
  const redelivered = await sqsClient.send(new ReceiveMessageCommand({ QueueUrl }));
  expect(redelivered.Messages).toHaveLength(1);
  expect(JSON.parse(redelivered.Messages[0].Body).id).toBe('rpt-001');
});

test('SNS fan-out delivers to multiple SQS queues', async () => {
  const { TopicArn } = await snsClient.send(new CreateTopicCommand({ Name: 'order-events' }));

  const queues = [];
  for (const name of ['svc-inventory', 'svc-analytics', 'svc-email']) {
    const { QueueUrl } = await sqsClient.send(new CreateQueueCommand({ QueueName: name }));
    const attrs = await sqsClient.send(new GetQueueAttributesCommand({
      QueueUrl,
      AttributeNames: ['QueueArn'],
    }));
    queues.push({ QueueUrl, arn: attrs.Attributes.QueueArn });

    await snsClient.send(new SubscribeCommand({
      TopicArn,
      Protocol: 'sqs',
      Endpoint: attrs.Attributes.QueueArn,
    }));
  }

  await snsClient.send(new PublishCommand({
    TopicArn,
    Message: JSON.stringify({ type: 'order-placed', orderId: 'ord-999' }),
  }));

  await eventually(async () => {
    for (const q of queues) {
      const result = await sqsClient.send(new ReceiveMessageCommand({
        QueueUrl: q.QueueUrl,
        MaxNumberOfMessages: 1,
      }));
      expect(result.Messages).toHaveLength(1);
    }
  }, { timeout: 10000 });
});

Testing Idempotency

Event-driven systems must handle duplicate message delivery. Test that your consumers are idempotent:

test('processing the same event twice produces the same result', async () => {
  const event = { eventId: 'evt-dedup-123', orderId: 'order-456', amount: 99.99 };

  // Process once
  await orderProcessor.handle(event);
  const afterFirst = await db.getOrder('order-456');

  // Process again (duplicate)
  await orderProcessor.handle(event);
  const afterSecond = await db.getOrder('order-456');

  // State should be identical — no double-charging, no duplicate records
  expect(afterSecond).toEqual(afterFirst);
  expect(afterSecond.chargeCount).toBe(1); // not 2
});

CI Configuration

Event-driven integration tests are slow. Keep them in a separate suite:

{
  "jest": {
    "projects": [
      {
        "displayName": "unit",
        "testMatch": ["**/*.test.js"],
        "testPathIgnorePatterns": ["/integration/"]
      },
      {
        "displayName": "integration",
        "testMatch": ["**/integration/**/*.test.js"],
        "testTimeout": 120000,
        "maxWorkers": 1
      }
    ]
  }
}

Run integration tests with:

# .github/workflows/integration.yml
- name: Run integration tests
  run: npx jest --selectProjects integration
  env:
    DOCKER_BUILDKIT: "1"

Testcontainers needs Docker available in the CI environment. GitHub Actions workers have Docker by default; GitLab CI needs services: docker:dind.

Event-driven testing is slower and more complex than REST testing, but the bugs it catches — lost messages, DLQ routing failures, idempotency violations, visibility timeout behavior — are the ones that cause production incidents. The investment in a proper eventually() helper and Testcontainers setup pays off the first time it catches a message processing bug before deployment.

Read more