Apache Pulsar Functions Testing: Unit Tests, Mocks, and Multi-Tenant Scenarios

Apache Pulsar Functions Testing: Unit Tests, Mocks, and Multi-Tenant Scenarios

Apache Pulsar Functions are lightweight compute units that transform or route messages — they're easy to unit-test because the Pulsar team provides a MockContext that eliminates the broker dependency. Stateful functions need state store tests. Multi-tenant applications need namespace isolation tests. Schema evolution tests prevent production data pipeline breaks.

Key Takeaways

Use MockContext for pure unit tests. Pulsar's io.pulsar.functions.api.Context has a MockContext implementation that captures output records, state store operations, and metrics without a running broker.

Test stateful function state isolation. Pulsar Functions share state within an instance but not across tenants. Verify that state keys are scoped correctly to prevent cross-tenant data leakage.

Test schema evolution compatibility. Functions that consume Avro or Protobuf schemas must handle schema evolution. Test BACKWARD and FORWARD compatibility before deploying.

Test error handling paths explicitly. What happens when your function throws? What happens when context.newOutputMessage() fails? Pulsar retries or routes to dead-letter topics depending on config — test which behavior your function exhibits.

Test multi-tenant namespace isolation with Testcontainers. Pulsar containers support multi-tenancy; test that tenant A's messages don't appear in tenant B's subscription.

What Makes Pulsar Functions Different to Test

Pulsar Functions are Pulsar-native stream processors: each message in triggers one synchronous process() call. Unlike Kafka Streams topologies, they don't have a built-in test driver — but Pulsar provides MockContext which covers most scenarios.

The three layers to test:

  1. Unit tests with MockContext — no broker, fast, tests business logic
  2. Integration tests with Testcontainers — real broker, tests end-to-end behavior
  3. Multi-tenant tests — verifies namespace isolation

Setup

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-api</artifactId>
    <version>3.2.4</version>
</dependency>
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-local-runner-original</artifactId>
    <version>3.2.4</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>pulsar</artifactId>
    <scope>test</scope>
</dependency>

Unit Testing with MockContext

Consider a function that enriches order events with pricing tier information:

public class OrderEnrichmentFunction implements Function<OrderEvent, EnrichedOrder> {

    @Override
    public EnrichedOrder process(OrderEvent order, Context context) throws Exception {
        String tierId = (String) context.getState("pricing.tier." + order.getCustomerId());

        if (tierId == null) {
            // Default tier for unknown customers
            tierId = "STANDARD";
        }

        // Log processing
        context.getLogger().info("Processing order {} for customer {}", 
            order.getOrderId(), order.getCustomerId());

        // Record metric
        context.recordMetric("orders.processed", 1.0);

        return EnrichedOrder.builder()
            .orderId(order.getOrderId())
            .customerId(order.getCustomerId())
            .amount(order.getAmount())
            .pricingTier(tierId)
            .build();
    }
}
class OrderEnrichmentFunctionTest {

    private OrderEnrichmentFunction function;
    private MockContext mockContext;

    @BeforeEach
    void setUp() {
        function = new OrderEnrichmentFunction();
        mockContext = MockContext.builder()
            .tenant("test-tenant")
            .namespace("test-namespace")
            .functionName("order-enrichment")
            .build();
    }

    @Test
    void enrichesOrderWithStoredPricingTier() throws Exception {
        // Pre-populate state store
        mockContext.putState("pricing.tier.CUST-001", ByteBuffer.wrap("PREMIUM".getBytes()));

        OrderEvent order = new OrderEvent("ORD-001", "CUST-001", BigDecimal.valueOf(199.99));

        EnrichedOrder result = function.process(order, mockContext);

        assertNotNull(result);
        assertEquals("ORD-001", result.getOrderId());
        assertEquals("PREMIUM", result.getPricingTier());
    }

    @Test
    void usesDefaultTierForUnknownCustomer() throws Exception {
        OrderEvent order = new OrderEvent("ORD-002", "CUST-UNKNOWN", BigDecimal.valueOf(50.00));

        EnrichedOrder result = function.process(order, mockContext);

        assertEquals("STANDARD", result.getPricingTier());
    }

