Apache Pulsar Functions Testing: Unit Tests, Mocks, and Multi-Tenant Scenarios
Apache Pulsar Functions are lightweight compute units that transform or route messages — they're easy to unit-test because the Pulsar team provides a MockContext that eliminates the broker dependency. Stateful functions need state store tests. Multi-tenant applications need namespace isolation tests. Schema evolution tests prevent production data pipeline breaks.
Key Takeaways
Use MockContext for pure unit tests. Pulsar's io.pulsar.functions.api.Context has a MockContext implementation that captures output records, state store operations, and metrics without a running broker.
Test stateful function state isolation. Pulsar Functions share state within an instance but not across tenants. Verify that state keys are scoped correctly to prevent cross-tenant data leakage.
Test schema evolution compatibility. Functions that consume Avro or Protobuf schemas must handle schema evolution. Test BACKWARD and FORWARD compatibility before deploying.
Test error handling paths explicitly. What happens when your function throws? What happens when context.newOutputMessage() fails? Pulsar retries or routes to dead-letter topics depending on config — test which behavior your function exhibits.
Test multi-tenant namespace isolation with Testcontainers. Pulsar containers support multi-tenancy; test that tenant A's messages don't appear in tenant B's subscription.
What Makes Pulsar Functions Different to Test
Pulsar Functions are Pulsar-native stream processors: each message in triggers one synchronous process() call. Unlike Kafka Streams topologies, they don't have a built-in test driver — but Pulsar provides MockContext which covers most scenarios.
The three layers to test:
- Unit tests with
MockContext— no broker, fast, tests business logic - Integration tests with Testcontainers — real broker, tests end-to-end behavior
- Multi-tenant tests — verifies namespace isolation
Setup
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner-original</artifactId>
<version>3.2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<scope>test</scope>
</dependency>Unit Testing with MockContext
Consider a function that enriches order events with pricing tier information:
public class OrderEnrichmentFunction implements Function<OrderEvent, EnrichedOrder> {
@Override
public EnrichedOrder process(OrderEvent order, Context context) throws Exception {
String tierId = (String) context.getState("pricing.tier." + order.getCustomerId());
if (tierId == null) {
// Default tier for unknown customers
tierId = "STANDARD";
}
// Log processing
context.getLogger().info("Processing order {} for customer {}",
order.getOrderId(), order.getCustomerId());
// Record metric
context.recordMetric("orders.processed", 1.0);
return EnrichedOrder.builder()
.orderId(order.getOrderId())
.customerId(order.getCustomerId())
.amount(order.getAmount())
.pricingTier(tierId)
.build();
}
}class OrderEnrichmentFunctionTest {
private OrderEnrichmentFunction function;
private MockContext mockContext;
@BeforeEach
void setUp() {
function = new OrderEnrichmentFunction();
mockContext = MockContext.builder()
.tenant("test-tenant")
.namespace("test-namespace")
.functionName("order-enrichment")
.build();
}
@Test
void enrichesOrderWithStoredPricingTier() throws Exception {
// Pre-populate state store
mockContext.putState("pricing.tier.CUST-001", ByteBuffer.wrap("PREMIUM".getBytes()));
OrderEvent order = new OrderEvent("ORD-001", "CUST-001", BigDecimal.valueOf(199.99));
EnrichedOrder result = function.process(order, mockContext);
assertNotNull(result);
assertEquals("ORD-001", result.getOrderId());
assertEquals("PREMIUM", result.getPricingTier());
}
@Test
void usesDefaultTierForUnknownCustomer() throws Exception {
OrderEvent order = new OrderEvent("ORD-002", "CUST-UNKNOWN", BigDecimal.valueOf(50.00));
EnrichedOrder result = function.process(order, mockContext);
assertEquals("STANDARD", result.getPricingTier());
}
@Test
void recordsProcessingMetric() throws Exception {
OrderEvent order = new OrderEvent("ORD-003", "CUST-001", BigDecimal.TEN);
function.process(order, mockContext);
// Verify metric was recorded
Map<String, Double> metrics = mockContext.getMetrics();
assertTrue(metrics.containsKey("orders.processed"));
assertEquals(1.0, metrics.get("orders.processed"));
}
@Test
void logsOrderProcessing() throws Exception {
OrderEvent order = new OrderEvent("ORD-004", "CUST-001", BigDecimal.TEN);
function.process(order, mockContext);
// Verify log message was emitted
List<String> logMessages = mockContext.getLogMessages();
assertTrue(logMessages.stream()
.anyMatch(msg -> msg.contains("ORD-004") && msg.contains("CUST-001")));
}
}Testing Functions with Output Topics
Functions that route to different output topics based on message content:
public class OrderRoutingFunction implements Function<OrderEvent, Void> {
@Override
public Void process(OrderEvent order, Context context) throws Exception {
String outputTopic = order.getAmount().compareTo(BigDecimal.valueOf(1000)) >= 0
? "persistent://main/orders/high-value"
: "persistent://main/orders/standard";
context.newOutputMessage(outputTopic, Schema.JSON(OrderEvent.class))
.value(order)
.property("routed-by", "OrderRoutingFunction")
.send();
return null;
}
}class OrderRoutingFunctionTest {
private OrderRoutingFunction function;
private MockContext mockContext;
@BeforeEach
void setUp() {
function = new OrderRoutingFunction();
mockContext = MockContext.builder()
.functionName("order-router")
.build();
}
@Test
void routesHighValueOrderToHighValueTopic() throws Exception {
OrderEvent bigOrder = new OrderEvent("BIG-001", "VIP-001", BigDecimal.valueOf(5000));
function.process(bigOrder, mockContext);
List<MockContext.OutputRecord<?>> outputs = mockContext.getOutputRecords();
assertEquals(1, outputs.size());
assertEquals("persistent://main/orders/high-value", outputs.get(0).getTopicName());
OrderEvent output = (OrderEvent) outputs.get(0).getValue();
assertEquals("BIG-001", output.getOrderId());
}
@Test
void routesStandardOrderToStandardTopic() throws Exception {
OrderEvent smallOrder = new OrderEvent("STD-001", "CUST-001", BigDecimal.valueOf(50));
function.process(smallOrder, mockContext);
List<MockContext.OutputRecord<?>> outputs = mockContext.getOutputRecords();
assertEquals("persistent://main/orders/standard", outputs.get(0).getTopicName());
}
@Test
void attachesRoutingPropertyToOutput() throws Exception {
OrderEvent order = new OrderEvent("ORD-001", "CUST-001", BigDecimal.TEN);
function.process(order, mockContext);
MockContext.OutputRecord<?> output = mockContext.getOutputRecords().get(0);
assertEquals("OrderRoutingFunction", output.getProperties().get("routed-by"));
}
@Test
void handlesExactBoundaryAmount() throws Exception {
// Exactly 1000 goes to high-value
OrderEvent boundaryOrder = new OrderEvent("BOUND-001", "CUST-001",
BigDecimal.valueOf(1000));
function.process(boundaryOrder, mockContext);
assertEquals("persistent://main/orders/high-value",
mockContext.getOutputRecords().get(0).getTopicName());
}
}Testing Stateful Functions
Pulsar Functions with state store access require tests that verify both read and write paths:
public class OrderCounterFunction implements Function<OrderEvent, CountSummary> {
@Override
public CountSummary process(OrderEvent order, Context context) throws Exception {
String stateKey = "order.count." + order.getCustomerId();
// Read current count
ByteBuffer currentBytes = context.getState(stateKey);
long currentCount = currentBytes != null
? ByteBuffer.wrap(currentBytes.array()).getLong()
: 0L;
// Increment
long newCount = currentCount + 1;
ByteBuffer newBytes = ByteBuffer.allocate(Long.BYTES);
newBytes.putLong(newCount);
newBytes.flip();
context.putState(stateKey, newBytes);
return new CountSummary(order.getCustomerId(), newCount);
}
}class OrderCounterFunctionTest {
private OrderCounterFunction function;
private MockContext context;
@BeforeEach
void setUp() {
function = new OrderCounterFunction();
context = MockContext.builder().build();
}
@Test
void startsCountAtOneForNewCustomer() throws Exception {
CountSummary result = function.process(
new OrderEvent("ORD-001", "CUST-NEW", BigDecimal.TEN), context);
assertEquals("CUST-NEW", result.getCustomerId());
assertEquals(1L, result.getCount());
}
@Test
void incrementsExistingCount() throws Exception {
// Process 3 orders for same customer
function.process(new OrderEvent("ORD-001", "CUST-001", BigDecimal.TEN), context);
function.process(new OrderEvent("ORD-002", "CUST-001", BigDecimal.TEN), context);
CountSummary third = function.process(
new OrderEvent("ORD-003", "CUST-001", BigDecimal.TEN), context);
assertEquals(3L, third.getCount());
}
@Test
void tracksCountsPerCustomerIndependently() throws Exception {
function.process(new OrderEvent("ORD-001", "CUST-A", BigDecimal.TEN), context);
function.process(new OrderEvent("ORD-002", "CUST-A", BigDecimal.TEN), context);
CountSummary custB = function.process(
new OrderEvent("ORD-003", "CUST-B", BigDecimal.TEN), context);
assertEquals("CUST-B", custB.getCustomerId());
assertEquals(1L, custB.getCount(), "CUST-B count should be independent of CUST-A");
// Verify CUST-A's state wasn't affected
ByteBuffer custAState = context.getState("order.count.CUST-A");
assertNotNull(custAState);
assertEquals(2L, ByteBuffer.wrap(custAState.array()).getLong());
}
@Test
void stateKeyIsScopedCorrectly() throws Exception {
function.process(new OrderEvent("ORD-001", "CUST-001", BigDecimal.TEN), context);
// Verify the correct state key is written
assertTrue(context.getStateKeys().contains("order.count.CUST-001"));
assertFalse(context.getStateKeys().contains("order.count.CUST-002"));
}
}Integration Testing with Testcontainers
For end-to-end tests with a real Pulsar broker:
@Testcontainers
class PulsarFunctionIntegrationTest {
@Container
static PulsarContainer pulsar = new PulsarContainer(
DockerImageName.parse("apachepulsar/pulsar:3.2.4"))
.withFunctionsWorker(); // enables Functions Worker
private PulsarClient client;
private PulsarAdmin admin;
@BeforeEach
void setUp() throws Exception {
client = PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
.build();
admin = PulsarAdmin.builder()
.serviceHttpUrl(pulsar.getHttpServiceUrl())
.build();
// Set up tenants and namespaces
admin.tenants().createTenant("test",
TenantInfo.builder()
.allowedClusters(Collections.singleton("standalone"))
.build());
admin.namespaces().createNamespace("test/orders");
}
@AfterEach
void tearDown() throws Exception {
client.close();
admin.close();
}
@Test
void functionProcessesMessagesEndToEnd() throws Exception {
// Deploy the function
FunctionConfig config = FunctionConfig.builder()
.tenant("test")
.namespace("orders")
.name("order-enrichment")
.className(OrderEnrichmentFunction.class.getName())
.inputs(Collections.singletonList("persistent://test/orders/raw"))
.output("persistent://test/orders/enriched")
.build();
admin.functions().createFunction(config,
OrderEnrichmentFunction.class.getProtectionDomain()
.getCodeSource().getLocation().getPath());
// Wait for function to be running
await().atMost(30, TimeUnit.SECONDS).until(() -> {
FunctionStatus status = admin.functions().getFunctionStatus(
"test", "orders", "order-enrichment");
return status.getNumRunning() > 0;
});
// Produce an order event
Producer<OrderEvent> producer = client.newProducer(Schema.JSON(OrderEvent.class))
.topic("persistent://test/orders/raw")
.create();
producer.send(new OrderEvent("ORD-E2E-001", "CUST-001", BigDecimal.valueOf(99.99)));
// Consume the enriched order
Consumer<EnrichedOrder> consumer = client.newConsumer(Schema.JSON(EnrichedOrder.class))
.topic("persistent://test/orders/enriched")
.subscriptionName("test-sub")
.subscribe();
Message<EnrichedOrder> received = consumer.receive(30, TimeUnit.SECONDS);
assertNotNull(received, "Should receive enriched order within 30 seconds");
assertEquals("ORD-E2E-001", received.getValue().getOrderId());
consumer.acknowledge(received);
producer.close();
consumer.close();
}
}Testing Multi-Tenant Namespace Isolation
@Test
void tenantsCannotReadEachOthersMessages() throws Exception {
// Create two tenants
admin.tenants().createTenant("tenant-a",
TenantInfo.builder().allowedClusters(Set.of("standalone")).build());
admin.tenants().createTenant("tenant-b",
TenantInfo.builder().allowedClusters(Set.of("standalone")).build());
admin.namespaces().createNamespace("tenant-a/orders");
admin.namespaces().createNamespace("tenant-b/orders");
// Producer for tenant-a
Producer<String> producerA = client.newProducer(Schema.STRING)
.topic("persistent://tenant-a/orders/events")
.create();
// Consumer subscribed ONLY to tenant-a's topic
Consumer<String> consumerA = client.newConsumer(Schema.STRING)
.topic("persistent://tenant-a/orders/events")
.subscriptionName("consumer-a")
.subscribe();
// Consumer subscribed ONLY to tenant-b's topic
Consumer<String> consumerB = client.newConsumer(Schema.STRING)
.topic("persistent://tenant-b/orders/events")
.subscriptionName("consumer-b")
.subscribe();
// Send to tenant-a
producerA.send("tenant-a-message");
// tenant-a consumer receives it
Message<String> msgA = consumerA.receive(5, TimeUnit.SECONDS);
assertNotNull(msgA, "tenant-a consumer should receive the message");
assertEquals("tenant-a-message", msgA.getValue());
consumerA.acknowledge(msgA);
// tenant-b consumer should NOT receive it
Message<String> msgB = consumerB.receive(2, TimeUnit.SECONDS);
assertNull(msgB, "tenant-b consumer must not receive tenant-a messages");
producerA.close();
consumerA.close();
consumerB.close();
}Testing Schema Evolution
Pulsar enforces schema compatibility — test it before deploying schema changes:
@Test
void backwardCompatibleSchemaEvolutionWorks() throws Exception {
String topic = "persistent://test/orders/schema-test";
// Register v1 schema
SchemaInfo v1Schema = Schema.AVRO(OrderEventV1.class).getSchemaInfo();
admin.schemas().createSchema(topic, PostSchemaPayload.builder()
.type("AVRO")
.schema(new String(v1Schema.getSchema()))
.build());
// Produce a v1 message
Producer<OrderEventV1> v1Producer = client.newProducer(Schema.AVRO(OrderEventV1.class))
.topic(topic)
.create();
v1Producer.send(new OrderEventV1("ORD-001", "CUST-001"));
v1Producer.close();
// V2 adds a nullable field with a default — backward compatible
// (V2 consumers can read V1 messages; V1 consumers cannot read V2 messages)
Producer<OrderEventV2> v2Producer = client.newProducer(Schema.AVRO(OrderEventV2.class))
.topic(topic) // same topic
.create();
// Should succeed — V2 is backward compatible
assertDoesNotThrow(() -> v2Producer.send(
new OrderEventV2("ORD-002", "CUST-001", "PREMIUM")));
v2Producer.close();
// V2 consumer should be able to read V1 messages
Consumer<OrderEventV2> consumer = client.newConsumer(Schema.AVRO(OrderEventV2.class))
.topic(topic)
.subscriptionName("compat-test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<OrderEventV2> v1AsV2 = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(v1AsV2);
assertEquals("ORD-001", v1AsV2.getValue().getOrderId());
assertNull(v1AsV2.getValue().getTier(), "V1 message has no tier field — should be null in V2");
consumer.close();
}
@Test
void incompatibleSchemaChangeIsRejected() throws Exception {
String topic = "persistent://test/orders/strict-schema";
admin.schemas().createSchema(topic, PostSchemaPayload.builder()
.type("AVRO")
.schema(new String(Schema.AVRO(OrderEventV1.class).getSchemaInfo().getSchema()))
.build());
// Incompatible change: rename required field without default
// This should be rejected by the broker
assertThrows(PulsarClientException.class, () -> {
Producer<OrderEventIncompatible> badProducer =
client.newProducer(Schema.AVRO(OrderEventIncompatible.class))
.topic(topic)
.create();
badProducer.close();
});
}Testing Dead Letter Topics
Pulsar's dead letter topic (DLT) catches messages that fail processing N times:
@Test
void failedMessagesRouteToDeadLetterTopic() throws Exception {
String inputTopic = "persistent://test/orders/dlq-test";
String dlqTopic = "persistent://test/orders/dlq-test-dlq";
AtomicInteger processAttempts = new AtomicInteger(0);
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(inputTopic)
.subscriptionName("dlq-test-sub")
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(3)
.deadLetterTopic(dlqTopic)
.build())
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();
Consumer<String> dlqConsumer = client.newConsumer(Schema.STRING)
.topic(dlqTopic)
.subscriptionName("dlq-reader")
.subscribe();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopic)
.create();
producer.send("poison-pill-message");
// Nack the message 4 times (maxRedeliverCount=3, so after 4th attempt it goes to DLQ)
for (int i = 0; i < 4; i++) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
processAttempts.incrementAndGet();
consumer.negativeAcknowledge(msg);
Thread.sleep(150); // wait for redelivery delay
}
// After maxRedeliverCount exhausted, message lands in DLQ
Message<String> dlqMsg = dlqConsumer.receive(10, TimeUnit.SECONDS);
assertNotNull(dlqMsg, "Message should appear in DLQ after max redeliveries");
assertEquals("poison-pill-message", dlqMsg.getValue());
// Verify exactly 4 processing attempts
assertEquals(4, processAttempts.get());
producer.close();
consumer.close();
dlqConsumer.close();
}What to Test vs. What to Skip
Test with MockContext:
- Business logic and transformations
- State store reads and writes
- Output topic routing decisions
- Metric and counter recording
- Edge cases (null fields, boundary values)
Test with Testcontainers integration:
- End-to-end message flow through real broker
- Schema compatibility enforcement
- Dead letter topic routing after max retries
- Multi-tenant namespace isolation
- Consumer group behavior
Skip:
- Pulsar broker internals (message durability, replication)
- Network partition scenarios (use chaos testing tools)
- Performance tuning (benchmark concern, not correctness)
- Pulsar Admin API behavior (tested by Pulsar project)