Apache Pulsar Testing Guide: Consumer and Producer Testing with Testcontainers
Apache Pulsar testing requires a real broker for meaningful integration tests — the client library doesn't provide a simulation mode. Testcontainers solves this with an official Pulsar image that starts in seconds. This guide covers producer/consumer testing, schema validation, subscription types, and dead letter topic handling in Java with JUnit 5.
Key Takeaways
Pulsar requires a real broker — no in-memory mock exists. Unlike NATS (embedded server) or RabbitMQ (partial mocking), Pulsar's client has no simulation mode. Use Testcontainers for all integration tests.
Use unique topic names per test. Pulsar persists topics even after tests finish. Prefix topics with the test class name or a UUID to prevent cross-test message bleed.
Test schema serialization explicitly. Pulsar's schema registry can reject messages with incompatible schemas. Test schema evolution before deploying schema changes.
Each subscription type has different behavior — test all you use. Exclusive, Shared, Failover, and Key_Shared subscriptions deliver messages differently. Test the subscription type your consumers actually use.
Dead letter topics require explicit configuration. DLT routing doesn't happen by default. Test that your DeadLetterPolicy is configured correctly by forcing consumer failures.
Why Pulsar Testing Is Harder Than Other Brokers
Apache Pulsar's architecture separates compute (brokers) from storage (BookKeeper). This means there's no practical in-memory mode — you need the full stack running to test meaningful behavior. Testcontainers' official apachepulsar/pulsar image runs a standalone Pulsar server (broker + BookKeeper + ZooKeeper in one process) suitable for testing.
The result: Pulsar integration tests take 10-30 seconds to start but then run fast. Budget your CI accordingly.
Setup
<!-- pom.xml -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>Shared Pulsar Container Fixture
// src/test/java/com/example/PulsarTestBase.java
package com.example;
import org.apache.pulsar.client.api.PulsarClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;
public abstract class PulsarTestBase {
static PulsarContainer pulsarContainer;
static PulsarClient pulsarClient;
@BeforeAll
static void startContainer() throws Exception {
pulsarContainer = new PulsarContainer(
DockerImageName.parse("apachepulsar/pulsar:3.2.0")
);
pulsarContainer.start();
pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarContainer.getPulsarBrokerUrl())
.build();
}
@AfterAll
static void stopContainer() throws Exception {
if (pulsarClient != null) pulsarClient.close();
if (pulsarContainer != null) pulsarContainer.stop();
}
static String uniqueTopic(String testName) {
return "persistent://public/default/" + testName + "-" + System.nanoTime();
}
}Basic Producer and Consumer Tests
// src/test/java/com/example/PulsarProducerConsumerTest.java
package com.example;
import org.apache.pulsar.client.api.*;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
class PulsarProducerConsumerTest extends PulsarTestBase {
@Test
void producerPublishesAndConsumerReceives() throws Exception {
String topic = uniqueTopic("basic");
try (
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
producer.send("hello-pulsar");
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg, "Expected to receive a message");
assertEquals("hello-pulsar", msg.getValue());
consumer.acknowledge(msg);
}
}
@Test
void consumerReceivesNoMessageFromEmptyTopic() throws Exception {
String topic = uniqueTopic("empty");
try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("empty-sub")
.subscribe()) {
Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNull(msg, "Expected null for empty topic");
}
}
@Test
void multipleMessagesDeliveredInOrder() throws Exception {
String topic = uniqueTopic("ordered");
try (
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("order-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
String[] messages = {"first", "second", "third"};
for (String m : messages) {
producer.send(m);
}
for (String expected : messages) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(expected, msg.getValue());
consumer.acknowledge(msg);
}
}
}
}Testing JSON Schema
Pulsar's schema registry enforces schema compatibility. Test that schema validation works:
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;
record OrderEvent(String orderId, double total, String status) {}
class PulsarSchemaTest extends PulsarTestBase {
@Test
void avroSchemaEnforcedOnProducer() throws Exception {
String topic = uniqueTopic("schema");
try (
Producer<OrderEvent> producer = pulsarClient.newProducer(Schema.AVRO(OrderEvent.class))
.topic(topic)
.create();
Consumer<OrderEvent> consumer = pulsarClient.newConsumer(Schema.AVRO(OrderEvent.class))
.topic(topic)
.subscriptionName("schema-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
OrderEvent event = new OrderEvent("ord-1", 99.99, "pending");
producer.send(event);
Message<OrderEvent> msg = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals("ord-1", msg.getValue().orderId());
assertEquals(99.99, msg.getValue().total(), 0.001);
consumer.acknowledge(msg);
}
}
}Testing Subscription Types
Pulsar's subscription types change message delivery behavior. Test each type you use:
class PulsarSubscriptionTypeTest extends PulsarTestBase {
@Test
void sharedSubscriptionDistributesMessages() throws Exception {
String topic = uniqueTopic("shared");
String sub = "shared-workers";
try (
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).create();
Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName(sub)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName(sub)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
// Publish 4 messages
for (int i = 0; i < 4; i++) {
producer.send("msg-" + i);
}
// Each consumer should receive some messages (not all)
int count1 = 0, count2 = 0;
for (int i = 0; i < 4; i++) {
Message<String> m1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
Message<String> m2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
if (m1 != null) { consumer1.acknowledge(m1); count1++; }
if (m2 != null) { consumer2.acknowledge(m2); count2++; }
}
assertEquals(4, count1 + count2, "All messages should be received");
}
}
}Testing Dead Letter Topics
Dead letter topics (DLT) catch messages that fail processing. Test that your policy routes failures correctly:
class PulsarDeadLetterTest extends PulsarTestBase {
@Test
void failedMessageRoutedToDeadLetterTopic() throws Exception {
String topic = uniqueTopic("dlq-source");
String dlqTopic = topic + "-DLQ";
String sub = "dlq-sub";
try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).create()) {
producer.send("bad-message");
}
// Consumer with DLQ policy: max 1 redelivery
try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(sub)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(1)
.deadLetterTopic(dlqTopic)
.build())
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe()) {
// Receive and negativeAck twice (exhausts redeliveries)
for (int attempt = 0; attempt < 2; attempt++) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) consumer.negativeAcknowledge(msg);
}
}
// Verify message in DLQ
try (Consumer<String> dlqConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(dlqTopic)
.subscriptionName("dlq-reader")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<String> dlqMsg = dlqConsumer.receive(10, TimeUnit.SECONDS);
assertNotNull(dlqMsg, "Message should appear in DLQ after max redeliveries");
assertEquals("bad-message", dlqMsg.getValue());
dlqConsumer.acknowledge(dlqMsg);
}
}
}Testing Partitioned Topics
Partitioned topics enable parallel consumption. Test that messages are produced to partitions and consumers receive from all partitions:
@Test
void partitionedTopicReceivesAllMessages() throws Exception {
String topic = "persistent://public/default/partitioned-" + System.nanoTime();
// Create partitioned topic via admin API
pulsarContainer.execInContainer(
"bin/pulsar-admin", "topics", "create-partitioned-topic",
"--partitions", "3", topic
);
try (
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("partition-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
for (int i = 0; i < 9; i++) {
producer.send("msg-" + i);
}
int received = 0;
while (received < 9) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg == null) break;
consumer.acknowledge(msg);
received++;
}
assertEquals(9, received, "All messages should be received across partitions");
}
}CI Configuration
Pulsar containers take 15-30 seconds to start. Use a shared container to amortize startup cost:
# .github/workflows/test.yml
- name: Run Pulsar integration tests
run: mvn test -Dtest="*PulsarTest*" -v
timeout-minutes: 10The @BeforeAll shared container approach (one container per test class) is the right trade-off — faster than one container per test, simpler than a Compose setup.
Summary
Apache Pulsar testing requires a real broker, and Testcontainers delivers one reliably in CI. Key tests to write:
- Basic produce/consume — confirms broker connectivity and schema round-trip
- Message ordering — verifies sequential delivery within a topic
- Subscription types — validates Shared vs. Exclusive vs. Failover routing
- Dead letter topics — tests the path most teams skip until production
Start with the DLQ test — understanding how Pulsar handles poison messages is critical before any consumer goes to production.