LocalStack Integration Testing: Full SQS + SNS + Lambda Stack Setup
moto is great for unit tests but stops at service boundaries. When you need SQS triggering Lambda, SNS fan-out to multiple SQS queues, or Lambda updating DynamoDB — you need LocalStack. This guide sets up a full LocalStack integration test environment with Docker Compose, pytest fixtures, and reusable infrastructure helpers.
Key Takeaways
LocalStack is a real HTTP API — use the same boto3 code, different endpoint. Point endpoint_url to http://localhost:4566 and LocalStack handles the rest. Your application code doesn't change — only the endpoint in your test configuration.
Use Docker Compose healthchecks before running tests. LocalStack takes a few seconds to initialize. Use depends_on with healthcheck conditions to avoid race conditions in CI.
Deploy Lambda as ZIP files to LocalStack. LocalStack supports Lambda ZIP and container image deployments. For integration tests, package your Lambda handler into a ZIP and deploy it via boto3 — no console needed.
Tear down resources between test modules, not between individual tests. Creating and destroying queues/topics per test is slow. Use module-scoped fixtures and purge queues between tests.
Use unique resource names per test run. Include a timestamp or UUID in resource names to prevent conflicts when tests run in parallel or when LocalStack state persists between runs.
When to Use LocalStack vs. moto
| Scenario | Tool |
|---|---|
| Testing producer logic | moto |
| Testing consumer logic | moto |
| Testing SQS triggering Lambda | LocalStack |
| Testing SNS fan-out to multiple SQS queues | LocalStack |
| Testing Lambda writing to DynamoDB | LocalStack |
| Testing SQS → Lambda → SQS pipelines | LocalStack |
| CI fast feedback | moto |
| Pre-production smoke tests | LocalStack |
Project Structure
myapp/
├── handlers/
│ └── order_processor.py # Lambda handler
├── tests/
│ ├── unit/
│ │ └── test_producer.py # moto tests
│ └── integration/
│ ├── conftest.py # LocalStack fixtures
│ ├── test_fanout.py # SNS → SQS fan-out
│ └── test_lambda.py # SQS → Lambda
├── docker-compose.localstack.yml
└── requirements-test.txtDocker Compose Setup
# docker-compose.localstack.yml
version: "3.9"
services:
localstack:
image: localstack/localstack:3
ports:
- "4566:4566"
environment:
- SERVICES=sqs,sns,lambda,dynamodb,iam
- DEBUG=0
- LAMBDA_EXECUTOR=docker
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
- "./localstack-init:/etc/localstack/init/ready.d"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"]
interval: 5s
timeout: 3s
retries: 10
start_period: 10sStart LocalStack before running integration tests:
docker compose -f docker-compose.localstack.yml up -d --wait
pytest tests/integration/
docker compose -f docker-compose.localstack.yml downPytest Fixtures for LocalStack
# tests/integration/conftest.py
import boto3
import json
import time
import zipfile
import io
import pytest
LOCALSTACK_ENDPOINT = "http://localhost:4566"
REGION = "us-east-1"
AWS_CREDS = {
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
"region_name": REGION,
"endpoint_url": LOCALSTACK_ENDPOINT,
}
@pytest.fixture(scope="session")
def sqs():
return boto3.client("sqs", **AWS_CREDS)
@pytest.fixture(scope="session")
def sns():
return boto3.client("sns", **AWS_CREDS)
@pytest.fixture(scope="session")
def lambda_client():
return boto3.client("lambda", **AWS_CREDS)
@pytest.fixture(scope="session")
def iam():
return boto3.client("iam", **AWS_CREDS)
@pytest.fixture(scope="session")
def run_id():
"""Unique suffix for all resources in this test run."""
return str(int(time.time()))
def create_queue(sqs_client, name, **kwargs):
resp = sqs_client.create_queue(QueueName=name, Attributes=kwargs.get("Attributes", {}))
url = resp["QueueUrl"]
arn = sqs_client.get_queue_attributes(
QueueUrl=url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
return {"url": url, "arn": arn}
def package_lambda(handler_code: str) -> bytes:
"""Package a Python handler string into a ZIP archive."""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("handler.py", handler_code)
return buffer.getvalue()Testing SNS → SQS Fan-Out
# tests/integration/test_fanout.py
import json
import time
import pytest
@pytest.fixture(scope="module")
def fanout_setup(sqs, sns, run_id):
"""Create topic + 2 queues + subscriptions for fan-out tests."""
topic_arn = sns.create_topic(Name=f"orders-{run_id}")["TopicArn"]
queue_a = create_queue(sqs, f"fulfillment-{run_id}")
queue_b = create_queue(sqs, f"analytics-{run_id}")
sub_a = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_a["arn"])
sub_b = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_b["arn"])
yield {
"topic_arn": topic_arn,
"queue_a": queue_a,
"queue_b": queue_b,
"sub_a_arn": sub_a["SubscriptionArn"],
"sub_b_arn": sub_b["SubscriptionArn"],
}
# Teardown
sqs.delete_queue(QueueUrl=queue_a["url"])
sqs.delete_queue(QueueUrl=queue_b["url"])
sns.delete_topic(TopicArn=topic_arn)
def test_publish_reaches_both_queues(sqs, sns, fanout_setup):
setup = fanout_setup
sns.publish(
TopicArn=setup["topic_arn"],
Message=json.dumps({"order_id": "ord-fan-001", "status": "shipped"}),
)
time.sleep(0.5)
for label, queue in [("fulfillment", setup["queue_a"]), ("analytics", setup["queue_b"])]:
resp = sqs.receive_message(
QueueUrl=queue["url"],
MaxNumberOfMessages=1,
WaitTimeSeconds=3,
)
assert "Messages" in resp, f"{label} queue did not receive message"
envelope = json.loads(resp["Messages"][0]["Body"])
payload = json.loads(envelope["Message"])
assert payload["order_id"] == "ord-fan-001"
sqs.delete_message(
QueueUrl=queue["url"],
ReceiptHandle=resp["Messages"][0]["ReceiptHandle"],
)
def test_filter_policy_routes_selectively(sqs, sns, fanout_setup, run_id):
"""Add a filtered subscription and verify only matching messages arrive."""
shipped_queue = create_queue(sqs, f"shipped-only-{run_id}")
sns.subscribe(
TopicArn=fanout_setup["topic_arn"],
Protocol="sqs",
Endpoint=shipped_queue["arn"],
Attributes={
"FilterPolicy": json.dumps({"status": ["shipped"]}),
},
)
# Publish shipped
sns.publish(
TopicArn=fanout_setup["topic_arn"],
Message=json.dumps({"order_id": "shipped-001"}),
MessageAttributes={"status": {"DataType": "String", "StringValue": "shipped"}},
)
# Publish cancelled (should NOT reach shipped_queue)
sns.publish(
TopicArn=fanout_setup["topic_arn"],
Message=json.dumps({"order_id": "cancelled-001"}),
MessageAttributes={"status": {"DataType": "String", "StringValue": "cancelled"}},
)
time.sleep(0.5)
msgs = sqs.receive_message(
QueueUrl=shipped_queue["url"], MaxNumberOfMessages=10, WaitTimeSeconds=2
)
bodies = [json.loads(json.loads(m["Body"])["Message"]) for m in msgs.get("Messages", [])]
order_ids = [b["order_id"] for b in bodies]
assert "shipped-001" in order_ids
assert "cancelled-001" not in order_ids
sqs.delete_queue(QueueUrl=shipped_queue["url"])Testing SQS → Lambda Integration
Deploy a Lambda handler to LocalStack and verify it processes messages from an SQS event source mapping.
# tests/integration/test_lambda.py
import json
import time
import pytest
from conftest import package_lambda, create_queue, AWS_CREDS
import boto3
LAMBDA_HANDLER_CODE = """
import json
import boto3
import os
def handler(event, context):
sqs = boto3.client(
"sqs",
region_name="us-east-1",
endpoint_url=os.environ.get("SQS_ENDPOINT", "http://localstack:4566"),
aws_access_key_id="test",
aws_secret_access_key="test",
)
output_url = os.environ["OUTPUT_QUEUE_URL"]
for record in event["Records"]:
body = json.loads(record["body"])
body["processed"] = True
sqs.send_message(QueueUrl=output_url, MessageBody=json.dumps(body))
return {"statusCode": 200}
"""
@pytest.fixture(scope="module")
def lambda_setup(sqs, lambda_client, iam, run_id):
input_queue = create_queue(sqs, f"input-{run_id}")
output_queue = create_queue(sqs, f"output-{run_id}")
# Create IAM role for Lambda
assume_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = iam.create_role(
RoleName=f"lambda-role-{run_id}",
AssumeRolePolicyDocument=assume_policy,
)
role_arn = role["Role"]["Arn"]
# Deploy Lambda
zip_bytes = package_lambda(LAMBDA_HANDLER_CODE)
fn = lambda_client.create_function(
FunctionName=f"order-processor-{run_id}",
Runtime="python3.12",
Role=role_arn,
Handler="handler.handler",
Code={"ZipFile": zip_bytes},
Environment={
"Variables": {
"OUTPUT_QUEUE_URL": output_queue["url"],
"SQS_ENDPOINT": "http://localstack:4566",
}
},
Timeout=30,
)
# Wait for function to be active
waiter = lambda_client.get_waiter("function_active")
waiter.wait(FunctionName=fn["FunctionName"])
# Create event source mapping (SQS → Lambda)
mapping = lambda_client.create_event_source_mapping(
EventSourceArn=input_queue["arn"],
FunctionName=fn["FunctionName"],
BatchSize=1,
Enabled=True,
)
time.sleep(2) # Allow mapping to activate
yield {
"input_queue": input_queue,
"output_queue": output_queue,
"function_name": fn["FunctionName"],
}
# Teardown
lambda_client.delete_event_source_mapping(UUID=mapping["UUID"])
lambda_client.delete_function(FunctionName=fn["FunctionName"])
sqs.delete_queue(QueueUrl=input_queue["url"])
sqs.delete_queue(QueueUrl=output_queue["url"])
def test_lambda_processes_sqs_message(sqs, lambda_setup):
setup = lambda_setup
sqs.send_message(
QueueUrl=setup["input_queue"]["url"],
MessageBody=json.dumps({"order_id": "ord-lambda-001"}),
)
# Wait for Lambda to process and write to output queue
deadline = time.time() + 15
output_msg = None
while time.time() < deadline:
resp = sqs.receive_message(
QueueUrl=setup["output_queue"]["url"],
MaxNumberOfMessages=1,
WaitTimeSeconds=2,
)
if "Messages" in resp:
output_msg = json.loads(resp["Messages"][0]["Body"])
break
assert output_msg is not None, "Lambda did not produce output within timeout"
assert output_msg["order_id"] == "ord-lambda-001"
assert output_msg["processed"] is TrueCI Configuration
# .github/workflows/integration-tests.yml
name: Integration Tests
on: [push, pull_request]
jobs:
integration:
runs-on: ubuntu-latest
services:
localstack:
image: localstack/localstack:3
ports:
- 4566:4566
env:
SERVICES: sqs,sns,lambda,dynamodb
options: >-
--health-cmd "curl -f http://localhost:4566/_localstack/health"
--health-interval 5s
--health-timeout 3s
--health-retries 10
--health-start-period 10s
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install -r requirements-test.txt
- name: Run integration tests
run: pytest tests/integration/ -v --tb=short
env:
LOCALSTACK_ENDPOINT: http://localhost:4566Performance Tips
Use session-scoped fixtures for infrastructure. Creating queues and topics takes time. Scope fixtures to session or module and purge between tests rather than recreating.
Parallelize with pytest-xdist carefully. Parallel test workers sharing LocalStack can conflict on resource names. Use run_id derived from worker_id:
@pytest.fixture(scope="session")
def run_id(worker_id):
return worker_id if worker_id != "master" else "main"Skip integration tests in unit test runs. Mark integration tests and use pytest -m "not integration" for fast local feedback:
pytestmark = pytest.mark.integrationSummary
LocalStack enables true integration tests for AWS SQS + SNS + Lambda flows without hitting real AWS. Use Docker Compose to manage LocalStack lifecycle, session-scoped pytest fixtures for shared infrastructure, and unique resource names to avoid parallel test conflicts. Test fan-out patterns, filter policies, Lambda event source mappings, and cross-service pipelines in isolation before deploying to production.