ActiveMQ Testing Guide: Embedded Broker and Spring Integration Tests

ActiveMQ Testing Guide: Embedded Broker and Spring Integration Tests

ActiveMQ Classic provides an embedded broker you can start directly in your test JVM — no Docker, no external process. This makes JUnit tests fast and deterministic. This guide covers testing JMS queues and topics, transactional sending, and dead letter queue behavior using the embedded broker and Spring Boot's JMS autoconfiguration.

Key Takeaways

Use the embedded broker for unit and integration tests. BrokerService starts an in-memory ActiveMQ broker in the same JVM. Tests are fast and require no external infrastructure.

Always close consumers and producers in tests. JMS resources (sessions, consumers, producers) are not GC'd — unclosed resources in tests can leak connections and cause flaky behavior.

Test transaction rollback explicitly. JMS transactions are a common failure mode. Test that a rollback causes the message to be redelivered rather than lost.

DLQ requires explicit redelivery policy configuration. By default, ActiveMQ moves messages to ActiveMQ.DLQ after 6 redelivery attempts. Test that your policy matches the retry count you configured.

Use unique destination names per test. ActiveMQ persists queue state across tests in the same broker session unless you delete queues or use unique names.

The Embedded Broker Advantage

ActiveMQ Classic's embedded broker is the most underused testing feature in the JMS ecosystem. Unlike Pulsar (requires Docker) or RabbitMQ (Docker or external), you can start a full ActiveMQ broker in your test process:

BrokerService broker = new BrokerService();
broker.addConnector("vm://localhost");
broker.start();

This runs the complete broker stack — queues, topics, transactions, DLQ — in memory. Tests start in milliseconds.

Setup

<!-- pom.xml -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.18.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

Embedded Broker Base Class

// src/test/java/com/example/ActiveMQTestBase.java
package com.example;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import javax.jms.*;

public abstract class ActiveMQTestBase {

    static BrokerService broker;
    static ConnectionFactory connectionFactory;

    @BeforeAll
    static void startBroker() throws Exception {
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.addConnector("vm://localhost?broker.persistent=false");
        broker.start();

        connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
    }

    @AfterAll
    static void stopBroker() throws Exception {
        if (broker != null) broker.stop();
    }

    static String uniqueQueue(String testName) {
        return "test." + testName + "." + System.nanoTime();
    }

    static String uniqueTopic(String testName) {
        return "test.topic." + testName + "." + System.nanoTime();
    }
}

Testing Queue Send and Receive

class ActiveMQQueueTest extends ActiveMQTestBase {

    @Test
    void messagePublishedAndConsumedFromQueue() throws Exception {
        String queueName = uniqueQueue("basic");

        try (Connection conn = connectionFactory.createConnection()) {
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);

            // Send
            MessageProducer producer = session.createProducer(queue);
            TextMessage sent = session.createTextMessage("hello-activemq");
            producer.send(sent);

            // Receive
            MessageConsumer consumer = session.createConsumer(queue);
            TextMessage received = (TextMessage) consumer.receive(3000);

            assertNotNull(received, "Expected a message");
            assertEquals("hello-activemq", received.getText());

            consumer.close();
            producer.close();
            session.close();
        }
    }

    @Test
    void emptyQueueReturnsNull() throws Exception {
        String queueName = uniqueQueue("empty");

        try (Connection conn = connectionFactory.createConnection()) {
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);

            MessageConsumer consumer = session.createConsumer(queue);
            Message msg = consumer.receive(500);

            assertNull(msg, "Expected null from empty queue");

            consumer.close();
            session.close();
        }
    }

    @Test
    void queuePreservesMessageProperties() throws Exception {
        String queueName = uniqueQueue("props");

        try (Connection conn = connectionFactory.createConnection()) {
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);

            MessageProducer producer = session.createProducer(queue);
            TextMessage msg = session.createTextMessage("data");
            msg.setStringProperty("source", "order-service");
            msg.setIntProperty("priority", 5);
            producer.send(msg);

            MessageConsumer consumer = session.createConsumer(queue);
            TextMessage received = (TextMessage) consumer.receive(3000);

            assertNotNull(received);
            assertEquals("order-service", received.getStringProperty("source"));
            assertEquals(5, received.getIntProperty("priority"));

            consumer.close();
            producer.close();
            session.close();
        }
    }
}

Testing Topic Pub/Sub

Topics deliver messages to all subscribers. Test that multiple subscribers each receive the message:

class ActiveMQTopicTest extends ActiveMQTestBase {

    @Test
    void topicDeliversToAllSubscribers() throws Exception {
        String topicName = uniqueTopic("fanout");

        try (
            Connection conn1 = connectionFactory.createConnection();
            Connection conn2 = connectionFactory.createConnection()
        ) {
            conn1.start();
            conn2.start();

            Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session1.createTopic(topicName);

            MessageConsumer sub1 = session1.createConsumer(topic);
            MessageConsumer sub2 = session2.createConsumer(topic);

            // Publish
            Session pubSession = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer publisher = pubSession.createProducer(topic);
            publisher.send(pubSession.createTextMessage("broadcast"));

            // Both subscribers receive
            TextMessage msg1 = (TextMessage) sub1.receive(3000);
            TextMessage msg2 = (TextMessage) sub2.receive(3000);

            assertNotNull(msg1, "Subscriber 1 should receive message");
            assertNotNull(msg2, "Subscriber 2 should receive message");
            assertEquals("broadcast", msg1.getText());
            assertEquals("broadcast", msg2.getText());

            sub1.close(); sub2.close();
            session1.close(); session2.close();
            pubSession.close();
        }
    }
}

