Testing Apache Flink Applications: Unit, Integration, and Stateful Stream Testing
Testing Apache Flink requires specialized tools at each layer: MiniClusterWithClientResource for topology-level tests, KeyedOneInputStreamOperatorTestHarness for stateful operators, and EmbeddedKafkaCluster for end-to-end integration. Event-time semantics and exactly-once guarantees demand explicit test harness control over watermarks and checkpoints.
Key Takeaways
Unit test operators in isolation. Use KeyedOneInputStreamOperatorTestHarness to feed elements and watermarks directly into a single operator without running a full Flink cluster.
Simulate event-time windows precisely. Advancing watermarks manually in test harnesses lets you trigger window computations on demand and assert on exactly the records you expect — no sleeping or polling required.
Validate exactly-once semantics explicitly. Testing EOS requires injecting failures mid-stream and verifying that downstream sinks see each record exactly once, not zero or multiple times.
Why Flink Testing Is Hard
Apache Flink occupies a unique space in the stream-processing world. Its stateful, event-time-aware execution model is powerful, but it makes testing significantly harder than testing a REST API or a batch job. Three things conspire against you:
- State is invisible by default. A
ValueState<Long>lives inside a managed state backend. You cannot inspect it with aSystem.out.println. - Time is decoupled from the system clock. Event-time processing advances via watermarks, not
System.currentTimeMillis(). A test that sleeps and hopes a window fires is unreliable. - Exactly-once semantics involve checkpointing. Verifying that your sink writes each record exactly once requires simulating failures — something a simple unit test cannot do.
Flink's testing libraries solve all three problems. This guide walks through each layer.
Setting Up the Test Dependencies
Add the following to your pom.xml. The flink-test-utils artifact brings in MiniClusterWithClientResource and the operator test harnesses.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.6.1</version>
<scope>test</scope>
</dependency>Unit Testing with MiniClusterWithClientResource
MiniClusterWithClientResource spins up a single-JVM Flink cluster for your test. It handles the JobManager, TaskManager, and cluster lifecycle. Use it when you want to test a full Flink DataStream topology end-to-end without external infrastructure.
@ClassRule
public static final MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testWordCountTopology() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> words = env.fromElements(
"the quick brown fox",
"the fox jumped",
"the quick rabbit"
);
DataStream<Tuple2<String, Integer>> counts = words
.flatMap(new Tokenizer())
.keyBy(t -> t.f0)
.sum(1);
List<Tuple2<String, Integer>> result = new ArrayList<>();
counts.addSink(result::add);
env.execute("word-count-test");
Map<String, Integer> map = result.stream()
.collect(Collectors.toMap(t -> t.f0, t -> t.f1, Integer::sum));
assertEquals(3, (int) map.get("the"));
assertEquals(2, (int) map.get("quick"));
assertEquals(2, (int) map.get("fox"));
}The key constraint: env.execute() is synchronous in test mode. The job completes before execute() returns, so you can safely inspect result afterward.
Testing Stateful Operators with KeyedOneInputStreamOperatorTestHarness
The MiniCluster approach is coarse. When you need to inspect internal state, trigger timers, or control watermarks precisely, use KeyedOneInputStreamOperatorTestHarness. It wraps a single operator and gives you direct control.
Consider a SessionWindowCounter that tracks per-user event counts within a 30-second session gap:
public class SessionWindowCounter
extends KeyedProcessFunction<String, UserEvent, SessionResult> {
private ValueState<Long> countState;
private ValueState<Long> timerState;
private static final long SESSION_GAP_MS = 30_000L;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(UserEvent event, Context ctx,
Collector<SessionResult> out) throws Exception {
Long count = countState.value();
countState.update(count == null ? 1L : count + 1);
Long currentTimer = timerState.value();
if (currentTimer != null) {
ctx.timerService().deleteEventTimeTimer(currentTimer);
}
long newTimer = event.timestamp + SESSION_GAP_MS;
ctx.timerService().registerEventTimeTimer(newTimer);
timerState.update(newTimer);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<SessionResult> out) throws Exception {
Long count = countState.value();
out.collect(new SessionResult(ctx.getCurrentKey(), count, timestamp));
countState.clear();
timerState.clear();
}
}Now test it without a full cluster:
@Test
public void testSessionWindowFires() throws Exception {
SessionWindowCounter operator = new SessionWindowCounter();
KeyedOneInputStreamOperatorTestHarness<String, UserEvent, SessionResult> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(operator),
event -> event.userId,
Types.STRING
);
harness.open();
// Feed two events for user "alice" at t=1000 and t=5000
harness.processElement(new UserEvent("alice", 1000L), 1000L);
harness.processElement(new UserEvent("alice", 5000L), 5000L);
// Advance watermark to 5000 — no output yet (timer is at 35000)
harness.processWatermark(5000L);
assertEquals(0, harness.extractOutputValues().size());
// Advance watermark past the session gap timer (5000 + 30000 = 35000)
harness.processWatermark(35001L);
List<SessionResult> output = harness.extractOutputValues();
assertEquals(1, output.size());
assertEquals("alice", output.get(0).userId);
assertEquals(2L, (long) output.get(0).count);
harness.close();
}Notice: no sleeping, no polling, no timing sensitivity. You advance the watermark to exactly the value that triggers the timer, and the operator fires deterministically.
Inspecting State Directly
The harness exposes the state backend, letting you read internal state mid-test:
harness.processElement(new UserEvent("bob", 2000L), 2000L);
// Read state for key "bob" without triggering output
harness.setCurrentKey("bob");
Long count = harness.getState(
new ValueStateDescriptor<>("count", Long.class)).value();
assertEquals(1L, (long) count);Testing Event-Time Windows
Tumbling and sliding window operators are tested the same way — advance the watermark past the window boundary and assert on the output.
@Test
public void testTumblingWindowAggregation() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Long>> input = env
.addSource(new FiniteEventSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(
Duration.ofSeconds(0))
.withTimestampAssigner((e, ts) -> e.f1));
DataStream<String> windowed = input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new CountAgg(), new WindowResultFunction());
// Collect output synchronously
List<String> results = new ArrayList<>();
windowed.addSink(results::add);
env.execute();
// FiniteEventSource emits events at t=1,5,9,15 for key "x"
// Window [0,10) should contain 3 events, window [10,20) should contain 1
assertTrue(results.contains("x:3"));
assertTrue(results.contains("x:1"));
}For finer control, use ListCollector in a harness instead:
OneInputStreamOperatorTestHarness<Tuple2<String, Long>, String> harness =
createWindowHarness();
harness.processElement(Tuple2.of("x", 1L), 1L);
harness.processElement(Tuple2.of("x", 5L), 5L);
harness.processElement(Tuple2.of("x", 9L), 9L);
// Window [0,10) hasn't fired yet
assertEquals(0, harness.extractOutputValues().size());
// Advance past window boundary
harness.processWatermark(10L);
List<String> out = harness.extractOutputValues();
assertEquals(1, out.size());
assertEquals("x:3", out.get(0));Integration Testing with Embedded Kafka
Unit tests validate operator logic. Integration tests validate the full pipeline — source connectors, transformations, sinks, and serialization. Use EmbeddedKafkaCluster (from the kafka-streams-test-utils package or the kafka test jar) to run a real Kafka broker in-process.
public class KafkaPipelineIntegrationTest {
private static EmbeddedKafkaCluster kafka;
private static final String INPUT_TOPIC = "raw-events";
private static final String OUTPUT_TOPIC = "aggregated-events";
@BeforeClass
public static void startKafka() throws Exception {
Properties brokerProps = new Properties();
brokerProps.setProperty("auto.create.topics.enable", "true");
kafka = new EmbeddedKafkaCluster(1, brokerProps);
kafka.start();
kafka.createTopic(INPUT_TOPIC, 1, 1);
kafka.createTopic(OUTPUT_TOPIC, 1, 1);
}
@AfterClass
public static void stopKafka() {
kafka.stop();
}
@Test
public void testEndToEndAggregation() throws Exception {
// Produce test messages
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafka.bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
try (KafkaProducer<String, String> producer =
new KafkaProducer<>(producerProps)) {
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>(INPUT_TOPIC, "user-1",
"{\"userId\":\"user-1\",\"event\":\"click\",\"ts\":" + (i * 1000) + "}"));
}
producer.flush();
}
// Run the Flink job against embedded Kafka
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(kafka.bootstrapServers())
.setTopics(INPUT_TOPIC)
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest()) // finite source for test
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka.bootstrapServers())
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
.map(json -> parseAndAggregate(json))
.filter(Objects::nonNull)
.sinkTo(sink);
env.execute("integration-test-job");
// Consume and assert output
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafka.bootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
List<String> outputs = new ArrayList<>();
try (KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(OUTPUT_TOPIC));
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(10));
records.forEach(r -> outputs.add(r.value()));
}
assertFalse("Expected output records", outputs.isEmpty());
assertTrue(outputs.stream().anyMatch(o -> o.contains("user-1")));
}
}Testing Exactly-Once Semantics
Exactly-once delivery requires checkpointing. Testing it means:
- Enable checkpointing in the test job.
- Inject a failure after some records are processed.
- Verify the sink received each record exactly once after recovery.
@Test
public void testExactlyOnceAfterFailure() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500); // checkpoint every 500ms
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
// FailingSource emits 100 records, then throws on record 50 on first attempt
DataStream<Integer> source = env.addSource(new FailingSource(100, 50));
CollectingSink<Integer> sink = new CollectingSink<>();
source.addSink(sink);
try {
env.execute("eos-test");
} catch (Exception ignored) {
// Job recovers from failure and completes
}
List<Integer> received = CollectingSink.getCollected();
// Each value 0-99 must appear exactly once
Map<Integer, Long> counts = received.stream()
.collect(Collectors.groupingBy(i -> i, Collectors.counting()));
for (int i = 0; i < 100; i++) {
assertEquals("Record " + i + " should appear exactly once",
1L, (long) counts.getOrDefault(i, 0L));
}
}The FailingSource uses a ListCheckpointed counter to track progress, so on recovery it resumes from the last checkpoint rather than replaying from the beginning.
Testing Async I/O Operators
Flink's AsyncDataStream pattern is common for enriching stream records with database lookups. Testing it requires a mock async function:
@Test
public void testAsyncEnrichment() throws Exception {
AsyncFunction<String, EnrichedRecord> mockEnricher =
(input, resultFuture) -> CompletableFuture
.supplyAsync(() -> new EnrichedRecord(input, "enriched-" + input))
.thenAccept(resultFuture::complete);
OneInputStreamOperatorTestHarness<String, EnrichedRecord> harness =
AsyncDataStream.unorderedWaitOperatorTestHarness(mockEnricher,
1000L, TimeUnit.MILLISECONDS, 10);
harness.open();
harness.processElement("event-1", 100L);
harness.processElement("event-2", 200L);
// Wait for async completions
harness.endInput();
List<EnrichedRecord> results = harness.extractOutputValues();
assertEquals(2, results.size());
assertTrue(results.stream().anyMatch(r -> r.original.equals("event-1")));
}Best Practices
Isolate time from logic. Never use System.currentTimeMillis() inside Flink operators. Use the Flink timer service and process functions so tests can control time via watermarks.
Keep operators small. A KeyedProcessFunction that does one thing is far easier to test than a mega-operator with branching logic. Decompose before you test.
Use @ClassRule for MiniCluster. Starting and stopping a Flink cluster per test is slow (2-5 seconds). A @ClassRule shares the cluster across all tests in a class.
Snapshot and restore state. Test state migration by snapshotting a harness, creating a new harness with a modified operator, restoring from the snapshot, and asserting on behavior:
OperatorSubtaskState snapshot = harness.snapshot(1L, 100L);
// Create new harness with updated operator
KeyedOneInputStreamOperatorTestHarness<...> newHarness = createHarness();
newHarness.initializeState(snapshot);
newHarness.open();
// Old state is visible to new operator version
newHarness.processElement(new UserEvent("alice", 40000L), 40000L);
newHarness.processWatermark(70001L);
// Should see count=3 (2 from old harness + 1 new)
assertEquals(3L, (long) newHarness.extractOutputValues().get(0).count);Parallelize integration tests carefully. Embedded Kafka brokers bind to ports. Use 0 for the port to get a random available port, and always share the cluster at @BeforeClass scope.
Putting It All Together
A complete Flink testing strategy has three layers:
| Layer | Tool | What It Tests | Speed |
|---|---|---|---|
| Operator unit | KeyedOneInputStreamOperatorTestHarness |
Single operator logic, state, timers | < 100ms |
| Topology unit | MiniClusterWithClientResource |
Full DAG, serialization, parallelism | 2-5s |
| Integration | Embedded Kafka + MiniCluster | Source/sink connectors, EOS, backpressure | 10-30s |
Run operator and topology tests on every commit. Reserve integration tests for CI pipelines where you can afford the overhead.
HelpMeTest can run your data pipeline tests automatically — sign up free