BullMQ Job Testing in Node.js: Queues, Workers, and Flow Producers

BullMQ Job Testing in Node.js: Queues, Workers, and Flow Producers

BullMQ is the modern successor to Bull for Node.js background job processing. It uses Redis for persistence and supports advanced patterns: priorities, rate limiting, flows (parent/child jobs), and job groups. Testing BullMQ requires understanding when to use real Redis versus mocks, and how to make workers testable.

Testing Architecture Choices

Unit tests with mocked Redis: Fast, no infrastructure, but may miss real-world Redis behaviors.

Integration tests with real Redis: Slower, requires Redis, catches connection issues and data consistency bugs.

Recommended approach:

  • Unit test job logic directly (pure functions, handlers)
  • Integration test queue/worker interaction with real Redis (test containers or local Redis)
  • Use ioredis-mock for most queue interaction tests

Setup

npm install --save-dev jest ioredis-mock @testcontainers/redis

Basic Configuration

// jest.config.js
module.exports = {
  testEnvironment: 'node',
  setupFilesAfterFramework: ['./jest.setup.js'],
  testTimeout: 30000,  // BullMQ tests may take longer
};
// jest.setup.js
jest.setTimeout(30000);

Testing Job Handlers Directly

The simplest approach: extract job logic into a pure function and test it independently of BullMQ.

// workers/email-worker.js
const { Worker } = require('bullmq');

// Pure function — testable without BullMQ
async function processEmailJob(job) {
  const { userId, templateId, variables } = job.data;
  
  const user = await UserService.findById(userId);
  if (!user) {
    throw new Error(`User ${userId} not found`);
  }
  
  const template = await EmailTemplate.findById(templateId);
  const content = template.render(variables);
  
  await EmailProvider.send({
    to: user.email,
    subject: content.subject,
    body: content.body,
  });
  
  await job.updateProgress(100);
  return { sent: true, email: user.email };
}

// Worker creation — separate from logic
function createEmailWorker(connection) {
  return new Worker('emails', processEmailJob, {
    connection,
    concurrency: 5,
  });
}

module.exports = { processEmailJob, createEmailWorker };
// workers/email-worker.test.js
const { processEmailJob } = require('./email-worker');

describe('processEmailJob', () => {
  let mockJob;
  
  beforeEach(() => {
    mockJob = {
      id: 'test-job-id',
      data: {
        userId: 1,
        templateId: 'welcome',
        variables: { firstName: 'Alice' },
      },
      updateProgress: jest.fn(),
    };
  });
  
  it('sends email to the correct user', async () => {
    jest.spyOn(UserService, 'findById').mockResolvedValue({
      id: 1,
      email: 'alice@example.com',
    });
    
    jest.spyOn(EmailTemplate, 'findById').mockResolvedValue({
      render: jest.fn().mockReturnValue({
        subject: 'Welcome!',
        body: 'Hello Alice',
      }),
    });
    
    const mockSend = jest.spyOn(EmailProvider, 'send').mockResolvedValue({ id: 'msg-123' });
    
    const result = await processEmailJob(mockJob);
    
    expect(mockSend).toHaveBeenCalledWith({
      to: 'alice@example.com',
      subject: 'Welcome!',
      body: 'Hello Alice',
    });
    expect(result).toEqual({ sent: true, email: 'alice@example.com' });
    expect(mockJob.updateProgress).toHaveBeenCalledWith(100);
  });
  
  it('throws when user is not found', async () => {
    jest.spyOn(UserService, 'findById').mockResolvedValue(null);
    
    await expect(processEmailJob(mockJob)).rejects.toThrow('User 1 not found');
  });
  
  it('throws when email provider fails', async () => {
    jest.spyOn(UserService, 'findById').mockResolvedValue({ id: 1, email: 'test@example.com' });
    jest.spyOn(EmailTemplate, 'findById').mockResolvedValue({
      render: jest.fn().mockReturnValue({ subject: 'Hi', body: 'Hello' }),
    });
    jest.spyOn(EmailProvider, 'send').mockRejectedValue(new Error('SMTP timeout'));
    
    await expect(processEmailJob(mockJob)).rejects.toThrow('SMTP timeout');
  });
});

Integration Testing with ioredis-mock

For testing queue interactions without Redis:

// test/helpers/queue-helper.js
const { Queue, Worker } = require('bullmq');
const IORedis = require('ioredis');

// For unit tests: use ioredis-mock
const IORedisMock = require('ioredis-mock');

function createTestConnection() {
  return new IORedisMock();
}

function createTestQueue(name, connection) {
  return new Queue(name, { connection });
}

module.exports = { createTestConnection, createTestQueue };
// workers/notification-worker.test.js
const { Queue, Worker } = require('bullmq');
const IORedisMock = require('ioredis-mock');
const { processNotificationJob } = require('./notification-worker');

describe('Notification Queue Integration', () => {
  let connection;
  let queue;
  let worker;
  
  beforeEach(async () => {
    connection = new IORedisMock();
    queue = new Queue('notifications', { connection });
  });
  
  afterEach(async () => {
    await queue.close();
    await connection.quit();
  });
  
  it('adds job to queue with correct data', async () => {
    const jobData = { userId: 1, type: 'order_shipped', orderId: 42 };
    
    const job = await queue.add('send-notification', jobData);
    
    expect(job.id).toBeDefined();
    expect(job.name).toBe('send-notification');
    expect(job.data).toEqual(jobData);
  });
  
  it('processes job with correct handler', async () => {
    const handler = jest.fn().mockResolvedValue({ sent: true });
    
    worker = new Worker('notifications', handler, { connection });
    
    await queue.add('send-notification', { userId: 1, type: 'welcome' });
    
    // Wait for job to be processed
    await new Promise((resolve) => {
      worker.on('completed', resolve);
    });
    
    expect(handler).toHaveBeenCalledTimes(1);
    expect(handler.mock.calls[0][0].data).toEqual({ userId: 1, type: 'welcome' });
  });
});

Integration Testing with Real Redis

For more reliable tests, use real Redis via Docker or @testcontainers/redis:

// test/integration/queue.integration.test.js
const { Queue, Worker, QueueEvents } = require('bullmq');
const { GenericContainer } = require('testcontainers');
const IORedis = require('ioredis');

describe('Queue Integration Tests', () => {
  let redisContainer;
  let connection;
  let queue;
  
  beforeAll(async () => {
    redisContainer = await new GenericContainer('redis:7')
      .withExposedPorts(6379)
      .start();
    
    connection = new IORedis({
      host: redisContainer.getHost(),
      port: redisContainer.getMappedPort(6379),
    });
  });
  
  afterAll(async () => {
    await connection.quit();
    await redisContainer.stop();
  });
  
  beforeEach(async () => {
    queue = new Queue('test-queue', { connection });
    await queue.obliterate({ force: true });  // Clean slate
  });
  
  afterEach(async () => {
    await queue.close();
  });
  
  it('completes a job end to end', async () => {
    const results = [];
    
    const worker = new Worker(
      'test-queue',
      async (job) => {
        results.push(job.data.value * 2);
        return { doubled: job.data.value * 2 };
      },
      { connection }
    );
    
    const job = await queue.add('double', { value: 21 });
    
    // Wait for completion
    const completedJob = await job.waitUntilFinished(
      new QueueEvents('test-queue', { connection })
    );
    
    expect(completedJob).toEqual({ doubled: 42 });
    expect(results).toContain(42);
    
    await worker.close();
  });
});

Testing Retries and Failure Handling

// workers/retry-worker.js
const { Worker } = require('bullmq');

