Testing Apache Flink Applications: Unit, Integration, and Stateful Stream Testing

Testing Apache Flink Applications: Unit, Integration, and Stateful Stream Testing

Testing Apache Flink requires specialized tools at each layer: MiniClusterWithClientResource for topology-level tests, KeyedOneInputStreamOperatorTestHarness for stateful operators, and EmbeddedKafkaCluster for end-to-end integration. Event-time semantics and exactly-once guarantees demand explicit test harness control over watermarks and checkpoints.

Key Takeaways

Unit test operators in isolation. Use KeyedOneInputStreamOperatorTestHarness to feed elements and watermarks directly into a single operator without running a full Flink cluster.

Simulate event-time windows precisely. Advancing watermarks manually in test harnesses lets you trigger window computations on demand and assert on exactly the records you expect — no sleeping or polling required.

Validate exactly-once semantics explicitly. Testing EOS requires injecting failures mid-stream and verifying that downstream sinks see each record exactly once, not zero or multiple times.

Apache Flink occupies a unique space in the stream-processing world. Its stateful, event-time-aware execution model is powerful, but it makes testing significantly harder than testing a REST API or a batch job. Three things conspire against you:

  1. State is invisible by default. A ValueState<Long> lives inside a managed state backend. You cannot inspect it with a System.out.println.
  2. Time is decoupled from the system clock. Event-time processing advances via watermarks, not System.currentTimeMillis(). A test that sleeps and hopes a window fires is unreliable.
  3. Exactly-once semantics involve checkpointing. Verifying that your sink writes each record exactly once requires simulating failures — something a simple unit test cannot do.

Flink's testing libraries solve all three problems. This guide walks through each layer.

Setting Up the Test Dependencies

Add the following to your pom.xml. The flink-test-utils artifact brings in MiniClusterWithClientResource and the operator test harnesses.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.18.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>3.1.0-1.18</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.6.1</version>
    <scope>test</scope>
</dependency>

Unit Testing with MiniClusterWithClientResource

MiniClusterWithClientResource spins up a single-JVM Flink cluster for your test. It handles the JobManager, TaskManager, and cluster lifecycle. Use it when you want to test a full Flink DataStream topology end-to-end without external infrastructure.

@ClassRule
public static final MiniClusterWithClientResource flinkCluster =
    new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(2)
            .setNumberTaskManagers(1)
            .build());

@Test
public void testWordCountTopology() throws Exception {
    StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<String> words = env.fromElements(
        "the quick brown fox",
        "the fox jumped",
        "the quick rabbit"
    );

    DataStream<Tuple2<String, Integer>> counts = words
        .flatMap(new Tokenizer())
        .keyBy(t -> t.f0)
        .sum(1);

    List<Tuple2<String, Integer>> result = new ArrayList<>();
    counts.addSink(result::add);

    env.execute("word-count-test");

    Map<String, Integer> map = result.stream()
        .collect(Collectors.toMap(t -> t.f0, t -> t.f1, Integer::sum));

    assertEquals(3, (int) map.get("the"));
    assertEquals(2, (int) map.get("quick"));
    assertEquals(2, (int) map.get("fox"));
}

The key constraint: env.execute() is synchronous in test mode. The job completes before execute() returns, so you can safely inspect result afterward.

Testing Stateful Operators with KeyedOneInputStreamOperatorTestHarness

The MiniCluster approach is coarse. When you need to inspect internal state, trigger timers, or control watermarks precisely, use KeyedOneInputStreamOperatorTestHarness. It wraps a single operator and gives you direct control.

Consider a SessionWindowCounter that tracks per-user event counts within a 30-second session gap:

public class SessionWindowCounter
    extends KeyedProcessFunction<String, UserEvent, SessionResult> {

    private ValueState<Long> countState;
    private ValueState<Long> timerState;
    private static final long SESSION_GAP_MS = 30_000L;

    @Override
    public void open(Configuration parameters) {
        countState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("count", Long.class));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class));
    }

    @Override
    public void processElement(UserEvent event, Context ctx,
                               Collector<SessionResult> out) throws Exception {
        Long count = countState.value();
        countState.update(count == null ? 1L : count + 1);

        Long currentTimer = timerState.value();
        if (currentTimer != null) {
            ctx.timerService().deleteEventTimeTimer(currentTimer);
        }
        long newTimer = event.timestamp + SESSION_GAP_MS;
        ctx.timerService().registerEventTimeTimer(newTimer);
        timerState.update(newTimer);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
                        Collector<SessionResult> out) throws Exception {
        Long count = countState.value();
        out.collect(new SessionResult(ctx.getCurrentKey(), count, timestamp));
        countState.clear();
        timerState.clear();
    }
}

Now test it without a full cluster:

