AWS SNS Testing Strategies: Topics, Fan-Out Patterns, and DLQ Handling

AWS SNS Testing Strategies: Topics, Fan-Out Patterns, and DLQ Handling

SNS is a pub/sub service where one publisher sends to a topic and multiple subscribers receive. Testing SNS means verifying: messages reach the right subscribers, message filters work correctly, delivery failures go to the DLQ, and fan-out to SQS queues behaves as expected. moto handles all of this in-process without any running infrastructure.

Key Takeaways

SNS fan-out to SQS is the most common pattern — test the full chain. Create a topic, subscribe an SQS queue, publish to the topic, and assert the message arrives in the queue. moto supports this end-to-end.

Message filtering cuts delivery costs — test filter policies explicitly. An SNS subscription with a filter policy only delivers messages whose attributes match. Test that matching messages arrive and non-matching messages don't.

Subscription confirmation is automatic in moto and LocalStack. In production, HTTP/HTTPS endpoints must confirm subscriptions. In tests, skip this — use SQS subscriptions which auto-confirm, or verify your confirmation handler separately.

SNS DLQs are configured per subscription, not per topic. Each SQS subscription can have its own DLQ for delivery failures. Test that failed deliveries (e.g., queue doesn't exist) route to the subscription DLQ.

Test raw message delivery vs. SNS envelope. By default, SQS receives SNS messages wrapped in a JSON envelope. With RawMessageDelivery=true, SQS gets the raw body. Your consumers must handle whichever format they'll receive in production.

SNS Testing Challenges

SNS is deceptively simple to publish to but complex to test correctly. The main challenges:

  • Fan-out: one publish → multiple subscribers → need to assert each one received the right message
  • Filtering: message attribute-based subscription filters mean the same publish produces different delivery outcomes per subscriber
  • Envelope format: by default, SQS queues receive SNS messages wrapped in a JSON envelope, not the raw body your application code expects
  • Cross-service dependencies: SNS often feeds SQS, Lambda, or HTTP endpoints simultaneously

The standard approach is moto for isolated unit tests and LocalStack for multi-service integration tests.

Basic SNS Setup with moto

pip install boto3 moto pytest
# tests/conftest.py
import boto3
import pytest
from moto import mock_aws


@pytest.fixture(autouse=True)
def aws_credentials(monkeypatch):
    monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
    monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
    monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
    monkeypatch.setenv("AWS_SESSION_TOKEN", "testing")
    monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")


@pytest.fixture
def sns_client():
    with mock_aws():
        yield boto3.client("sns", region_name="us-east-1")


@pytest.fixture
def sqs_client():
    with mock_aws():
        yield boto3.client("sqs", region_name="us-east-1")

Testing a Publisher

# myapp/publisher.py
import json
import boto3


class EventPublisher:
    def __init__(self, topic_arn: str, region: str = "us-east-1"):
        self.sns = boto3.client("sns", region_name=region)
        self.topic_arn = topic_arn

    def publish_order_event(self, order_id: str, status: str) -> dict:
        return self.sns.publish(
            TopicArn=self.topic_arn,
            Message=json.dumps({"order_id": order_id, "status": status}),
            Subject="OrderStatusChanged",
            MessageAttributes={
                "status": {
                    "DataType": "String",
                    "StringValue": status,
                },
                "source": {
                    "DataType": "String",
                    "StringValue": "order-service",
                },
            },
        )
# tests/test_publisher.py
from moto import mock_aws
import boto3
import pytest
from myapp.publisher import EventPublisher


def test_publisher_sends_to_topic(aws_credentials):
    with mock_aws():
        sns = boto3.client("sns", region_name="us-east-1")
        topic = sns.create_topic(Name="order-events")
        topic_arn = topic["TopicArn"]

        publisher = EventPublisher(topic_arn=topic_arn)
        response = publisher.publish_order_event("ord-001", "shipped")

        # Successful publish returns a MessageId
        assert "MessageId" in response
        assert len(response["MessageId"]) > 0

Testing Fan-Out to SQS

The fan-out pattern sends one message to multiple SQS queues via SNS subscriptions. Test that every subscriber receives the message.

def test_fanout_delivers_to_all_subscribers(aws_credentials):
    with mock_aws():
        sns = boto3.client("sns", region_name="us-east-1")
        sqs = boto3.client("sqs", region_name="us-east-1")

        topic_arn = sns.create_topic(Name="order-events")["TopicArn"]

        # Create two subscriber queues
        queue_a_url = sqs.create_queue(QueueName="fulfillment-queue")["QueueUrl"]
        queue_b_url = sqs.create_queue(QueueName="analytics-queue")["QueueUrl"]

        def get_queue_arn(url):
            return sqs.get_queue_attributes(
                QueueUrl=url, AttributeNames=["QueueArn"]
            )["Attributes"]["QueueArn"]

        queue_a_arn = get_queue_arn(queue_a_url)
        queue_b_arn = get_queue_arn(queue_b_url)

        # Subscribe both queues
        sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_a_arn)
        sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_b_arn)

        # Publish one message
        sns.publish(
            TopicArn=topic_arn,
            Message='{"order_id": "ord-999", "status": "shipped"}',
        )

        # Both queues should receive it
        for url in [queue_a_url, queue_b_url]:
            msgs = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1)
            assert "Messages" in msgs, f"Queue {url} did not receive the message"

