Async Worker Patterns: Testing Idempotency, Dead-Letter Queues, and Backpressure

Async Worker Patterns: Testing Idempotency, Dead-Letter Queues, and Backpressure

Background job systems fail in complex ways. A job can run twice due to a retry. A downstream service can be slow, causing queue buildup. A crash during execution can leave data in an inconsistent state. Testing these scenarios requires specific patterns that go beyond simple pass/fail test cases.

This guide covers the three hardest async worker testing problems: idempotency, dead-letter queue handling, and backpressure.

Part 1: Testing Idempotency

An idempotent job produces the same result regardless of how many times it runs with the same input. This is critical for background jobs because retries, duplicate deliveries, and at-least-once delivery guarantees mean your jobs will run more than once.

The Problem

# Non-idempotent job — dangerous
def process_order(order_id):
    order = Order.objects.get(id=order_id)
    
    # Problem: runs every time the job is called
    order.total_cents += calculate_tax(order.subtotal)
    order.save()
    
    PaymentGateway.charge(order.customer_id, order.total_cents)
    EmailService.send_receipt(order)

If this runs twice, the customer is charged twice and receives two receipts.

Idempotency Patterns

Pattern 1: State checks

def process_order(order_id):
    order = Order.objects.get(id=order_id)
    
    if order.status in ('completed', 'failed'):
        return {'status': 'already_processed', 'order_id': order_id}
    
    with transaction.atomic():
        order.status = 'processing'
        order.save()
        
        # Now safe to proceed
        payment = PaymentGateway.charge(order.customer_id, order.total_cents)
        order.status = 'completed'
        order.payment_id = payment.id
        order.save()
    
    EmailService.send_receipt(order)
    return {'status': 'completed', 'order_id': order_id}

Pattern 2: Idempotency keys

import hashlib

def send_webhook(endpoint_url, event_type, payload, event_id):
    """Idempotency key prevents duplicate webhook deliveries"""
    
    idempotency_key = f"webhook:{event_id}:{endpoint_url}"
    
    # Check if already delivered
    cache_key = f"webhook_sent:{hashlib.sha256(idempotency_key.encode()).hexdigest()}"
    if cache.get(cache_key):
        return {'status': 'already_sent', 'idempotency_key': idempotency_key}
    
    # Attempt delivery
    response = requests.post(
        endpoint_url,
        json={'event': event_type, 'data': payload},
        headers={'X-Webhook-ID': event_id},
        timeout=5
    )
    
    if response.status_code in (200, 201, 204):
        # Mark as sent with TTL
        cache.set(cache_key, True, timeout=86400)  # 24 hours
        return {'status': 'sent', 'http_status': response.status_code}
    
    raise WebhookDeliveryError(f"HTTP {response.status_code}")

Pattern 3: Database-level deduplication

from django.db import IntegrityError

def create_invoice_for_order(order_id, billing_period):
    """Use unique constraint to prevent duplicate invoices"""
    try:
        invoice = Invoice.objects.create(
            order_id=order_id,
            billing_period=billing_period,
            # Unique constraint on (order_id, billing_period)
        )
        return invoice
    except IntegrityError:
        # Already exists — return existing
        return Invoice.objects.get(order_id=order_id, billing_period=billing_period)

Testing Idempotency

class TestOrderProcessingIdempotency:
    
    @pytest.fixture
    def pending_order(self, db):
        return Order.objects.create(
            customer_id=1,
            total_cents=9999,
            status='pending'
        )
    
    def test_double_processing_charges_once(self, pending_order, mocker):
        """Running the job twice should only charge the customer once"""
        mock_charge = mocker.patch('myapp.PaymentGateway.charge')
        mock_charge.return_value = MagicMock(id='txn_123')
        
        process_order(pending_order.id)
        process_order(pending_order.id)  # Second run — should be a no-op
        
        assert mock_charge.call_count == 1
    
    def test_double_processing_sends_one_receipt(self, pending_order, mocker):
        """Receipt email should only be sent once"""
        mocker.patch('myapp.PaymentGateway.charge', return_value=MagicMock(id='txn_123'))
        mock_email = mocker.patch('myapp.EmailService.send_receipt')
        
        process_order(pending_order.id)
        process_order(pending_order.id)
        
        assert mock_email.call_count == 1
    
    def test_second_call_returns_already_processed(self, pending_order, mocker):
        mocker.patch('myapp.PaymentGateway.charge', return_value=MagicMock(id='txn_123'))
        
        process_order(pending_order.id)
        result = process_order(pending_order.id)
        
        assert result['status'] == 'already_processed'
    
    def test_concurrent_processing_is_safe(self, pending_order, mocker):
        """Simulate concurrent execution with threads"""
        import threading
        
        mock_charge = mocker.patch('myapp.PaymentGateway.charge')
        mock_charge.return_value = MagicMock(id='txn_123')
        
        results = []
        errors = []
        
        def run_job():
            try:
                result = process_order(pending_order.id)
                results.append(result)
            except Exception as e:
                errors.append(e)
        
        threads = [threading.Thread(target=run_job) for _ in range(5)]
        [t.start() for t in threads]
        [t.join() for t in threads]
        
        assert not errors
        assert mock_charge.call_count == 1
    
    def test_webhook_idempotency_key_prevents_duplicates(self, mocker):
        mock_post = mocker.patch('requests.post')
        mock_post.return_value = MagicMock(status_code=200)
        
        event_id = 'evt_abc123'
        
        send_webhook('https://example.com/hook', 'order.created', {}, event_id)
        send_webhook('https://example.com/hook', 'order.created', {}, event_id)
        
        # Only one HTTP request should have been made
        assert mock_post.call_count == 1

Part 2: Testing Dead-Letter Queues

Dead-letter queues (DLQs) receive jobs that have exhausted their retry attempts. Testing DLQ behavior ensures your failure handling works correctly.

Setting Up DLQ Testing

// BullMQ DLQ configuration
const { Queue, Worker } = require('bullmq');

async function setupQueues(connection) {
  const mainQueue = new Queue('orders', { connection });
  const dlq = new Queue('orders-dlq', { connection });
  
  const worker = new Worker(
    'orders',
    async (job) => {
      try {
        await processOrder(job.data);
      } catch (error) {
        // If job has failed all attempts, move to DLQ
        if (job.attemptsMade >= job.opts.attempts - 1) {
          await dlq.add('failed-order', {
            originalJob: job.data,
            error: error.message,
            failedAt: new Date().toISOString(),
            attempts: job.attemptsMade + 1,
          });
        }
        throw error;  // Still throw to trigger retry/failure
      }
    },
    { connection }
  );
  
  return { mainQueue, dlq, worker };
}
// Test DLQ behavior
describe('Dead Letter Queue', () => {
  
  it('moves job to DLQ after max retries', async () => {
    const { mainQueue, dlq, worker } = await setupQueues(connection);
    
    // Add job that will always fail
    await mainQueue.add(
      'process-order',
      { orderId: 999 },
      { attempts: 3, backoff: { type: 'fixed', delay: 10 } }
    );
    
    // Wait for processing to complete/fail
    const queueEvents = new QueueEvents('orders', { connection });
    
    await new Promise((resolve) => {
      queueEvents.on('failed', resolve);
    });
    
    // Give time for DLQ move
    await new Promise(r => setTimeout(r, 100));
    
    const dlqJobs = await dlq.getJobs(['waiting']);
    expect(dlqJobs).toHaveLength(1);
    expect(dlqJobs[0].data.originalJob.orderId).toBe(999);
    expect(dlqJobs[0].data.attempts).toBe(3);
    
    await worker.close();
    await queueEvents.close();
  });
  
  it('preserves original job data in DLQ', async () => {
    const originalData = {
      orderId: 42,
      customerId: 7,
      amount: 9999,
      items: [{ sku: 'ABC123', quantity: 2 }]
    };
    
    const { mainQueue, dlq, worker } = await setupQueues(connection);
    
    await mainQueue.add('process-order', originalData, {
      attempts: 1,  // Fail immediately
    });
    
    // Wait for DLQ
    await new Promise(r => setTimeout(r, 500));
    
    const [dlqJob] = await dlq.getJobs(['waiting']);
    expect(dlqJob.data.originalJob).toEqual(originalData);
    
    await worker.close();
  });
  
  it('can replay jobs from DLQ', async () => {
    const processedOrders = [];
    
    // Fix the handler — now it works
    const recoveryWorker = new Worker(
      'orders',
      async (job) => {
        processedOrders.push(job.data.orderId);
      },
      { connection }
    );
    
    // Re-queue failed jobs from DLQ
    const failedJobs = await dlq.getJobs(['waiting']);
    
    for (const job of failedJobs) {
      await mainQueue.add('process-order', job.data.originalJob);
      await job.remove();  // Remove from DLQ
    }
    
    await new Promise(r => setTimeout(r, 500));
    
    expect(processedOrders).toContain(999);  // From previous test
    
    await recoveryWorker.close();
  });
});

Python DLQ Testing (Celery)

class TestDeadLetterQueue:
    
    @pytest.fixture(autouse=True)
    def setup_dlq(self, celery_app):
        celery_app.conf.task_acks_late = True
        celery_app.conf.task_reject_on_worker_lost = True
    
    def test_exhausted_task_triggers_callback(self, mocker):
        """Verify that tasks exhausting retries call the DLQ callback"""
        mock_dlq = mocker.patch('myapp.tasks.DeadLetterQueue.add')
        
        # Simulate the retries_exhausted callback
        msg = {
            'jid': 'test-job-id',
            'args': [999],  # Non-existent order
            'queue': 'orders'
        }
        exc = Order.DoesNotExist("Order 999 not found")
        
        ProcessOrderWorker.sidekiq_retries_exhausted_block.call(msg, exc)
        
        mock_dlq.assert_called_once_with(
            job_id='test-job-id',
            queue='orders',
            payload=[999],
            error='Order 999 not found',
        )
    
    def test_dlq_contents_can_be_inspected(self):
        """Can enumerate jobs in the DLQ"""
        DeadLetterQueue.add(
            job_id='test-1',
            queue='orders',
            payload={'order_id': 1},
            error='Timeout',
        )
        
        dlq_jobs = DeadLetterQueue.all()
        
        assert len(dlq_jobs) >= 1
        assert any(j['job_id'] == 'test-1' for j in dlq_jobs)
    
    def test_dlq_jobs_can_be_replayed(self, mocker):
        """Can replay a DLQ job after fixing the underlying issue"""
        mock_process = mocker.patch('myapp.services.OrderService.process')
        mock_process.return_value = {'status': 'success'}
        
        # Add to DLQ
        DeadLetterQueue.add(
            job_id='test-replay',
            queue='orders',
            payload={'order_id': 42},
            error='Transient network error',
        )
        
        # Replay
        DeadLetterQueue.replay('test-replay')
        
        # Should have been processed
        mock_process.assert_called_once_with(order_id=42)

Part 3: Testing Backpressure

Backpressure occurs when the producer adds jobs faster than workers can process them. Untested backpressure handling can cause memory exhaustion, dropped messages, or cascading failures.

Testing Queue Depth Limits

describe('Queue backpressure', () => {
  
  it('respects maxJobs configuration', async () => {
    const queue = new Queue('bounded', {
      connection,
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: true,
      }
    });
    
    const MAX_QUEUE_SIZE = 100;
    
    // Custom check before adding
    async function addJobWithBackpressure(data) {
      const count = await queue.count();
      if (count >= MAX_QUEUE_SIZE) {
        throw new Error('Queue full — backpressure activated');
      }
      return queue.add('job', data);
    }
    
    // Fill to limit
    for (let i = 0; i < MAX_QUEUE_SIZE; i++) {
      await addJobWithBackpressure({ index: i });
    }
    
    // Next add should fail
    await expect(
      addJobWithBackpressure({ index: MAX_QUEUE_SIZE + 1 })
    ).rejects.toThrow('Queue full');
    
    await queue.obliterate({ force: true });
    await queue.close();
  });
  
  it('recovers from backpressure when workers drain queue', async () => {
    const queue = new Queue('backpressure-test', { connection });
    const MAX_SIZE = 5;
    const processed = [];
    
    // Start worker
    const worker = new Worker(
      'backpressure-test',
      async (job) => {
        await new Promise(r => setTimeout(r, 50));  // Slow worker
        processed.push(job.data.index);
      },
      { connection, concurrency: 1 }
    );
    
    // Add MAX_SIZE jobs (fills queue)
    for (let i = 0; i < MAX_SIZE; i++) {
      await queue.add('job', { index: i });
    }
    
    // Wait for processing
    await new Promise(r => setTimeout(r, MAX_SIZE * 100));
    
    // All should have been processed
    expect(processed.length).toBe(MAX_SIZE);
    
    // Now we can add more without backpressure
    const count = await queue.count();
    expect(count).toBe(0);
    
    await worker.close();
    await queue.obliterate({ force: true });
    await queue.close();
  });
});

Testing Slow Consumer Resilience

