AWS Messaging Testing Best Practices: DLQ, Retry, Idempotency, and Message Ordering

AWS Messaging Testing Best Practices: DLQ, Retry, Idempotency, and Message Ordering

The failure modes of SQS, SNS, and EventBridge are well-known: duplicate delivery, out-of-order messages, poison pills that fill DLQs, and missing idempotency guards that corrupt state. This post covers the testing patterns that catch these bugs before they reach production.

Key Takeaways

Test idempotency explicitly. SQS delivers messages at least once, meaning duplicates are expected. Your handler must produce the same result if it processes the same message twice. Write a test that delivers the same message N times and asserts state is only updated once.

DLQ tests are regression tests for your retry policy. When you change maxReceiveCount or your retry logic, the DLQ test is the first signal. Run it every time you touch the consumer.

Test message ordering assumptions separately per queue type. Standard queues offer best-effort ordering; FIFO queues guarantee ordering within a group. Tests that assume ordering on a standard queue are incorrect specs.

Retry backoff logic should be tested without real time passing. Use a mock clock or parameterize sleep intervals to test exponential backoff in milliseconds, not minutes.

Observability belongs in tests too. Verify that your consumer emits the right CloudWatch metrics or structured logs when it processes, retries, or DLQs a message.

The Five Failure Modes Worth Testing

Every SQS/SNS/EventBridge application should have explicit tests for:

  1. Dead-letter routing — messages that fail maxReceiveCount times land in the DLQ
  2. Retry behavior — transient failures cause retries with appropriate backoff
  3. Idempotency — duplicate messages don't corrupt state
  4. Message ordering — ordering guarantees are either asserted (FIFO) or explicitly not assumed (standard)
  5. Poison pill isolation — malformed messages don't block valid messages

Testing DLQ Routing

The DLQ is your safety net. Test that it actually catches what it's supposed to catch.

# tests/test_dlq_routing.py
import boto3
import json
import pytest
from moto import mock_aws
from unittest.mock import MagicMock, patch


@pytest.fixture
def aws_env(monkeypatch):
    monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test")
    monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test")
    monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")


def setup_queue_with_dlq(sqs_client, max_receive_count: int = 3):
    dlq = sqs_client.create_queue(QueueName="test-dlq")
    dlq_arn = sqs_client.get_queue_attributes(
        QueueUrl=dlq["QueueUrl"], AttributeNames=["QueueArn"]
    )["Attributes"]["QueueArn"]

    main = sqs_client.create_queue(
        QueueName="test-main",
        Attributes={
            "VisibilityTimeout": "0",  # Instant re-visibility for testing
            "RedrivePolicy": json.dumps({
                "deadLetterTargetArn": dlq_arn,
                "maxReceiveCount": str(max_receive_count),
            }),
        },
    )
    return main["QueueUrl"], dlq["QueueUrl"]


def test_message_moves_to_dlq_after_max_retries(aws_env):
    with mock_aws():
        sqs = boto3.client("sqs", region_name="us-east-1")
        main_url, dlq_url = setup_queue_with_dlq(sqs, max_receive_count=3)

        sqs.send_message(
            QueueUrl=main_url,
            MessageBody=json.dumps({"payload": "poison"}),
        )

        # Simulate 3 failed receive attempts (never delete)
        for attempt in range(3):
            resp = sqs.receive_message(QueueUrl=main_url, MaxNumberOfMessages=1)
            assert "Messages" in resp, f"Message missing on attempt {attempt + 1}"
            # Don't delete — simulates processing failure

        # After maxReceiveCount, moto moves to DLQ
        dlq_msg = sqs.receive_message(QueueUrl=dlq_url, MaxNumberOfMessages=1)
        assert "Messages" in dlq_msg, "Message did not route to DLQ"
        body = json.loads(dlq_msg["Messages"][0]["Body"])
        assert body["payload"] == "poison"

        # Main queue should now be empty
        main_msgs = sqs.receive_message(QueueUrl=main_url, MaxNumberOfMessages=10)
        assert main_msgs.get("Messages", []) == []