Testing Message Filtering

Subscription filter policies let you route messages selectively. This is critical to test — a misconfigured filter silently drops messages.

import json


def test_filter_policy_routes_only_matching_messages(aws_credentials):
    with mock_aws():
        sns = boto3.client("sns", region_name="us-east-1")
        sqs = boto3.client("sqs", region_name="us-east-1")

        topic_arn = sns.create_topic(Name="order-events")["TopicArn"]

        # Shipped queue only receives "shipped" status
        shipped_url = sqs.create_queue(QueueName="shipped-queue")["QueueUrl"]
        shipped_arn = sqs.get_queue_attributes(
            QueueUrl=shipped_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]

        # Cancelled queue only receives "cancelled" status
        cancelled_url = sqs.create_queue(QueueName="cancelled-queue")["QueueUrl"]
        cancelled_arn = sqs.get_queue_attributes(
            QueueUrl=cancelled_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]

        sns.subscribe(
            TopicArn=topic_arn,
            Protocol="sqs",
            Endpoint=shipped_arn,
            Attributes={
                "FilterPolicy": json.dumps({"status": ["shipped"]}),
            },
        )
        sns.subscribe(
            TopicArn=topic_arn,
            Protocol="sqs",
            Endpoint=cancelled_arn,
            Attributes={
                "FilterPolicy": json.dumps({"status": ["cancelled"]}),
            },
        )

        # Publish shipped event
        sns.publish(
            TopicArn=topic_arn,
            Message='{"order_id": "ord-001"}',
            MessageAttributes={
                "status": {"DataType": "String", "StringValue": "shipped"}
            },
        )

        # Publish cancelled event
        sns.publish(
            TopicArn=topic_arn,
            Message='{"order_id": "ord-002"}',
            MessageAttributes={
                "status": {"DataType": "String", "StringValue": "cancelled"}
            },
        )

        # Shipped queue: only ord-001
        msgs = sqs.receive_message(QueueUrl=shipped_url, MaxNumberOfMessages=10)
        assert len(msgs.get("Messages", [])) == 1

        # Cancelled queue: only ord-002
        msgs = sqs.receive_message(QueueUrl=cancelled_url, MaxNumberOfMessages=10)
        assert len(msgs.get("Messages", [])) == 1

Testing the SNS Envelope vs. Raw Delivery

By default, SQS receives messages wrapped in an SNS JSON envelope. If your consumer parses the raw body, it needs to handle this wrapper.

import json


def test_sqs_receives_sns_envelope_by_default(aws_credentials):
    with mock_aws():
        sns = boto3.client("sns", region_name="us-east-1")
        sqs = boto3.client("sqs", region_name="us-east-1")

        topic_arn = sns.create_topic(Name="wrapped-topic")["TopicArn"]
        queue_url = sqs.create_queue(QueueName="wrapped-queue")["QueueUrl"]
        queue_arn = sqs.get_queue_attributes(
            QueueUrl=queue_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]

        sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)

        sns.publish(TopicArn=topic_arn, Message="hello from sns")

        msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
        raw_body = msgs["Messages"][0]["Body"]
        envelope = json.loads(raw_body)

        # SNS wraps the message
        assert envelope["Type"] == "Notification"
        assert envelope["Message"] == "hello from sns"
        assert "TopicArn" in envelope


