Kafka Streams Testing: TopologyTestDriver and Stateful Stream Processors

Kafka Streams Testing: TopologyTestDriver and Stateful Stream Processors

Kafka Streams provides a TopologyTestDriver that runs your entire stream processing topology in-process — no Kafka cluster needed. You pipe records in through TestInputTopic, read results from TestOutputTopic, and advance wall-clock time programmatically to test windowed aggregations. For Schema Registry integration, use MockSchemaRegistryClient to avoid hitting a real registry.

Key Takeaways

TopologyTestDriver runs your topology in-process. No Kafka, no ZooKeeper, no network. Your topology runs as a pure function you can call synchronously in tests.

Advance time programmatically for window tests. testDriver.advanceWallClockTime(Duration) moves the stream time forward, triggering window closures without waiting for real time to pass.

Test state stores directly. After piping records through the topology, query the KeyValueStore directly via testDriver.getKeyValueStore("store-name") to assert intermediate state.

Use MockSchemaRegistryClient for Avro/Protobuf. Inject it into both the serializer and the topology's schema registry URL config so your tests work offline with any schema.

Test punctuators by advancing wall-clock time. Punctuate callbacks (scheduled processing on time triggers) fire when you advance the wall clock past the schedule interval.

Why TopologyTestDriver Changes Everything

Integration-testing Kafka Streams applications used to mean spinning up a full Kafka cluster, deploying your application, and sending messages to real topics. That's slow, flaky, and hard to control.

TopologyTestDriver (part of kafka-streams-test-utils) runs your Topology or StreamsBuilder pipeline entirely in-process. Record processing is synchronous — you put a record in and the topology processes it immediately. There's no consumer group, no network, and no background threads.

This makes Kafka Streams one of the most testable stream processing systems available.

Project Setup

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.7.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>3.7.0</version>
    <scope>test</scope>
</dependency>

Your First Topology Test

Consider a simple order enrichment topology that reads raw orders and joins them with a customer table:

public class OrderEnrichmentTopology {

    public Topology build(StreamsBuilder builder) {
        KStream<String, Order> orders = builder.stream("raw-orders",
            Consumed.with(Serdes.String(), orderSerde()));

        KTable<String, Customer> customers = builder.table("customers",
            Consumed.with(Serdes.String(), customerSerde()));

        orders.join(customers,
                (order, customer) -> new EnrichedOrder(order, customer),
                Joined.with(Serdes.String(), orderSerde(), customerSerde()))
              .to("enriched-orders",
                  Produced.with(Serdes.String(), enrichedOrderSerde()));

        return builder.build();
    }
}

Test it with TopologyTestDriver:

class OrderEnrichmentTopologyTest {

    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Order> orderInput;
    private TestInputTopic<String, Customer> customerInput;
    private TestOutputTopic<String, EnrichedOrder> enrichedOutput;

    @BeforeEach
    void setUp() {
        StreamsBuilder builder = new StreamsBuilder();
        Topology topology = new OrderEnrichmentTopology().build(builder);

        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");

        testDriver = new TopologyTestDriver(topology, props);

        orderInput = testDriver.createInputTopic(
            "raw-orders", new StringSerializer(), new JsonSerializer<>());
        customerInput = testDriver.createInputTopic(
            "customers", new StringSerializer(), new JsonSerializer<>());
        enrichedOutput = testDriver.createOutputTopic(
            "enriched-orders", new StringDeserializer(), new JsonDeserializer<>());
    }

    @AfterEach
    void tearDown() {
        testDriver.close();
    }

    @Test
    void enrichesOrderWithCustomerData() {
        // Populate the KTable first
        customerInput.pipeInput("CUST-001", new Customer("CUST-001", "Alice Smith", "alice@example.com"));

        // Now send an order
        orderInput.pipeInput("ORDER-001", new Order("ORDER-001", "CUST-001", BigDecimal.valueOf(99.99)));

        // Assert the enriched output
        assertFalse(enrichedOutput.isEmpty());
        EnrichedOrder result = enrichedOutput.readValue();
        assertEquals("Alice Smith", result.getCustomerName());
        assertEquals("alice@example.com", result.getCustomerEmail());
        assertEquals(BigDecimal.valueOf(99.99), result.getOrderAmount());
    }