class TestBackpressureHandling:
    
    def test_fast_producer_with_slow_consumer(self, mocker):
        """Producer should not overwhelm slow consumers"""
        processing_times = []
        
        def slow_processor(job_data):
            time.sleep(0.01)  # 10ms per job
            processing_times.append(time.time())
            return {'processed': True}
        
        mocker.patch('myapp.tasks.process_item', side_effect=slow_processor)
        
        # Add 20 jobs rapidly
        start = time.time()
        for i in range(20):
            process_item.delay({'index': i})
        
        # Drain the queue
        time.sleep(1)
        
        # Verify all were processed eventually
        assert len(processing_times) == 20
        
        # Verify reasonable throughput (not dropped)
        total_time = processing_times[-1] - processing_times[0]
        assert total_time < 2.0  # Should complete within 2s
    
    def test_memory_does_not_grow_unbounded(self):
        """Queue should not accumulate unbounded with rate limiting"""
        import tracemalloc
        
        tracemalloc.start()
        baseline = tracemalloc.take_snapshot()
        
        # Add 1000 jobs
        for i in range(1000):
            cheap_task.delay({'index': i})
        
        # Check memory hasn't grown excessively
        current = tracemalloc.take_snapshot()
        stats = current.compare_to(baseline, 'lineno')
        
        total_added = sum(s.size_diff for s in stats if s.size_diff > 0)
        assert total_added < 10 * 1024 * 1024  # Less than 10MB
        
        tracemalloc.stop()

Testing Circuit Breaker for Worker Dependencies

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
def call_external_api(data):
    response = requests.post('https://api.example.com/process', json=data, timeout=3)
    response.raise_for_status()
    return response.json()


class TestCircuitBreaker:
    
    def test_circuit_opens_after_threshold_failures(self, mocker):
        """After 5 failures, circuit should open"""
        mocker.patch('requests.post', side_effect=ConnectionError("API down"))
        
        # First 5 calls should raise ConnectionError and increment counter
        for _ in range(5):
            with pytest.raises(ConnectionError):
                call_external_api({'test': 'data'})
        
        # 6th call should raise CircuitBreakerError immediately
        from circuitbreaker import CircuitBreakerError
        with pytest.raises(CircuitBreakerError):
            call_external_api({'test': 'data'})
    
    def test_job_handles_open_circuit(self, mocker):
        """Worker should gracefully handle circuit breaker trips"""
        from circuitbreaker import CircuitBreakerError
        
        mocker.patch('myapp.tasks.call_external_api',
                     side_effect=CircuitBreakerError('API circuit open'))
        
        job = ProcessWithExternalApiWorker()
        result = job.perform_with_fallback({'data': 'test'})
        
        # Should use fallback, not crash
        assert result['status'] == 'queued_for_retry'
        assert result['retry_after'] is not None

Combining All Three: Chaos Testing for Workers

The most realistic test: inject random failures and verify the system maintains correctness:

import random
import threading
from concurrent.futures import ThreadPoolExecutor

class TestWorkerChaos:
    
    def test_correct_processing_under_chaos(self, db):
        """With random failures, all jobs should eventually complete correctly"""
        
        orders_to_process = [create_order() for _ in range(50)]
        processed_order_ids = set()
        lock = threading.Lock()
        
        fail_count = 0
        
        def chaotic_processor(order_id):
            nonlocal fail_count
            
            # Random failure simulation (30% failure rate)
            if random.random() < 0.3:
                fail_count += 1
                raise TransientError("Random chaos failure")
            
            # Simulate actual processing
            order = Order.objects.get(id=order_id)
            order.mark_processed()
            
            with lock:
                processed_order_ids.add(order_id)
        
        # Queue all orders
        for order in orders_to_process:
            process_order_with_retry.delay(order.id)
        
        # Wait for processing (with generous timeout)
        max_wait = 30  # seconds
        start = time.time()
        
        while len(processed_order_ids) < len(orders_to_process):
            if time.time() - start > max_wait:
                break
            time.sleep(0.1)
        
        # All orders should be processed despite chaos
        assert len(processed_order_ids) == len(orders_to_process), (
            f"Only {len(processed_order_ids)}/{len(orders_to_process)} processed. "
            f"Fail count: {fail_count}"
        )
        
        # Verify idempotency: no double-processing
        for order in orders_to_process:
            order.refresh_from_db()
            assert order.processing_count == 1, (
                f"Order {order.id} processed {order.processing_count} times"
            )

Testing async workers at this level — idempotency, DLQs, backpressure, chaos — is what separates systems that degrade gracefully from those that lose data under pressure. Most bugs in background job systems aren't logic bugs; they're correctness bugs that only manifest under retry, duplication, or load conditions.

Read more