Testing Message Ordering and Exactly-Once Delivery in Event-Driven Systems

Testing Message Ordering and Exactly-Once Delivery in Event-Driven Systems

Testing message ordering and exactly-once delivery in event-driven systems is one of the hardest testing problems in distributed systems engineering. The guarantees are subtle, the failure modes are nondeterministic, and most bugs only surface under specific timing or load conditions. This guide covers how to write deterministic tests for delivery semantics, idempotent consumers, and dead letter queue routing using Kafka and Testcontainers.

Understanding Delivery Semantics

Before writing tests, you need to be precise about which guarantee you're actually testing. There are three fundamentally different delivery guarantees:

At-Most-Once Delivery

Messages may be lost but will never be processed twice. Implemented by acknowledging before processing. Fast and simple, but data loss is possible.

Failure mode: Network partition between acknowledgment and processing → message dropped silently.

At-Least-Once Delivery

Messages will be delivered at least once, possibly more. Implemented by acknowledging after processing. Data loss is prevented, but duplicate processing is possible.

Failure mode: Consumer crashes after processing but before acknowledging → broker re-delivers → duplicate processing.

Exactly-Once Delivery (Effectively-Once)

Messages are processed exactly once. Kafka implements this via transactional producers and idempotent consumers. RabbitMQ doesn't offer true exactly-once, so the pattern is at-least-once with idempotent consumers.

Failure mode: Idempotency key collision (two different messages with the same key), or idempotency store failure.

Guarantee Duplicates Possible Data Loss Possible Performance
At-Most-Once No Yes Highest
At-Least-Once Yes No Medium
Exactly-Once No No Lowest

Project Setup with Testcontainers

<!-- pom.xml -->
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.0</version>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>
// BaseKafkaTest.java
@Testcontainers
public abstract class BaseKafkaTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
        .withKraft(); // KRaft mode — no Zookeeper needed

    protected KafkaProducer<String, String> producer;
    protected KafkaConsumer<String, String> consumer;

    @BeforeEach
    void setUpKafkaClients() {
        producer = new KafkaProducer<>(Map.of(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
            ProducerConfig.ACKS_CONFIG, "all",            // wait for all replicas
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true // idempotent producer
        ));

        consumer = new KafkaConsumer<>(Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
            ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID(),
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false, // manual ack
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
        ));
    }

    protected List<ConsumerRecord<String, String>> consumeMessages(
            String topic, int expectedCount, Duration timeout) {
        consumer.subscribe(List.of(topic));
        List<ConsumerRecord<String, String>> records = new ArrayList<>();
        long deadline = System.currentTimeMillis() + timeout.toMillis();

        while (records.size() < expectedCount && System.currentTimeMillis() < deadline) {
            ConsumerRecords<String, String> polled = consumer.poll(Duration.ofMillis(100));
            polled.forEach(records::add);
        }
        consumer.commitSync();
        return records;
    }
}

Testing Idempotent Consumers

The most important property to test: processing the same message twice produces the same result as processing it once.

// IdempotentConsumerTest.java
public class IdempotentConsumerTest extends BaseKafkaTest {

    private OrderProcessor orderProcessor;
    private OrderRepository orderRepository;

    @Test
    void processingDuplicateMessageShouldNotCreateDuplicateOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        String messagePayload = """
            {
                "event_id": "%s",
                "order_id": "%s",
                "customer_id": "cust-123",
                "amount": 99.99,
                "event_type": "ORDER_CREATED"
            }
            """.formatted(orderId, orderId);

        // Send the same message twice (simulating at-least-once redelivery)
        ProducerRecord<String, String> record =
            new ProducerRecord<>("orders", orderId, messagePayload);
        producer.send(record).get();
        producer.send(record).get(); // duplicate
        producer.flush();

        // Consume and process both messages
        List<ConsumerRecord<String, String>> messages =
            consumeMessages("orders", 2, Duration.ofSeconds(10));
        Assertions.assertEquals(2, messages.size(), "Should have received 2 messages");