    @Test
    void producesNoOutputWhenCustomerNotFound() {
        // Send order without corresponding customer in KTable
        orderInput.pipeInput("ORDER-002", new Order("ORDER-002", "UNKNOWN-CUST", BigDecimal.valueOf(50.00)));

        // KTable join produces no output for missing keys
        assertTrue(enrichedOutput.isEmpty());
    }
}

Testing Stateful Aggregations

State stores are where Kafka Streams applications get interesting — and where bugs hide. TopologyTestDriver lets you query state stores directly.

public class OrderCountTopology {

    public static Topology build() {
        StreamsBuilder builder = new StreamsBuilder();

        builder.stream("orders", Consumed.with(Serdes.String(), orderSerde()))
               .groupBy((key, order) -> order.getCustomerId(),
                        Grouped.with(Serdes.String(), orderSerde()))
               .count(Materialized.as("order-counts"))
               .toStream()
               .to("order-count-events",
                   Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }
}
class OrderCountTopologyTest {

    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Order> input;
    private TestOutputTopic<String, Long> output;
    private KeyValueStore<String, Long> countStore;

    @BeforeEach
    void setUp() {
        testDriver = new TopologyTestDriver(OrderCountTopology.build(), testProps());
        input = testDriver.createInputTopic("orders",
            new StringSerializer(), new JsonSerializer<>());
        output = testDriver.createOutputTopic("order-count-events",
            new StringDeserializer(), new LongDeserializer());
        countStore = testDriver.getKeyValueStore("order-counts");
    }

    @Test
    void countsOrdersPerCustomer() {
        input.pipeInput("k1", new Order("O1", "CUST-A", BigDecimal.TEN));
        input.pipeInput("k2", new Order("O2", "CUST-A", BigDecimal.TEN));
        input.pipeInput("k3", new Order("O3", "CUST-B", BigDecimal.TEN));

        // Assert state store directly
        assertEquals(2L, countStore.get("CUST-A"));
        assertEquals(1L, countStore.get("CUST-B"));

        // Also verify the output stream captured the updates
        List<KeyValue<String, Long>> updates = output.readKeyValuesToList();
        assertEquals(3, updates.size());
        // Last update for CUST-A should show count of 2
        assertEquals(2L, updates.get(1).value);
    }
}

Testing Windowed Aggregations

Tumbling and sliding windows are tricky to test because they depend on event timestamps. TopologyTestDriver lets you control timestamps precisely.

public class HourlySalesTopology {

    public static Topology build() {
        StreamsBuilder builder = new StreamsBuilder();

        builder.stream("sales", Consumed.with(Serdes.String(), saleSerde()))
               .groupByKey()
               .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
               .aggregate(
                   () -> BigDecimal.ZERO,
                   (key, sale, total) -> total.add(sale.getAmount()),
                   Materialized.<String, BigDecimal, WindowStore<Bytes, byte[]>>as("hourly-sales")
                       .withKeySerde(Serdes.String())
                       .withValueSerde(bigDecimalSerde())
               )
               .toStream()
               .map((windowedKey, total) -> KeyValue.pair(windowedKey.key(), total))
               .to("hourly-totals", Produced.with(Serdes.String(), bigDecimalSerde()));

        return builder.build();
    }
}
class HourlySalesTopologyTest {

    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Sale> input;
    private TestOutputTopic<String, BigDecimal> output;

    @BeforeEach
    void setUp() {
        testDriver = new TopologyTestDriver(HourlySalesTopology.build(), testProps());
        input = testDriver.createInputTopic("sales",
            new StringSerializer(), new JsonSerializer<>(),
            Instant.parse("2026-01-01T10:00:00Z"), Duration.ZERO);  // start timestamp
        output = testDriver.createOutputTopic("hourly-totals",
            new StringDeserializer(), new BigDecimalDeserializer());
    }

    @Test
    void aggregatesSalesWithinHourWindow() {
        Instant windowStart = Instant.parse("2026-01-01T10:00:00Z");

        // Send 3 sales within the same hour
        input.pipeInput("store-A", new Sale(BigDecimal.valueOf(100)), windowStart);
        input.pipeInput("store-A", new Sale(BigDecimal.valueOf(200)), windowStart.plusSeconds(1800));
        input.pipeInput("store-A", new Sale(BigDecimal.valueOf(150)), windowStart.plusSeconds(3500));

        // Advance past the window to close it
        testDriver.advanceWallClockTime(Duration.ofHours(2));

        // Pipe a record in the next window to trigger the close of the first
        input.pipeInput("store-A", new Sale(BigDecimal.valueOf(50)),
            windowStart.plus(Duration.ofHours(1)).plusSeconds(100));

        List<KeyValue<String, BigDecimal>> results = output.readKeyValuesToList();
        // Find the final aggregate for the first window
        Optional<KeyValue<String, BigDecimal>> windowResult = results.stream()
            .filter(kv -> kv.key.equals("store-A"))
            .reduce((first, second) -> second);  // last update

        assertTrue(windowResult.isPresent());
        assertEquals(BigDecimal.valueOf(450), windowResult.get().value);
    }

