Testing BullMQ Jobs with Jest: Queues, Workers, and Retries
BullMQ is the most widely used Node.js job queue library, built on Redis. Testing it well means covering three separate concerns: the producer that enqueues jobs, the worker that processes them, and the retry/failure handling when things go wrong. This guide shows how to do all three with Jest.
Setup
Install BullMQ and the testing dependencies:
npm install bullmq ioredis
npm install -D jest @types/jest ts-jestFor unit tests that don't need a real Redis instance, use ioredis-mock:
npm install -D ioredis-mockTesting Job Producers
Test that your producer enqueues jobs with the correct data and options:
// jobs/emailQueue.ts
import { Queue } from 'bullmq'
import { redisConnection } from '../config/redis'
export const emailQueue = new Queue('email', { connection: redisConnection })
export async function queueWelcomeEmail(userId: string, email: string) {
return emailQueue.add(
'welcome',
{ userId, email },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 100,
}
)
}// jobs/emailQueue.test.ts
import { Queue } from 'bullmq'
import { queueWelcomeEmail } from './emailQueue'
jest.mock('bullmq', () => ({
Queue: jest.fn().mockImplementation(() => ({
add: jest.fn().mockResolvedValue({ id: 'job-123' }),
close: jest.fn(),
})),
}))
describe('queueWelcomeEmail', () => {
let mockAdd: jest.Mock
beforeEach(() => {
mockAdd = (Queue as jest.Mock).mock.results[0]?.value.add
mockAdd?.mockClear()
})
it('enqueues a welcome job with correct data', async () => {
await queueWelcomeEmail('user-1', 'alice@example.com')
expect(mockAdd).toHaveBeenCalledWith(
'welcome',
{ userId: 'user-1', email: 'alice@example.com' },
expect.objectContaining({ attempts: 3 })
)
})
it('returns the created job', async () => {
const job = await queueWelcomeEmail('user-1', 'alice@example.com')
expect(job).toHaveProperty('id', 'job-123')
})
})Testing Workers
Worker tests verify that your processing logic handles job data correctly:
// jobs/emailWorker.ts
import { Worker, Job } from 'bullmq'
import { redisConnection } from '../config/redis'
import { sendEmail } from '../services/email'
export function createEmailWorker() {
return new Worker(
'email',
async (job: Job) => {
if (job.name === 'welcome') {
await sendEmail({
to: job.data.email,
template: 'welcome',
data: { userId: job.data.userId },
})
return { sent: true, to: job.data.email }
}
throw new Error(`Unknown job type: ${job.name}`)
},
{ connection: redisConnection }
)
}The key insight: test the processor function directly, not the Worker instance:
// jobs/emailProcessor.ts — extract the processor function
export async function processEmailJob(job: Job) {
if (job.name === 'welcome') {
await sendEmail({
to: job.data.email,
template: 'welcome',
data: { userId: job.data.userId },
})
return { sent: true, to: job.data.email }
}
throw new Error(`Unknown job type: ${job.name}`)
}// jobs/emailProcessor.test.ts
import { processEmailJob } from './emailProcessor'
import { sendEmail } from '../services/email'
jest.mock('../services/email')
const mockSendEmail = sendEmail as jest.Mock
function makeJob(name: string, data: Record<string, unknown>) {
return { name, data, id: 'test-job-id' } as any
}
describe('processEmailJob', () => {
beforeEach(() => {
mockSendEmail.mockResolvedValue({ messageId: 'msg-123' })
})
afterEach(() => {
jest.clearAllMocks()
})
it('sends a welcome email for welcome jobs', async () => {
const result = await processEmailJob(
makeJob('welcome', { userId: 'u1', email: 'alice@example.com' })
)
expect(mockSendEmail).toHaveBeenCalledWith({
to: 'alice@example.com',
template: 'welcome',
data: { userId: 'u1' },
})
expect(result).toEqual({ sent: true, to: 'alice@example.com' })
})
it('throws for unknown job types', async () => {
await expect(
processEmailJob(makeJob('unknown-type', {}))
).rejects.toThrow('Unknown job type: unknown-type')
})
it('propagates email send failures', async () => {
mockSendEmail.mockRejectedValue(new Error('SMTP connection refused'))
await expect(
processEmailJob(makeJob('welcome', { userId: 'u1', email: 'a@b.com' }))
).rejects.toThrow('SMTP connection refused')
})
})Integration Tests with Real Redis
For integration tests, use a real Redis instance (or ioredis-mock):
// jobs/emailQueue.integration.test.ts
import { Queue, Worker, QueueEvents } from 'bullmq'
import IORedis from 'ioredis'
const connection = new IORedis({ maxRetriesPerRequest: null })
async function wait(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
describe('email queue integration', () => {
let queue: Queue
let worker: Worker
let queueEvents: QueueEvents
beforeEach(() => {
queue = new Queue('email-test', { connection })
queueEvents = new QueueEvents('email-test', { connection })
})
afterEach(async () => {
await worker?.close()
await queue.obliterate({ force: true })
await queue.close()
await queueEvents.close()
})
afterAll(async () => {
await connection.quit()
})
it('processes a job and returns a result', async () => {
const processedJobs: string[] = []
worker = new Worker(
'email-test',
async (job) => {
processedJobs.push(job.data.email)
return { sent: true }
},
{ connection }
)
const job = await queue.add('welcome', { email: 'alice@example.com' })
await job.waitUntilFinished(queueEvents, 5000)
expect(processedJobs).toContain('alice@example.com')
const completedJob = await queue.getJob(job.id!)
expect(completedJob?.returnvalue).toEqual({ sent: true })
})
it('retries a failing job up to the configured limit', async () => {
let attempts = 0
worker = new Worker(
'email-test',
async () => {
attempts++
if (attempts < 3) throw new Error('Temporary failure')
return { sent: true }
},
{ connection }
)
const job = await queue.add(
'welcome',
{ email: 'b@example.com' },
{ attempts: 3, backoff: { type: 'fixed', delay: 100 } }
)
await job.waitUntilFinished(queueEvents, 10000)
expect(attempts).toBe(3)
})
})Testing Retry Logic
Test what happens when all retries are exhausted:
it('moves job to failed after exhausting retries', async () => {
worker = new Worker(
'email-test',
async () => {
throw new Error('Permanent failure')
},
{ connection }
)
const failedJobIds: string[] = []
queueEvents.on('failed', ({ jobId }) => failedJobIds.push(jobId))
const job = await queue.add(
'welcome',
{ email: 'c@example.com' },
{ attempts: 2, backoff: { type: 'fixed', delay: 50 } }
)
// Wait for failure event
await new Promise<void>((resolve) => {
queueEvents.on('failed', resolve)
})
expect(failedJobIds).toContain(job.id)
const failedJob = await queue.getJob(job.id!)
expect(await failedJob?.isFailed()).toBe(true)
expect(failedJob?.failedReason).toContain('Permanent failure')
})Testing Delayed Jobs
it('does not process a delayed job immediately', async () => {
const processed: boolean[] = []
worker = new Worker(
'email-test',
async () => {
processed.push(true)
},
{ connection }
)
await queue.add('welcome', {}, { delay: 5000 })
await wait(500) // give worker time to pick up non-delayed jobs
expect(processed).toHaveLength(0)
})Testing Job Progress Tracking
// processor that reports progress
async function processWithProgress(job: Job) {
await job.updateProgress(10)
// ... first step
await job.updateProgress(50)
// ... second step
await job.updateProgress(100)
return { done: true }
}
it('tracks progress updates', async () => {
const progressEvents: number[] = []
worker = new Worker('email-test', processWithProgress, { connection })
queueEvents.on('progress', ({ data }) => {
progressEvents.push(data as number)
})
const job = await queue.add('welcome', {})
await job.waitUntilFinished(queueEvents, 5000)
expect(progressEvents).toEqual(expect.arrayContaining([10, 50, 100]))
})What to Mock vs What to Test for Real
| Test type | Use mock Redis | Use real Redis |
|---|---|---|
| Unit: processor logic | ✓ | — |
| Unit: producer enqueue call | ✓ | — |
| Integration: queue + worker lifecycle | — | ✓ |
| Integration: retry exhaustion | — | ✓ |
| Integration: delayed/scheduled jobs | — | ✓ |
Keep unit tests fast by mocking BullMQ entirely and testing your processor functions in isolation. Run real-Redis integration tests in CI against a test Redis instance or use ioredis-mock for environments where a real Redis isn't available.
What Automated Tests Miss
Unit and integration tests verify your job logic but won't catch:
- Memory leaks in long-running worker processes
- Redis connection pool exhaustion under high concurrency
- Clock drift in delayed jobs across multiple servers
- Worker crashes during job processing and the impact on job state
HelpMeTest can run your BullMQ-dependent flows end-to-end on a schedule, catching failures that only show up when the full stack is running. The $100/month Pro plan gives you unlimited tests with parallel execution.
Summary
Testing BullMQ effectively:
- Extract processor functions — test them independently of the Worker class
- Mock BullMQ for unit tests — fast, no Redis required
- Use real Redis for integration tests — verify lifecycle, retries, and failure handling
- Use
QueueEvents— the right way to observe job state changes in tests - Test the failure path — verify failed job state,
failedReason, and DLQ behaviour