BullMQ Advanced Testing: Flow Producers, Delayed Jobs, and Rate Limiters

BullMQ Advanced Testing: Flow Producers, Delayed Jobs, and Rate Limiters

BullMQ's advanced features — flow producers, delayed jobs, rate limiters, and job groups — enable sophisticated background processing patterns. But these features are also the hardest to test correctly. This guide covers testing strategies for each advanced BullMQ pattern, building on basic queue testing to handle the complex orchestration scenarios you'll encounter in production.

Prerequisites

You should be comfortable with basic BullMQ testing using real Redis or ioredis-mock. This guide focuses on advanced patterns. For BullMQ basics, see our BullMQ job testing guide.

All examples use Vitest + real Redis via Testcontainers:

import { GenericContainer, StartedTestContainer } from "testcontainers";
import { Queue, Worker, FlowProducer } from "bullmq";
import IORedis from "ioredis";

let redisContainer: StartedTestContainer;
let connection: IORedis;

beforeAll(async () => {
  redisContainer = await new GenericContainer("redis:7-alpine")
    .withExposedPorts(6379)
    .start();
  
  connection = new IORedis({
    host: redisContainer.getHost(),
    port: redisContainer.getMappedPort(6379),
    maxRetriesPerRequest: null,
  });
}, 30_000);

afterAll(async () => {
  await connection.quit();
  await redisContainer.stop();
});

Testing Flow Producers

Flow producers orchestrate parent/child job relationships. The parent job only completes after all children succeed. Testing requires verifying both the tree structure and completion order:

import { FlowProducer, Worker } from "bullmq";

interface FlowResult {
  jobId: string;
  name: string;
  data: unknown;
  returnValue: unknown;
}

async function runFlowToCompletion(
  flowProducer: FlowProducer,
  flow: Parameters<FlowProducer["add"]>[0],
  workerHandlers: Record<string, (data: unknown) => Promise<unknown>>,
  timeoutMs = 10_000
): Promise<FlowResult[]> {
  const completed: FlowResult[] = [];
  const workers: Worker[] = [];

  // Create a worker for each queue name in the flow
  const queueNames = collectQueueNames(flow);
  for (const queueName of queueNames) {
    const handler = workerHandlers[queueName] ?? (() => ({ done: true }));
    const worker = new Worker(queueName, async (job) => handler(job.data), { connection });
    worker.on("completed", (job, returnValue) => {
      completed.push({ jobId: job.id!, name: job.name, data: job.data, returnValue });
    });
    workers.push(worker);
  }

  await flowProducer.add(flow);

  // Wait for all jobs to complete
  await new Promise<void>((resolve, reject) => {
    const timeout = setTimeout(() => reject(new Error("Flow timeout")), timeoutMs);
    const interval = setInterval(() => {
      if (completed.length >= queueNames.length) {
        clearTimeout(timeout);
        clearInterval(interval);
        resolve();
      }
    }, 100);
  });

  await Promise.all(workers.map((w) => w.close()));
  return completed;
}

it("completes children before parent in flow", async () => {
  const flowProducer = new FlowProducer({ connection });

  const completionOrder: string[] = [];

  const results = await runFlowToCompletion(
    flowProducer,
    {
      name: "generate-report",
      queueName: "reports",
      data: { reportId: "rpt_001" },
      children: [
        { name: "fetch-data", queueName: "data-fetch", data: { source: "sales" } },
        { name: "fetch-metrics", queueName: "data-fetch", data: { source: "metrics" } },
      ],
    },
    {
      "data-fetch": async (data: any) => {
        completionOrder.push(`child:${data.source}`);
        return { rows: 100 };
      },
      reports: async (data) => {
        completionOrder.push("parent");
        return { generated: true };
      },
    }
  );

  // Children must complete before parent
  expect(completionOrder.indexOf("parent")).toBeGreaterThan(
    completionOrder.indexOf("child:sales")
  );
  expect(completionOrder.indexOf("parent")).toBeGreaterThan(
    completionOrder.indexOf("child:metrics")
  );

  await flowProducer.close();
});

Testing Child Job Return Values in Parent

Parent jobs can access their children's return values. Test this dependency:

it("parent receives children return values", async () => {
  const flowProducer = new FlowProducer({ connection });
  const reportQueue = new Queue("report-assembly", { connection });
  let parentReceivedChildren: unknown;

  const parentWorker = new Worker(
    "report-assembly",
    async (job) => {
      // BullMQ passes children results to parent via job.data or token
      const childrenValues = await job.getChildrenValues();
      parentReceivedChildren = childrenValues;
      return { assembled: true };
    },
    { connection }
  );

  await flowProducer.add({
    name: "assemble",
    queueName: "report-assembly",
    data: {},
    children: [
      { name: "section-1", queueName: "sections", data: { id: 1 } },
      { name: "section-2", queueName: "sections", data: { id: 2 } },
    ],
  });

  const sectionWorker = new Worker(
    "sections",
    async (job) => ({ content: `Section ${job.data.id} content` }),
    { connection }
  );

  await waitForQueueEmpty(reportQueue, 5000);

  expect(parentReceivedChildren).toBeDefined();
  const values = Object.values(parentReceivedChildren as Record<string, unknown>);
  expect(values).toHaveLength(2);
  expect(values).toContainEqual(expect.objectContaining({ content: "Section 1 content" }));

  await parentWorker.close();
  await sectionWorker.close();
  await flowProducer.close();
});

