Apache Pulsar Testing Guide: Consumer and Producer Testing with Testcontainers

Apache Pulsar Testing Guide: Consumer and Producer Testing with Testcontainers

Apache Pulsar testing requires a real broker for meaningful integration tests — the client library doesn't provide a simulation mode. Testcontainers solves this with an official Pulsar image that starts in seconds. This guide covers producer/consumer testing, schema validation, subscription types, and dead letter topic handling in Java with JUnit 5.

Key Takeaways

Pulsar requires a real broker — no in-memory mock exists. Unlike NATS (embedded server) or RabbitMQ (partial mocking), Pulsar's client has no simulation mode. Use Testcontainers for all integration tests.

Use unique topic names per test. Pulsar persists topics even after tests finish. Prefix topics with the test class name or a UUID to prevent cross-test message bleed.

Test schema serialization explicitly. Pulsar's schema registry can reject messages with incompatible schemas. Test schema evolution before deploying schema changes.

Each subscription type has different behavior — test all you use. Exclusive, Shared, Failover, and Key_Shared subscriptions deliver messages differently. Test the subscription type your consumers actually use.

Dead letter topics require explicit configuration. DLT routing doesn't happen by default. Test that your DeadLetterPolicy is configured correctly by forcing consumer failures.

Why Pulsar Testing Is Harder Than Other Brokers

Apache Pulsar's architecture separates compute (brokers) from storage (BookKeeper). This means there's no practical in-memory mode — you need the full stack running to test meaningful behavior. Testcontainers' official apachepulsar/pulsar image runs a standalone Pulsar server (broker + BookKeeper + ZooKeeper in one process) suitable for testing.

The result: Pulsar integration tests take 10-30 seconds to start but then run fast. Budget your CI accordingly.

Setup

<!-- pom.xml -->
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>pulsar</artifactId>
    <version>1.19.7</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>5.10.2</version>
    <scope>test</scope>
</dependency>

Shared Pulsar Container Fixture

// src/test/java/com/example/PulsarTestBase.java
package com.example;

import org.apache.pulsar.client.api.PulsarClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

public abstract class PulsarTestBase {

    static PulsarContainer pulsarContainer;
    static PulsarClient pulsarClient;

    @BeforeAll
    static void startContainer() throws Exception {
        pulsarContainer = new PulsarContainer(
            DockerImageName.parse("apachepulsar/pulsar:3.2.0")
        );
        pulsarContainer.start();

        pulsarClient = PulsarClient.builder()
            .serviceUrl(pulsarContainer.getPulsarBrokerUrl())
            .build();
    }

    @AfterAll
    static void stopContainer() throws Exception {
        if (pulsarClient != null) pulsarClient.close();
        if (pulsarContainer != null) pulsarContainer.stop();
    }

    static String uniqueTopic(String testName) {
        return "persistent://public/default/" + testName + "-" + System.nanoTime();
    }
}

Basic Producer and Consumer Tests

// src/test/java/com/example/PulsarProducerConsumerTest.java
package com.example;

import org.apache.pulsar.client.api.*;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.*;

class PulsarProducerConsumerTest extends PulsarTestBase {

    @Test
    void producerPublishesAndConsumerReceives() throws Exception {
        String topic = uniqueTopic("basic");

        try (
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .create();
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("test-sub")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()
        ) {
            producer.send("hello-pulsar");

            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
            assertNotNull(msg, "Expected to receive a message");
            assertEquals("hello-pulsar", msg.getValue());
            consumer.acknowledge(msg);
        }
    }

    @Test
    void consumerReceivesNoMessageFromEmptyTopic() throws Exception {
        String topic = uniqueTopic("empty");

        try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("empty-sub")
                .subscribe()) {

            Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
            assertNull(msg, "Expected null for empty topic");
        }
    }

    @Test
    void multipleMessagesDeliveredInOrder() throws Exception {
        String topic = uniqueTopic("ordered");

        try (
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .create();
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("order-sub")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()
        ) {
            String[] messages = {"first", "second", "third"};
            for (String m : messages) {
                producer.send(m);
            }

            for (String expected : messages) {
                Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
                assertNotNull(msg);
                assertEquals(expected, msg.getValue());
                consumer.acknowledge(msg);
            }
        }
    }
}

Testing JSON Schema

Pulsar's schema registry enforces schema compatibility. Test that schema validation works:

import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;

record OrderEvent(String orderId, double total, String status) {}

class PulsarSchemaTest extends PulsarTestBase {