@Test
public void testSessionWindowFires() throws Exception {
    SessionWindowCounter operator = new SessionWindowCounter();

    KeyedOneInputStreamOperatorTestHarness<String, UserEvent, SessionResult> harness =
        new KeyedOneInputStreamOperatorTestHarness<>(
            new KeyedProcessOperator<>(operator),
            event -> event.userId,
            Types.STRING
        );

    harness.open();

    // Feed two events for user "alice" at t=1000 and t=5000
    harness.processElement(new UserEvent("alice", 1000L), 1000L);
    harness.processElement(new UserEvent("alice", 5000L), 5000L);

    // Advance watermark to 5000 — no output yet (timer is at 35000)
    harness.processWatermark(5000L);
    assertEquals(0, harness.extractOutputValues().size());

    // Advance watermark past the session gap timer (5000 + 30000 = 35000)
    harness.processWatermark(35001L);

    List<SessionResult> output = harness.extractOutputValues();
    assertEquals(1, output.size());
    assertEquals("alice", output.get(0).userId);
    assertEquals(2L, (long) output.get(0).count);

    harness.close();
}

Notice: no sleeping, no polling, no timing sensitivity. You advance the watermark to exactly the value that triggers the timer, and the operator fires deterministically.

Inspecting State Directly

The harness exposes the state backend, letting you read internal state mid-test:

harness.processElement(new UserEvent("bob", 2000L), 2000L);

// Read state for key "bob" without triggering output
harness.setCurrentKey("bob");
Long count = harness.getState(
    new ValueStateDescriptor<>("count", Long.class)).value();
assertEquals(1L, (long) count);

Testing Event-Time Windows

Tumbling and sliding window operators are tested the same way — advance the watermark past the window boundary and assert on the output.

@Test
public void testTumblingWindowAggregation() throws Exception {
    StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<Tuple2<String, Long>> input = env
        .addSource(new FiniteEventSource())
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(
                    Duration.ofSeconds(0))
                .withTimestampAssigner((e, ts) -> e.f1));

    DataStream<String> windowed = input
        .keyBy(t -> t.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .aggregate(new CountAgg(), new WindowResultFunction());

    // Collect output synchronously
    List<String> results = new ArrayList<>();
    windowed.addSink(results::add);

    env.execute();

    // FiniteEventSource emits events at t=1,5,9,15 for key "x"
    // Window [0,10) should contain 3 events, window [10,20) should contain 1
    assertTrue(results.contains("x:3"));
    assertTrue(results.contains("x:1"));
}

For finer control, use ListCollector in a harness instead:

OneInputStreamOperatorTestHarness<Tuple2<String, Long>, String> harness =
    createWindowHarness();

harness.processElement(Tuple2.of("x", 1L), 1L);
harness.processElement(Tuple2.of("x", 5L), 5L);
harness.processElement(Tuple2.of("x", 9L), 9L);

// Window [0,10) hasn't fired yet
assertEquals(0, harness.extractOutputValues().size());

// Advance past window boundary
harness.processWatermark(10L);

List<String> out = harness.extractOutputValues();
assertEquals(1, out.size());
assertEquals("x:3", out.get(0));

Integration Testing with Embedded Kafka

Unit tests validate operator logic. Integration tests validate the full pipeline — source connectors, transformations, sinks, and serialization. Use EmbeddedKafkaCluster (from the kafka-streams-test-utils package or the kafka test jar) to run a real Kafka broker in-process.

public class KafkaPipelineIntegrationTest {

    private static EmbeddedKafkaCluster kafka;
    private static final String INPUT_TOPIC = "raw-events";
    private static final String OUTPUT_TOPIC = "aggregated-events";

    @BeforeClass
    public static void startKafka() throws Exception {
        Properties brokerProps = new Properties();
        brokerProps.setProperty("auto.create.topics.enable", "true");
        kafka = new EmbeddedKafkaCluster(1, brokerProps);
        kafka.start();
        kafka.createTopic(INPUT_TOPIC, 1, 1);
        kafka.createTopic(OUTPUT_TOPIC, 1, 1);
    }

    @AfterClass
    public static void stopKafka() {
        kafka.stop();
    }

    @Test
    public void testEndToEndAggregation() throws Exception {
        // Produce test messages
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafka.bootstrapServers());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());

        try (KafkaProducer<String, String> producer =
                 new KafkaProducer<>(producerProps)) {
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>(INPUT_TOPIC, "user-1",
                    "{\"userId\":\"user-1\",\"event\":\"click\",\"ts\":" + (i * 1000) + "}"));
            }
            producer.flush();
        }

        // Run the Flink job against embedded Kafka
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(kafka.bootstrapServers())
            .setTopics(INPUT_TOPIC)
            .setGroupId("test-group")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setBounded(OffsetsInitializer.latest())  // finite source for test
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers(kafka.bootstrapServers())
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(OUTPUT_TOPIC)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
            .build();

        env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
            .map(json -> parseAndAggregate(json))
            .filter(Objects::nonNull)
            .sinkTo(sink);

        env.execute("integration-test-job");

        // Consume and assert output
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafka.bootstrapServers());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());

        List<String> outputs = new ArrayList<>();
        try (KafkaConsumer<String, String> consumer =
                 new KafkaConsumer<>(consumerProps)) {
            consumer.subscribe(Collections.singletonList(OUTPUT_TOPIC));
            ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofSeconds(10));
            records.forEach(r -> outputs.add(r.value()));
        }

        assertFalse("Expected output records", outputs.isEmpty());
        assertTrue(outputs.stream().anyMatch(o -> o.contains("user-1")));
    }
}

