BullMQ Advanced Testing: Flow Producers, Delayed Jobs, and Rate Limiters
BullMQ's advanced features — flow producers, delayed jobs, rate limiters, and job groups — enable sophisticated background processing patterns. But these features are also the hardest to test correctly. This guide covers testing strategies for each advanced BullMQ pattern, building on basic queue testing to handle the complex orchestration scenarios you'll encounter in production.
Prerequisites
You should be comfortable with basic BullMQ testing using real Redis or ioredis-mock. This guide focuses on advanced patterns. For BullMQ basics, see our BullMQ job testing guide.
All examples use Vitest + real Redis via Testcontainers:
import { GenericContainer, StartedTestContainer } from "testcontainers";
import { Queue, Worker, FlowProducer } from "bullmq";
import IORedis from "ioredis";
let redisContainer: StartedTestContainer;
let connection: IORedis;
beforeAll(async () => {
redisContainer = await new GenericContainer("redis:7-alpine")
.withExposedPorts(6379)
.start();
connection = new IORedis({
host: redisContainer.getHost(),
port: redisContainer.getMappedPort(6379),
maxRetriesPerRequest: null,
});
}, 30_000);
afterAll(async () => {
await connection.quit();
await redisContainer.stop();
});Testing Flow Producers
Flow producers orchestrate parent/child job relationships. The parent job only completes after all children succeed. Testing requires verifying both the tree structure and completion order:
import { FlowProducer, Worker } from "bullmq";
interface FlowResult {
jobId: string;
name: string;
data: unknown;
returnValue: unknown;
}
async function runFlowToCompletion(
flowProducer: FlowProducer,
flow: Parameters<FlowProducer["add"]>[0],
workerHandlers: Record<string, (data: unknown) => Promise<unknown>>,
timeoutMs = 10_000
): Promise<FlowResult[]> {
const completed: FlowResult[] = [];
const workers: Worker[] = [];
// Create a worker for each queue name in the flow
const queueNames = collectQueueNames(flow);
for (const queueName of queueNames) {
const handler = workerHandlers[queueName] ?? (() => ({ done: true }));
const worker = new Worker(queueName, async (job) => handler(job.data), { connection });
worker.on("completed", (job, returnValue) => {
completed.push({ jobId: job.id!, name: job.name, data: job.data, returnValue });
});
workers.push(worker);
}
await flowProducer.add(flow);
// Wait for all jobs to complete
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error("Flow timeout")), timeoutMs);
const interval = setInterval(() => {
if (completed.length >= queueNames.length) {
clearTimeout(timeout);
clearInterval(interval);
resolve();
}
}, 100);
});
await Promise.all(workers.map((w) => w.close()));
return completed;
}
it("completes children before parent in flow", async () => {
const flowProducer = new FlowProducer({ connection });
const completionOrder: string[] = [];
const results = await runFlowToCompletion(
flowProducer,
{
name: "generate-report",
queueName: "reports",
data: { reportId: "rpt_001" },
children: [
{ name: "fetch-data", queueName: "data-fetch", data: { source: "sales" } },
{ name: "fetch-metrics", queueName: "data-fetch", data: { source: "metrics" } },
],
},
{
"data-fetch": async (data: any) => {
completionOrder.push(`child:${data.source}`);
return { rows: 100 };
},
reports: async (data) => {
completionOrder.push("parent");
return { generated: true };
},
}
);
// Children must complete before parent
expect(completionOrder.indexOf("parent")).toBeGreaterThan(
completionOrder.indexOf("child:sales")
);
expect(completionOrder.indexOf("parent")).toBeGreaterThan(
completionOrder.indexOf("child:metrics")
);
await flowProducer.close();
});Testing Child Job Return Values in Parent
Parent jobs can access their children's return values. Test this dependency:
it("parent receives children return values", async () => {
const flowProducer = new FlowProducer({ connection });
const reportQueue = new Queue("report-assembly", { connection });
let parentReceivedChildren: unknown;
const parentWorker = new Worker(
"report-assembly",
async (job) => {
// BullMQ passes children results to parent via job.data or token
const childrenValues = await job.getChildrenValues();
parentReceivedChildren = childrenValues;
return { assembled: true };
},
{ connection }
);
await flowProducer.add({
name: "assemble",
queueName: "report-assembly",
data: {},
children: [
{ name: "section-1", queueName: "sections", data: { id: 1 } },
{ name: "section-2", queueName: "sections", data: { id: 2 } },
],
});
const sectionWorker = new Worker(
"sections",
async (job) => ({ content: `Section ${job.data.id} content` }),
{ connection }
);
await waitForQueueEmpty(reportQueue, 5000);
expect(parentReceivedChildren).toBeDefined();
const values = Object.values(parentReceivedChildren as Record<string, unknown>);
expect(values).toHaveLength(2);
expect(values).toContainEqual(expect.objectContaining({ content: "Section 1 content" }));
await parentWorker.close();
await sectionWorker.close();
await flowProducer.close();
});Testing Delayed Jobs
Delayed jobs are tricky — you don't want tests to wait real time. BullMQ provides drainDelay for testing:
import { Queue, Worker, Job } from "bullmq";
it("delayed job processes after delay", async () => {
const queue = new Queue("delayed-jobs", { connection });
const processed: string[] = [];
const worker = new Worker(
"delayed-jobs",
async (job: Job) => {
processed.push(job.id!);
},
{ connection }
);
// Add job with 5-second delay
const job = await queue.add("send-reminder", { userId: "usr_1" }, { delay: 5000 });
// Job should not be processed immediately
await new Promise((r) => setTimeout(r, 100));
expect(processed).not.toContain(job.id);
// Drain the delay — moves all delayed jobs to waiting immediately
await queue.drain(true); // true = drain delayed jobs
// Now job should process quickly
await waitForJobCompletion(job, 3000);
expect(processed).toContain(job.id);
await worker.close();
await queue.close();
});
// Testing that delay values are set correctly
it("sets correct delay for 3-day reminder", async () => {
const queue = new Queue("reminders", { connection });
const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000;
const job = await scheduleReminder("usr_1", "3d");
const jobDetails = await queue.getJob(job.id!);
expect(jobDetails?.opts.delay).toBeCloseTo(THREE_DAYS_MS, -3); // within 1 second
await queue.close();
});Testing Rate Limiters
BullMQ rate limiters throttle job execution. Test that your rate limiter config prevents burst processing:
it("respects rate limit of 10 jobs per minute", async () => {
const queue = new Queue("rate-limited", { connection });
const processingTimestamps: number[] = [];
const worker = new Worker(
"rate-limited",
async () => {
processingTimestamps.push(Date.now());
},
{
connection,
limiter: {
max: 3, // max 3 jobs
duration: 1000, // per 1 second
},
}
);
// Add 6 jobs — should process in 2 batches
await Promise.all(
Array.from({ length: 6 }, (_, i) =>
queue.add(`job-${i}`, { index: i })
)
);
await waitForAllJobsComplete(queue, 5000);
// First 3 should process quickly, next 3 should be delayed ~1s
const firstBatch = processingTimestamps.slice(0, 3);
const secondBatch = processingTimestamps.slice(3, 6);
const firstBatchSpread = Math.max(...firstBatch) - Math.min(...firstBatch);
const batchGap = Math.min(...secondBatch) - Math.max(...firstBatch);
expect(firstBatchSpread).toBeLessThan(500); // First 3 process close together
expect(batchGap).toBeGreaterThan(800); // Then ~1s gap before next batch
await worker.close();
await queue.close();
});Testing Job Groups (Concurrency Groups)
Job groups prevent parallel execution of jobs in the same group — useful when processing per-user:
it("processes same-group jobs serially", async () => {
const queue = new Queue("user-jobs", { connection });
const executionLog: { jobId: string; start: number; end: number }[] = [];
const worker = new Worker(
"user-jobs",
async (job) => {
const start = Date.now();
await new Promise((r) => setTimeout(r, 200)); // Simulate 200ms work
const end = Date.now();
executionLog.push({ jobId: job.id!, start, end });
},
{
connection,
group: { concurrency: 1 }, // One job per group at a time
}
);
// Add 3 jobs for the same user — should not overlap
await queue.add("process", { userId: "usr_1" }, { group: { id: "usr_1" } });
await queue.add("process", { userId: "usr_1" }, { group: { id: "usr_1" } });
await queue.add("process", { userId: "usr_1" }, { group: { id: "usr_1" } });
await waitForAllJobsComplete(queue, 5000);
// Sort by start time
executionLog.sort((a, b) => a.start - b.start);
// Verify no overlap: each job's start must be after previous job's end
for (let i = 1; i < executionLog.length; i++) {
expect(executionLog[i].start).toBeGreaterThanOrEqual(executionLog[i - 1].end);
}
await worker.close();
await queue.close();
});Testing Priority Queues
BullMQ supports job priorities (1 = highest). Test that high-priority jobs process first:
it("processes high-priority jobs before low-priority", async () => {
const queue = new Queue("priority-test", { connection });
const processedOrder: string[] = [];
// Add worker but pause processing initially
const worker = new Worker(
"priority-test",
async (job) => {
processedOrder.push(job.data.label);
},
{ connection }
);
await worker.pause();
// Enqueue low-priority jobs first
await queue.add("task", { label: "low-1" }, { priority: 10 });
await queue.add("task", { label: "low-2" }, { priority: 10 });
// Then enqueue high-priority
await queue.add("task", { label: "high-1" }, { priority: 1 });
await queue.add("task", { label: "high-2" }, { priority: 1 });
// Resume — jobs process in priority order
await worker.resume();
await waitForAllJobsComplete(queue, 3000);
// High priority jobs should be processed first
expect(processedOrder.indexOf("high-1")).toBeLessThan(processedOrder.indexOf("low-1"));
expect(processedOrder.indexOf("high-2")).toBeLessThan(processedOrder.indexOf("low-2"));
await worker.close();
await queue.close();
});Testing Job Staleness and Stalled Jobs
Workers can crash mid-job, leaving stalled jobs. Test stalled job recovery:
it("recovers stalled jobs after worker crash", async () => {
const queue = new Queue("stall-test", { connection });
const completedJobIds: string[] = [];
// Add a job
const job = await queue.add("risky-task", { data: "important" });
// Simulate worker that acquires the job but crashes
const crashingWorker = new Worker(
"stall-test",
async () => {
// Simulate crash by never resolving and then closing
await new Promise(() => {}); // Hang forever
},
{
connection,
stalledInterval: 1000, // Check for stalled jobs every 1s
maxStalledCount: 1, // Fail after 1 stall
}
);
// Wait for job to be picked up
await new Promise((r) => setTimeout(r, 200));
// Force close the worker (simulating crash)
await crashingWorker.close(true);
// Spin up a replacement worker
const reliableWorker = new Worker(
"stall-test",
async (job) => {
completedJobIds.push(job.id!);
return { rescued: true };
},
{
connection,
stalledInterval: 500,
}
);
// QueueEvents detect when jobs are recovered
await waitForJobCompletion(job, 8000);
expect(completedJobIds).toContain(job.id);
await reliableWorker.close();
await queue.close();
}, 15_000);Testing Dead Letter Queue Patterns
Move permanently failed jobs to a dead letter queue:
import { Queue, Worker, QueueEvents } from "bullmq";
export async function setupWithDLQ(queueName: string, maxAttempts: number) {
const mainQueue = new Queue(queueName, { connection });
const dlqQueue = new Queue(`${queueName}-dlq`, { connection });
const worker = new Worker(
queueName,
async (job) => {
// Your job logic
await processJob(job.data);
},
{
connection,
settings: { backoffStrategy: (attemptsMade) => attemptsMade * 1000 },
}
);
// Move failed jobs to DLQ
worker.on("failed", async (job, err) => {
if (!job) return;
if (job.attemptsMade >= maxAttempts) {
await dlqQueue.add("failed-job", {
originalJob: { name: job.name, data: job.data },
error: err.message,
failedAt: new Date().toISOString(),
});
}
});
return { mainQueue, dlqQueue, worker };
}
it("moves exhausted jobs to dead letter queue", async () => {
const processJob = vi.fn().mockRejectedValue(new Error("Permanent failure"));
const { mainQueue, dlqQueue, worker } = await setupWithDLQ("dlq-test", 3);
const job = await mainQueue.add("will-fail", { input: "bad-data" }, {
attempts: 3,
backoff: { type: "fixed", delay: 100 },
});
await waitForDLQEntry(dlqQueue, 10_000);
const dlqJobs = await dlqQueue.getJobs(["waiting"]);
expect(dlqJobs).toHaveLength(1);
expect(dlqJobs[0].data.originalJob.name).toBe("will-fail");
expect(dlqJobs[0].data.error).toBe("Permanent failure");
await worker.close();
await mainQueue.close();
await dlqQueue.close();
}, 15_000);Summary
Testing BullMQ advanced patterns:
- Flow producers: Verify children complete before parent; assert on child return value propagation
- Delayed jobs: Use
queue.drain(true)to skip delay in tests; assert delay values in job options - Rate limiters: Time batch processing to verify throttling; assert second batch starts after duration
- Job groups: Pause worker, enqueue all, resume — verify no temporal overlap between same-group jobs
- Priority queues: Pause worker before enqueueing; verify high-priority jobs come out first after resume
- Stalled job recovery: Simulate worker crash with
worker.close(true); verify replacement worker recovers - Dead letter queues: Use
worker.on("failed")to move exhausted jobs; assert DLQ entry contents