AWS SQS Testing Guide: Unit and Integration Testing with LocalStack, moto, and boto3
Testing SQS consumers and producers without hitting real AWS is the only way to run fast, isolated tests. moto gives you an in-process mock for unit tests. LocalStack gives you a full local AWS stack for integration tests. This guide covers both, with patterns for FIFO queues, dead-letter queues, visibility timeout, and message attribute validation.
Key Takeaways
Use moto for unit tests, LocalStack for integration tests. moto intercepts boto3 calls in-process — no network, no Docker, extremely fast. LocalStack runs a real SQS API over HTTP — use it when you need to test Lambda triggers, cross-service flows, or infrastructure-as-code.
Always test DLQ routing. SQS dead-letter queue behavior (after maxReceiveCount) is easy to misconfigure. Write an explicit test that sends a message, fails processing N times, and asserts the message appears in the DLQ.
Visibility timeout is the main source of double-processing bugs. Test that your consumer extends visibility timeout for long-running jobs, and that messages reappear after timeout expiry.
FIFO queues require MessageGroupId and MessageDeduplicationId. Tests that omit these will raise InvalidParameterValue. Always test both happy-path deduplication and cross-group ordering.
Purge queues between tests. Leftover messages from one test pollute the next. Call purge_queue() in teardown, or use a fresh queue name per test.
Why Test SQS Locally?
Real AWS SQS costs money, is slow (network round-trips), and requires credentials in CI. Beyond cost and latency, real SQS makes tests flaky — messages can linger across test runs, DLQ behavior depends on timing, and FIFO deduplication windows are time-based.
The solution is a two-tier testing strategy:
- Unit tests with moto — test your producer/consumer logic in isolation, no Docker, no network
- Integration tests with LocalStack — test cross-service flows (SQS → Lambda, SQS → SNS) against a real HTTP API
Setting Up moto for Unit Tests
Install dependencies:
pip install boto3 moto pytestmoto works by patching boto3 with a decorator or context manager. The key is that the mock must be active before you import or instantiate any boto3 client.
# tests/test_sqs_producer.py
import json
import boto3
import pytest
from moto import mock_aws
from myapp.producer import SQSProducer
@pytest.fixture
def aws_credentials(monkeypatch):
"""Fake credentials so boto3 doesn't try to find real ones."""
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
monkeypatch.setenv("AWS_SESSION_TOKEN", "testing")
monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")
@pytest.fixture
def sqs_queue(aws_credentials):
with mock_aws():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(QueueName="test-orders")
yield queue
def test_producer_sends_order_message(sqs_queue):
with mock_aws():
producer = SQSProducer(queue_url=sqs_queue.url)
producer.send_order(order_id="ord-123", amount=49.99)
messages = sqs_queue.receive_messages(MaxNumberOfMessages=1)
assert len(messages) == 1
body = json.loads(messages[0].body)
assert body["order_id"] == "ord-123"
assert body["amount"] == 49.99The producer under test:
# myapp/producer.py
import json
import boto3
class SQSProducer:
def __init__(self, queue_url: str, region: str = "us-east-1"):
self.sqs = boto3.client("sqs", region_name=region)
self.queue_url = queue_url
def send_order(self, order_id: str, amount: float) -> dict:
message = json.dumps({"order_id": order_id, "amount": amount})
return self.sqs.send_message(
QueueUrl=self.queue_url,
MessageBody=message,
MessageAttributes={
"EventType": {
"DataType": "String",
"StringValue": "order.created",
}
},
)Testing Message Attributes
Message attributes let consumers filter or route messages. Always test that your producer sets them correctly and your consumer reads them.
def test_producer_sets_event_type_attribute(sqs_queue):
with mock_aws():
producer = SQSProducer(queue_url=sqs_queue.url)
producer.send_order(order_id="ord-456", amount=99.00)
messages = sqs_queue.receive_messages(
MaxNumberOfMessages=1,
MessageAttributeNames=["All"],
)
attrs = messages[0].message_attributes
assert attrs["EventType"]["StringValue"] == "order.created"Testing the Consumer
A consumer pulls messages, processes them, and deletes them. The critical paths to test: successful processing + deletion, processing failure + message visibility, and poison messages going to the DLQ.
# myapp/consumer.py
import json
import boto3
import logging
logger = logging.getLogger(__name__)
class SQSConsumer:
def __init__(self, queue_url: str, region: str = "us-east-1"):
self.sqs = boto3.client("sqs", region_name=region)
self.queue_url = queue_url
def process_one(self, handler) -> bool:
response = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=0,
)
messages = response.get("Messages", [])
if not messages:
return False
msg = messages[0]
try:
body = json.loads(msg["Body"])
handler(body)
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=msg["ReceiptHandle"],
)
return True
except Exception as exc:
logger.error("Processing failed: %s", exc)
return False# tests/test_sqs_consumer.py
from unittest.mock import MagicMock, patch
from moto import mock_aws
import boto3
import json
import pytest
from myapp.consumer import SQSConsumer
@pytest.fixture
def populated_queue(aws_credentials):
with mock_aws():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(QueueName="test-consume")
queue.send_message(MessageBody=json.dumps({"order_id": "ord-789"}))
yield queue
def test_consumer_calls_handler_and_deletes_message(populated_queue):
with mock_aws():
handler = MagicMock()
consumer = SQSConsumer(queue_url=populated_queue.url)
result = consumer.process_one(handler)
assert result is True
handler.assert_called_once_with({"order_id": "ord-789"})
# Message should be deleted — queue should be empty
remaining = populated_queue.receive_messages(MaxNumberOfMessages=10)
assert len(remaining) == 0
def test_consumer_does_not_delete_on_handler_failure(populated_queue):
with mock_aws():
def failing_handler(body):
raise ValueError("downstream unavailable")
consumer = SQSConsumer(queue_url=populated_queue.url)
result = consumer.process_one(failing_handler)
assert result is False
# Message should still be in the queue (visibility timeout resets)
# In moto, messages become visible again immediately after the timeoutTesting Dead-Letter Queues
DLQ routing is controlled by maxReceiveCount on the redrive policy. In moto, you can simulate this by receiving a message multiple times without deleting it.
def test_message_routes_to_dlq_after_max_receive_count(aws_credentials):
with mock_aws():
sqs = boto3.client("sqs", region_name="us-east-1")
dlq = sqs.create_queue(QueueName="orders-dlq")
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq["QueueUrl"],
AttributeNames=["QueueArn"],
)["Attributes"]["QueueArn"]
main_queue = sqs.create_queue(
QueueName="orders",
Attributes={
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": dlq_arn,
"maxReceiveCount": "3",
}),
"VisibilityTimeout": "0", # instant re-visibility for testing
},
)
sqs.send_message(
QueueUrl=main_queue["QueueUrl"],
MessageBody=json.dumps({"order_id": "bad-message"}),
)
# Receive but never delete — simulate repeated failures
for _ in range(3):
sqs.receive_message(
QueueUrl=main_queue["QueueUrl"],
MaxNumberOfMessages=1,
)
# After maxReceiveCount, moto routes to DLQ
dlq_response = sqs.receive_message(
QueueUrl=dlq["QueueUrl"],
MaxNumberOfMessages=1,
)
assert "Messages" in dlq_response
assert json.loads(dlq_response["Messages"][0]["Body"])["order_id"] == "bad-message"Testing FIFO Queues
FIFO queues guarantee ordering within a group and deduplicate messages within a 5-minute window.
def test_fifo_queue_ordering_within_group(aws_credentials):
with mock_aws():
sqs = boto3.client("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="orders.fifo",
Attributes={
"FifoQueue": "true",
"ContentBasedDeduplication": "true",
},
)
for i in range(3):
sqs.send_message(
QueueUrl=queue["QueueUrl"],
MessageBody=json.dumps({"seq": i}),
MessageGroupId="order-group-1",
)
received = []
for _ in range(3):
resp = sqs.receive_message(
QueueUrl=queue["QueueUrl"],
MaxNumberOfMessages=1,
)
msg = resp["Messages"][0]
received.append(json.loads(msg["Body"])["seq"])
sqs.delete_message(
QueueUrl=queue["QueueUrl"],
ReceiptHandle=msg["ReceiptHandle"],
)
assert received == [0, 1, 2] # FIFO order guaranteed within groupIntegration Testing with LocalStack
For cross-service tests (SQS triggering Lambda, SQS fed by SNS), use LocalStack:
# docker-compose.yml
services:
localstack:
image: localstack/localstack:3
ports:
- <span class="hljs-string">"4566:4566"
environment:
- SERVICES=sqs,sns,lambda# tests/integration/test_sqs_localstack.py
import boto3
import json
import pytest
import time
LOCALSTACK_ENDPOINT = "http://localhost:4566"
@pytest.fixture(scope="session")
def localstack_sqs():
return boto3.client(
"sqs",
region_name="us-east-1",
endpoint_url=LOCALSTACK_ENDPOINT,
aws_access_key_id="test",
aws_secret_access_key="test",
)
@pytest.fixture
def test_queue(localstack_sqs):
resp = localstack_sqs.create_queue(QueueName=f"test-{int(time.time())}")
url = resp["QueueUrl"]
yield url
localstack_sqs.delete_queue(QueueUrl=url)
def test_send_and_receive_via_localstack(localstack_sqs, test_queue):
localstack_sqs.send_message(
QueueUrl=test_queue,
MessageBody=json.dumps({"event": "test"}),
)
response = localstack_sqs.receive_message(
QueueUrl=test_queue,
MaxNumberOfMessages=1,
WaitTimeSeconds=2,
)
assert "Messages" in response
body = json.loads(response["Messages"][0]["Body"])
assert body["event"] == "test"Common Pitfalls
Forgetting to mock at the right level. moto must wrap the boto3 client creation, not just the call site. If your code creates clients at module import time, use mock_aws() as a class decorator or patch the client in the constructor.
Not testing long-polling. WaitTimeSeconds=20 in production means your consumer blocks. In tests, always set WaitTimeSeconds=0 and send messages before calling receive_message.
Ignoring ApproximateNumberOfMessages. This attribute is eventually consistent. Don't use it in assertions — receive messages directly instead.
Parallel tests with shared queue names. Use unique queue names per test (e.g., f"test-{uuid4()}") to avoid cross-test contamination.
Summary
Test SQS with moto for fast unit tests and LocalStack for integration tests. Cover the happy path (send → receive → delete), the failure path (handler error → no delete → requeue), and DLQ routing (maxReceiveCount exceeded → message in DLQ). FIFO queues need explicit ordering and deduplication tests. Keep queue names unique per test and always purge or delete queues in teardown.