    @Test
    void recordsProcessingMetric() throws Exception {
        OrderEvent order = new OrderEvent("ORD-003", "CUST-001", BigDecimal.TEN);
        function.process(order, mockContext);

        // Verify metric was recorded
        Map<String, Double> metrics = mockContext.getMetrics();
        assertTrue(metrics.containsKey("orders.processed"));
        assertEquals(1.0, metrics.get("orders.processed"));
    }

    @Test
    void logsOrderProcessing() throws Exception {
        OrderEvent order = new OrderEvent("ORD-004", "CUST-001", BigDecimal.TEN);
        function.process(order, mockContext);

        // Verify log message was emitted
        List<String> logMessages = mockContext.getLogMessages();
        assertTrue(logMessages.stream()
            .anyMatch(msg -> msg.contains("ORD-004") && msg.contains("CUST-001")));
    }
}

Testing Functions with Output Topics

Functions that route to different output topics based on message content:

public class OrderRoutingFunction implements Function<OrderEvent, Void> {

    @Override
    public Void process(OrderEvent order, Context context) throws Exception {
        String outputTopic = order.getAmount().compareTo(BigDecimal.valueOf(1000)) >= 0
            ? "persistent://main/orders/high-value"
            : "persistent://main/orders/standard";

        context.newOutputMessage(outputTopic, Schema.JSON(OrderEvent.class))
            .value(order)
            .property("routed-by", "OrderRoutingFunction")
            .send();

        return null;
    }
}
class OrderRoutingFunctionTest {

    private OrderRoutingFunction function;
    private MockContext mockContext;

    @BeforeEach
    void setUp() {
        function = new OrderRoutingFunction();
        mockContext = MockContext.builder()
            .functionName("order-router")
            .build();
    }

    @Test
    void routesHighValueOrderToHighValueTopic() throws Exception {
        OrderEvent bigOrder = new OrderEvent("BIG-001", "VIP-001", BigDecimal.valueOf(5000));

        function.process(bigOrder, mockContext);

        List<MockContext.OutputRecord<?>> outputs = mockContext.getOutputRecords();
        assertEquals(1, outputs.size());
        assertEquals("persistent://main/orders/high-value", outputs.get(0).getTopicName());

        OrderEvent output = (OrderEvent) outputs.get(0).getValue();
        assertEquals("BIG-001", output.getOrderId());
    }

    @Test
    void routesStandardOrderToStandardTopic() throws Exception {
        OrderEvent smallOrder = new OrderEvent("STD-001", "CUST-001", BigDecimal.valueOf(50));

        function.process(smallOrder, mockContext);

        List<MockContext.OutputRecord<?>> outputs = mockContext.getOutputRecords();
        assertEquals("persistent://main/orders/standard", outputs.get(0).getTopicName());
    }

    @Test
    void attachesRoutingPropertyToOutput() throws Exception {
        OrderEvent order = new OrderEvent("ORD-001", "CUST-001", BigDecimal.TEN);
        function.process(order, mockContext);

        MockContext.OutputRecord<?> output = mockContext.getOutputRecords().get(0);
        assertEquals("OrderRoutingFunction", output.getProperties().get("routed-by"));
    }

    @Test
    void handlesExactBoundaryAmount() throws Exception {
        // Exactly 1000 goes to high-value
        OrderEvent boundaryOrder = new OrderEvent("BOUND-001", "CUST-001",
            BigDecimal.valueOf(1000));

        function.process(boundaryOrder, mockContext);

        assertEquals("persistent://main/orders/high-value",
            mockContext.getOutputRecords().get(0).getTopicName());
    }
}

Testing Stateful Functions

Pulsar Functions with state store access require tests that verify both read and write paths:

public class OrderCounterFunction implements Function<OrderEvent, CountSummary> {