    @Test
    void avroSchemaEnforcedOnProducer() throws Exception {
        String topic = uniqueTopic("schema");

        try (
            Producer<OrderEvent> producer = pulsarClient.newProducer(Schema.AVRO(OrderEvent.class))
                .topic(topic)
                .create();
            Consumer<OrderEvent> consumer = pulsarClient.newConsumer(Schema.AVRO(OrderEvent.class))
                .topic(topic)
                .subscriptionName("schema-sub")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()
        ) {
            OrderEvent event = new OrderEvent("ord-1", 99.99, "pending");
            producer.send(event);

            Message<OrderEvent> msg = consumer.receive(5, TimeUnit.SECONDS);
            assertNotNull(msg);
            assertEquals("ord-1", msg.getValue().orderId());
            assertEquals(99.99, msg.getValue().total(), 0.001);
            consumer.acknowledge(msg);
        }
    }
}

Testing Subscription Types

Pulsar's subscription types change message delivery behavior. Test each type you use:

class PulsarSubscriptionTypeTest extends PulsarTestBase {

    @Test
    void sharedSubscriptionDistributesMessages() throws Exception {
        String topic = uniqueTopic("shared");
        String sub = "shared-workers";

        try (
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic).create();
            Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic).subscriptionName(sub)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
            Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic).subscriptionName(sub)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()
        ) {
            // Publish 4 messages
            for (int i = 0; i < 4; i++) {
                producer.send("msg-" + i);
            }

            // Each consumer should receive some messages (not all)
            int count1 = 0, count2 = 0;
            for (int i = 0; i < 4; i++) {
                Message<String> m1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
                Message<String> m2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
                if (m1 != null) { consumer1.acknowledge(m1); count1++; }
                if (m2 != null) { consumer2.acknowledge(m2); count2++; }
            }

            assertEquals(4, count1 + count2, "All messages should be received");
        }
    }
}

Testing Dead Letter Topics

Dead letter topics (DLT) catch messages that fail processing. Test that your policy routes failures correctly:

class PulsarDeadLetterTest extends PulsarTestBase {

    @Test
    void failedMessageRoutedToDeadLetterTopic() throws Exception {
        String topic = uniqueTopic("dlq-source");
        String dlqTopic = topic + "-DLQ";
        String sub = "dlq-sub";

        try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic).create()) {
            producer.send("bad-message");
        }

        // Consumer with DLQ policy: max 1 redelivery
        try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName(sub)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(1)
                    .deadLetterTopic(dlqTopic)
                    .build())
                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
                .subscribe()) {

            // Receive and negativeAck twice (exhausts redeliveries)
            for (int attempt = 0; attempt < 2; attempt++) {
                Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
                if (msg != null) consumer.negativeAcknowledge(msg);
            }
        }

        // Verify message in DLQ
        try (Consumer<String> dlqConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(dlqTopic)
                .subscriptionName("dlq-reader")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe()) {

            Message<String> dlqMsg = dlqConsumer.receive(10, TimeUnit.SECONDS);
            assertNotNull(dlqMsg, "Message should appear in DLQ after max redeliveries");
            assertEquals("bad-message", dlqMsg.getValue());
            dlqConsumer.acknowledge(dlqMsg);
        }
    }
}

Testing Partitioned Topics

Partitioned topics enable parallel consumption. Test that messages are produced to partitions and consumers receive from all partitions:

@Test
void partitionedTopicReceivesAllMessages() throws Exception {
    String topic = "persistent://public/default/partitioned-" + System.nanoTime();

    // Create partitioned topic via admin API
    pulsarContainer.execInContainer(
        "bin/pulsar-admin", "topics", "create-partitioned-topic",
        "--partitions", "3", topic
    );

    try (
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
            .topic(topic).create();
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName("partition-sub")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscribe()
    ) {
        for (int i = 0; i < 9; i++) {
            producer.send("msg-" + i);
        }

        int received = 0;
        while (received < 9) {
            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
            if (msg == null) break;
            consumer.acknowledge(msg);
            received++;
        }
        assertEquals(9, received, "All messages should be received across partitions");
    }
}

CI Configuration

Pulsar containers take 15-30 seconds to start. Use a shared container to amortize startup cost:

# .github/workflows/test.yml
- name: Run Pulsar integration tests
  run: mvn test -Dtest="*PulsarTest*" -v
  timeout-minutes: 10

The @BeforeAll shared container approach (one container per test class) is the right trade-off — faster than one container per test, simpler than a Compose setup.

Summary

Apache Pulsar testing requires a real broker, and Testcontainers delivers one reliably in CI. Key tests to write:

  • Basic produce/consume — confirms broker connectivity and schema round-trip
  • Message ordering — verifies sequential delivery within a topic
  • Subscription types — validates Shared vs. Exclusive vs. Failover routing
  • Dead letter topics — tests the path most teams skip until production

Start with the DLQ test — understanding how Pulsar handles poison messages is critical before any consumer goes to production.

Read more