Testing Message Ordering and Exactly-Once Delivery in Event-Driven Systems
Testing message ordering and exactly-once delivery in event-driven systems is one of the hardest testing problems in distributed systems engineering. The guarantees are subtle, the failure modes are nondeterministic, and most bugs only surface under specific timing or load conditions. This guide covers how to write deterministic tests for delivery semantics, idempotent consumers, and dead letter queue routing using Kafka and Testcontainers.
Understanding Delivery Semantics
Before writing tests, you need to be precise about which guarantee you're actually testing. There are three fundamentally different delivery guarantees:
At-Most-Once Delivery
Messages may be lost but will never be processed twice. Implemented by acknowledging before processing. Fast and simple, but data loss is possible.
Failure mode: Network partition between acknowledgment and processing → message dropped silently.
At-Least-Once Delivery
Messages will be delivered at least once, possibly more. Implemented by acknowledging after processing. Data loss is prevented, but duplicate processing is possible.
Failure mode: Consumer crashes after processing but before acknowledging → broker re-delivers → duplicate processing.
Exactly-Once Delivery (Effectively-Once)
Messages are processed exactly once. Kafka implements this via transactional producers and idempotent consumers. RabbitMQ doesn't offer true exactly-once, so the pattern is at-least-once with idempotent consumers.
Failure mode: Idempotency key collision (two different messages with the same key), or idempotency store failure.
| Guarantee | Duplicates Possible | Data Loss Possible | Performance |
|---|---|---|---|
| At-Most-Once | No | Yes | Highest |
| At-Least-Once | Yes | No | Medium |
| Exactly-Once | No | No | Lowest |
Project Setup with Testcontainers
<!-- pom.xml -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>// BaseKafkaTest.java
@Testcontainers
public abstract class BaseKafkaTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
.withKraft(); // KRaft mode — no Zookeeper needed
protected KafkaProducer<String, String> producer;
protected KafkaConsumer<String, String> consumer;
@BeforeEach
void setUpKafkaClients() {
producer = new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.ACKS_CONFIG, "all", // wait for all replicas
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true // idempotent producer
));
consumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false, // manual ack
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
));
}
protected List<ConsumerRecord<String, String>> consumeMessages(
String topic, int expectedCount, Duration timeout) {
consumer.subscribe(List.of(topic));
List<ConsumerRecord<String, String>> records = new ArrayList<>();
long deadline = System.currentTimeMillis() + timeout.toMillis();
while (records.size() < expectedCount && System.currentTimeMillis() < deadline) {
ConsumerRecords<String, String> polled = consumer.poll(Duration.ofMillis(100));
polled.forEach(records::add);
}
consumer.commitSync();
return records;
}
}Testing Idempotent Consumers
The most important property to test: processing the same message twice produces the same result as processing it once.
// IdempotentConsumerTest.java
public class IdempotentConsumerTest extends BaseKafkaTest {
private OrderProcessor orderProcessor;
private OrderRepository orderRepository;
@Test
void processingDuplicateMessageShouldNotCreateDuplicateOrder() throws Exception {
String orderId = UUID.randomUUID().toString();
String messagePayload = """
{
"event_id": "%s",
"order_id": "%s",
"customer_id": "cust-123",
"amount": 99.99,
"event_type": "ORDER_CREATED"
}
""".formatted(orderId, orderId);
// Send the same message twice (simulating at-least-once redelivery)
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", orderId, messagePayload);
producer.send(record).get();
producer.send(record).get(); // duplicate
producer.flush();
// Consume and process both messages
List<ConsumerRecord<String, String>> messages =
consumeMessages("orders", 2, Duration.ofSeconds(10));
Assertions.assertEquals(2, messages.size(), "Should have received 2 messages");
for (ConsumerRecord<String, String> msg : messages) {
orderProcessor.process(msg.value());
}
// Despite processing twice, only one order should exist
List<Order> orders = orderRepository.findByOrderId(orderId);
Assertions.assertEquals(1, orders.size(),
"Idempotent consumer should create exactly one order despite duplicate messages");
}
@Test
void idempotencyKeyShouldBeUsedNotMessageOffset() throws Exception {
// Two different messages with the same business key (event_id)
// This simulates a producer that retried and generated duplicate event IDs
String eventId = "evt-" + UUID.randomUUID();
String message1 = buildOrderMessage(eventId, "ORDER_CREATED", 99.99);
String message2 = buildOrderMessage(eventId, "ORDER_CREATED", 99.99); // same content
producer.send(new ProducerRecord<>("orders", "key1", message1)).get();
producer.send(new ProducerRecord<>("orders", "key2", message2)).get();
producer.flush();
List<ConsumerRecord<String, String>> messages =
consumeMessages("orders", 2, Duration.ofSeconds(10));
for (ConsumerRecord<String, String> msg : messages) {
orderProcessor.process(msg.value());
}
// The idempotency check must be on event_id, not Kafka offset
Assertions.assertEquals(1, orderRepository.count(),
"Deduplication must use event_id, not Kafka offset");
}
@Test
void differentEventIdsShouldCreateSeparateOrders() throws Exception {
// Sanity check: different messages should not be deduplicated
for (int i = 0; i < 5; i++) {
String msg = buildOrderMessage(
"evt-" + UUID.randomUUID(), "ORDER_CREATED", 10.00 * i);
producer.send(new ProducerRecord<>("orders", msg)).get();
}
producer.flush();
List<ConsumerRecord<String, String>> messages =
consumeMessages("orders", 5, Duration.ofSeconds(10));
for (ConsumerRecord<String, String> msg : messages) {
orderProcessor.process(msg.value());
}
Assertions.assertEquals(5, orderRepository.count(),
"Five unique events should create five orders");
}
}Testing Out-of-Order Message Handling
Kafka guarantees ordering within a partition, but not across partitions. When you need ordering guarantees, use the same partition key for related messages. Test that your consumers handle out-of-order messages gracefully:
@Test
void consumerShouldHandleOutOfOrderStateTransitions() throws Exception {
String orderId = "order-" + UUID.randomUUID();
// Send SHIPPED before PAID (simulating out-of-order delivery across partitions)
String shippedEvent = buildOrderEvent(orderId, "ORDER_SHIPPED", Instant.now());
String paidEvent = buildOrderEvent(orderId, "ORDER_PAID",
Instant.now().minusSeconds(60)); // paid 60s ago, but received late
// Deliberately send to different partitions to simulate out-of-order
producer.send(new ProducerRecord<>("order-events", 0, orderId, shippedEvent)).get();
producer.send(new ProducerRecord<>("order-events", 1, orderId, paidEvent)).get();
producer.flush();
List<ConsumerRecord<String, String>> messages =
consumeMessages("order-events", 2, Duration.ofSeconds(10));
// Process in the received order (SHIPPED first, then PAID)
for (ConsumerRecord<String, String> msg : messages) {
orderStateMachine.process(msg.value());
}
// Order should be in SHIPPED state (the logically later state)
// NOT in PAID state (which would be a regression if processed naively in order)
Order order = orderRepository.findById(orderId).orElseThrow();
Assertions.assertEquals(OrderState.SHIPPED, order.getState(),
"State machine should use event timestamp, not processing order, for state transitions");
}
@Test
void samePartitionKeyShouldGuaranteeOrdering() throws Exception {
String customerId = "cust-" + UUID.randomUUID();
// All events for the same customer go to the same partition
// Kafka guarantees ordering within a partition
List<String> events = List.of("ACCOUNT_CREATED", "EMAIL_VERIFIED", "FIRST_PURCHASE");
for (String eventType : events) {
producer.send(new ProducerRecord<>("customer-events",
customerId, // partition key = customer ID
buildCustomerEvent(customerId, eventType)
)).get();
}
producer.flush();
List<ConsumerRecord<String, String>> received =
consumeMessages("customer-events", 3, Duration.ofSeconds(10));
List<String> receivedEventTypes = received.stream()
.map(r -> extractEventType(r.value()))
.collect(toList());
Assertions.assertEquals(events, receivedEventTypes,
"Messages with same partition key must be received in send order");
}Testing Dead Letter Queue Handling
Messages that fail processing repeatedly should be routed to a dead letter queue (DLQ) for investigation. Test that the routing happens correctly and the DLQ message contains enough context to debug the failure:
@Test
void unparsableMessageShouldGoToDeadLetterQueue() throws Exception {
String invalidJson = "this is not valid json {{{";
producer.send(new ProducerRecord<>("orders", "bad-key", invalidJson)).get();
producer.flush();
// The main consumer should not be able to process this
// It should appear in the DLQ after max retry attempts
List<ConsumerRecord<String, String>> dlqMessages =
consumeMessages("orders.DLQ", 1, Duration.ofSeconds(15));
Assertions.assertEquals(1, dlqMessages.size(),
"Unparsable message should be routed to DLQ");
// Verify DLQ message contains metadata needed for debugging
ConsumerRecord<String, String> dlqMessage = dlqMessages.get(0);
Assertions.assertNotNull(dlqMessage.headers().lastHeader("x-original-topic"),
"DLQ message must include original topic header");
Assertions.assertNotNull(dlqMessage.headers().lastHeader("x-exception-message"),
"DLQ message must include exception message header");
Assertions.assertNotNull(dlqMessage.headers().lastHeader("x-original-offset"),
"DLQ message must include original offset for replay");
// Original message payload must be preserved intact
Assertions.assertEquals(invalidJson, dlqMessage.value(),
"DLQ must preserve original message payload unchanged");
}
@Test
void transientFailureShouldRetryBeforeDLQ() throws Exception {
AtomicInteger processAttempts = new AtomicInteger(0);
// Simulate a consumer that fails twice then succeeds
orderProcessor = new OrderProcessor(orderRepository) {
@Override
public void process(String message) {
int attempt = processAttempts.incrementAndGet();
if (attempt <= 2) {
throw new TransientException("Simulated transient failure, attempt " + attempt);
}
super.process(message); // succeeds on attempt 3
}
};
producer.send(new ProducerRecord<>("orders", "retry-test", buildValidOrder())).get();
producer.flush();
// Wait for processing to complete
Thread.sleep(5000);
// Message should be processed successfully (not in DLQ)
List<ConsumerRecord<String, String>> dlqMessages =
consumeMessages("orders.DLQ", 0, Duration.ofSeconds(3));
Assertions.assertTrue(dlqMessages.isEmpty(),
"Transient failures should not send message to DLQ if eventually successful");
Assertions.assertEquals(3, processAttempts.get(),
"Processor should have been called 3 times (2 failures + 1 success)");
Assertions.assertEquals(1, orderRepository.count(),
"Order should be created after successful retry");
}Testing Consumer Group Rebalancing Edge Cases
Consumer group rebalancing happens when consumers join or leave a group. Messages in flight during a rebalance can be re-delivered. Test that your consumers handle this correctly:
@Test
void messageInFlightDuringRebalanceShouldNotBeLost() throws Exception {
String topic = "rebalance-test-" + UUID.randomUUID();
admin.createTopics(List.of(new NewTopic(topic, 4, (short) 1))).all().get();
// Send 100 messages
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(topic, "key-" + i,
"{\"seq\":" + i + "}")).get();
}
producer.flush();
String groupId = "rebalance-test-group";
Set<Integer> processedSeqs = Collections.synchronizedSet(new HashSet<>());
// Start consumer 1
KafkaConsumer<String, String> consumer1 = createConsumer(groupId);
consumer1.subscribe(List.of(topic));
// Process some messages with consumer 1
processMessages(consumer1, processedSeqs, 20, Duration.ofSeconds(5));
// Add consumer 2 — triggers rebalance
KafkaConsumer<String, String> consumer2 = createConsumer(groupId);
consumer2.subscribe(List.of(topic));
// Continue processing with both consumers
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> processMessages(consumer1, processedSeqs, 40, Duration.ofSeconds(10)));
executor.submit(() -> processMessages(consumer2, processedSeqs, 40, Duration.ofSeconds(10)));
executor.shutdown();
executor.awaitTermination(15, TimeUnit.SECONDS);
// All 100 messages should be processed exactly once (idempotency handles any duplicates)
Assertions.assertEquals(100, processedSeqs.size(),
"All messages should be processed after rebalance. Missing: " +
IntStream.range(0, 100)
.filter(i -> !processedSeqs.contains(i))
.boxed().collect(toList()));
}Kafka Exactly-Once with Transactions
@Test
void transactionalProducerShouldProvideExactlyOnceSemantics() throws Exception {
Properties txProps = new Properties();
txProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
txProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
txProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id");
txProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> txProducer = new KafkaProducer<>(txProps);
txProducer.initTransactions();
// Send two messages in a transaction — both or neither should be consumed
txProducer.beginTransaction();
txProducer.send(new ProducerRecord<>("payments", "tx-1", "{\"amount\":50.00}"));
txProducer.send(new ProducerRecord<>("audit-log", "tx-1", "{\"action\":\"PAYMENT\"}"));
txProducer.commitTransaction();
// Consumer with isolation.level=read_committed should see both messages
KafkaConsumer<String, String> isolatedConsumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "isolated-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed", // key setting
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
));
isolatedConsumer.subscribe(List.of("payments"));
List<ConsumerRecord<String, String>> paymentMessages =
new ArrayList<>();
long deadline = System.currentTimeMillis() + 10000;
while (paymentMessages.isEmpty() && System.currentTimeMillis() < deadline) {
ConsumerRecords<String, String> polled = isolatedConsumer.poll(Duration.ofMillis(100));
polled.forEach(paymentMessages::add);
}
Assertions.assertEquals(1, paymentMessages.size(),
"Exactly one payment message should be visible after committed transaction");
}Test Coverage Checklist
| Scenario | What Breaks Without It |
|---|---|
| Duplicate message → same outcome | Processing duplicates corrupts data |
| Out-of-order → correct final state | Naive processing regresses state machines |
| Poison pill → DLQ routing | Consumer blocks forever, backlog grows |
| DLQ metadata headers present | Ops team can't debug or replay failures |
| Transient failure → retry → success | Losing messages on transient errors |
| Max retries → DLQ | Infinite retry loop consumes all resources |
| Rebalance → no message loss | Messages dropped during scaling events |
| Same partition key → ordered | Cross-partition ordering assumptions break |
| Transactional read_committed | Consumers see partial transactions |
Event-driven systems fail in ways that are difficult to reproduce manually. Wrapping Kafka in Testcontainers gives you a real broker in CI where you can inject precise failure conditions and assert on the exact behavior. The tests above cover the failure modes that cause the worst production incidents: silent data loss, duplicate processing, and consumer stalls.