Testing Exactly-Once Semantics

Exactly-once delivery requires checkpointing. Testing it means:

  1. Enable checkpointing in the test job.
  2. Inject a failure after some records are processed.
  3. Verify the sink received each record exactly once after recovery.
@Test
public void testExactlyOnceAfterFailure() throws Exception {
    StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(500);  // checkpoint every 500ms
    env.setRestartStrategy(
        RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));

    // FailingSource emits 100 records, then throws on record 50 on first attempt
    DataStream<Integer> source = env.addSource(new FailingSource(100, 50));

    CollectingSink<Integer> sink = new CollectingSink<>();
    source.addSink(sink);

    try {
        env.execute("eos-test");
    } catch (Exception ignored) {
        // Job recovers from failure and completes
    }

    List<Integer> received = CollectingSink.getCollected();

    // Each value 0-99 must appear exactly once
    Map<Integer, Long> counts = received.stream()
        .collect(Collectors.groupingBy(i -> i, Collectors.counting()));

    for (int i = 0; i < 100; i++) {
        assertEquals("Record " + i + " should appear exactly once",
            1L, (long) counts.getOrDefault(i, 0L));
    }
}

The FailingSource uses a ListCheckpointed counter to track progress, so on recovery it resumes from the last checkpoint rather than replaying from the beginning.

Testing Async I/O Operators

Flink's AsyncDataStream pattern is common for enriching stream records with database lookups. Testing it requires a mock async function:

@Test
public void testAsyncEnrichment() throws Exception {
    AsyncFunction<String, EnrichedRecord> mockEnricher =
        (input, resultFuture) -> CompletableFuture
            .supplyAsync(() -> new EnrichedRecord(input, "enriched-" + input))
            .thenAccept(resultFuture::complete);

    OneInputStreamOperatorTestHarness<String, EnrichedRecord> harness =
        AsyncDataStream.unorderedWaitOperatorTestHarness(mockEnricher,
            1000L, TimeUnit.MILLISECONDS, 10);

    harness.open();
    harness.processElement("event-1", 100L);
    harness.processElement("event-2", 200L);

    // Wait for async completions
    harness.endInput();

    List<EnrichedRecord> results = harness.extractOutputValues();
    assertEquals(2, results.size());
    assertTrue(results.stream().anyMatch(r -> r.original.equals("event-1")));
}

Best Practices

Isolate time from logic. Never use System.currentTimeMillis() inside Flink operators. Use the Flink timer service and process functions so tests can control time via watermarks.

Keep operators small. A KeyedProcessFunction that does one thing is far easier to test than a mega-operator with branching logic. Decompose before you test.

Use @ClassRule for MiniCluster. Starting and stopping a Flink cluster per test is slow (2-5 seconds). A @ClassRule shares the cluster across all tests in a class.

Snapshot and restore state. Test state migration by snapshotting a harness, creating a new harness with a modified operator, restoring from the snapshot, and asserting on behavior:

OperatorSubtaskState snapshot = harness.snapshot(1L, 100L);

// Create new harness with updated operator
KeyedOneInputStreamOperatorTestHarness<...> newHarness = createHarness();
newHarness.initializeState(snapshot);
newHarness.open();

// Old state is visible to new operator version
newHarness.processElement(new UserEvent("alice", 40000L), 40000L);
newHarness.processWatermark(70001L);

// Should see count=3 (2 from old harness + 1 new)
assertEquals(3L, (long) newHarness.extractOutputValues().get(0).count);

Parallelize integration tests carefully. Embedded Kafka brokers bind to ports. Use 0 for the port to get a random available port, and always share the cluster at @BeforeClass scope.

Putting It All Together

A complete Flink testing strategy has three layers:

Layer Tool What It Tests Speed
Operator unit KeyedOneInputStreamOperatorTestHarness Single operator logic, state, timers < 100ms
Topology unit MiniClusterWithClientResource Full DAG, serialization, parallelism 2-5s
Integration Embedded Kafka + MiniCluster Source/sink connectors, EOS, backpressure 10-30s

Run operator and topology tests on every commit. Reserve integration tests for CI pipelines where you can afford the overhead.


HelpMeTest can run your data pipeline tests automatically — sign up free

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest