Testing BullMQ Jobs with Jest: Queues, Workers, and Retries

Testing BullMQ Jobs with Jest: Queues, Workers, and Retries

BullMQ is the most widely used Node.js job queue library, built on Redis. Testing it well means covering three separate concerns: the producer that enqueues jobs, the worker that processes them, and the retry/failure handling when things go wrong. This guide shows how to do all three with Jest.

Setup

Install BullMQ and the testing dependencies:

npm install bullmq ioredis
npm install -D jest @types/jest ts-jest

For unit tests that don't need a real Redis instance, use ioredis-mock:

npm install -D ioredis-mock

Testing Job Producers

Test that your producer enqueues jobs with the correct data and options:

// jobs/emailQueue.ts
import { Queue } from 'bullmq'
import { redisConnection } from '../config/redis'

export const emailQueue = new Queue('email', { connection: redisConnection })

export async function queueWelcomeEmail(userId: string, email: string) {
  return emailQueue.add(
    'welcome',
    { userId, email },
    {
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 },
      removeOnComplete: 100,
    }
  )
}
// jobs/emailQueue.test.ts
import { Queue } from 'bullmq'
import { queueWelcomeEmail } from './emailQueue'

jest.mock('bullmq', () => ({
  Queue: jest.fn().mockImplementation(() => ({
    add: jest.fn().mockResolvedValue({ id: 'job-123' }),
    close: jest.fn(),
  })),
}))

describe('queueWelcomeEmail', () => {
  let mockAdd: jest.Mock

  beforeEach(() => {
    mockAdd = (Queue as jest.Mock).mock.results[0]?.value.add
    mockAdd?.mockClear()
  })

  it('enqueues a welcome job with correct data', async () => {
    await queueWelcomeEmail('user-1', 'alice@example.com')

    expect(mockAdd).toHaveBeenCalledWith(
      'welcome',
      { userId: 'user-1', email: 'alice@example.com' },
      expect.objectContaining({ attempts: 3 })
    )
  })

  it('returns the created job', async () => {
    const job = await queueWelcomeEmail('user-1', 'alice@example.com')
    expect(job).toHaveProperty('id', 'job-123')
  })
})

Testing Workers

Worker tests verify that your processing logic handles job data correctly:

// jobs/emailWorker.ts
import { Worker, Job } from 'bullmq'
import { redisConnection } from '../config/redis'
import { sendEmail } from '../services/email'

export function createEmailWorker() {
  return new Worker(
    'email',
    async (job: Job) => {
      if (job.name === 'welcome') {
        await sendEmail({
          to: job.data.email,
          template: 'welcome',
          data: { userId: job.data.userId },
        })
        return { sent: true, to: job.data.email }
      }
      throw new Error(`Unknown job type: ${job.name}`)
    },
    { connection: redisConnection }
  )
}

The key insight: test the processor function directly, not the Worker instance:

// jobs/emailProcessor.ts — extract the processor function
export async function processEmailJob(job: Job) {
  if (job.name === 'welcome') {
    await sendEmail({
      to: job.data.email,
      template: 'welcome',
      data: { userId: job.data.userId },
    })
    return { sent: true, to: job.data.email }
  }
  throw new Error(`Unknown job type: ${job.name}`)
}
// jobs/emailProcessor.test.ts
import { processEmailJob } from './emailProcessor'
import { sendEmail } from '../services/email'

jest.mock('../services/email')

const mockSendEmail = sendEmail as jest.Mock

function makeJob(name: string, data: Record<string, unknown>) {
  return { name, data, id: 'test-job-id' } as any
}

describe('processEmailJob', () => {
  beforeEach(() => {
    mockSendEmail.mockResolvedValue({ messageId: 'msg-123' })
  })

  afterEach(() => {
    jest.clearAllMocks()
  })

  it('sends a welcome email for welcome jobs', async () => {
    const result = await processEmailJob(
      makeJob('welcome', { userId: 'u1', email: 'alice@example.com' })
    )

    expect(mockSendEmail).toHaveBeenCalledWith({
      to: 'alice@example.com',
      template: 'welcome',
      data: { userId: 'u1' },
    })
    expect(result).toEqual({ sent: true, to: 'alice@example.com' })
  })

  it('throws for unknown job types', async () => {
    await expect(
      processEmailJob(makeJob('unknown-type', {}))
    ).rejects.toThrow('Unknown job type: unknown-type')
  })

  it('propagates email send failures', async () => {
    mockSendEmail.mockRejectedValue(new Error('SMTP connection refused'))

    await expect(
      processEmailJob(makeJob('welcome', { userId: 'u1', email: 'a@b.com' }))
    ).rejects.toThrow('SMTP connection refused')
  })
})

Integration Tests with Real Redis

For integration tests, use a real Redis instance (or ioredis-mock):

// jobs/emailQueue.integration.test.ts
import { Queue, Worker, QueueEvents } from 'bullmq'
import IORedis from 'ioredis'

