BullMQ Job Testing in Node.js: Queues, Workers, and Flow Producers
BullMQ is the modern successor to Bull for Node.js background job processing. It uses Redis for persistence and supports advanced patterns: priorities, rate limiting, flows (parent/child jobs), and job groups. Testing BullMQ requires understanding when to use real Redis versus mocks, and how to make workers testable.
Testing Architecture Choices
Unit tests with mocked Redis: Fast, no infrastructure, but may miss real-world Redis behaviors.
Integration tests with real Redis: Slower, requires Redis, catches connection issues and data consistency bugs.
Recommended approach:
- Unit test job logic directly (pure functions, handlers)
- Integration test queue/worker interaction with real Redis (test containers or local Redis)
- Use
ioredis-mockfor most queue interaction tests
Setup
npm install --save-dev jest ioredis-mock @testcontainers/redisBasic Configuration
// jest.config.js
module.exports = {
testEnvironment: 'node',
setupFilesAfterFramework: ['./jest.setup.js'],
testTimeout: 30000, // BullMQ tests may take longer
};// jest.setup.js
jest.setTimeout(30000);Testing Job Handlers Directly
The simplest approach: extract job logic into a pure function and test it independently of BullMQ.
// workers/email-worker.js
const { Worker } = require('bullmq');
// Pure function — testable without BullMQ
async function processEmailJob(job) {
const { userId, templateId, variables } = job.data;
const user = await UserService.findById(userId);
if (!user) {
throw new Error(`User ${userId} not found`);
}
const template = await EmailTemplate.findById(templateId);
const content = template.render(variables);
await EmailProvider.send({
to: user.email,
subject: content.subject,
body: content.body,
});
await job.updateProgress(100);
return { sent: true, email: user.email };
}
// Worker creation — separate from logic
function createEmailWorker(connection) {
return new Worker('emails', processEmailJob, {
connection,
concurrency: 5,
});
}
module.exports = { processEmailJob, createEmailWorker };// workers/email-worker.test.js
const { processEmailJob } = require('./email-worker');
describe('processEmailJob', () => {
let mockJob;
beforeEach(() => {
mockJob = {
id: 'test-job-id',
data: {
userId: 1,
templateId: 'welcome',
variables: { firstName: 'Alice' },
},
updateProgress: jest.fn(),
};
});
it('sends email to the correct user', async () => {
jest.spyOn(UserService, 'findById').mockResolvedValue({
id: 1,
email: 'alice@example.com',
});
jest.spyOn(EmailTemplate, 'findById').mockResolvedValue({
render: jest.fn().mockReturnValue({
subject: 'Welcome!',
body: 'Hello Alice',
}),
});
const mockSend = jest.spyOn(EmailProvider, 'send').mockResolvedValue({ id: 'msg-123' });
const result = await processEmailJob(mockJob);
expect(mockSend).toHaveBeenCalledWith({
to: 'alice@example.com',
subject: 'Welcome!',
body: 'Hello Alice',
});
expect(result).toEqual({ sent: true, email: 'alice@example.com' });
expect(mockJob.updateProgress).toHaveBeenCalledWith(100);
});
it('throws when user is not found', async () => {
jest.spyOn(UserService, 'findById').mockResolvedValue(null);
await expect(processEmailJob(mockJob)).rejects.toThrow('User 1 not found');
});
it('throws when email provider fails', async () => {
jest.spyOn(UserService, 'findById').mockResolvedValue({ id: 1, email: 'test@example.com' });
jest.spyOn(EmailTemplate, 'findById').mockResolvedValue({
render: jest.fn().mockReturnValue({ subject: 'Hi', body: 'Hello' }),
});
jest.spyOn(EmailProvider, 'send').mockRejectedValue(new Error('SMTP timeout'));
await expect(processEmailJob(mockJob)).rejects.toThrow('SMTP timeout');
});
});Integration Testing with ioredis-mock
For testing queue interactions without Redis:
// test/helpers/queue-helper.js
const { Queue, Worker } = require('bullmq');
const IORedis = require('ioredis');
// For unit tests: use ioredis-mock
const IORedisMock = require('ioredis-mock');
function createTestConnection() {
return new IORedisMock();
}
function createTestQueue(name, connection) {
return new Queue(name, { connection });
}
module.exports = { createTestConnection, createTestQueue };// workers/notification-worker.test.js
const { Queue, Worker } = require('bullmq');
const IORedisMock = require('ioredis-mock');
const { processNotificationJob } = require('./notification-worker');
describe('Notification Queue Integration', () => {
let connection;
let queue;
let worker;
beforeEach(async () => {
connection = new IORedisMock();
queue = new Queue('notifications', { connection });
});
afterEach(async () => {
await queue.close();
await connection.quit();
});
it('adds job to queue with correct data', async () => {
const jobData = { userId: 1, type: 'order_shipped', orderId: 42 };
const job = await queue.add('send-notification', jobData);
expect(job.id).toBeDefined();
expect(job.name).toBe('send-notification');
expect(job.data).toEqual(jobData);
});
it('processes job with correct handler', async () => {
const handler = jest.fn().mockResolvedValue({ sent: true });
worker = new Worker('notifications', handler, { connection });
await queue.add('send-notification', { userId: 1, type: 'welcome' });
// Wait for job to be processed
await new Promise((resolve) => {
worker.on('completed', resolve);
});
expect(handler).toHaveBeenCalledTimes(1);
expect(handler.mock.calls[0][0].data).toEqual({ userId: 1, type: 'welcome' });
});
});Integration Testing with Real Redis
For more reliable tests, use real Redis via Docker or @testcontainers/redis:
// test/integration/queue.integration.test.js
const { Queue, Worker, QueueEvents } = require('bullmq');
const { GenericContainer } = require('testcontainers');
const IORedis = require('ioredis');
describe('Queue Integration Tests', () => {
let redisContainer;
let connection;
let queue;
beforeAll(async () => {
redisContainer = await new GenericContainer('redis:7')
.withExposedPorts(6379)
.start();
connection = new IORedis({
host: redisContainer.getHost(),
port: redisContainer.getMappedPort(6379),
});
});
afterAll(async () => {
await connection.quit();
await redisContainer.stop();
});
beforeEach(async () => {
queue = new Queue('test-queue', { connection });
await queue.obliterate({ force: true }); // Clean slate
});
afterEach(async () => {
await queue.close();
});
it('completes a job end to end', async () => {
const results = [];
const worker = new Worker(
'test-queue',
async (job) => {
results.push(job.data.value * 2);
return { doubled: job.data.value * 2 };
},
{ connection }
);
const job = await queue.add('double', { value: 21 });
// Wait for completion
const completedJob = await job.waitUntilFinished(
new QueueEvents('test-queue', { connection })
);
expect(completedJob).toEqual({ doubled: 42 });
expect(results).toContain(42);
await worker.close();
});
});Testing Retries and Failure Handling
// workers/retry-worker.js
const { Worker } = require('bullmq');
async function processWithRetry(job) {
const { url, maxAttempts = 3 } = job.data;
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return { success: true, status: response.status };
} catch (error) {
// BullMQ will retry based on job options
throw error;
}
}
// Queue with retry options
async function addRetryableJob(queue, url) {
return queue.add(
'fetch-url',
{ url },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
}
);
}// Test retry behavior
describe('Retry handling', () => {
it('retries on failure and eventually succeeds', async () => {
let attempts = 0;
const worker = new Worker(
'test-queue',
async (job) => {
attempts++;
if (attempts < 3) throw new Error('Transient error');
return { success: true };
},
{
connection,
limiter: { max: 10, duration: 1000 },
}
);
const job = await queue.add(
'retry-test',
{ data: 'test' },
{
attempts: 3,
backoff: { type: 'fixed', delay: 10 }, // Short delay for tests
}
);
const queueEvents = new QueueEvents('test-queue', { connection });
const result = await job.waitUntilFinished(queueEvents, 10000);
expect(result).toEqual({ success: true });
expect(attempts).toBe(3);
await worker.close();
await queueEvents.close();
});
it('moves job to failed after max retries', async () => {
const worker = new Worker(
'test-queue',
async () => { throw new Error('Always fails'); },
{ connection }
);
const job = await queue.add(
'fail-test',
{ data: 'test' },
{
attempts: 2,
backoff: { type: 'fixed', delay: 10 },
}
);
const queueEvents = new QueueEvents('test-queue', { connection });
await expect(
job.waitUntilFinished(queueEvents, 10000)
).rejects.toThrow('Always fails');
const failedJob = await queue.getJob(job.id);
expect(failedJob.attemptsMade).toBe(2);
await worker.close();
await queueEvents.close();
});
});Testing Flow Producers (Parent/Child Jobs)
BullMQ flows allow parent jobs to spawn child jobs and wait for all children to complete:
// flows/data-pipeline.js
const { FlowProducer } = require('bullmq');
async function runDataPipeline(datasetId) {
const flow = new FlowProducer({ connection });
return flow.add({
name: 'aggregate-results',
queueName: 'pipeline',
data: { datasetId },
children: [
{
name: 'process-chunk',
queueName: 'pipeline',
data: { datasetId, chunk: 1 },
},
{
name: 'process-chunk',
queueName: 'pipeline',
data: { datasetId, chunk: 2 },
},
{
name: 'process-chunk',
queueName: 'pipeline',
data: { datasetId, chunk: 3 },
},
],
});
}describe('Data Pipeline Flow', () => {
it('creates parent job with children', async () => {
const flow = new FlowProducer({ connection });
const result = await flow.add({
name: 'aggregate',
queueName: 'test-flow',
data: { id: 1 },
children: [
{ name: 'child-1', queueName: 'test-flow', data: { part: 1 } },
{ name: 'child-2', queueName: 'test-flow', data: { part: 2 } },
],
});
expect(result.job.name).toBe('aggregate');
expect(result.children).toHaveLength(2);
// Parent should wait for children
const parentJob = await queue.getJob(result.job.id);
expect(parentJob.data.id).toBe(1);
await flow.close();
});
it('parent receives children results', async () => {
const childResults = [];
const worker = new Worker(
'test-flow',
async (job) => {
if (job.name === 'child') {
return { processed: job.data.value * 2 };
}
if (job.name === 'parent') {
// Children results available via job.data.childrenValues
const childrenValues = await job.getChildrenValues();
return { total: Object.values(childrenValues).reduce((sum, v) => sum + v.processed, 0) };
}
},
{ connection }
);
const flow = new FlowProducer({ connection });
const { job: parentJob } = await flow.add({
name: 'parent',
queueName: 'test-flow',
data: {},
children: [
{ name: 'child', queueName: 'test-flow', data: { value: 5 } },
{ name: 'child', queueName: 'test-flow', data: { value: 10 } },
],
});
const queueEvents = new QueueEvents('test-flow', { connection });
const result = await parentJob.waitUntilFinished(queueEvents, 30000);
expect(result.total).toBe(30); // (5*2) + (10*2)
await worker.close();
await flow.close();
await queueEvents.close();
});
});Testing Rate Limiting
describe('Rate limited queue', () => {
it('respects rate limit', async () => {
const processedAt = [];
const worker = new Worker(
'rate-limited',
async (job) => {
processedAt.push(Date.now());
return { processed: true };
},
{
connection,
limiter: {
max: 2, // Max 2 jobs
duration: 500, // Per 500ms
},
}
);
// Add 4 jobs
for (let i = 0; i < 4; i++) {
await queue.add('job', { index: i });
}
// Wait for all to complete
await new Promise(resolve => setTimeout(resolve, 2000));
// Should have taken at least 1 second (4 jobs / 2 per 500ms = 2 windows)
const totalTime = processedAt[3] - processedAt[0];
expect(totalTime).toBeGreaterThan(900);
await worker.close();
});
});Testing Job Events
describe('Job lifecycle events', () => {
it('emits events in correct order', async () => {
const events = [];
const queueEvents = new QueueEvents('test-queue', { connection });
queueEvents.on('active', ({ jobId }) => events.push(`active:${jobId}`));
queueEvents.on('completed', ({ jobId }) => events.push(`completed:${jobId}`));
queueEvents.on('failed', ({ jobId }) => events.push(`failed:${jobId}`));
const worker = new Worker(
'test-queue',
async (job) => ({ done: true }),
{ connection }
);
const job = await queue.add('test', { data: 1 });
await job.waitUntilFinished(queueEvents);
expect(events).toContain(`active:${job.id}`);
expect(events).toContain(`completed:${job.id}`);
await worker.close();
await queueEvents.close();
});
});Cleanup and Teardown
Always clean up BullMQ resources in tests to avoid port conflicts and memory leaks:
// test/helpers/bullmq-cleanup.js
async function cleanupBullMQ(queue, worker, queueEvents, connection) {
const cleanups = [];
if (worker) cleanups.push(worker.close());
if (queueEvents) cleanups.push(queueEvents.close());
if (queue) cleanups.push(queue.close());
await Promise.all(cleanups);
if (connection) await connection.quit();
}
// Obliterate all queue data for a fresh test
async function resetQueue(queue) {
await queue.obliterate({ force: true });
}BullMQ's strong TypeScript support and separation of Queue/Worker/QueueEvents makes it more testable than its predecessor. The key is keeping job handler logic pure and separate from the BullMQ wiring — pure functions are easy to unit test, and integration tests with real Redis catch the connection and lifecycle issues that mocks miss.