async function processWithRetry(job) {
  const { url, maxAttempts = 3 } = job.data;
  
  try {
    const response = await fetch(url);
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}`);
    }
    return { success: true, status: response.status };
  } catch (error) {
    // BullMQ will retry based on job options
    throw error;
  }
}

// Queue with retry options
async function addRetryableJob(queue, url) {
  return queue.add(
    'fetch-url',
    { url },
    {
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 1000,  // 1s, 2s, 4s
      },
    }
  );
}
// Test retry behavior
describe('Retry handling', () => {
  it('retries on failure and eventually succeeds', async () => {
    let attempts = 0;
    
    const worker = new Worker(
      'test-queue',
      async (job) => {
        attempts++;
        if (attempts < 3) throw new Error('Transient error');
        return { success: true };
      },
      {
        connection,
        limiter: { max: 10, duration: 1000 },
      }
    );
    
    const job = await queue.add(
      'retry-test',
      { data: 'test' },
      {
        attempts: 3,
        backoff: { type: 'fixed', delay: 10 },  // Short delay for tests
      }
    );
    
    const queueEvents = new QueueEvents('test-queue', { connection });
    const result = await job.waitUntilFinished(queueEvents, 10000);
    
    expect(result).toEqual({ success: true });
    expect(attempts).toBe(3);
    
    await worker.close();
    await queueEvents.close();
  });
  
  it('moves job to failed after max retries', async () => {
    const worker = new Worker(
      'test-queue',
      async () => { throw new Error('Always fails'); },
      { connection }
    );
    
    const job = await queue.add(
      'fail-test',
      { data: 'test' },
      {
        attempts: 2,
        backoff: { type: 'fixed', delay: 10 },
      }
    );
    
    const queueEvents = new QueueEvents('test-queue', { connection });
    
    await expect(
      job.waitUntilFinished(queueEvents, 10000)
    ).rejects.toThrow('Always fails');
    
    const failedJob = await queue.getJob(job.id);
    expect(failedJob.attemptsMade).toBe(2);
    
    await worker.close();
    await queueEvents.close();
  });
});

Testing Flow Producers (Parent/Child Jobs)

BullMQ flows allow parent jobs to spawn child jobs and wait for all children to complete:

// flows/data-pipeline.js
const { FlowProducer } = require('bullmq');

async function runDataPipeline(datasetId) {
  const flow = new FlowProducer({ connection });
  
  return flow.add({
    name: 'aggregate-results',
    queueName: 'pipeline',
    data: { datasetId },
    children: [
      {
        name: 'process-chunk',
        queueName: 'pipeline',
        data: { datasetId, chunk: 1 },
      },
      {
        name: 'process-chunk',
        queueName: 'pipeline',
        data: { datasetId, chunk: 2 },
      },
      {
        name: 'process-chunk',
        queueName: 'pipeline',
        data: { datasetId, chunk: 3 },
      },
    ],
  });
}
describe('Data Pipeline Flow', () => {
  it('creates parent job with children', async () => {
    const flow = new FlowProducer({ connection });
    
    const result = await flow.add({
      name: 'aggregate',
      queueName: 'test-flow',
      data: { id: 1 },
      children: [
        { name: 'child-1', queueName: 'test-flow', data: { part: 1 } },
        { name: 'child-2', queueName: 'test-flow', data: { part: 2 } },
      ],
    });
    
    expect(result.job.name).toBe('aggregate');
    expect(result.children).toHaveLength(2);
    
    // Parent should wait for children
    const parentJob = await queue.getJob(result.job.id);
    expect(parentJob.data.id).toBe(1);
    
    await flow.close();
  });
  
  it('parent receives children results', async () => {
    const childResults = [];
    
    const worker = new Worker(
      'test-flow',
      async (job) => {
        if (job.name === 'child') {
          return { processed: job.data.value * 2 };
        }
        
        if (job.name === 'parent') {
          // Children results available via job.data.childrenValues
          const childrenValues = await job.getChildrenValues();
          return { total: Object.values(childrenValues).reduce((sum, v) => sum + v.processed, 0) };
        }
      },
      { connection }
    );
    
    const flow = new FlowProducer({ connection });
    
    const { job: parentJob } = await flow.add({
      name: 'parent',
      queueName: 'test-flow',
      data: {},
      children: [
        { name: 'child', queueName: 'test-flow', data: { value: 5 } },
        { name: 'child', queueName: 'test-flow', data: { value: 10 } },
      ],
    });
    
    const queueEvents = new QueueEvents('test-flow', { connection });
    const result = await parentJob.waitUntilFinished(queueEvents, 30000);
    
    expect(result.total).toBe(30);  // (5*2) + (10*2)
    
    await worker.close();
    await flow.close();
    await queueEvents.close();
  });
});

Testing Rate Limiting

describe('Rate limited queue', () => {
  it('respects rate limit', async () => {
    const processedAt = [];
    
    const worker = new Worker(
      'rate-limited',
      async (job) => {
        processedAt.push(Date.now());
        return { processed: true };
      },
      {
        connection,
        limiter: {
          max: 2,       // Max 2 jobs
          duration: 500, // Per 500ms
        },
      }
    );
    
    // Add 4 jobs
    for (let i = 0; i < 4; i++) {
      await queue.add('job', { index: i });
    }
    
    // Wait for all to complete
    await new Promise(resolve => setTimeout(resolve, 2000));
    
    // Should have taken at least 1 second (4 jobs / 2 per 500ms = 2 windows)
    const totalTime = processedAt[3] - processedAt[0];
    expect(totalTime).toBeGreaterThan(900);
    
    await worker.close();
  });
});

Testing Job Events

describe('Job lifecycle events', () => {
  it('emits events in correct order', async () => {
    const events = [];
    const queueEvents = new QueueEvents('test-queue', { connection });
    
    queueEvents.on('active', ({ jobId }) => events.push(`active:${jobId}`));
    queueEvents.on('completed', ({ jobId }) => events.push(`completed:${jobId}`));
    queueEvents.on('failed', ({ jobId }) => events.push(`failed:${jobId}`));
    
    const worker = new Worker(
      'test-queue',
      async (job) => ({ done: true }),
      { connection }
    );
    
    const job = await queue.add('test', { data: 1 });
    await job.waitUntilFinished(queueEvents);
    
    expect(events).toContain(`active:${job.id}`);
    expect(events).toContain(`completed:${job.id}`);
    
    await worker.close();
    await queueEvents.close();
  });
});

Cleanup and Teardown

Always clean up BullMQ resources in tests to avoid port conflicts and memory leaks:

// test/helpers/bullmq-cleanup.js
async function cleanupBullMQ(queue, worker, queueEvents, connection) {
  const cleanups = [];
  
  if (worker) cleanups.push(worker.close());
  if (queueEvents) cleanups.push(queueEvents.close());
  if (queue) cleanups.push(queue.close());
  
  await Promise.all(cleanups);
  
  if (connection) await connection.quit();
}

// Obliterate all queue data for a fresh test
async function resetQueue(queue) {
  await queue.obliterate({ force: true });
}

BullMQ's strong TypeScript support and separation of Queue/Worker/QueueEvents makes it more testable than its predecessor. The key is keeping job handler logic pure and separate from the BullMQ wiring — pure functions are easy to unit test, and integration tests with real Redis catch the connection and lifecycle issues that mocks miss.

Read more