def test_raw_delivery_sends_plain_body(aws_credentials):
    with mock_aws():
        sns = boto3.client("sns", region_name="us-east-1")
        sqs = boto3.client("sqs", region_name="us-east-1")

        topic_arn = sns.create_topic(Name="raw-topic")["TopicArn"]
        queue_url = sqs.create_queue(QueueName="raw-queue")["QueueUrl"]
        queue_arn = sqs.get_queue_attributes(
            QueueUrl=queue_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]

        sub = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)
        sns.set_subscription_attributes(
            SubscriptionArn=sub["SubscriptionArn"],
            AttributeName="RawMessageDelivery",
            AttributeValue="true",
        )

        sns.publish(TopicArn=topic_arn, Message="raw message body")

        msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
        assert msgs["Messages"][0]["Body"] == "raw message body"

Testing DLQ for Failed Deliveries

Each SNS subscription can specify a DLQ for messages that fail to deliver (e.g., SQS queue doesn't exist or is full).

def test_subscription_dlq_configured(aws_credentials):
    """Verify the DLQ is attached to the subscription attributes."""
    with mock_aws():
        sns = boto3.client("sns", region_name="us-east-1")
        sqs = boto3.client("sqs", region_name="us-east-1")

        topic_arn = sns.create_topic(Name="events-topic")["TopicArn"]
        dlq_url = sqs.create_queue(QueueName="events-dlq")["QueueUrl"]
        dlq_arn = sqs.get_queue_attributes(
            QueueUrl=dlq_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]

        target_url = sqs.create_queue(QueueName="events-queue")["QueueUrl"]
        target_arn = sqs.get_queue_attributes(
            QueueUrl=target_url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]

        sub = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=target_arn)

        sns.set_subscription_attributes(
            SubscriptionArn=sub["SubscriptionArn"],
            AttributeName="RedrivePolicy",
            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),
        )

        attrs = sns.get_subscription_attributes(
            SubscriptionArn=sub["SubscriptionArn"]
        )["Attributes"]
        redrive = json.loads(attrs["RedrivePolicy"])
        assert redrive["deadLetterTargetArn"] == dlq_arn

Integration Test with LocalStack

# tests/integration/test_sns_localstack.py
import boto3
import json
import time
import pytest

ENDPOINT = "http://localhost:4566"
REGION = "us-east-1"
CREDS = {"aws_access_key_id": "test", "aws_secret_access_key": "test"}


@pytest.fixture(scope="module")
def clients():
    sns = boto3.client("sns", region_name=REGION, endpoint_url=ENDPOINT, **CREDS)
    sqs = boto3.client("sqs", region_name=REGION, endpoint_url=ENDPOINT, **CREDS)
    return sns, sqs


def test_sns_fanout_integration(clients):
    sns, sqs = clients
    suffix = str(int(time.time()))

    topic_arn = sns.create_topic(Name=f"orders-{suffix}")["TopicArn"]
    q1_url = sqs.create_queue(QueueName=f"consumer1-{suffix}")["QueueUrl"]
    q2_url = sqs.create_queue(QueueName=f"consumer2-{suffix}")["QueueUrl"]

    for url in [q1_url, q2_url]:
        arn = sqs.get_queue_attributes(
            QueueUrl=url, AttributeNames=["QueueArn"]
        )["Attributes"]["QueueArn"]
        sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=arn)

    sns.publish(TopicArn=topic_arn, Message=json.dumps({"event": "test"}))

    for url in [q1_url, q2_url]:
        resp = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=1, WaitTimeSeconds=2)
        assert "Messages" in resp

Common Mistakes

Not setting FilterPolicyScope. By default, SNS filter policies apply to message attributes. If you want to filter on message body fields, set FilterPolicyScope to MessageBody. Tests that work against attribute filters will fail against body filters if the scope is wrong.

Assuming subscriptions auto-confirm for HTTP endpoints. SQS subscriptions auto-confirm; HTTP/HTTPS do not. If you're testing an HTTP subscriber, you need to handle the confirmation POST before any messages arrive.

Publishing to a deleted topic. In cleanup-heavy test suites, publishing to a topic that was deleted in a previous test raises TopicNotFound. Use unique topic names or always check existence before publishing.

Summary

Test SNS by verifying the publish → subscribe → deliver chain end-to-end. Use moto for fast unit tests that cover fan-out, filter policies, and envelope format. Use LocalStack for integration tests that span SNS, SQS, and Lambda. Always test the DLQ configuration and verify that filtered messages arrive only at the intended subscriber.

Read more