Testing Async Messaging with Kafka: Consumer and Producer Tests

Testing Async Messaging with Kafka: Consumer and Producer Tests

Testing Kafka producers and consumers requires handling asynchrony — you publish a message and the consumer processes it on its own schedule. Embedded Kafka (for unit-like tests) and TestContainers Kafka (for integration tests) solve the "real Kafka without real infrastructure" problem. Consumer group offset management, idempotency, and retry logic need dedicated tests.

Key Takeaways

Embedded Kafka for unit tests, TestContainers for integration tests. Embedded Kafka is faster but constrained. TestContainers runs a real Kafka instance in Docker, catching issues that embedded Kafka misses.

Use polling loops, not fixed sleeps. Thread.sleep(2000) is the wrong way to wait for async consumption. Poll with a timeout (Awaitility in Java, polling in Python) until the condition is met or the timeout expires.

Test consumer group offset commits separately. Verify that your consumer commits offsets at the right point — after successful processing, not before. A consumer that commits offsets before processing can silently drop messages on restart.

Test your dead letter queue handling. What happens when a message fails processing repeatedly? Verify the DLQ receives the message and your monitoring alerts correctly.

Test message ordering assumptions. If your consumer assumes messages arrive in order, test what happens when they don't. Kafka guarantees order within a partition, not across partitions.

The Challenge of Testing Async Systems

Synchronous systems are straightforward to test: call function, check result. Async message-driven systems are harder because the causal chain is indirect: publish message → broker routes → consumer processes → side effects appear.

The timing is non-deterministic. The consumer might process the message in 5ms or 500ms depending on load, network conditions, and consumer group state. Tests that work locally fail in CI because the CI environment is slower.

The right approach uses real or realistic Kafka infrastructure with proper async waiting rather than fixed delays. This guide covers the patterns.

Setting Up a Test Kafka Environment

Embedded Kafka (Spring Boot)

For Spring Boot projects, spring-kafka-test provides an embedded Kafka broker:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>
@SpringBootTest
@EmbeddedKafka(
    partitions = 1,
    topics = {"orders", "order-events"},
    brokerProperties = {
        "auto.create.topics.enable=true",
        "log.retention.ms=5000"
    }
)
class OrderEventConsumerTest {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Autowired
    private OrderRepository orderRepository;

    @Test
    void processesOrderCreatedEvent() {
        OrderEvent event = new OrderEvent("ORDER-123", "CREATED", Instant.now());
        
        kafkaTemplate.send("order-events", "ORDER-123", event);
        
        // Wait for consumer to process, up to 5 seconds
        await().atMost(5, TimeUnit.SECONDS)
               .untilAsserted(() -> {
                   Optional<Order> order = orderRepository.findById("ORDER-123");
                   assertThat(order).isPresent();
                   assertThat(order.get().getStatus()).isEqualTo("CREATED");
               });
    }
}

The await() from Awaitility replaces Thread.sleep(). It polls the assertion until it passes or the timeout expires, failing with a clear message if the timeout is reached.

TestContainers Kafka

For tests requiring real Kafka behavior (replication, consumer groups, exactly-once semantics), use TestContainers:

@Testcontainers
@SpringBootTest
class KafkaIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
        registry.add("spring.kafka.consumer.group-id", () -> "test-consumer-group");
        registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
    }

    // Test methods use real Kafka...
}

TestContainers starts a real Kafka Docker container before the tests run and stops it after. The startup time (~10-15 seconds) is the tradeoff for real Kafka behavior.

Python Test Setup with TestContainers

import pytest
from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json

@pytest.fixture(scope="session")
def kafka_container():
    with KafkaContainer("confluentinc/cp-kafka:7.5.0") as kafka:
        yield kafka

@pytest.fixture
def producer(kafka_container):
    p = KafkaProducer(
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        value_serializer=lambda v: json.dumps(v).encode()
    )
    yield p
    p.close()

@pytest.fixture
def consumer(kafka_container):
    c = KafkaConsumer(
        "test-topic",
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        group_id="test-group",
        auto_offset_reset="earliest",
        consumer_timeout_ms=5000,
        value_deserializer=lambda v: json.loads(v.decode())
    )
    yield c
    c.close()

Testing Kafka Producers

Producer tests verify that:

  1. The correct topic receives the message
  2. The message is serialized correctly
  3. The message key is set correctly (affects partition routing)
  4. Errors are handled (broker unavailable, serialization failure)

Verifying Message Content

