BullMQ Testing: Unit Tests, Integration Tests, and Worker Testing Patterns

BullMQ Testing: Unit Tests, Integration Tests, and Worker Testing Patterns

BullMQ is the most popular Node.js queue library. It's built on Redis and supports job retries, priorities, rate limiting, and repeatable jobs. Testing BullMQ requires understanding its architecture: queues add jobs, workers process them, and both communicate through Redis.

Architecture for Testability

Structure your code to separate job logic from queue concerns:

jobs/
  process-payment.js    ← handler function (pure, testable)
  process-payment.worker.js  ← worker setup (uses handler)
queues/
  payment-queue.js      ← queue instance
// jobs/process-payment.js — pure business logic
export async function processPaymentHandler(jobData) {
  const { orderId, amount, currency } = jobData
  
  const order = await db.orders.findById(orderId)
  if (!order) throw new Error(`Order ${orderId} not found`)
  if (order.status !== 'pending') throw new Error(`Order already processed`)
  
  const result = await stripe.charges.create({ amount, currency, source: order.stripeToken })
  
  await db.orders.update(orderId, {
    status: 'paid',
    stripeChargeId: result.id,
    paidAt: new Date(),
  })
  
  return { success: true, chargeId: result.id }
}
// jobs/process-payment.worker.js — BullMQ worker
import { Worker } from 'bullmq'
import { processPaymentHandler } from './process-payment'
import { connection } from '../redis'

export const paymentWorker = new Worker(
  'payments',
  async (job) => {
    job.updateProgress(10)
    const result = await processPaymentHandler(job.data)
    job.updateProgress(100)
    return result
  },
  { connection, concurrency: 5 }
)

paymentWorker.on('failed', (job, err) => {
  console.error(`Payment job ${job?.id} failed:`, err)
})

Unit Testing Handlers

// process-payment.test.js
import { vi, test, expect, beforeEach } from 'vitest'
import { processPaymentHandler } from './process-payment'

vi.mock('../db')
vi.mock('../stripe')

beforeEach(() => vi.clearAllMocks())

test('processes payment successfully', async () => {
  db.orders.findById.mockResolvedValue({
    id: 'ord-1',
    status: 'pending',
    stripeToken: 'tok_test'
  })
  stripe.charges.create.mockResolvedValue({ id: 'ch_test123' })
  db.orders.update.mockResolvedValue(undefined)
  
  const result = await processPaymentHandler({
    orderId: 'ord-1',
    amount: 9999,
    currency: 'usd'
  })
  
  expect(result).toEqual({ success: true, chargeId: 'ch_test123' })
  expect(stripe.charges.create).toHaveBeenCalledWith({
    amount: 9999,
    currency: 'usd',
    source: 'tok_test'
  })
  expect(db.orders.update).toHaveBeenCalledWith('ord-1', {
    status: 'paid',
    stripeChargeId: 'ch_test123',
    paidAt: expect.any(Date)
  })
})

test('throws when order not found', async () => {
  db.orders.findById.mockResolvedValue(null)
  
  await expect(processPaymentHandler({ orderId: 'missing', amount: 100, currency: 'usd' }))
    .rejects.toThrow('Order missing not found')
  
  expect(stripe.charges.create).not.toHaveBeenCalled()
})

test('throws when order already paid', async () => {
  db.orders.findById.mockResolvedValue({ id: 'ord-1', status: 'paid' })
  
  await expect(processPaymentHandler({ orderId: 'ord-1', amount: 100, currency: 'usd' }))
    .rejects.toThrow('Order already processed')
})

Integration Tests with Real Redis

// payment-queue.integration.test.js
import { Queue, Worker, QueueEvents } from 'bullmq'
import Redis from 'ioredis'
import { processPaymentHandler } from './process-payment'

const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null })

let queue
let worker
let queueEvents

beforeAll(async () => {
  queue = new Queue('payments-test', { connection })
  queueEvents = new QueueEvents('payments-test', { connection })
  await queueEvents.waitUntilReady()
})

afterAll(async () => {
  await worker?.close()
  await queue.obliterate({ force: true })
  await queue.close()
  await queueEvents.close()
  await connection.quit()
})

beforeEach(async () => {
  await queue.drain()
  worker?.close()
})