Testing Delayed Jobs

Delayed jobs are tricky — you don't want tests to wait real time. BullMQ provides drainDelay for testing:

import { Queue, Worker, Job } from "bullmq";

it("delayed job processes after delay", async () => {
  const queue = new Queue("delayed-jobs", { connection });
  const processed: string[] = [];

  const worker = new Worker(
    "delayed-jobs",
    async (job: Job) => {
      processed.push(job.id!);
    },
    { connection }
  );

  // Add job with 5-second delay
  const job = await queue.add("send-reminder", { userId: "usr_1" }, { delay: 5000 });

  // Job should not be processed immediately
  await new Promise((r) => setTimeout(r, 100));
  expect(processed).not.toContain(job.id);

  // Drain the delay — moves all delayed jobs to waiting immediately
  await queue.drain(true); // true = drain delayed jobs

  // Now job should process quickly
  await waitForJobCompletion(job, 3000);
  expect(processed).toContain(job.id);

  await worker.close();
  await queue.close();
});

// Testing that delay values are set correctly
it("sets correct delay for 3-day reminder", async () => {
  const queue = new Queue("reminders", { connection });
  const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000;

  const job = await scheduleReminder("usr_1", "3d");
  const jobDetails = await queue.getJob(job.id!);

  expect(jobDetails?.opts.delay).toBeCloseTo(THREE_DAYS_MS, -3); // within 1 second
  
  await queue.close();
});

Testing Rate Limiters

BullMQ rate limiters throttle job execution. Test that your rate limiter config prevents burst processing:

it("respects rate limit of 10 jobs per minute", async () => {
  const queue = new Queue("rate-limited", { connection });
  const processingTimestamps: number[] = [];

  const worker = new Worker(
    "rate-limited",
    async () => {
      processingTimestamps.push(Date.now());
    },
    {
      connection,
      limiter: {
        max: 3,      // max 3 jobs
        duration: 1000, // per 1 second
      },
    }
  );

  // Add 6 jobs — should process in 2 batches
  await Promise.all(
    Array.from({ length: 6 }, (_, i) =>
      queue.add(`job-${i}`, { index: i })
    )
  );

  await waitForAllJobsComplete(queue, 5000);

  // First 3 should process quickly, next 3 should be delayed ~1s
  const firstBatch = processingTimestamps.slice(0, 3);
  const secondBatch = processingTimestamps.slice(3, 6);

  const firstBatchSpread = Math.max(...firstBatch) - Math.min(...firstBatch);
  const batchGap = Math.min(...secondBatch) - Math.max(...firstBatch);

  expect(firstBatchSpread).toBeLessThan(500); // First 3 process close together
  expect(batchGap).toBeGreaterThan(800); // Then ~1s gap before next batch

  await worker.close();
  await queue.close();
});

Testing Job Groups (Concurrency Groups)

Job groups prevent parallel execution of jobs in the same group — useful when processing per-user:

it("processes same-group jobs serially", async () => {
  const queue = new Queue("user-jobs", { connection });
  const executionLog: { jobId: string; start: number; end: number }[] = [];

  const worker = new Worker(
    "user-jobs",
    async (job) => {
      const start = Date.now();
      await new Promise((r) => setTimeout(r, 200)); // Simulate 200ms work
      const end = Date.now();
      executionLog.push({ jobId: job.id!, start, end });
    },
    {
      connection,
      group: { concurrency: 1 }, // One job per group at a time
    }
  );

  // Add 3 jobs for the same user — should not overlap
  await queue.add("process", { userId: "usr_1" }, { group: { id: "usr_1" } });
  await queue.add("process", { userId: "usr_1" }, { group: { id: "usr_1" } });
  await queue.add("process", { userId: "usr_1" }, { group: { id: "usr_1" } });

  await waitForAllJobsComplete(queue, 5000);

  // Sort by start time
  executionLog.sort((a, b) => a.start - b.start);

  // Verify no overlap: each job's start must be after previous job's end
  for (let i = 1; i < executionLog.length; i++) {
    expect(executionLog[i].start).toBeGreaterThanOrEqual(executionLog[i - 1].end);
  }

  await worker.close();
  await queue.close();
});

Testing Priority Queues

BullMQ supports job priorities (1 = highest). Test that high-priority jobs process first:

it("processes high-priority jobs before low-priority", async () => {
  const queue = new Queue("priority-test", { connection });
  const processedOrder: string[] = [];

  // Add worker but pause processing initially
  const worker = new Worker(
    "priority-test",
    async (job) => {
      processedOrder.push(job.data.label);
    },
    { connection }
  );
  await worker.pause();

  // Enqueue low-priority jobs first
  await queue.add("task", { label: "low-1" }, { priority: 10 });
  await queue.add("task", { label: "low-2" }, { priority: 10 });

  // Then enqueue high-priority
  await queue.add("task", { label: "high-1" }, { priority: 1 });
  await queue.add("task", { label: "high-2" }, { priority: 1 });

  // Resume — jobs process in priority order
  await worker.resume();
  await waitForAllJobsComplete(queue, 3000);

  // High priority jobs should be processed first
  expect(processedOrder.indexOf("high-1")).toBeLessThan(processedOrder.indexOf("low-1"));
  expect(processedOrder.indexOf("high-2")).toBeLessThan(processedOrder.indexOf("low-2"));

  await worker.close();
  await queue.close();
});

Testing Job Staleness and Stalled Jobs

Workers can crash mid-job, leaving stalled jobs. Test stalled job recovery:

it("recovers stalled jobs after worker crash", async () => {
  const queue = new Queue("stall-test", { connection });
  const completedJobIds: string[] = [];

  // Add a job
  const job = await queue.add("risky-task", { data: "important" });

  // Simulate worker that acquires the job but crashes
  const crashingWorker = new Worker(
    "stall-test",
    async () => {
      // Simulate crash by never resolving and then closing
      await new Promise(() => {}); // Hang forever
    },
    {
      connection,
      stalledInterval: 1000, // Check for stalled jobs every 1s
      maxStalledCount: 1,    // Fail after 1 stall
    }
  );

  // Wait for job to be picked up
  await new Promise((r) => setTimeout(r, 200));

  // Force close the worker (simulating crash)
  await crashingWorker.close(true);

  // Spin up a replacement worker
  const reliableWorker = new Worker(
    "stall-test",
    async (job) => {
      completedJobIds.push(job.id!);
      return { rescued: true };
    },
    {
      connection,
      stalledInterval: 500,
    }
  );

  // QueueEvents detect when jobs are recovered
  await waitForJobCompletion(job, 8000);
  expect(completedJobIds).toContain(job.id);

  await reliableWorker.close();
  await queue.close();
}, 15_000);

Testing Dead Letter Queue Patterns

Move permanently failed jobs to a dead letter queue:

import { Queue, Worker, QueueEvents } from "bullmq";

export async function setupWithDLQ(queueName: string, maxAttempts: number) {
  const mainQueue = new Queue(queueName, { connection });
  const dlqQueue = new Queue(`${queueName}-dlq`, { connection });

  const worker = new Worker(
    queueName,
    async (job) => {
      // Your job logic
      await processJob(job.data);
    },
    {
      connection,
      settings: { backoffStrategy: (attemptsMade) => attemptsMade * 1000 },
    }
  );

  // Move failed jobs to DLQ
  worker.on("failed", async (job, err) => {
    if (!job) return;
    if (job.attemptsMade >= maxAttempts) {
      await dlqQueue.add("failed-job", {
        originalJob: { name: job.name, data: job.data },
        error: err.message,
        failedAt: new Date().toISOString(),
      });
    }
  });

  return { mainQueue, dlqQueue, worker };
}

it("moves exhausted jobs to dead letter queue", async () => {
  const processJob = vi.fn().mockRejectedValue(new Error("Permanent failure"));
  const { mainQueue, dlqQueue, worker } = await setupWithDLQ("dlq-test", 3);

  const job = await mainQueue.add("will-fail", { input: "bad-data" }, {
    attempts: 3,
    backoff: { type: "fixed", delay: 100 },
  });

  await waitForDLQEntry(dlqQueue, 10_000);

  const dlqJobs = await dlqQueue.getJobs(["waiting"]);
  expect(dlqJobs).toHaveLength(1);
  expect(dlqJobs[0].data.originalJob.name).toBe("will-fail");
  expect(dlqJobs[0].data.error).toBe("Permanent failure");

  await worker.close();
  await mainQueue.close();
  await dlqQueue.close();
}, 15_000);

Summary

Testing BullMQ advanced patterns:

  1. Flow producers: Verify children complete before parent; assert on child return value propagation
  2. Delayed jobs: Use queue.drain(true) to skip delay in tests; assert delay values in job options
  3. Rate limiters: Time batch processing to verify throttling; assert second batch starts after duration
  4. Job groups: Pause worker, enqueue all, resume — verify no temporal overlap between same-group jobs
  5. Priority queues: Pause worker before enqueueing; verify high-priority jobs come out first after resume
  6. Stalled job recovery: Simulate worker crash with worker.close(true); verify replacement worker recovers
  7. Dead letter queues: Use worker.on("failed") to move exhausted jobs; assert DLQ entry contents

Read more