    @Override
    public CountSummary process(OrderEvent order, Context context) throws Exception {
        String stateKey = "order.count." + order.getCustomerId();

        // Read current count
        ByteBuffer currentBytes = context.getState(stateKey);
        long currentCount = currentBytes != null
            ? ByteBuffer.wrap(currentBytes.array()).getLong()
            : 0L;

        // Increment
        long newCount = currentCount + 1;
        ByteBuffer newBytes = ByteBuffer.allocate(Long.BYTES);
        newBytes.putLong(newCount);
        newBytes.flip();
        context.putState(stateKey, newBytes);

        return new CountSummary(order.getCustomerId(), newCount);
    }
}
class OrderCounterFunctionTest {

    private OrderCounterFunction function;
    private MockContext context;

    @BeforeEach
    void setUp() {
        function = new OrderCounterFunction();
        context = MockContext.builder().build();
    }

    @Test
    void startsCountAtOneForNewCustomer() throws Exception {
        CountSummary result = function.process(
            new OrderEvent("ORD-001", "CUST-NEW", BigDecimal.TEN), context);

        assertEquals("CUST-NEW", result.getCustomerId());
        assertEquals(1L, result.getCount());
    }

    @Test
    void incrementsExistingCount() throws Exception {
        // Process 3 orders for same customer
        function.process(new OrderEvent("ORD-001", "CUST-001", BigDecimal.TEN), context);
        function.process(new OrderEvent("ORD-002", "CUST-001", BigDecimal.TEN), context);
        CountSummary third = function.process(
            new OrderEvent("ORD-003", "CUST-001", BigDecimal.TEN), context);

        assertEquals(3L, third.getCount());
    }

    @Test
    void tracksCountsPerCustomerIndependently() throws Exception {
        function.process(new OrderEvent("ORD-001", "CUST-A", BigDecimal.TEN), context);
        function.process(new OrderEvent("ORD-002", "CUST-A", BigDecimal.TEN), context);
        CountSummary custB = function.process(
            new OrderEvent("ORD-003", "CUST-B", BigDecimal.TEN), context);

        assertEquals("CUST-B", custB.getCustomerId());
        assertEquals(1L, custB.getCount(), "CUST-B count should be independent of CUST-A");

        // Verify CUST-A's state wasn't affected
        ByteBuffer custAState = context.getState("order.count.CUST-A");
        assertNotNull(custAState);
        assertEquals(2L, ByteBuffer.wrap(custAState.array()).getLong());
    }

    @Test
    void stateKeyIsScopedCorrectly() throws Exception {
        function.process(new OrderEvent("ORD-001", "CUST-001", BigDecimal.TEN), context);

        // Verify the correct state key is written
        assertTrue(context.getStateKeys().contains("order.count.CUST-001"));
        assertFalse(context.getStateKeys().contains("order.count.CUST-002"));
    }
}

Integration Testing with Testcontainers

For end-to-end tests with a real Pulsar broker:

@Testcontainers
class PulsarFunctionIntegrationTest {

    @Container
    static PulsarContainer pulsar = new PulsarContainer(
        DockerImageName.parse("apachepulsar/pulsar:3.2.4"))
        .withFunctionsWorker();  // enables Functions Worker

    private PulsarClient client;
    private PulsarAdmin admin;

    @BeforeEach
    void setUp() throws Exception {
        client = PulsarClient.builder()
            .serviceUrl(pulsar.getPulsarBrokerUrl())
            .build();
        admin = PulsarAdmin.builder()
            .serviceHttpUrl(pulsar.getHttpServiceUrl())
            .build();

        // Set up tenants and namespaces
        admin.tenants().createTenant("test",
            TenantInfo.builder()
                .allowedClusters(Collections.singleton("standalone"))
                .build());
        admin.namespaces().createNamespace("test/orders");
    }

    @AfterEach
    void tearDown() throws Exception {
        client.close();
        admin.close();
    }