        for (ConsumerRecord<String, String> msg : messages) {
            orderProcessor.process(msg.value());
        }

        // Despite processing twice, only one order should exist
        List<Order> orders = orderRepository.findByOrderId(orderId);
        Assertions.assertEquals(1, orders.size(),
            "Idempotent consumer should create exactly one order despite duplicate messages");
    }

    @Test
    void idempotencyKeyShouldBeUsedNotMessageOffset() throws Exception {
        // Two different messages with the same business key (event_id)
        // This simulates a producer that retried and generated duplicate event IDs
        String eventId = "evt-" + UUID.randomUUID();
        String message1 = buildOrderMessage(eventId, "ORDER_CREATED", 99.99);
        String message2 = buildOrderMessage(eventId, "ORDER_CREATED", 99.99); // same content

        producer.send(new ProducerRecord<>("orders", "key1", message1)).get();
        producer.send(new ProducerRecord<>("orders", "key2", message2)).get();
        producer.flush();

        List<ConsumerRecord<String, String>> messages =
            consumeMessages("orders", 2, Duration.ofSeconds(10));

        for (ConsumerRecord<String, String> msg : messages) {
            orderProcessor.process(msg.value());
        }

        // The idempotency check must be on event_id, not Kafka offset
        Assertions.assertEquals(1, orderRepository.count(),
            "Deduplication must use event_id, not Kafka offset");
    }

    @Test
    void differentEventIdsShouldCreateSeparateOrders() throws Exception {
        // Sanity check: different messages should not be deduplicated
        for (int i = 0; i < 5; i++) {
            String msg = buildOrderMessage(
                "evt-" + UUID.randomUUID(), "ORDER_CREATED", 10.00 * i);
            producer.send(new ProducerRecord<>("orders", msg)).get();
        }
        producer.flush();

        List<ConsumerRecord<String, String>> messages =
            consumeMessages("orders", 5, Duration.ofSeconds(10));

        for (ConsumerRecord<String, String> msg : messages) {
            orderProcessor.process(msg.value());
        }

        Assertions.assertEquals(5, orderRepository.count(),
            "Five unique events should create five orders");
    }
}

Testing Out-of-Order Message Handling

Kafka guarantees ordering within a partition, but not across partitions. When you need ordering guarantees, use the same partition key for related messages. Test that your consumers handle out-of-order messages gracefully:

@Test
void consumerShouldHandleOutOfOrderStateTransitions() throws Exception {
    String orderId = "order-" + UUID.randomUUID();

    // Send SHIPPED before PAID (simulating out-of-order delivery across partitions)
    String shippedEvent = buildOrderEvent(orderId, "ORDER_SHIPPED", Instant.now());
    String paidEvent = buildOrderEvent(orderId, "ORDER_PAID",
        Instant.now().minusSeconds(60)); // paid 60s ago, but received late

    // Deliberately send to different partitions to simulate out-of-order
    producer.send(new ProducerRecord<>("order-events", 0, orderId, shippedEvent)).get();
    producer.send(new ProducerRecord<>("order-events", 1, orderId, paidEvent)).get();
    producer.flush();

    List<ConsumerRecord<String, String>> messages =
        consumeMessages("order-events", 2, Duration.ofSeconds(10));

    // Process in the received order (SHIPPED first, then PAID)
    for (ConsumerRecord<String, String> msg : messages) {
        orderStateMachine.process(msg.value());
    }

    // Order should be in SHIPPED state (the logically later state)
    // NOT in PAID state (which would be a regression if processed naively in order)
    Order order = orderRepository.findById(orderId).orElseThrow();
    Assertions.assertEquals(OrderState.SHIPPED, order.getState(),
        "State machine should use event timestamp, not processing order, for state transitions");
}