const connection = new IORedis({ maxRetriesPerRequest: null })

async function wait(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

describe('email queue integration', () => {
  let queue: Queue
  let worker: Worker
  let queueEvents: QueueEvents

  beforeEach(() => {
    queue = new Queue('email-test', { connection })
    queueEvents = new QueueEvents('email-test', { connection })
  })

  afterEach(async () => {
    await worker?.close()
    await queue.obliterate({ force: true })
    await queue.close()
    await queueEvents.close()
  })

  afterAll(async () => {
    await connection.quit()
  })

  it('processes a job and returns a result', async () => {
    const processedJobs: string[] = []

    worker = new Worker(
      'email-test',
      async (job) => {
        processedJobs.push(job.data.email)
        return { sent: true }
      },
      { connection }
    )

    const job = await queue.add('welcome', { email: 'alice@example.com' })
    await job.waitUntilFinished(queueEvents, 5000)

    expect(processedJobs).toContain('alice@example.com')
    const completedJob = await queue.getJob(job.id!)
    expect(completedJob?.returnvalue).toEqual({ sent: true })
  })

  it('retries a failing job up to the configured limit', async () => {
    let attempts = 0

    worker = new Worker(
      'email-test',
      async () => {
        attempts++
        if (attempts < 3) throw new Error('Temporary failure')
        return { sent: true }
      },
      { connection }
    )

    const job = await queue.add(
      'welcome',
      { email: 'b@example.com' },
      { attempts: 3, backoff: { type: 'fixed', delay: 100 } }
    )

    await job.waitUntilFinished(queueEvents, 10000)
    expect(attempts).toBe(3)
  })
})

Testing Retry Logic

Test what happens when all retries are exhausted:

it('moves job to failed after exhausting retries', async () => {
  worker = new Worker(
    'email-test',
    async () => {
      throw new Error('Permanent failure')
    },
    { connection }
  )

  const failedJobIds: string[] = []
  queueEvents.on('failed', ({ jobId }) => failedJobIds.push(jobId))

  const job = await queue.add(
    'welcome',
    { email: 'c@example.com' },
    { attempts: 2, backoff: { type: 'fixed', delay: 50 } }
  )

  // Wait for failure event
  await new Promise<void>((resolve) => {
    queueEvents.on('failed', resolve)
  })

  expect(failedJobIds).toContain(job.id)

  const failedJob = await queue.getJob(job.id!)
  expect(await failedJob?.isFailed()).toBe(true)
  expect(failedJob?.failedReason).toContain('Permanent failure')
})

Testing Delayed Jobs

it('does not process a delayed job immediately', async () => {
  const processed: boolean[] = []

  worker = new Worker(
    'email-test',
    async () => {
      processed.push(true)
    },
    { connection }
  )

  await queue.add('welcome', {}, { delay: 5000 })
  await wait(500) // give worker time to pick up non-delayed jobs

  expect(processed).toHaveLength(0)
})

Testing Job Progress Tracking

// processor that reports progress
async function processWithProgress(job: Job) {
  await job.updateProgress(10)
  // ... first step
  await job.updateProgress(50)
  // ... second step
  await job.updateProgress(100)
  return { done: true }
}

it('tracks progress updates', async () => {
  const progressEvents: number[] = []

  worker = new Worker('email-test', processWithProgress, { connection })
  queueEvents.on('progress', ({ data }) => {
    progressEvents.push(data as number)
  })

  const job = await queue.add('welcome', {})
  await job.waitUntilFinished(queueEvents, 5000)

  expect(progressEvents).toEqual(expect.arrayContaining([10, 50, 100]))
})

What to Mock vs What to Test for Real

Test type Use mock Redis Use real Redis
Unit: processor logic
Unit: producer enqueue call
Integration: queue + worker lifecycle
Integration: retry exhaustion
Integration: delayed/scheduled jobs

Keep unit tests fast by mocking BullMQ entirely and testing your processor functions in isolation. Run real-Redis integration tests in CI against a test Redis instance or use ioredis-mock for environments where a real Redis isn't available.

What Automated Tests Miss

Unit and integration tests verify your job logic but won't catch:

  • Memory leaks in long-running worker processes
  • Redis connection pool exhaustion under high concurrency
  • Clock drift in delayed jobs across multiple servers
  • Worker crashes during job processing and the impact on job state

HelpMeTest can run your BullMQ-dependent flows end-to-end on a schedule, catching failures that only show up when the full stack is running. The $100/month Pro plan gives you unlimited tests with parallel execution.

Summary

Testing BullMQ effectively:

  • Extract processor functions — test them independently of the Worker class
  • Mock BullMQ for unit tests — fast, no Redis required
  • Use real Redis for integration tests — verify lifecycle, retries, and failure handling
  • Use QueueEvents — the right way to observe job state changes in tests
  • Test the failure path — verify failed job state, failedReason, and DLQ behaviour

Read more