@Test
void publishesOrderCreatedEventWithCorrectFields() throws Exception {
    String topic = "order-events";
    
    // Create a consumer to intercept what the producer sends
    Consumer<String, String> testConsumer = createTestConsumer(topic);
    
    // Trigger the producer under test
    orderService.createOrder(new CreateOrderRequest("user-123", List.of(
        new Item("product-1", 2, new BigDecimal("9.99"))
    )));
    
    // Wait for the message and assert its content
    ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofSeconds(5));
    
    assertThat(records.count()).isEqualTo(1);
    ConsumerRecord<String, String> record = records.iterator().next();
    
    OrderEvent event = objectMapper.readValue(record.value(), OrderEvent.class);
    assertThat(event.getEventType()).isEqualTo("ORDER_CREATED");
    assertThat(event.getUserId()).isEqualTo("user-123");
    assertThat(event.getItems()).hasSize(1);
    assertThat(event.getTimestamp()).isNotNull();
}

Testing Message Keys

Message keys determine partition routing and ordering guarantees. Test that your producer uses the right key:

@Test
void usesOrderIdAsMessageKey() {
    String orderId = "ORDER-" + UUID.randomUUID();
    
    orderService.createOrder(new CreateOrderRequest(orderId, "user-123"));
    
    ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofSeconds(5));
    ConsumerRecord<String, String> record = records.iterator().next();
    
    // Key should be the order ID for correct partition routing
    assertThat(record.key()).isEqualTo(orderId);
}

Testing Kafka Consumers

Consumer tests verify that:

  1. Messages are consumed from the correct topic
  2. Messages are deserialized correctly
  3. Business logic executes correctly based on the message content
  4. Offsets are committed at the right time
  5. Failed messages are handled correctly (DLQ, retry)

Basic Consumer Test

@Test
void processesPaymentProcessedEvent() {
    // Arrange: publish a message the consumer will read
    PaymentProcessedEvent event = PaymentProcessedEvent.builder()
        .orderId("ORDER-123")
        .amount(new BigDecimal("99.99"))
        .status("SUCCESS")
        .transactionId("TXN-456")
        .build();
    
    kafkaTemplate.send("payment-events", "ORDER-123", event);
    
    // Act + Assert: wait for the consumer's side effect
    await().atMost(10, TimeUnit.SECONDS)
           .untilAsserted(() -> {
               Order order = orderRepository.findById("ORDER-123").orElseThrow();
               assertThat(order.getStatus()).isEqualTo("PAYMENT_CONFIRMED");
               assertThat(order.getPaymentTransactionId()).isEqualTo("TXN-456");
           });
}

Testing Consumer Error Handling

What happens when processing fails? Test the retry and DLQ behavior:

@Test
void sendsToDeadLetterQueueAfterMaxRetries() {
    // Arrange: publish a message with an invalid format that will fail deserialization
    // or trigger business logic failure
    String invalidPayload = "{\"orderId\": null, \"amount\": \"not-a-number\"}";
    kafkaTemplate.send("payment-events", "ORDER-INVALID", invalidPayload);
    
    // Assert: the message ends up in the DLQ
    Consumer<String, String> dlqConsumer = createTestConsumer("payment-events.DLT");
    
    await().atMost(30, TimeUnit.SECONDS)  // Longer timeout to allow retries
           .untilAsserted(() -> {
               ConsumerRecords<String, String> dlqRecords = dlqConsumer.poll(Duration.ofSeconds(2));
               assertThat(dlqRecords.count()).isGreaterThan(0);
               
               ConsumerRecord<String, String> dlqRecord = dlqRecords.iterator().next();
               // Verify DLQ message has error metadata headers
               Header exceptionHeader = dlqRecord.headers().lastHeader("kafka_dlt-exception-message");
               assertThat(exceptionHeader).isNotNull();
           });
}

Testing Idempotency

Kafka consumers can receive the same message more than once (at-least-once delivery). Test that your consumer handles duplicates correctly:

@Test
void processesOrderEventOnlyOnceWhenDeliveredTwice() {
    OrderEvent event = new OrderEvent("ORDER-123", "CREATED", Instant.now());
    
    // Publish the same message twice (simulating redelivery)
    kafkaTemplate.send("order-events", "ORDER-123", event);
    kafkaTemplate.send("order-events", "ORDER-123", event);
    
    await().atMost(10, TimeUnit.SECONDS)
           .untilAsserted(() -> {
               // Database should have exactly one record, not two
               List<Order> orders = orderRepository.findByExternalId("ORDER-123");
               assertThat(orders).hasSize(1);
               
               // Processing count should be tracked
               Order order = orders.get(0);
               assertThat(order.getProcessedEventIds()).contains(event.getEventId());
           });
}