def test_dlq_alert_fires_on_message_arrival():
    """
    Test that your monitoring code publishes a CloudWatch alarm payload
    when the DLQ receives a message.
    """
    from myapp.dlq_monitor import check_dlq_depth

    mock_cw = MagicMock()
    check_dlq_depth(dlq_depth=5, cloudwatch_client=mock_cw)
    mock_cw.put_metric_data.assert_called_once()
    call_args = mock_cw.put_metric_data.call_args[1]
    assert call_args["MetricData"][0]["Value"] == 5

Testing Retry Logic with Backoff

# myapp/consumer_with_retry.py
import time
import logging

logger = logging.getLogger(__name__)


def process_with_retry(
    handler,
    message: dict,
    max_retries: int = 3,
    base_delay: float = 1.0,
    sleep_fn=time.sleep,
) -> bool:
    """Process a message with exponential backoff retry."""
    for attempt in range(max_retries + 1):
        try:
            handler(message)
            return True
        except Exception as exc:
            if attempt == max_retries:
                logger.error("Exhausted retries for message: %s", exc)
                return False
            delay = base_delay * (2 ** attempt)
            logger.warning("Attempt %d failed, retrying in %.1fs: %s", attempt + 1, delay, exc)
            sleep_fn(delay)
    return False
# tests/test_retry_logic.py
from unittest.mock import MagicMock, call
import pytest
from myapp.consumer_with_retry import process_with_retry


def test_retries_on_transient_failure():
    """Handler fails twice, succeeds on third attempt."""
    handler = MagicMock(side_effect=[Exception("timeout"), Exception("timeout"), None])
    mock_sleep = MagicMock()

    result = process_with_retry(
        handler,
        message={"order_id": "ord-001"},
        max_retries=3,
        base_delay=1.0,
        sleep_fn=mock_sleep,
    )

    assert result is True
    assert handler.call_count == 3
    # Backoff: 1s after attempt 1, 2s after attempt 2
    mock_sleep.assert_has_calls([call(1.0), call(2.0)])


def test_exhausts_retries_and_returns_false():
    """Handler always fails — should give up after max_retries."""
    handler = MagicMock(side_effect=Exception("permanent error"))
    mock_sleep = MagicMock()

    result = process_with_retry(
        handler,
        message={"order_id": "bad-order"},
        max_retries=2,
        base_delay=0.5,
        sleep_fn=mock_sleep,
    )

    assert result is False
    assert handler.call_count == 3  # initial + 2 retries
    mock_sleep.assert_has_calls([call(0.5), call(1.0)])


def test_no_sleep_on_immediate_success():
    """Successful handler should not trigger any sleep."""
    handler = MagicMock()
    mock_sleep = MagicMock()

    process_with_retry(handler, message={"order_id": "good"}, sleep_fn=mock_sleep)

    assert handler.call_count == 1
    mock_sleep.assert_not_called()

Testing Idempotency

SQS delivers at-least-once. Your consumer must be idempotent — processing the same message twice must produce the same result as processing it once.

# myapp/idempotent_handler.py
import hashlib
import json


class IdempotentOrderHandler:
    def __init__(self, db, processed_ids_cache):
        self.db = db
        self.cache = processed_ids_cache

    def handle(self, message: dict) -> bool:
        message_id = message.get("message_id") or self._hash_message(message)

        if self.cache.exists(message_id):
            return False  # Already processed — skip

        self.db.update_order(message["order_id"], message["status"])
        self.cache.set(message_id, ttl_seconds=3600)
        return True

    @staticmethod
    def _hash_message(message: dict) -> str:
        body = json.dumps(message, sort_keys=True).encode()
        return hashlib.sha256(body).hexdigest()
# tests/test_idempotency.py
from unittest.mock import MagicMock, patch
import pytest
from myapp.idempotent_handler import IdempotentOrderHandler