Testing JMS Transactions

Transactions ensure messages are sent atomically — or rolled back:

class ActiveMQTransactionTest extends ActiveMQTestBase {

    @Test
    void rolledBackTransactionDoesNotDeliverMessage() throws Exception {
        String queueName = uniqueQueue("tx-rollback");

        try (Connection conn = connectionFactory.createConnection()) {
            conn.start();

            // Transacted session
            Session txSession = conn.createSession(true, Session.SESSION_TRANSACTED);
            Queue queue = txSession.createQueue(queueName);
            MessageProducer producer = txSession.createProducer(queue);

            producer.send(txSession.createTextMessage("should-not-arrive"));
            txSession.rollback(); // Roll back — message should not appear

            // Non-transacted session to check queue
            Session readSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = readSession.createConsumer(queue);
            Message msg = consumer.receive(500);

            assertNull(msg, "Rolled-back message should not appear in queue");

            producer.close();
            consumer.close();
            txSession.close();
            readSession.close();
        }
    }

    @Test
    void committedTransactionDeliversMessage() throws Exception {
        String queueName = uniqueQueue("tx-commit");

        try (Connection conn = connectionFactory.createConnection()) {
            conn.start();
            Session txSession = conn.createSession(true, Session.SESSION_TRANSACTED);
            Queue queue = txSession.createQueue(queueName);
            MessageProducer producer = txSession.createProducer(queue);

            producer.send(txSession.createTextMessage("committed"));
            txSession.commit();

            Session readSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = readSession.createConsumer(queue);
            TextMessage msg = (TextMessage) consumer.receive(3000);

            assertNotNull(msg, "Committed message should be received");
            assertEquals("committed", msg.getText());

            producer.close(); consumer.close();
            txSession.close(); readSession.close();
        }
    }
}

Testing Dead Letter Queue Behavior

ActiveMQ moves messages to ActiveMQ.DLQ after max redelivery attempts. Test this:

class ActiveMQDLQTest extends ActiveMQTestBase {

    @Test
    void exceededRedeliveriesGoToDLQ() throws Exception {
        // Configure short redelivery policy
        ActiveMQConnectionFactory amqCf = (ActiveMQConnectionFactory) connectionFactory;
        RedeliveryPolicy policy = new RedeliveryPolicy();
        policy.setMaximumRedeliveries(1);
        policy.setInitialRedeliveryDelay(0);
        amqCf.setRedeliveryPolicy(policy);

        String queueName = uniqueQueue("dlq-test");

        try (Connection conn = amqCf.createConnection()) {
            conn.start();

            // Send message
            Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(queue);
            producer.send(session.createTextMessage("poison"));
            producer.close();

            // Consume without acknowledging (2 times = exceeds max 1 redelivery)
            MessageConsumer consumer = session.createConsumer(queue);
            Message m1 = consumer.receive(3000);
            assertNotNull(m1);
            session.recover(); // Triggers redelivery

            Message m2 = consumer.receive(3000);
            assertNotNull(m2);
            session.recover(); // Exceeds maxRedeliveries

            consumer.close();

            // Check DLQ
            Session dlqSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue dlq = dlqSession.createQueue("ActiveMQ.DLQ");
            MessageConsumer dlqConsumer = dlqSession.createConsumer(dlq);

            TextMessage dlqMsg = (TextMessage) dlqConsumer.receive(5000);
            assertNotNull(dlqMsg, "Expected message in ActiveMQ.DLQ");
            assertEquals("poison", dlqMsg.getText());

            dlqConsumer.close();
            dlqSession.close();
            session.close();
        }
    }
}

Testing with Spring Boot and @JmsListener

Testing Spring's @JmsListener annotation requires the Spring test context:

@SpringBootTest
@ActiveProfiles("test")
class OrderListenerTest {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private OrderProcessingService orderService;

    @Test
    void listenerProcessesOrderMessage() throws Exception {
        jmsTemplate.convertAndSend("orders.queue", Map.of(
            "orderId", "o123",
            "total", 150.00
        ));

        // Wait for async listener
        await().atMost(5, TimeUnit.SECONDS)
            .until(() -> orderService.isProcessed("o123"));

        assertTrue(orderService.isProcessed("o123"));
    }
}
# src/test/resources/application-test.yaml
spring:
  activemq:
    broker-url: vm://localhost?broker.persistent=false
    in-memory: true

Summary

ActiveMQ's embedded broker makes JMS testing fast and dependency-free:

  • Embedded broker starts in milliseconds with no Docker
  • Queue and topic tests verify routing and fan-out behavior
  • Transaction tests confirm rollback doesn't deliver messages and commit does
  • DLQ tests verify poison message handling matches your retry policy

Test the transaction rollback and DLQ paths first — those are the scenarios that cause silent data loss in production and are never tested until something goes wrong.

Read more