    @Test
    void functionProcessesMessagesEndToEnd() throws Exception {
        // Deploy the function
        FunctionConfig config = FunctionConfig.builder()
            .tenant("test")
            .namespace("orders")
            .name("order-enrichment")
            .className(OrderEnrichmentFunction.class.getName())
            .inputs(Collections.singletonList("persistent://test/orders/raw"))
            .output("persistent://test/orders/enriched")
            .build();

        admin.functions().createFunction(config,
            OrderEnrichmentFunction.class.getProtectionDomain()
                .getCodeSource().getLocation().getPath());

        // Wait for function to be running
        await().atMost(30, TimeUnit.SECONDS).until(() -> {
            FunctionStatus status = admin.functions().getFunctionStatus(
                "test", "orders", "order-enrichment");
            return status.getNumRunning() > 0;
        });

        // Produce an order event
        Producer<OrderEvent> producer = client.newProducer(Schema.JSON(OrderEvent.class))
            .topic("persistent://test/orders/raw")
            .create();

        producer.send(new OrderEvent("ORD-E2E-001", "CUST-001", BigDecimal.valueOf(99.99)));

        // Consume the enriched order
        Consumer<EnrichedOrder> consumer = client.newConsumer(Schema.JSON(EnrichedOrder.class))
            .topic("persistent://test/orders/enriched")
            .subscriptionName("test-sub")
            .subscribe();

        Message<EnrichedOrder> received = consumer.receive(30, TimeUnit.SECONDS);
        assertNotNull(received, "Should receive enriched order within 30 seconds");
        assertEquals("ORD-E2E-001", received.getValue().getOrderId());

        consumer.acknowledge(received);
        producer.close();
        consumer.close();
    }
}

Testing Multi-Tenant Namespace Isolation

@Test
void tenantsCannotReadEachOthersMessages() throws Exception {
    // Create two tenants
    admin.tenants().createTenant("tenant-a",
        TenantInfo.builder().allowedClusters(Set.of("standalone")).build());
    admin.tenants().createTenant("tenant-b",
        TenantInfo.builder().allowedClusters(Set.of("standalone")).build());

    admin.namespaces().createNamespace("tenant-a/orders");
    admin.namespaces().createNamespace("tenant-b/orders");

    // Producer for tenant-a
    Producer<String> producerA = client.newProducer(Schema.STRING)
        .topic("persistent://tenant-a/orders/events")
        .create();

    // Consumer subscribed ONLY to tenant-a's topic
    Consumer<String> consumerA = client.newConsumer(Schema.STRING)
        .topic("persistent://tenant-a/orders/events")
        .subscriptionName("consumer-a")
        .subscribe();

    // Consumer subscribed ONLY to tenant-b's topic
    Consumer<String> consumerB = client.newConsumer(Schema.STRING)
        .topic("persistent://tenant-b/orders/events")
        .subscriptionName("consumer-b")
        .subscribe();

    // Send to tenant-a
    producerA.send("tenant-a-message");

    // tenant-a consumer receives it
    Message<String> msgA = consumerA.receive(5, TimeUnit.SECONDS);
    assertNotNull(msgA, "tenant-a consumer should receive the message");
    assertEquals("tenant-a-message", msgA.getValue());
    consumerA.acknowledge(msgA);

    // tenant-b consumer should NOT receive it
    Message<String> msgB = consumerB.receive(2, TimeUnit.SECONDS);
    assertNull(msgB, "tenant-b consumer must not receive tenant-a messages");

    producerA.close();
    consumerA.close();
    consumerB.close();
}

Testing Schema Evolution

Pulsar enforces schema compatibility — test it before deploying schema changes:

@Test
void backwardCompatibleSchemaEvolutionWorks() throws Exception {
    String topic = "persistent://test/orders/schema-test";

    // Register v1 schema
    SchemaInfo v1Schema = Schema.AVRO(OrderEventV1.class).getSchemaInfo();
    admin.schemas().createSchema(topic, PostSchemaPayload.builder()
        .type("AVRO")
        .schema(new String(v1Schema.getSchema()))
        .build());

    // Produce a v1 message
    Producer<OrderEventV1> v1Producer = client.newProducer(Schema.AVRO(OrderEventV1.class))
        .topic(topic)
        .create();
    v1Producer.send(new OrderEventV1("ORD-001", "CUST-001"));
    v1Producer.close();

    // V2 adds a nullable field with a default — backward compatible
    // (V2 consumers can read V1 messages; V1 consumers cannot read V2 messages)
    Producer<OrderEventV2> v2Producer = client.newProducer(Schema.AVRO(OrderEventV2.class))
        .topic(topic)  // same topic
        .create();

    // Should succeed — V2 is backward compatible
    assertDoesNotThrow(() -> v2Producer.send(
        new OrderEventV2("ORD-002", "CUST-001", "PREMIUM")));
    v2Producer.close();

    // V2 consumer should be able to read V1 messages
    Consumer<OrderEventV2> consumer = client.newConsumer(Schema.AVRO(OrderEventV2.class))
        .topic(topic)
        .subscriptionName("compat-test")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscribe();

    Message<OrderEventV2> v1AsV2 = consumer.receive(5, TimeUnit.SECONDS);
    assertNotNull(v1AsV2);
    assertEquals("ORD-001", v1AsV2.getValue().getOrderId());
    assertNull(v1AsV2.getValue().getTier(), "V1 message has no tier field — should be null in V2");

    consumer.close();
}

@Test
void incompatibleSchemaChangeIsRejected() throws Exception {
    String topic = "persistent://test/orders/strict-schema";

    admin.schemas().createSchema(topic, PostSchemaPayload.builder()
        .type("AVRO")
        .schema(new String(Schema.AVRO(OrderEventV1.class).getSchemaInfo().getSchema()))
        .build());

    // Incompatible change: rename required field without default
    // This should be rejected by the broker
    assertThrows(PulsarClientException.class, () -> {
        Producer<OrderEventIncompatible> badProducer =
            client.newProducer(Schema.AVRO(OrderEventIncompatible.class))
                .topic(topic)
                .create();
        badProducer.close();
    });
}

Testing Dead Letter Topics

Pulsar's dead letter topic (DLT) catches messages that fail processing N times:

@Test
void failedMessagesRouteToDeadLetterTopic() throws Exception {
    String inputTopic = "persistent://test/orders/dlq-test";
    String dlqTopic = "persistent://test/orders/dlq-test-dlq";

    AtomicInteger processAttempts = new AtomicInteger(0);

    Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic(inputTopic)
        .subscriptionName("dlq-test-sub")
        .deadLetterPolicy(DeadLetterPolicy.builder()
            .maxRedeliverCount(3)
            .deadLetterTopic(dlqTopic)
            .build())
        .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
        .subscribe();

    Consumer<String> dlqConsumer = client.newConsumer(Schema.STRING)
        .topic(dlqTopic)
        .subscriptionName("dlq-reader")
        .subscribe();

    Producer<String> producer = client.newProducer(Schema.STRING)
        .topic(inputTopic)
        .create();

    producer.send("poison-pill-message");

    // Nack the message 4 times (maxRedeliverCount=3, so after 4th attempt it goes to DLQ)
    for (int i = 0; i < 4; i++) {
        Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
        assertNotNull(msg);
        processAttempts.incrementAndGet();
        consumer.negativeAcknowledge(msg);
        Thread.sleep(150);  // wait for redelivery delay
    }

    // After maxRedeliverCount exhausted, message lands in DLQ
    Message<String> dlqMsg = dlqConsumer.receive(10, TimeUnit.SECONDS);
    assertNotNull(dlqMsg, "Message should appear in DLQ after max redeliveries");
    assertEquals("poison-pill-message", dlqMsg.getValue());

    // Verify exactly 4 processing attempts
    assertEquals(4, processAttempts.get());

    producer.close();
    consumer.close();
    dlqConsumer.close();
}

What to Test vs. What to Skip

Test with MockContext:

  • Business logic and transformations
  • State store reads and writes
  • Output topic routing decisions
  • Metric and counter recording
  • Edge cases (null fields, boundary values)

Test with Testcontainers integration:

  • End-to-end message flow through real broker
  • Schema compatibility enforcement
  • Dead letter topic routing after max retries
  • Multi-tenant namespace isolation
  • Consumer group behavior

Skip:

  • Pulsar broker internals (message durability, replication)
  • Network partition scenarios (use chaos testing tools)
  • Performance tuning (benchmark concern, not correctness)
  • Pulsar Admin API behavior (tested by Pulsar project)

Read more