Testing Consumer Groups

When multiple consumers form a consumer group, each partition is assigned to exactly one consumer. Test that your consumer group behavior is correct:

@Test
void distributeMessagesAcrossConsumerGroup() {
    // This requires a topic with multiple partitions
    String topic = "high-throughput-events";
    int messageCount = 100;
    
    // Send messages with different keys (different partitions)
    for (int i = 0; i < messageCount; i++) {
        String key = "user-" + (i % 10);  // 10 different keys = up to 10 partitions
        kafkaTemplate.send(topic, key, new ProcessingEvent(i));
    }
    
    // All messages should be processed exactly once
    await().atMost(30, TimeUnit.SECONDS)
           .untilAsserted(() -> {
               long processedCount = processingRepository.countDistinctEventIds();
               assertThat(processedCount).isEqualTo(messageCount);
           });
}

Handling Timing in Async Tests

The most common failure mode in Kafka tests is incorrect timing handling.

Wrong: fixed sleep

kafkaTemplate.send("events", event);
Thread.sleep(2000);  // Fragile — works sometimes, fails in CI
assertThat(repository.findById(event.getId())).isPresent();

Wrong: no wait at all

kafkaTemplate.send("events", event);
assertThat(repository.findById(event.getId())).isPresent();  // Always fails

Right: poll until condition met

kafkaTemplate.send("events", event);
await()
    .atMost(10, TimeUnit.SECONDS)
    .pollInterval(100, TimeUnit.MILLISECONDS)
    .untilAsserted(() ->
        assertThat(repository.findById(event.getId())).isPresent()
    );

Awaitility's pollInterval controls how often it checks the condition. The default is 100ms, which is fine for most tests. For faster tests that should succeed quickly, reduce the poll interval. For slower operations, increase the timeout.

Python Timing Patterns

import time
from typing import Callable

def wait_for(condition: Callable[[], bool], timeout_seconds: int = 10, poll_interval: float = 0.1) -> bool:
    """Poll until condition is True or timeout."""
    deadline = time.time() + timeout_seconds
    while time.time() < deadline:
        if condition():
            return True
        time.sleep(poll_interval)
    return False

def test_consumer_processes_order_event(producer, consumer, order_repository):
    event = {"event_type": "ORDER_CREATED", "order_id": "ORD-123", "user_id": "user-456"}
    
    producer.send("order-events", key=b"ORD-123", value=event)
    producer.flush()
    
    assert wait_for(
        lambda: order_repository.find("ORD-123") is not None,
        timeout_seconds=10
    ), "Order was not created within 10 seconds"
    
    order = order_repository.find("ORD-123")
    assert order.status == "PENDING"
    assert order.user_id == "user-456"

Testing Schema Registry Integration

When using Confluent Schema Registry with Avro or Protobuf:

@Test
void serializesEventWithCorrectAvroSchema() {
    // Verify the schema used matches the registered version
    OrderCreatedEvent event = OrderCreatedEvent.newBuilder()
        .setOrderId("ORDER-123")
        .setUserId("user-456")
        .setTotal(9999L)  // cents
        .build();
    
    kafkaTemplate.send("order-events", "ORDER-123", event);
    
    // Consumer should deserialize without schema mismatch errors
    await().atMost(5, TimeUnit.SECONDS)
           .untilAsserted(() ->
               assertThat(orderRepository.findById("ORDER-123")).isPresent()
           );
}

@Test
void handlesSchemaEvolutionBackwardCompatibly() {
    // Publish an event with the old schema (missing new optional field)
    String oldFormatJson = "{\"order_id\": \"ORDER-123\", \"user_id\": \"user-456\"}";
    rawKafkaTemplate.send("order-events", "ORDER-123", oldFormatJson);
    
    // Consumer should handle missing optional field gracefully
    await().atMost(5, TimeUnit.SECONDS)
           .untilAsserted(() -> {
               Order order = orderRepository.findById("ORDER-123").orElseThrow();
               assertThat(order.getNewOptionalField()).isNull();  // Default value
           });
}

Summary

Kafka tests require real infrastructure (embedded or TestContainers), proper async waiting (Awaitility), and dedicated tests for error scenarios (DLQ, retries, idempotency).

The most valuable tests to have:

  1. Producer tests that verify message format and key
  2. Consumer tests that verify correct side effects
  3. Error handling tests that verify DLQ behavior
  4. Idempotency tests that verify duplicate messages are handled

The most common mistake is Thread.sleep() — replace every instance with Awaitility or a polling helper. Fragile timing is the #1 cause of flaky Kafka tests.

Read more