QStash and Upstash Queue Testing: Message Delivery, Deduplication, and Delays
QStash and Upstash Queue are serverless message queue services built on Redis. QStash delivers HTTP callbacks with at-least-once semantics, deduplication, scheduling, and retry logic. Upstash Queue provides FIFO queues with consumer groups. Testing these systems requires understanding their delivery guarantees and designing tests that verify your message handlers behave correctly under retry, delay, and deduplication conditions.
QStash Architecture
QStash works by accepting an HTTP POST to its API, then delivering that message to your endpoint URL. Your endpoint must respond with a 2xx status within the timeout (defaults to 30s) or QStash will retry:
Client → QStash API → Your HTTP endpoint
↑ retries on failureThis means your tests need to:
- Test the message handler logic (the function at your endpoint)
- Test that you're publishing the right payloads to QStash
- Optionally test end-to-end delivery in integration environments
Testing Message Handler Logic
The most important tests run your handler function directly — no QStash infrastructure required:
// src/handlers/sendNotification.ts
export async function handleNotification(payload: {
userId: string;
type: "welcome" | "digest" | "alert";
data: Record<string, unknown>;
}) {
const user = await userService.get(payload.userId);
if (!user) throw new Error(`User ${payload.userId} not found`);
switch (payload.type) {
case "welcome":
return emailService.sendWelcome(user.email, payload.data);
case "digest":
return emailService.sendDigest(user.email, payload.data);
case "alert":
return smsService.sendAlert(user.phone, payload.data);
}
}
// src/handlers/sendNotification.test.ts
import { handleNotification } from "./sendNotification";
it("sends welcome email for welcome notification type", async () => {
const sendWelcome = vi.spyOn(emailService, "sendWelcome").mockResolvedValueOnce({ id: "msg_1" });
vi.spyOn(userService, "get").mockResolvedValueOnce({ email: "test@example.com", phone: "+1555000" });
await handleNotification({ userId: "usr_1", type: "welcome", data: { name: "Alice" } });
expect(sendWelcome).toHaveBeenCalledWith("test@example.com", { name: "Alice" });
});
it("throws when user is not found", async () => {
vi.spyOn(userService, "get").mockResolvedValueOnce(null);
await expect(
handleNotification({ userId: "missing", type: "welcome", data: {} })
).rejects.toThrow("not found");
});Testing QStash Signature Verification
QStash signs every request with a signature header. Your endpoint must verify this to prevent replay attacks. Test the verification logic:
import { Receiver } from "@upstash/qstash";
import { verifyQStashSignature } from "./middleware/qstashVerify";
const receiver = new Receiver({
currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
});
// In tests, use the test mode that skips real verification
it("rejects requests with invalid signature", async () => {
const req = new Request("https://yourapp.com/api/webhook", {
method: "POST",
headers: {
"content-type": "application/json",
"upstash-signature": "invalid_signature",
},
body: JSON.stringify({ userId: "usr_1", type: "welcome" }),
});
const response = await handleRequest(req);
expect(response.status).toBe(401);
});
it("accepts requests with valid signature", async () => {
// Use QSTASH_TOKEN=test in test env — QStash provides a test mode
process.env.QSTASH_TOKEN = "test";
const validBody = JSON.stringify({ userId: "usr_1", type: "welcome", data: {} });
// In test mode, signature verification is bypassed by QStash SDK
const req = new Request("https://yourapp.com/api/webhook", {
method: "POST",
headers: { "content-type": "application/json" },
body: validBody,
});
const response = await handleRequest(req);
expect(response.status).toBe(200);
});Testing Message Publishing
Test that your application publishes messages with the correct payload and options:
import { Client } from "@upstash/qstash";
// Mock QStash client for unit tests
vi.mock("@upstash/qstash", () => ({
Client: vi.fn().mockImplementation(() => ({
publishJSON: vi.fn().mockResolvedValue({ messageId: "msg_test_123" }),
})),
}));
import { scheduleNotification } from "./services/notificationService";
it("publishes welcome notification to correct endpoint", async () => {
const client = new Client({ token: "test" });
const publishSpy = vi.spyOn(client, "publishJSON");
await scheduleNotification({ userId: "usr_1", type: "welcome" });
expect(publishSpy).toHaveBeenCalledWith(
expect.objectContaining({
url: "https://yourapp.com/api/notifications",
body: { userId: "usr_1", type: "welcome" },
})
);
});Testing Deduplication
QStash deduplicates messages by messageId — if you publish two messages with the same messageId within the deduplication window, only one is delivered. Test that your publisher sets messageId based on stable, deterministic data:
import { Client } from "@upstash/qstash";
// The deduplication ID should be based on content, not random
export async function publishOrderEvent(orderId: string) {
const client = new Client({ token: process.env.QSTASH_TOKEN! });
return client.publishJSON({
url: `${process.env.BASE_URL}/api/order-events`,
body: { orderId },
headers: {
// Use orderId as dedup key — prevents duplicate processing
"Upstash-Deduplication-Id": `order-created-${orderId}`,
},
});
}
it("uses order ID as deduplication key", async () => {
const publishSpy = vi.fn().mockResolvedValue({ messageId: "msg_123" });
mockQStashClient({ publishJSON: publishSpy });
await publishOrderEvent("ord_999");
expect(publishSpy).toHaveBeenCalledWith(
expect.objectContaining({
headers: expect.objectContaining({
"Upstash-Deduplication-Id": "order-created-ord_999",
}),
})
);
});Testing Delay and Scheduling
QStash supports delayed delivery via delay (seconds) or notBefore (Unix timestamp). Test that your scheduler sets delays correctly:
export async function scheduleReminder(userId: string, delaySeconds: number) {
const client = new Client({ token: process.env.QSTASH_TOKEN! });
return client.publishJSON({
url: `${process.env.BASE_URL}/api/reminders`,
body: { userId },
delay: delaySeconds,
});
}
it("schedules reminder with correct delay", async () => {
const publishSpy = vi.fn().mockResolvedValue({ messageId: "msg_delayed" });
mockQStashClient({ publishJSON: publishSpy });
const THREE_DAYS_SECONDS = 3 * 24 * 60 * 60;
await scheduleReminder("usr_1", THREE_DAYS_SECONDS);
expect(publishSpy).toHaveBeenCalledWith(
expect.objectContaining({
delay: THREE_DAYS_SECONDS,
body: { userId: "usr_1" },
})
);
});
// Test cron scheduling
export async function scheduleDailyReport(recipientEmail: string) {
const client = new Client({ token: process.env.QSTASH_TOKEN! });
return client.schedules.create({
destination: `${process.env.BASE_URL}/api/reports`,
cron: "0 9 * * 1-5", // Weekdays at 9am
body: JSON.stringify({ recipientEmail }),
headers: { "content-type": "application/json" },
});
}
it("creates weekday schedule for daily reports", async () => {
const createSpy = vi.fn().mockResolvedValue({ scheduleId: "sched_123" });
mockQStashClient({ schedules: { create: createSpy } });
await scheduleDailyReport("reports@example.com");
expect(createSpy).toHaveBeenCalledWith(
expect.objectContaining({
cron: "0 9 * * 1-5",
body: JSON.stringify({ recipientEmail: "reports@example.com" }),
})
);
});Testing Upstash Queue (FIFO)
Upstash Queue provides traditional FIFO semantics with consumer groups. Test enqueue and dequeue patterns:
import { Queue } from "@upstash/queue";
// Production queue usage
export class JobQueue {
private queue: Queue;
constructor() {
this.queue = new Queue({
url: process.env.UPSTASH_REDIS_REST_URL!,
token: process.env.UPSTASH_REDIS_REST_TOKEN!,
name: "job-queue",
});
}
async enqueue(job: { type: string; payload: unknown }) {
return this.queue.enqueue(JSON.stringify(job));
}
async dequeue(): Promise<{ type: string; payload: unknown } | null> {
const item = await this.queue.dequeue();
if (!item) return null;
return JSON.parse(item);
}
}
// Test with a real Upstash Redis (use test credentials)
describe("JobQueue integration", () => {
let queue: JobQueue;
beforeEach(() => {
// Use test Upstash credentials from environment
process.env.UPSTASH_REDIS_REST_URL = process.env.TEST_UPSTASH_URL;
process.env.UPSTASH_REDIS_REST_TOKEN = process.env.TEST_UPSTASH_TOKEN;
queue = new JobQueue();
});
it("enqueues and dequeues in FIFO order", async () => {
await queue.enqueue({ type: "email", payload: { to: "a@example.com" } });
await queue.enqueue({ type: "email", payload: { to: "b@example.com" } });
const first = await queue.dequeue();
const second = await queue.dequeue();
expect(first?.payload).toMatchObject({ to: "a@example.com" });
expect(second?.payload).toMatchObject({ to: "b@example.com" });
});
it("returns null when queue is empty", async () => {
const item = await queue.dequeue();
expect(item).toBeNull();
});
});Testing Consumer Retry Logic
Build retry-aware consumers and test them:
export async function consumeWithRetry(
queue: JobQueue,
handler: (job: unknown) => Promise<void>,
maxAttempts = 3
) {
const job = await queue.dequeue();
if (!job) return;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
await handler(job);
return;
} catch (error) {
if (attempt === maxAttempts) throw error;
await new Promise((r) => setTimeout(r, attempt * 1000)); // exponential backoff
}
}
}
it("retries handler on transient failure", async () => {
const mockQueue = {
dequeue: vi.fn().mockResolvedValue({ type: "email", payload: {} }),
};
let callCount = 0;
const handler = vi.fn().mockImplementation(async () => {
callCount++;
if (callCount < 3) throw new Error("Transient error");
});
await consumeWithRetry(mockQueue as any, handler, 3);
expect(handler).toHaveBeenCalledTimes(3);
});
it("throws after max attempts exceeded", async () => {
const mockQueue = { dequeue: vi.fn().mockResolvedValue({ type: "email", payload: {} }) };
const handler = vi.fn().mockRejectedValue(new Error("Persistent failure"));
await expect(consumeWithRetry(mockQueue as any, handler, 3)).rejects.toThrow("Persistent failure");
expect(handler).toHaveBeenCalledTimes(3);
});Integration Testing with Real QStash
For end-to-end integration tests, use QStash's test token (test) which allows publishing to localhost via the QStash Dev Server:
# Start local tunnel for QStash to reach your local server
npx cloudflared tunnel --url http://localhost:3000// integration/qstash.test.ts
import { Client } from "@upstash/qstash";
const client = new Client({ token: process.env.QSTASH_TOKEN! });
it("delivers message to endpoint within 30 seconds", async () => {
const received: unknown[] = [];
// Your endpoint collects received messages for test inspection
const messageId = await client.publishJSON({
url: `${process.env.PUBLIC_URL}/api/test-receiver`,
body: { testId: "integration-001", timestamp: Date.now() },
});
// Poll your database/store for the received message
let message;
for (let i = 0; i < 30; i++) {
message = await testStore.getMessage("integration-001");
if (message) break;
await new Promise((r) => setTimeout(r, 1000));
}
expect(message).toBeDefined();
expect(message.testId).toBe("integration-001");
}, 35_000);Summary
Testing QStash and Upstash Queue effectively:
- Test handler logic directly — no QStash required for unit tests
- Test signature verification — use QStash test mode (
QSTASH_TOKEN=test) - Mock the client for publishing tests — assert on payload, headers, delay, and deduplication ID
- Test deduplication by inspecting the
Upstash-Deduplication-Idheader — it should be stable and deterministic - Test delay scheduling — assert the delay value matches your business rules
- Test consumer retry logic — both retry-until-success and max-attempts-exceeded paths
- Use real Upstash credentials for integration tests — Upstash's free tier is suitable for CI pipelines