@pytest.fixture
def handler():
    db = MagicMock()
    cache = MagicMock()
    cache.exists.return_value = False
    return IdempotentOrderHandler(db=db, processed_ids_cache=cache), db, cache


def test_first_delivery_processes_and_caches(handler):
    h, db, cache = handler
    msg = {"message_id": "msg-001", "order_id": "ord-A", "status": "shipped"}

    result = h.handle(msg)

    assert result is True
    db.update_order.assert_called_once_with("ord-A", "shipped")
    cache.set.assert_called_once_with("msg-001", ttl_seconds=3600)


def test_duplicate_delivery_is_skipped(handler):
    h, db, cache = handler
    cache.exists.return_value = True  # Already seen this message
    msg = {"message_id": "msg-001", "order_id": "ord-A", "status": "shipped"}

    result = h.handle(msg)

    assert result is False
    db.update_order.assert_not_called()


def test_messages_without_id_use_content_hash(handler):
    h, db, cache = handler
    msg = {"order_id": "ord-B", "status": "pending"}

    h.handle(msg)

    # Should have called cache with a hash string
    call_args = cache.set.call_args[0]
    assert len(call_args[0]) == 64  # SHA-256 hex digest


def test_two_different_messages_are_both_processed():
    """Same handler, two distinct messages — both must be processed."""
    db = MagicMock()
    cache = {}

    class LocalCache:
        def exists(self, key):
            return key in cache
        def set(self, key, **kwargs):
            cache[key] = True

    h = IdempotentOrderHandler(db=db, processed_ids_cache=LocalCache())
    h.handle({"message_id": "id-1", "order_id": "ord-1", "status": "shipped"})
    h.handle({"message_id": "id-2", "order_id": "ord-2", "status": "shipped"})

    assert db.update_order.call_count == 2

Testing Message Ordering

Standard Queue — Test That You Don't Assume Order

def test_standard_queue_consumer_handles_any_order(aws_env):
    """Standard queues don't guarantee order — consumer must not assume it."""
    with mock_aws():
        import boto3
        sqs = boto3.client("sqs", region_name="us-east-1")
        url = sqs.create_queue(QueueName="standard-queue")["QueueUrl"]

        # Send messages in order
        for i in range(5):
            sqs.send_message(QueueUrl=url, MessageBody=json.dumps({"seq": i}))

        # Receive all messages
        received_seqs = []
        for _ in range(5):
            resp = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1)
            if "Messages" in resp:
                body = json.loads(resp["Messages"][0]["Body"])
                received_seqs.append(body["seq"])
                sqs.delete_message(
                    QueueUrl=url,
                    ReceiptHandle=resp["Messages"][0]["ReceiptHandle"],
                )

        # Assert all messages arrived (don't assert order)
        assert sorted(received_seqs) == [0, 1, 2, 3, 4]

FIFO Queue — Test That Order Is Preserved

def test_fifo_queue_preserves_order_within_group(aws_env):
    with mock_aws():
        import boto3
        sqs = boto3.client("sqs", region_name="us-east-1")
        url = sqs.create_queue(
            QueueName="ordered.fifo",
            Attributes={
                "FifoQueue": "true",
                "ContentBasedDeduplication": "true",
            },
        )["QueueUrl"]

        for i in range(5):
            sqs.send_message(
                QueueUrl=url,
                MessageBody=json.dumps({"seq": i, "nonce": str(i)}),
                MessageGroupId="group-1",
            )

        received = []
        for _ in range(5):
            resp = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1)
            if "Messages" in resp:
                msg = resp["Messages"][0]
                received.append(json.loads(msg["Body"])["seq"])
                sqs.delete_message(QueueUrl=url, ReceiptHandle=msg["ReceiptHandle"])

        assert received == [0, 1, 2, 3, 4], "FIFO ordering violated"

Testing Poison Pill Isolation