    @Test
    void separatesSalesAcrossWindowBoundaries() {
        Instant hour1 = Instant.parse("2026-01-01T10:30:00Z");
        Instant hour2 = Instant.parse("2026-01-01T11:30:00Z");

        input.pipeInput("store-B", new Sale(BigDecimal.valueOf(100)), hour1);
        input.pipeInput("store-B", new Sale(BigDecimal.valueOf(200)), hour2);

        // Advance to close both windows
        testDriver.advanceWallClockTime(Duration.ofHours(3));
        input.pipeInput("store-B", new Sale(BigDecimal.valueOf(1)),
            Instant.parse("2026-01-01T13:00:00Z"));

        List<BigDecimal> results = output.readValuesToList();
        // Each sale is in its own window
        assertTrue(results.stream().anyMatch(v -> v.compareTo(BigDecimal.valueOf(100)) == 0));
        assertTrue(results.stream().anyMatch(v -> v.compareTo(BigDecimal.valueOf(200)) == 0));
    }
}

Testing Punctuators

Punctuators are scheduled callbacks that fire on wall-clock time or stream time. They're used for evictions, TTL logic, and periodic flushes.

public class SessionCleanupProcessor implements Processor<String, Event, String, String> {

    private KeyValueStore<String, Session> sessionStore;
    private ProcessorContext<String, String> context;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        sessionStore = context.getStateStore("sessions");

        // Schedule cleanup every 5 minutes
        context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME,
            timestamp -> evictExpiredSessions(timestamp));
    }

    private void evictExpiredSessions(long currentTime) {
        try (KeyValueIterator<String, Session> iter = sessionStore.all()) {
            while (iter.hasNext()) {
                KeyValue<String, Session> entry = iter.next();
                if (entry.value.isExpired(currentTime)) {
                    sessionStore.delete(entry.key);
                    context.forward(entry.key, "EXPIRED");
                }
            }
        }
    }
    // ...
}
@Test
void evictsExpiredSessionsOnPunctuate() {
    Instant now = Instant.now();

    // Create sessions — one that will expire, one that won't
    sessionInput.pipeInput("user-A",
        new Event("LOGIN", now.minus(Duration.ofHours(2))));
    sessionInput.pipeInput("user-B",
        new Event("LOGIN", now.minus(Duration.ofMinutes(1))));

    // Advance wall-clock past the punctuate interval
    testDriver.advanceWallClockTime(Duration.ofMinutes(6));

    // Expired session should appear in output
    List<String> expired = output.readValuesToList();
    assertTrue(expired.contains("user-A"));
    assertFalse(expired.contains("user-B"));

    // State store should no longer hold the expired session
    KeyValueStore<String, Session> store = testDriver.getKeyValueStore("sessions");
    assertNull(store.get("user-A"));
    assertNotNull(store.get("user-B"));
}

Schema Registry Integration Testing

If your topology uses Avro or Protobuf serialization with Schema Registry, inject a MockSchemaRegistryClient to avoid hitting a real registry:

class AvroTopologyTest {

    private TopologyTestDriver testDriver;
    private MockSchemaRegistryClient schemaRegistryClient;

    @BeforeEach
    void setUp() throws Exception {
        schemaRegistryClient = new MockSchemaRegistryClient();

        // Register schemas upfront
        schemaRegistryClient.register("orders-value", Order.getClassSchema());
        schemaRegistryClient.register("enriched-orders-value", EnrichedOrder.getClassSchema());

        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test");
        serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        KafkaAvroSerializer<Order> serializer = new KafkaAvroSerializer<>(schemaRegistryClient);
        serializer.configure(serdeConfig, false);

        KafkaAvroDeserializer<EnrichedOrder> deserializer =
            new KafkaAvroDeserializer<>(schemaRegistryClient);
        deserializer.configure(serdeConfig, false);

        Properties props = testProps();
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test");

        testDriver = new TopologyTestDriver(buildTopology(schemaRegistryClient), props);

        input = testDriver.createInputTopic("orders",
            new StringSerializer(), serializer);
        output = testDriver.createOutputTopic("enriched-orders",
            new StringDeserializer(), deserializer);
    }