test('job completes and returns result', async () => {
  // Mock the handler
  vi.mock('./process-payment', () => ({
    processPaymentHandler: vi.fn().mockResolvedValue({ success: true, chargeId: 'ch_123' })
  }))
  
  worker = new Worker('payments-test', async (job) => {
    return processPaymentHandler(job.data)
  }, { connection })
  
  const job = await queue.add('payment', { orderId: 'ord-1', amount: 9999, currency: 'usd' })
  
  const result = await job.waitUntilFinished(queueEvents, 5000)
  
  expect(result).toEqual({ success: true, chargeId: 'ch_123' })
})

test('failed job is retried', async () => {
  let attempts = 0
  
  worker = new Worker('payments-test', async () => {
    attempts++
    if (attempts < 3) throw new Error('transient error')
    return { success: true }
  }, {
    connection,
    settings: { backoffStrategy: () => 100 }
  })
  
  const job = await queue.add('payment', { orderId: 'ord-1' }, {
    attempts: 3,
    backoff: { type: 'fixed', delay: 100 }
  })
  
  const result = await job.waitUntilFinished(queueEvents, 10000)
  
  expect(result.success).toBe(true)
  expect(attempts).toBe(3)
})

Testing Job Progress

test('worker reports progress', async () => {
  const progressValues = []
  
  worker = new Worker('payments-test', async (job) => {
    await job.updateProgress(25)
    await job.updateProgress(75)
    await job.updateProgress(100)
    return { done: true }
  }, { connection })
  
  queueEvents.on('progress', ({ jobId, data }) => {
    progressValues.push(data)
  })
  
  const job = await queue.add('task', {})
  await job.waitUntilFinished(queueEvents, 5000)
  
  expect(progressValues).toContain(25)
  expect(progressValues).toContain(75)
  expect(progressValues).toContain(100)
})

Testing Repeatable Jobs

test('repeatable job is scheduled at correct interval', async () => {
  await queue.add(
    'daily-report',
    { reportType: 'daily' },
    { repeat: { every: 86400000 } } // 24 hours
  )
  
  const repeatableJobs = await queue.getRepeatableJobs()
  
  expect(repeatableJobs).toHaveLength(1)
  expect(repeatableJobs[0].name).toBe('daily-report')
  expect(repeatableJobs[0].every).toBe(86400000)
})

Testing Priority Queues

test('high priority jobs process before low priority', async () => {
  const processed = []
  
  worker = new Worker('payments-test', async (job) => {
    processed.push(job.opts.priority)
    await new Promise(r => setTimeout(r, 50))
  }, { connection })
  
  // Add low priority jobs first
  await queue.add('task', {}, { priority: 10 })
  await queue.add('task', {}, { priority: 10 })
  
  // Add high priority job
  await queue.add('urgent', {}, { priority: 1 })
  
  await new Promise(r => setTimeout(r, 500))
  
  // High priority (1) should have been processed
  expect(processed[0]).toBe(1)
})

Testing Queue Draining

test('all jobs complete before queue drains', async () => {
  const processed = new Set()
  
  worker = new Worker('payments-test', async (job) => {
    processed.add(job.id)
  }, { connection })
  
  // Add 10 jobs
  const jobs = await Promise.all(
    Array.from({ length: 10 }, (_, i) =>
      queue.add('task', { index: i })
    )
  )
  
  // Wait for all to complete
  await Promise.all(jobs.map(j => j.waitUntilFinished(queueEvents, 10000)))
  
  expect(processed.size).toBe(10)
})

Mocking BullMQ in Unit Tests

When you want to test code that enqueues jobs (but not the jobs themselves):

// services/order-service.js
import { paymentQueue } from '../queues/payment-queue'

export async function createOrder(orderData) {
  const order = await db.orders.create(orderData)
  await paymentQueue.add('process', { orderId: order.id, amount: order.total })
  return order
}

// services/order-service.test.js
vi.mock('../queues/payment-queue', () => ({
  paymentQueue: { add: vi.fn().mockResolvedValue({ id: 'job-1' }) }
}))

import { paymentQueue } from '../queues/payment-queue'
import { createOrder } from './order-service'

test('enqueues payment job after order creation', async () => {
  const order = await createOrder({ userId: 'u1', total: 9999, items: [] })
  
  expect(paymentQueue.add).toHaveBeenCalledWith(
    'process',
    { orderId: order.id, amount: 9999 }
  )
})

Summary

BullMQ testing has two tiers: unit tests for handler functions (fast, no Redis needed) and integration tests for queue mechanics (require Redis). Keep handlers as pure functions that accept jobData and return results — this makes them trivial to unit test. Use integration tests to verify retry logic, progress tracking, and prioritization. Mock the queue entirely when testing code that enqueues jobs.

Read more