BullMQ Testing: Unit Tests, Integration Tests, and Worker Testing Patterns
BullMQ is the most popular Node.js queue library. It's built on Redis and supports job retries, priorities, rate limiting, and repeatable jobs. Testing BullMQ requires understanding its architecture: queues add jobs, workers process them, and both communicate through Redis.
Architecture for Testability
Structure your code to separate job logic from queue concerns:
jobs/
process-payment.js ← handler function (pure, testable)
process-payment.worker.js ← worker setup (uses handler)
queues/
payment-queue.js ← queue instance// jobs/process-payment.js — pure business logic
export async function processPaymentHandler(jobData) {
const { orderId, amount, currency } = jobData
const order = await db.orders.findById(orderId)
if (!order) throw new Error(`Order ${orderId} not found`)
if (order.status !== 'pending') throw new Error(`Order already processed`)
const result = await stripe.charges.create({ amount, currency, source: order.stripeToken })
await db.orders.update(orderId, {
status: 'paid',
stripeChargeId: result.id,
paidAt: new Date(),
})
return { success: true, chargeId: result.id }
}// jobs/process-payment.worker.js — BullMQ worker
import { Worker } from 'bullmq'
import { processPaymentHandler } from './process-payment'
import { connection } from '../redis'
export const paymentWorker = new Worker(
'payments',
async (job) => {
job.updateProgress(10)
const result = await processPaymentHandler(job.data)
job.updateProgress(100)
return result
},
{ connection, concurrency: 5 }
)
paymentWorker.on('failed', (job, err) => {
console.error(`Payment job ${job?.id} failed:`, err)
})Unit Testing Handlers
// process-payment.test.js
import { vi, test, expect, beforeEach } from 'vitest'
import { processPaymentHandler } from './process-payment'
vi.mock('../db')
vi.mock('../stripe')
beforeEach(() => vi.clearAllMocks())
test('processes payment successfully', async () => {
db.orders.findById.mockResolvedValue({
id: 'ord-1',
status: 'pending',
stripeToken: 'tok_test'
})
stripe.charges.create.mockResolvedValue({ id: 'ch_test123' })
db.orders.update.mockResolvedValue(undefined)
const result = await processPaymentHandler({
orderId: 'ord-1',
amount: 9999,
currency: 'usd'
})
expect(result).toEqual({ success: true, chargeId: 'ch_test123' })
expect(stripe.charges.create).toHaveBeenCalledWith({
amount: 9999,
currency: 'usd',
source: 'tok_test'
})
expect(db.orders.update).toHaveBeenCalledWith('ord-1', {
status: 'paid',
stripeChargeId: 'ch_test123',
paidAt: expect.any(Date)
})
})
test('throws when order not found', async () => {
db.orders.findById.mockResolvedValue(null)
await expect(processPaymentHandler({ orderId: 'missing', amount: 100, currency: 'usd' }))
.rejects.toThrow('Order missing not found')
expect(stripe.charges.create).not.toHaveBeenCalled()
})
test('throws when order already paid', async () => {
db.orders.findById.mockResolvedValue({ id: 'ord-1', status: 'paid' })
await expect(processPaymentHandler({ orderId: 'ord-1', amount: 100, currency: 'usd' }))
.rejects.toThrow('Order already processed')
})Integration Tests with Real Redis
// payment-queue.integration.test.js
import { Queue, Worker, QueueEvents } from 'bullmq'
import Redis from 'ioredis'
import { processPaymentHandler } from './process-payment'
const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null })
let queue
let worker
let queueEvents
beforeAll(async () => {
queue = new Queue('payments-test', { connection })
queueEvents = new QueueEvents('payments-test', { connection })
await queueEvents.waitUntilReady()
})
afterAll(async () => {
await worker?.close()
await queue.obliterate({ force: true })
await queue.close()
await queueEvents.close()
await connection.quit()
})
beforeEach(async () => {
await queue.drain()
worker?.close()
})
test('job completes and returns result', async () => {
// Mock the handler
vi.mock('./process-payment', () => ({
processPaymentHandler: vi.fn().mockResolvedValue({ success: true, chargeId: 'ch_123' })
}))
worker = new Worker('payments-test', async (job) => {
return processPaymentHandler(job.data)
}, { connection })
const job = await queue.add('payment', { orderId: 'ord-1', amount: 9999, currency: 'usd' })
const result = await job.waitUntilFinished(queueEvents, 5000)
expect(result).toEqual({ success: true, chargeId: 'ch_123' })
})
test('failed job is retried', async () => {
let attempts = 0
worker = new Worker('payments-test', async () => {
attempts++
if (attempts < 3) throw new Error('transient error')
return { success: true }
}, {
connection,
settings: { backoffStrategy: () => 100 }
})
const job = await queue.add('payment', { orderId: 'ord-1' }, {
attempts: 3,
backoff: { type: 'fixed', delay: 100 }
})
const result = await job.waitUntilFinished(queueEvents, 10000)
expect(result.success).toBe(true)
expect(attempts).toBe(3)
})Testing Job Progress
test('worker reports progress', async () => {
const progressValues = []
worker = new Worker('payments-test', async (job) => {
await job.updateProgress(25)
await job.updateProgress(75)
await job.updateProgress(100)
return { done: true }
}, { connection })
queueEvents.on('progress', ({ jobId, data }) => {
progressValues.push(data)
})
const job = await queue.add('task', {})
await job.waitUntilFinished(queueEvents, 5000)
expect(progressValues).toContain(25)
expect(progressValues).toContain(75)
expect(progressValues).toContain(100)
})Testing Repeatable Jobs
test('repeatable job is scheduled at correct interval', async () => {
await queue.add(
'daily-report',
{ reportType: 'daily' },
{ repeat: { every: 86400000 } } // 24 hours
)
const repeatableJobs = await queue.getRepeatableJobs()
expect(repeatableJobs).toHaveLength(1)
expect(repeatableJobs[0].name).toBe('daily-report')
expect(repeatableJobs[0].every).toBe(86400000)
})Testing Priority Queues
test('high priority jobs process before low priority', async () => {
const processed = []
worker = new Worker('payments-test', async (job) => {
processed.push(job.opts.priority)
await new Promise(r => setTimeout(r, 50))
}, { connection })
// Add low priority jobs first
await queue.add('task', {}, { priority: 10 })
await queue.add('task', {}, { priority: 10 })
// Add high priority job
await queue.add('urgent', {}, { priority: 1 })
await new Promise(r => setTimeout(r, 500))
// High priority (1) should have been processed
expect(processed[0]).toBe(1)
})Testing Queue Draining
test('all jobs complete before queue drains', async () => {
const processed = new Set()
worker = new Worker('payments-test', async (job) => {
processed.add(job.id)
}, { connection })
// Add 10 jobs
const jobs = await Promise.all(
Array.from({ length: 10 }, (_, i) =>
queue.add('task', { index: i })
)
)
// Wait for all to complete
await Promise.all(jobs.map(j => j.waitUntilFinished(queueEvents, 10000)))
expect(processed.size).toBe(10)
})Mocking BullMQ in Unit Tests
When you want to test code that enqueues jobs (but not the jobs themselves):
// services/order-service.js
import { paymentQueue } from '../queues/payment-queue'
export async function createOrder(orderData) {
const order = await db.orders.create(orderData)
await paymentQueue.add('process', { orderId: order.id, amount: order.total })
return order
}
// services/order-service.test.js
vi.mock('../queues/payment-queue', () => ({
paymentQueue: { add: vi.fn().mockResolvedValue({ id: 'job-1' }) }
}))
import { paymentQueue } from '../queues/payment-queue'
import { createOrder } from './order-service'
test('enqueues payment job after order creation', async () => {
const order = await createOrder({ userId: 'u1', total: 9999, items: [] })
expect(paymentQueue.add).toHaveBeenCalledWith(
'process',
{ orderId: order.id, amount: 9999 }
)
})Summary
BullMQ testing has two tiers: unit tests for handler functions (fast, no Redis needed) and integration tests for queue mechanics (require Redis). Keep handlers as pure functions that accept jobData and return results — this makes them trivial to unit test. Use integration tests to verify retry logic, progress tracking, and prioritization. Mock the queue entirely when testing code that enqueues jobs.