A poison pill (malformed or unprocessable message) should not block valid messages behind it.

# myapp/robust_consumer.py
import json
import logging

logger = logging.getLogger(__name__)


def consume_batch(sqs_client, queue_url: str, handler) -> dict:
    """Process a batch of messages, isolating poison pills."""
    resp = sqs_client.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=0,
    )
    messages = resp.get("Messages", [])
    processed = failed = 0

    for msg in messages:
        try:
            body = json.loads(msg["Body"])
            handler(body)
            sqs_client.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=msg["ReceiptHandle"],
            )
            processed += 1
        except json.JSONDecodeError:
            logger.error("Poison pill (invalid JSON): %s", msg["MessageId"])
            failed += 1
        except Exception as exc:
            logger.error("Handler failed for %s: %s", msg["MessageId"], exc)
            failed += 1

    return {"processed": processed, "failed": failed}
def test_poison_pill_does_not_block_valid_messages(aws_env):
    with mock_aws():
        import boto3
        from myapp.robust_consumer import consume_batch
        from unittest.mock import MagicMock

        sqs = boto3.client("sqs", region_name="us-east-1")
        url = sqs.create_queue(QueueName="mixed-queue")["QueueUrl"]

        sqs.send_message(QueueUrl=url, MessageBody='{"order_id": "good-1"}')
        sqs.send_message(QueueUrl=url, MessageBody="{invalid json")  # poison pill
        sqs.send_message(QueueUrl=url, MessageBody='{"order_id": "good-2"}')

        handler = MagicMock()
        result = consume_batch(sqs, url, handler)

        # 2 valid messages processed, 1 failed
        assert result["processed"] == 2
        assert result["failed"] == 1

        # Valid messages reached the handler
        handled_ids = [call[0][0]["order_id"] for call in handler.call_args_list]
        assert "good-1" in handled_ids
        assert "good-2" in handled_ids

Testing Observability

Your consumer should emit metrics. Test that it does.

# tests/test_consumer_metrics.py
from unittest.mock import MagicMock, patch
import pytest


def test_consumer_emits_processing_time_metric():
    """Consumer should record processing latency for each message."""
    from myapp.instrumented_consumer import InstrumentedConsumer

    mock_metrics = MagicMock()
    consumer = InstrumentedConsumer(metrics=mock_metrics)

    consumer.handle_message({"order_id": "ord-metric-001"})

    mock_metrics.record_latency.assert_called_once()
    assert mock_metrics.record_latency.call_args[0][0] > 0  # latency > 0ms


def test_consumer_increments_dlq_counter_on_failure():
    """Consumer should increment DLQ counter when it fails maxReceiveCount times."""
    from myapp.instrumented_consumer import InstrumentedConsumer

    mock_metrics = MagicMock()
    consumer = InstrumentedConsumer(metrics=mock_metrics)

    consumer.record_dlq_event(queue_name="orders-dlq", message_id="msg-fail-001")

    mock_metrics.increment.assert_called_with(
        "dlq.messages_received",
        tags={"queue": "orders-dlq"},
    )

Test Coverage Checklist

Before shipping any SQS/SNS/EventBridge consumer to production:

  • Happy path: message received, processed, deleted
  • DLQ: message fails N times → appears in DLQ, disappears from main queue
  • Idempotency: same message delivered twice → handler called once, state unchanged
  • Poison pill: invalid message doesn't block valid ones in the same batch
  • Retry backoff: exponential delay is applied, not constant
  • Ordering: FIFO queues test ordering; standard queues don't assert it
  • Filter policies: matching messages arrive, non-matching don't
  • Metrics/logs: consumer emits events for monitoring

Summary

The failure modes in AWS messaging systems are predictable. Build a test suite that explicitly covers DLQ routing, retry backoff, idempotency, message ordering by queue type, and poison pill isolation. These tests aren't defensive over-engineering — they're the spec for how your system should behave under real-world conditions. Run them on every change to your consumer code.

Read more