    @Test
    void serializesAndDeserializesAvroMessages() {
        Order order = Order.newBuilder()
            .setOrderId("ORD-001")
            .setCustomerId("CUST-001")
            .setAmount(99.99)
            .build();

        input.pipeInput("ORD-001", order);

        assertFalse(output.isEmpty());
        EnrichedOrder enriched = output.readValue();
        assertEquals("ORD-001", enriched.getOrderId());
    }
}

Testing Error Handling and Dead Letter Queues

Production topologies need to handle deserialization errors and processing failures. Test the error handler:

@Test
void routesDeserializationErrorsToDLQ() {
    // Send malformed bytes that can't be deserialized
    TestInputTopic<String, byte[]> rawInput = testDriver.createInputTopic(
        "orders-raw", new StringSerializer(), new ByteArraySerializer());

    rawInput.pipeInput("bad-key", "not-valid-json".getBytes());

    // The DLQ should receive the failed record
    TestOutputTopic<String, byte[]> dlq = testDriver.createOutputTopic(
        "orders-dlq", new StringDeserializer(), new ByteArrayDeserializer());

    assertFalse(dlq.isEmpty());
    byte[] dlqRecord = dlq.readValue();
    assertNotNull(dlqRecord);
}

Integration Tests with TestContainers

For end-to-end validation, run the topology against a real Kafka cluster using TestContainers:

@Testcontainers
class OrderEnrichmentIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));

    private KafkaStreams streams;

    @BeforeEach
    void setUp() {
        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-" + UUID.randomUUID());
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
            Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
            Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        new OrderEnrichmentTopology().build(builder);
        streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

    @AfterEach
    void tearDown() {
        streams.close();
    }

    @Test
    void processesOrdersEndToEnd() throws Exception {
        String bootstrapServers = kafka.getBootstrapServers();

        // Seed the customers topic (KTable source)
        try (Producer<String, String> producer = createProducer(bootstrapServers)) {
            producer.send(new ProducerRecord<>("customers", "CUST-001",
                "{\"id\":\"CUST-001\",\"name\":\"Alice\"}")).get();
        }

        // Wait for KTable to be populated
        Thread.sleep(500);

        // Send an order
        try (Producer<String, String> producer = createProducer(bootstrapServers)) {
            producer.send(new ProducerRecord<>("raw-orders", "ORDER-001",
                "{\"orderId\":\"ORDER-001\",\"customerId\":\"CUST-001\"}")).get();
        }

        // Consume from the output topic
        try (Consumer<String, String> consumer = createConsumer(bootstrapServers, "enriched-orders")) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
            assertFalse(records.isEmpty());
            String value = records.iterator().next().value();
            assertTrue(value.contains("Alice"));
        }
    }
}

What to Test vs. What to Skip

Test with TopologyTestDriver:

  • Record routing logic (filter, branch, split)
  • State store reads and writes
  • Windowed aggregation logic
  • Join behavior (KStream-KTable, KStream-KStream)
  • Punctuators and TTL logic
  • Serialization/deserialization with mock schema registry
  • Error handler routing

Test with TestContainers integration tests:

  • End-to-end consumer group behavior
  • Exactly-once semantics (EOS) configuration
  • Rebalancing and reprocessing after restart
  • Schema Registry compatibility checks
  • Lag monitoring integration

Skip testing:

  • Kafka broker internals (not your code)
  • Partition assignment strategies (tested by Kafka project)
  • Network failure simulation (use chaos testing tools for this)

Common Mistakes

Not closing the test driver. Call testDriver.close() in @AfterEach. Leaving it open leaks resources and can cause port conflicts between tests.

Using wall-clock time in windowed tests. Use pipeInput with explicit Instant timestamps, not the no-timestamp overload, for windowed aggregation tests. Stream time advances based on record timestamps, not wall-clock time.

Asserting on intermediate updates. KTables emit updates for every change. If you aggregate 3 records and assert output.readValuesToList().size() == 1, you'll fail — there are 3 updates. Read all values and check the final one.

Forgetting to seed KTables before KStream records. In a KStream-KTable join, the KTable must contain the join key before the stream record arrives. Pipe KTable records first in your test setup.

Read more