Kafka Streams Testing: TopologyTestDriver and Stateful Stream Processors
Kafka Streams provides a TopologyTestDriver that runs your entire stream processing topology in-process — no Kafka cluster needed. You pipe records in through TestInputTopic, read results from TestOutputTopic, and advance wall-clock time programmatically to test windowed aggregations. For Schema Registry integration, use MockSchemaRegistryClient to avoid hitting a real registry.
Key Takeaways
TopologyTestDriver runs your topology in-process. No Kafka, no ZooKeeper, no network. Your topology runs as a pure function you can call synchronously in tests.
Advance time programmatically for window tests. testDriver.advanceWallClockTime(Duration) moves the stream time forward, triggering window closures without waiting for real time to pass.
Test state stores directly. After piping records through the topology, query the KeyValueStore directly via testDriver.getKeyValueStore("store-name") to assert intermediate state.
Use MockSchemaRegistryClient for Avro/Protobuf. Inject it into both the serializer and the topology's schema registry URL config so your tests work offline with any schema.
Test punctuators by advancing wall-clock time. Punctuate callbacks (scheduled processing on time triggers) fire when you advance the wall clock past the schedule interval.
Why TopologyTestDriver Changes Everything
Integration-testing Kafka Streams applications used to mean spinning up a full Kafka cluster, deploying your application, and sending messages to real topics. That's slow, flaky, and hard to control.
TopologyTestDriver (part of kafka-streams-test-utils) runs your Topology or StreamsBuilder pipeline entirely in-process. Record processing is synchronous — you put a record in and the topology processes it immediately. There's no consumer group, no network, and no background threads.
This makes Kafka Streams one of the most testable stream processing systems available.
Project Setup
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.7.0</version>
<scope>test</scope>
</dependency>Your First Topology Test
Consider a simple order enrichment topology that reads raw orders and joins them with a customer table:
public class OrderEnrichmentTopology {
public Topology build(StreamsBuilder builder) {
KStream<String, Order> orders = builder.stream("raw-orders",
Consumed.with(Serdes.String(), orderSerde()));
KTable<String, Customer> customers = builder.table("customers",
Consumed.with(Serdes.String(), customerSerde()));
orders.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderSerde(), customerSerde()))
.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde()));
return builder.build();
}
}Test it with TopologyTestDriver:
class OrderEnrichmentTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Order> orderInput;
private TestInputTopic<String, Customer> customerInput;
private TestOutputTopic<String, EnrichedOrder> enrichedOutput;
@BeforeEach
void setUp() {
StreamsBuilder builder = new StreamsBuilder();
Topology topology = new OrderEnrichmentTopology().build(builder);
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
testDriver = new TopologyTestDriver(topology, props);
orderInput = testDriver.createInputTopic(
"raw-orders", new StringSerializer(), new JsonSerializer<>());
customerInput = testDriver.createInputTopic(
"customers", new StringSerializer(), new JsonSerializer<>());
enrichedOutput = testDriver.createOutputTopic(
"enriched-orders", new StringDeserializer(), new JsonDeserializer<>());
}
@AfterEach
void tearDown() {
testDriver.close();
}
@Test
void enrichesOrderWithCustomerData() {
// Populate the KTable first
customerInput.pipeInput("CUST-001", new Customer("CUST-001", "Alice Smith", "alice@example.com"));
// Now send an order
orderInput.pipeInput("ORDER-001", new Order("ORDER-001", "CUST-001", BigDecimal.valueOf(99.99)));
// Assert the enriched output
assertFalse(enrichedOutput.isEmpty());
EnrichedOrder result = enrichedOutput.readValue();
assertEquals("Alice Smith", result.getCustomerName());
assertEquals("alice@example.com", result.getCustomerEmail());
assertEquals(BigDecimal.valueOf(99.99), result.getOrderAmount());
}
@Test
void producesNoOutputWhenCustomerNotFound() {
// Send order without corresponding customer in KTable
orderInput.pipeInput("ORDER-002", new Order("ORDER-002", "UNKNOWN-CUST", BigDecimal.valueOf(50.00)));
// KTable join produces no output for missing keys
assertTrue(enrichedOutput.isEmpty());
}
}Testing Stateful Aggregations
State stores are where Kafka Streams applications get interesting — and where bugs hide. TopologyTestDriver lets you query state stores directly.
public class OrderCountTopology {
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde()))
.groupBy((key, order) -> order.getCustomerId(),
Grouped.with(Serdes.String(), orderSerde()))
.count(Materialized.as("order-counts"))
.toStream()
.to("order-count-events",
Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
}class OrderCountTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Order> input;
private TestOutputTopic<String, Long> output;
private KeyValueStore<String, Long> countStore;
@BeforeEach
void setUp() {
testDriver = new TopologyTestDriver(OrderCountTopology.build(), testProps());
input = testDriver.createInputTopic("orders",
new StringSerializer(), new JsonSerializer<>());
output = testDriver.createOutputTopic("order-count-events",
new StringDeserializer(), new LongDeserializer());
countStore = testDriver.getKeyValueStore("order-counts");
}
@Test
void countsOrdersPerCustomer() {
input.pipeInput("k1", new Order("O1", "CUST-A", BigDecimal.TEN));
input.pipeInput("k2", new Order("O2", "CUST-A", BigDecimal.TEN));
input.pipeInput("k3", new Order("O3", "CUST-B", BigDecimal.TEN));
// Assert state store directly
assertEquals(2L, countStore.get("CUST-A"));
assertEquals(1L, countStore.get("CUST-B"));
// Also verify the output stream captured the updates
List<KeyValue<String, Long>> updates = output.readKeyValuesToList();
assertEquals(3, updates.size());
// Last update for CUST-A should show count of 2
assertEquals(2L, updates.get(1).value);
}
}Testing Windowed Aggregations
Tumbling and sliding windows are tricky to test because they depend on event timestamps. TopologyTestDriver lets you control timestamps precisely.
public class HourlySalesTopology {
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("sales", Consumed.with(Serdes.String(), saleSerde()))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
() -> BigDecimal.ZERO,
(key, sale, total) -> total.add(sale.getAmount()),
Materialized.<String, BigDecimal, WindowStore<Bytes, byte[]>>as("hourly-sales")
.withKeySerde(Serdes.String())
.withValueSerde(bigDecimalSerde())
)
.toStream()
.map((windowedKey, total) -> KeyValue.pair(windowedKey.key(), total))
.to("hourly-totals", Produced.with(Serdes.String(), bigDecimalSerde()));
return builder.build();
}
}class HourlySalesTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Sale> input;
private TestOutputTopic<String, BigDecimal> output;
@BeforeEach
void setUp() {
testDriver = new TopologyTestDriver(HourlySalesTopology.build(), testProps());
input = testDriver.createInputTopic("sales",
new StringSerializer(), new JsonSerializer<>(),
Instant.parse("2026-01-01T10:00:00Z"), Duration.ZERO); // start timestamp
output = testDriver.createOutputTopic("hourly-totals",
new StringDeserializer(), new BigDecimalDeserializer());
}
@Test
void aggregatesSalesWithinHourWindow() {
Instant windowStart = Instant.parse("2026-01-01T10:00:00Z");
// Send 3 sales within the same hour
input.pipeInput("store-A", new Sale(BigDecimal.valueOf(100)), windowStart);
input.pipeInput("store-A", new Sale(BigDecimal.valueOf(200)), windowStart.plusSeconds(1800));
input.pipeInput("store-A", new Sale(BigDecimal.valueOf(150)), windowStart.plusSeconds(3500));
// Advance past the window to close it
testDriver.advanceWallClockTime(Duration.ofHours(2));
// Pipe a record in the next window to trigger the close of the first
input.pipeInput("store-A", new Sale(BigDecimal.valueOf(50)),
windowStart.plus(Duration.ofHours(1)).plusSeconds(100));
List<KeyValue<String, BigDecimal>> results = output.readKeyValuesToList();
// Find the final aggregate for the first window
Optional<KeyValue<String, BigDecimal>> windowResult = results.stream()
.filter(kv -> kv.key.equals("store-A"))
.reduce((first, second) -> second); // last update
assertTrue(windowResult.isPresent());
assertEquals(BigDecimal.valueOf(450), windowResult.get().value);
}
@Test
void separatesSalesAcrossWindowBoundaries() {
Instant hour1 = Instant.parse("2026-01-01T10:30:00Z");
Instant hour2 = Instant.parse("2026-01-01T11:30:00Z");
input.pipeInput("store-B", new Sale(BigDecimal.valueOf(100)), hour1);
input.pipeInput("store-B", new Sale(BigDecimal.valueOf(200)), hour2);
// Advance to close both windows
testDriver.advanceWallClockTime(Duration.ofHours(3));
input.pipeInput("store-B", new Sale(BigDecimal.valueOf(1)),
Instant.parse("2026-01-01T13:00:00Z"));
List<BigDecimal> results = output.readValuesToList();
// Each sale is in its own window
assertTrue(results.stream().anyMatch(v -> v.compareTo(BigDecimal.valueOf(100)) == 0));
assertTrue(results.stream().anyMatch(v -> v.compareTo(BigDecimal.valueOf(200)) == 0));
}
}Testing Punctuators
Punctuators are scheduled callbacks that fire on wall-clock time or stream time. They're used for evictions, TTL logic, and periodic flushes.
public class SessionCleanupProcessor implements Processor<String, Event, String, String> {
private KeyValueStore<String, Session> sessionStore;
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
sessionStore = context.getStateStore("sessions");
// Schedule cleanup every 5 minutes
context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME,
timestamp -> evictExpiredSessions(timestamp));
}
private void evictExpiredSessions(long currentTime) {
try (KeyValueIterator<String, Session> iter = sessionStore.all()) {
while (iter.hasNext()) {
KeyValue<String, Session> entry = iter.next();
if (entry.value.isExpired(currentTime)) {
sessionStore.delete(entry.key);
context.forward(entry.key, "EXPIRED");
}
}
}
}
// ...
}@Test
void evictsExpiredSessionsOnPunctuate() {
Instant now = Instant.now();
// Create sessions — one that will expire, one that won't
sessionInput.pipeInput("user-A",
new Event("LOGIN", now.minus(Duration.ofHours(2))));
sessionInput.pipeInput("user-B",
new Event("LOGIN", now.minus(Duration.ofMinutes(1))));
// Advance wall-clock past the punctuate interval
testDriver.advanceWallClockTime(Duration.ofMinutes(6));
// Expired session should appear in output
List<String> expired = output.readValuesToList();
assertTrue(expired.contains("user-A"));
assertFalse(expired.contains("user-B"));
// State store should no longer hold the expired session
KeyValueStore<String, Session> store = testDriver.getKeyValueStore("sessions");
assertNull(store.get("user-A"));
assertNotNull(store.get("user-B"));
}Schema Registry Integration Testing
If your topology uses Avro or Protobuf serialization with Schema Registry, inject a MockSchemaRegistryClient to avoid hitting a real registry:
class AvroTopologyTest {
private TopologyTestDriver testDriver;
private MockSchemaRegistryClient schemaRegistryClient;
@BeforeEach
void setUp() throws Exception {
schemaRegistryClient = new MockSchemaRegistryClient();
// Register schemas upfront
schemaRegistryClient.register("orders-value", Order.getClassSchema());
schemaRegistryClient.register("enriched-orders-value", EnrichedOrder.getClassSchema());
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test");
serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaAvroSerializer<Order> serializer = new KafkaAvroSerializer<>(schemaRegistryClient);
serializer.configure(serdeConfig, false);
KafkaAvroDeserializer<EnrichedOrder> deserializer =
new KafkaAvroDeserializer<>(schemaRegistryClient);
deserializer.configure(serdeConfig, false);
Properties props = testProps();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://test");
testDriver = new TopologyTestDriver(buildTopology(schemaRegistryClient), props);
input = testDriver.createInputTopic("orders",
new StringSerializer(), serializer);
output = testDriver.createOutputTopic("enriched-orders",
new StringDeserializer(), deserializer);
}
@Test
void serializesAndDeserializesAvroMessages() {
Order order = Order.newBuilder()
.setOrderId("ORD-001")
.setCustomerId("CUST-001")
.setAmount(99.99)
.build();
input.pipeInput("ORD-001", order);
assertFalse(output.isEmpty());
EnrichedOrder enriched = output.readValue();
assertEquals("ORD-001", enriched.getOrderId());
}
}Testing Error Handling and Dead Letter Queues
Production topologies need to handle deserialization errors and processing failures. Test the error handler:
@Test
void routesDeserializationErrorsToDLQ() {
// Send malformed bytes that can't be deserialized
TestInputTopic<String, byte[]> rawInput = testDriver.createInputTopic(
"orders-raw", new StringSerializer(), new ByteArraySerializer());
rawInput.pipeInput("bad-key", "not-valid-json".getBytes());
// The DLQ should receive the failed record
TestOutputTopic<String, byte[]> dlq = testDriver.createOutputTopic(
"orders-dlq", new StringDeserializer(), new ByteArrayDeserializer());
assertFalse(dlq.isEmpty());
byte[] dlqRecord = dlq.readValue();
assertNotNull(dlqRecord);
}Integration Tests with TestContainers
For end-to-end validation, run the topology against a real Kafka cluster using TestContainers:
@Testcontainers
class OrderEnrichmentIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
private KafkaStreams streams;
@BeforeEach
void setUp() {
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-" + UUID.randomUUID());
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
new OrderEnrichmentTopology().build(builder);
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
@AfterEach
void tearDown() {
streams.close();
}
@Test
void processesOrdersEndToEnd() throws Exception {
String bootstrapServers = kafka.getBootstrapServers();
// Seed the customers topic (KTable source)
try (Producer<String, String> producer = createProducer(bootstrapServers)) {
producer.send(new ProducerRecord<>("customers", "CUST-001",
"{\"id\":\"CUST-001\",\"name\":\"Alice\"}")).get();
}
// Wait for KTable to be populated
Thread.sleep(500);
// Send an order
try (Producer<String, String> producer = createProducer(bootstrapServers)) {
producer.send(new ProducerRecord<>("raw-orders", "ORDER-001",
"{\"orderId\":\"ORDER-001\",\"customerId\":\"CUST-001\"}")).get();
}
// Consume from the output topic
try (Consumer<String, String> consumer = createConsumer(bootstrapServers, "enriched-orders")) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertFalse(records.isEmpty());
String value = records.iterator().next().value();
assertTrue(value.contains("Alice"));
}
}
}What to Test vs. What to Skip
Test with TopologyTestDriver:
- Record routing logic (filter, branch, split)
- State store reads and writes
- Windowed aggregation logic
- Join behavior (KStream-KTable, KStream-KStream)
- Punctuators and TTL logic
- Serialization/deserialization with mock schema registry
- Error handler routing
Test with TestContainers integration tests:
- End-to-end consumer group behavior
- Exactly-once semantics (EOS) configuration
- Rebalancing and reprocessing after restart
- Schema Registry compatibility checks
- Lag monitoring integration
Skip testing:
- Kafka broker internals (not your code)
- Partition assignment strategies (tested by Kafka project)
- Network failure simulation (use chaos testing tools for this)
Common Mistakes
Not closing the test driver. Call testDriver.close() in @AfterEach. Leaving it open leaks resources and can cause port conflicts between tests.
Using wall-clock time in windowed tests. Use pipeInput with explicit Instant timestamps, not the no-timestamp overload, for windowed aggregation tests. Stream time advances based on record timestamps, not wall-clock time.
Asserting on intermediate updates. KTables emit updates for every change. If you aggregate 3 records and assert output.readValuesToList().size() == 1, you'll fail — there are 3 updates. Read all values and check the final one.
Forgetting to seed KTables before KStream records. In a KStream-KTable join, the KTable must contain the join key before the stream record arrives. Pipe KTable records first in your test setup.