@Test
void samePartitionKeyShouldGuaranteeOrdering() throws Exception {
    String customerId = "cust-" + UUID.randomUUID();

    // All events for the same customer go to the same partition
    // Kafka guarantees ordering within a partition
    List<String> events = List.of("ACCOUNT_CREATED", "EMAIL_VERIFIED", "FIRST_PURCHASE");
    for (String eventType : events) {
        producer.send(new ProducerRecord<>("customer-events",
            customerId, // partition key = customer ID
            buildCustomerEvent(customerId, eventType)
        )).get();
    }
    producer.flush();

    List<ConsumerRecord<String, String>> received =
        consumeMessages("customer-events", 3, Duration.ofSeconds(10));

    List<String> receivedEventTypes = received.stream()
        .map(r -> extractEventType(r.value()))
        .collect(toList());

    Assertions.assertEquals(events, receivedEventTypes,
        "Messages with same partition key must be received in send order");
}

Testing Dead Letter Queue Handling

Messages that fail processing repeatedly should be routed to a dead letter queue (DLQ) for investigation. Test that the routing happens correctly and the DLQ message contains enough context to debug the failure:

@Test
void unparsableMessageShouldGoToDeadLetterQueue() throws Exception {
    String invalidJson = "this is not valid json {{{";

    producer.send(new ProducerRecord<>("orders", "bad-key", invalidJson)).get();
    producer.flush();

    // The main consumer should not be able to process this
    // It should appear in the DLQ after max retry attempts
    List<ConsumerRecord<String, String>> dlqMessages =
        consumeMessages("orders.DLQ", 1, Duration.ofSeconds(15));

    Assertions.assertEquals(1, dlqMessages.size(),
        "Unparsable message should be routed to DLQ");

    // Verify DLQ message contains metadata needed for debugging
    ConsumerRecord<String, String> dlqMessage = dlqMessages.get(0);
    Assertions.assertNotNull(dlqMessage.headers().lastHeader("x-original-topic"),
        "DLQ message must include original topic header");
    Assertions.assertNotNull(dlqMessage.headers().lastHeader("x-exception-message"),
        "DLQ message must include exception message header");
    Assertions.assertNotNull(dlqMessage.headers().lastHeader("x-original-offset"),
        "DLQ message must include original offset for replay");

    // Original message payload must be preserved intact
    Assertions.assertEquals(invalidJson, dlqMessage.value(),
        "DLQ must preserve original message payload unchanged");
}

@Test
void transientFailureShouldRetryBeforeDLQ() throws Exception {
    AtomicInteger processAttempts = new AtomicInteger(0);

    // Simulate a consumer that fails twice then succeeds
    orderProcessor = new OrderProcessor(orderRepository) {
        @Override
        public void process(String message) {
            int attempt = processAttempts.incrementAndGet();
            if (attempt <= 2) {
                throw new TransientException("Simulated transient failure, attempt " + attempt);
            }
            super.process(message); // succeeds on attempt 3
        }
    };

    producer.send(new ProducerRecord<>("orders", "retry-test", buildValidOrder())).get();
    producer.flush();

    // Wait for processing to complete
    Thread.sleep(5000);

    // Message should be processed successfully (not in DLQ)
    List<ConsumerRecord<String, String>> dlqMessages =
        consumeMessages("orders.DLQ", 0, Duration.ofSeconds(3));

    Assertions.assertTrue(dlqMessages.isEmpty(),
        "Transient failures should not send message to DLQ if eventually successful");
    Assertions.assertEquals(3, processAttempts.get(),
        "Processor should have been called 3 times (2 failures + 1 success)");
    Assertions.assertEquals(1, orderRepository.count(),
        "Order should be created after successful retry");
}

Testing Consumer Group Rebalancing Edge Cases

Consumer group rebalancing happens when consumers join or leave a group. Messages in flight during a rebalance can be re-delivered. Test that your consumers handle this correctly:

@Test
void messageInFlightDuringRebalanceShouldNotBeLost() throws Exception {
    String topic = "rebalance-test-" + UUID.randomUUID();
    admin.createTopics(List.of(new NewTopic(topic, 4, (short) 1))).all().get();

    // Send 100 messages
    for (int i = 0; i < 100; i++) {
        producer.send(new ProducerRecord<>(topic, "key-" + i,
            "{\"seq\":" + i + "}")).get();
    }
    producer.flush();

    String groupId = "rebalance-test-group";
    Set<Integer> processedSeqs = Collections.synchronizedSet(new HashSet<>());

    // Start consumer 1
    KafkaConsumer<String, String> consumer1 = createConsumer(groupId);
    consumer1.subscribe(List.of(topic));

    // Process some messages with consumer 1
    processMessages(consumer1, processedSeqs, 20, Duration.ofSeconds(5));

    // Add consumer 2 — triggers rebalance
    KafkaConsumer<String, String> consumer2 = createConsumer(groupId);
    consumer2.subscribe(List.of(topic));

    // Continue processing with both consumers
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.submit(() -> processMessages(consumer1, processedSeqs, 40, Duration.ofSeconds(10)));
    executor.submit(() -> processMessages(consumer2, processedSeqs, 40, Duration.ofSeconds(10)));
    executor.shutdown();
    executor.awaitTermination(15, TimeUnit.SECONDS);

    // All 100 messages should be processed exactly once (idempotency handles any duplicates)
    Assertions.assertEquals(100, processedSeqs.size(),
        "All messages should be processed after rebalance. Missing: " +
        IntStream.range(0, 100)
            .filter(i -> !processedSeqs.contains(i))
            .boxed().collect(toList()));
}

Kafka Exactly-Once with Transactions

@Test
void transactionalProducerShouldProvideExactlyOnceSemantics() throws Exception {
    Properties txProps = new Properties();
    txProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    txProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    txProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id");
    txProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

    KafkaProducer<String, String> txProducer = new KafkaProducer<>(txProps);
    txProducer.initTransactions();

    // Send two messages in a transaction — both or neither should be consumed
    txProducer.beginTransaction();
    txProducer.send(new ProducerRecord<>("payments", "tx-1", "{\"amount\":50.00}"));
    txProducer.send(new ProducerRecord<>("audit-log", "tx-1", "{\"action\":\"PAYMENT\"}"));
    txProducer.commitTransaction();

    // Consumer with isolation.level=read_committed should see both messages
    KafkaConsumer<String, String> isolatedConsumer = new KafkaConsumer<>(Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
        ConsumerConfig.GROUP_ID_CONFIG, "isolated-" + UUID.randomUUID(),
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
        ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed", // key setting
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
    ));

    isolatedConsumer.subscribe(List.of("payments"));
    List<ConsumerRecord<String, String>> paymentMessages =
        new ArrayList<>();
    long deadline = System.currentTimeMillis() + 10000;
    while (paymentMessages.isEmpty() && System.currentTimeMillis() < deadline) {
        ConsumerRecords<String, String> polled = isolatedConsumer.poll(Duration.ofMillis(100));
        polled.forEach(paymentMessages::add);
    }

    Assertions.assertEquals(1, paymentMessages.size(),
        "Exactly one payment message should be visible after committed transaction");
}

Test Coverage Checklist

Scenario What Breaks Without It
Duplicate message → same outcome Processing duplicates corrupts data
Out-of-order → correct final state Naive processing regresses state machines
Poison pill → DLQ routing Consumer blocks forever, backlog grows
DLQ metadata headers present Ops team can't debug or replay failures
Transient failure → retry → success Losing messages on transient errors
Max retries → DLQ Infinite retry loop consumes all resources
Rebalance → no message loss Messages dropped during scaling events
Same partition key → ordered Cross-partition ordering assumptions break
Transactional read_committed Consumers see partial transactions

Event-driven systems fail in ways that are difficult to reproduce manually. Wrapping Kafka in Testcontainers gives you a real broker in CI where you can inject precise failure conditions and assert on the exact behavior. The tests above cover the failure modes that cause the worst production incidents: silent data loss, duplicate processing, and consumer stalls.

Read more