Async Worker Patterns: Testing Idempotency, Dead-Letter Queues, and Backpressure
Background job systems fail in complex ways. A job can run twice due to a retry. A downstream service can be slow, causing queue buildup. A crash during execution can leave data in an inconsistent state. Testing these scenarios requires specific patterns that go beyond simple pass/fail test cases.
This guide covers the three hardest async worker testing problems: idempotency, dead-letter queue handling, and backpressure.
Part 1: Testing Idempotency
An idempotent job produces the same result regardless of how many times it runs with the same input. This is critical for background jobs because retries, duplicate deliveries, and at-least-once delivery guarantees mean your jobs will run more than once.
The Problem
# Non-idempotent job — dangerous
def process_order(order_id):
order = Order.objects.get(id=order_id)
# Problem: runs every time the job is called
order.total_cents += calculate_tax(order.subtotal)
order.save()
PaymentGateway.charge(order.customer_id, order.total_cents)
EmailService.send_receipt(order)If this runs twice, the customer is charged twice and receives two receipts.
Idempotency Patterns
Pattern 1: State checks
def process_order(order_id):
order = Order.objects.get(id=order_id)
if order.status in ('completed', 'failed'):
return {'status': 'already_processed', 'order_id': order_id}
with transaction.atomic():
order.status = 'processing'
order.save()
# Now safe to proceed
payment = PaymentGateway.charge(order.customer_id, order.total_cents)
order.status = 'completed'
order.payment_id = payment.id
order.save()
EmailService.send_receipt(order)
return {'status': 'completed', 'order_id': order_id}Pattern 2: Idempotency keys
import hashlib
def send_webhook(endpoint_url, event_type, payload, event_id):
"""Idempotency key prevents duplicate webhook deliveries"""
idempotency_key = f"webhook:{event_id}:{endpoint_url}"
# Check if already delivered
cache_key = f"webhook_sent:{hashlib.sha256(idempotency_key.encode()).hexdigest()}"
if cache.get(cache_key):
return {'status': 'already_sent', 'idempotency_key': idempotency_key}
# Attempt delivery
response = requests.post(
endpoint_url,
json={'event': event_type, 'data': payload},
headers={'X-Webhook-ID': event_id},
timeout=5
)
if response.status_code in (200, 201, 204):
# Mark as sent with TTL
cache.set(cache_key, True, timeout=86400) # 24 hours
return {'status': 'sent', 'http_status': response.status_code}
raise WebhookDeliveryError(f"HTTP {response.status_code}")Pattern 3: Database-level deduplication
from django.db import IntegrityError
def create_invoice_for_order(order_id, billing_period):
"""Use unique constraint to prevent duplicate invoices"""
try:
invoice = Invoice.objects.create(
order_id=order_id,
billing_period=billing_period,
# Unique constraint on (order_id, billing_period)
)
return invoice
except IntegrityError:
# Already exists — return existing
return Invoice.objects.get(order_id=order_id, billing_period=billing_period)Testing Idempotency
class TestOrderProcessingIdempotency:
@pytest.fixture
def pending_order(self, db):
return Order.objects.create(
customer_id=1,
total_cents=9999,
status='pending'
)
def test_double_processing_charges_once(self, pending_order, mocker):
"""Running the job twice should only charge the customer once"""
mock_charge = mocker.patch('myapp.PaymentGateway.charge')
mock_charge.return_value = MagicMock(id='txn_123')
process_order(pending_order.id)
process_order(pending_order.id) # Second run — should be a no-op
assert mock_charge.call_count == 1
def test_double_processing_sends_one_receipt(self, pending_order, mocker):
"""Receipt email should only be sent once"""
mocker.patch('myapp.PaymentGateway.charge', return_value=MagicMock(id='txn_123'))
mock_email = mocker.patch('myapp.EmailService.send_receipt')
process_order(pending_order.id)
process_order(pending_order.id)
assert mock_email.call_count == 1
def test_second_call_returns_already_processed(self, pending_order, mocker):
mocker.patch('myapp.PaymentGateway.charge', return_value=MagicMock(id='txn_123'))
process_order(pending_order.id)
result = process_order(pending_order.id)
assert result['status'] == 'already_processed'
def test_concurrent_processing_is_safe(self, pending_order, mocker):
"""Simulate concurrent execution with threads"""
import threading
mock_charge = mocker.patch('myapp.PaymentGateway.charge')
mock_charge.return_value = MagicMock(id='txn_123')
results = []
errors = []
def run_job():
try:
result = process_order(pending_order.id)
results.append(result)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=run_job) for _ in range(5)]
[t.start() for t in threads]
[t.join() for t in threads]
assert not errors
assert mock_charge.call_count == 1
def test_webhook_idempotency_key_prevents_duplicates(self, mocker):
mock_post = mocker.patch('requests.post')
mock_post.return_value = MagicMock(status_code=200)
event_id = 'evt_abc123'
send_webhook('https://example.com/hook', 'order.created', {}, event_id)
send_webhook('https://example.com/hook', 'order.created', {}, event_id)
# Only one HTTP request should have been made
assert mock_post.call_count == 1Part 2: Testing Dead-Letter Queues
Dead-letter queues (DLQs) receive jobs that have exhausted their retry attempts. Testing DLQ behavior ensures your failure handling works correctly.
Setting Up DLQ Testing
// BullMQ DLQ configuration
const { Queue, Worker } = require('bullmq');
async function setupQueues(connection) {
const mainQueue = new Queue('orders', { connection });
const dlq = new Queue('orders-dlq', { connection });
const worker = new Worker(
'orders',
async (job) => {
try {
await processOrder(job.data);
} catch (error) {
// If job has failed all attempts, move to DLQ
if (job.attemptsMade >= job.opts.attempts - 1) {
await dlq.add('failed-order', {
originalJob: job.data,
error: error.message,
failedAt: new Date().toISOString(),
attempts: job.attemptsMade + 1,
});
}
throw error; // Still throw to trigger retry/failure
}
},
{ connection }
);
return { mainQueue, dlq, worker };
}// Test DLQ behavior
describe('Dead Letter Queue', () => {
it('moves job to DLQ after max retries', async () => {
const { mainQueue, dlq, worker } = await setupQueues(connection);
// Add job that will always fail
await mainQueue.add(
'process-order',
{ orderId: 999 },
{ attempts: 3, backoff: { type: 'fixed', delay: 10 } }
);
// Wait for processing to complete/fail
const queueEvents = new QueueEvents('orders', { connection });
await new Promise((resolve) => {
queueEvents.on('failed', resolve);
});
// Give time for DLQ move
await new Promise(r => setTimeout(r, 100));
const dlqJobs = await dlq.getJobs(['waiting']);
expect(dlqJobs).toHaveLength(1);
expect(dlqJobs[0].data.originalJob.orderId).toBe(999);
expect(dlqJobs[0].data.attempts).toBe(3);
await worker.close();
await queueEvents.close();
});
it('preserves original job data in DLQ', async () => {
const originalData = {
orderId: 42,
customerId: 7,
amount: 9999,
items: [{ sku: 'ABC123', quantity: 2 }]
};
const { mainQueue, dlq, worker } = await setupQueues(connection);
await mainQueue.add('process-order', originalData, {
attempts: 1, // Fail immediately
});
// Wait for DLQ
await new Promise(r => setTimeout(r, 500));
const [dlqJob] = await dlq.getJobs(['waiting']);
expect(dlqJob.data.originalJob).toEqual(originalData);
await worker.close();
});
it('can replay jobs from DLQ', async () => {
const processedOrders = [];
// Fix the handler — now it works
const recoveryWorker = new Worker(
'orders',
async (job) => {
processedOrders.push(job.data.orderId);
},
{ connection }
);
// Re-queue failed jobs from DLQ
const failedJobs = await dlq.getJobs(['waiting']);
for (const job of failedJobs) {
await mainQueue.add('process-order', job.data.originalJob);
await job.remove(); // Remove from DLQ
}
await new Promise(r => setTimeout(r, 500));
expect(processedOrders).toContain(999); // From previous test
await recoveryWorker.close();
});
});Python DLQ Testing (Celery)
class TestDeadLetterQueue:
@pytest.fixture(autouse=True)
def setup_dlq(self, celery_app):
celery_app.conf.task_acks_late = True
celery_app.conf.task_reject_on_worker_lost = True
def test_exhausted_task_triggers_callback(self, mocker):
"""Verify that tasks exhausting retries call the DLQ callback"""
mock_dlq = mocker.patch('myapp.tasks.DeadLetterQueue.add')
# Simulate the retries_exhausted callback
msg = {
'jid': 'test-job-id',
'args': [999], # Non-existent order
'queue': 'orders'
}
exc = Order.DoesNotExist("Order 999 not found")
ProcessOrderWorker.sidekiq_retries_exhausted_block.call(msg, exc)
mock_dlq.assert_called_once_with(
job_id='test-job-id',
queue='orders',
payload=[999],
error='Order 999 not found',
)
def test_dlq_contents_can_be_inspected(self):
"""Can enumerate jobs in the DLQ"""
DeadLetterQueue.add(
job_id='test-1',
queue='orders',
payload={'order_id': 1},
error='Timeout',
)
dlq_jobs = DeadLetterQueue.all()
assert len(dlq_jobs) >= 1
assert any(j['job_id'] == 'test-1' for j in dlq_jobs)
def test_dlq_jobs_can_be_replayed(self, mocker):
"""Can replay a DLQ job after fixing the underlying issue"""
mock_process = mocker.patch('myapp.services.OrderService.process')
mock_process.return_value = {'status': 'success'}
# Add to DLQ
DeadLetterQueue.add(
job_id='test-replay',
queue='orders',
payload={'order_id': 42},
error='Transient network error',
)
# Replay
DeadLetterQueue.replay('test-replay')
# Should have been processed
mock_process.assert_called_once_with(order_id=42)Part 3: Testing Backpressure
Backpressure occurs when the producer adds jobs faster than workers can process them. Untested backpressure handling can cause memory exhaustion, dropped messages, or cascading failures.
Testing Queue Depth Limits
describe('Queue backpressure', () => {
it('respects maxJobs configuration', async () => {
const queue = new Queue('bounded', {
connection,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
}
});
const MAX_QUEUE_SIZE = 100;
// Custom check before adding
async function addJobWithBackpressure(data) {
const count = await queue.count();
if (count >= MAX_QUEUE_SIZE) {
throw new Error('Queue full — backpressure activated');
}
return queue.add('job', data);
}
// Fill to limit
for (let i = 0; i < MAX_QUEUE_SIZE; i++) {
await addJobWithBackpressure({ index: i });
}
// Next add should fail
await expect(
addJobWithBackpressure({ index: MAX_QUEUE_SIZE + 1 })
).rejects.toThrow('Queue full');
await queue.obliterate({ force: true });
await queue.close();
});
it('recovers from backpressure when workers drain queue', async () => {
const queue = new Queue('backpressure-test', { connection });
const MAX_SIZE = 5;
const processed = [];
// Start worker
const worker = new Worker(
'backpressure-test',
async (job) => {
await new Promise(r => setTimeout(r, 50)); // Slow worker
processed.push(job.data.index);
},
{ connection, concurrency: 1 }
);
// Add MAX_SIZE jobs (fills queue)
for (let i = 0; i < MAX_SIZE; i++) {
await queue.add('job', { index: i });
}
// Wait for processing
await new Promise(r => setTimeout(r, MAX_SIZE * 100));
// All should have been processed
expect(processed.length).toBe(MAX_SIZE);
// Now we can add more without backpressure
const count = await queue.count();
expect(count).toBe(0);
await worker.close();
await queue.obliterate({ force: true });
await queue.close();
});
});Testing Slow Consumer Resilience
class TestBackpressureHandling:
def test_fast_producer_with_slow_consumer(self, mocker):
"""Producer should not overwhelm slow consumers"""
processing_times = []
def slow_processor(job_data):
time.sleep(0.01) # 10ms per job
processing_times.append(time.time())
return {'processed': True}
mocker.patch('myapp.tasks.process_item', side_effect=slow_processor)
# Add 20 jobs rapidly
start = time.time()
for i in range(20):
process_item.delay({'index': i})
# Drain the queue
time.sleep(1)
# Verify all were processed eventually
assert len(processing_times) == 20
# Verify reasonable throughput (not dropped)
total_time = processing_times[-1] - processing_times[0]
assert total_time < 2.0 # Should complete within 2s
def test_memory_does_not_grow_unbounded(self):
"""Queue should not accumulate unbounded with rate limiting"""
import tracemalloc
tracemalloc.start()
baseline = tracemalloc.take_snapshot()
# Add 1000 jobs
for i in range(1000):
cheap_task.delay({'index': i})
# Check memory hasn't grown excessively
current = tracemalloc.take_snapshot()
stats = current.compare_to(baseline, 'lineno')
total_added = sum(s.size_diff for s in stats if s.size_diff > 0)
assert total_added < 10 * 1024 * 1024 # Less than 10MB
tracemalloc.stop()Testing Circuit Breaker for Worker Dependencies
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
def call_external_api(data):
response = requests.post('https://api.example.com/process', json=data, timeout=3)
response.raise_for_status()
return response.json()
class TestCircuitBreaker:
def test_circuit_opens_after_threshold_failures(self, mocker):
"""After 5 failures, circuit should open"""
mocker.patch('requests.post', side_effect=ConnectionError("API down"))
# First 5 calls should raise ConnectionError and increment counter
for _ in range(5):
with pytest.raises(ConnectionError):
call_external_api({'test': 'data'})
# 6th call should raise CircuitBreakerError immediately
from circuitbreaker import CircuitBreakerError
with pytest.raises(CircuitBreakerError):
call_external_api({'test': 'data'})
def test_job_handles_open_circuit(self, mocker):
"""Worker should gracefully handle circuit breaker trips"""
from circuitbreaker import CircuitBreakerError
mocker.patch('myapp.tasks.call_external_api',
side_effect=CircuitBreakerError('API circuit open'))
job = ProcessWithExternalApiWorker()
result = job.perform_with_fallback({'data': 'test'})
# Should use fallback, not crash
assert result['status'] == 'queued_for_retry'
assert result['retry_after'] is not NoneCombining All Three: Chaos Testing for Workers
The most realistic test: inject random failures and verify the system maintains correctness:
import random
import threading
from concurrent.futures import ThreadPoolExecutor
class TestWorkerChaos:
def test_correct_processing_under_chaos(self, db):
"""With random failures, all jobs should eventually complete correctly"""
orders_to_process = [create_order() for _ in range(50)]
processed_order_ids = set()
lock = threading.Lock()
fail_count = 0
def chaotic_processor(order_id):
nonlocal fail_count
# Random failure simulation (30% failure rate)
if random.random() < 0.3:
fail_count += 1
raise TransientError("Random chaos failure")
# Simulate actual processing
order = Order.objects.get(id=order_id)
order.mark_processed()
with lock:
processed_order_ids.add(order_id)
# Queue all orders
for order in orders_to_process:
process_order_with_retry.delay(order.id)
# Wait for processing (with generous timeout)
max_wait = 30 # seconds
start = time.time()
while len(processed_order_ids) < len(orders_to_process):
if time.time() - start > max_wait:
break
time.sleep(0.1)
# All orders should be processed despite chaos
assert len(processed_order_ids) == len(orders_to_process), (
f"Only {len(processed_order_ids)}/{len(orders_to_process)} processed. "
f"Fail count: {fail_count}"
)
# Verify idempotency: no double-processing
for order in orders_to_process:
order.refresh_from_db()
assert order.processing_count == 1, (
f"Order {order.id} processed {order.processing_count} times"
)Testing async workers at this level — idempotency, DLQs, backpressure, chaos — is what separates systems that degrade gracefully from those that lose data under pressure. Most bugs in background job systems aren't logic bugs; they're correctness bugs that only manifest under retry, duplication, or load conditions.