LocalStack Integration Testing: Full SQS + SNS + Lambda Stack Setup

LocalStack Integration Testing: Full SQS + SNS + Lambda Stack Setup

moto is great for unit tests but stops at service boundaries. When you need SQS triggering Lambda, SNS fan-out to multiple SQS queues, or Lambda updating DynamoDB — you need LocalStack. This guide sets up a full LocalStack integration test environment with Docker Compose, pytest fixtures, and reusable infrastructure helpers.

Key Takeaways

LocalStack is a real HTTP API — use the same boto3 code, different endpoint. Point endpoint_url to http://localhost:4566 and LocalStack handles the rest. Your application code doesn't change — only the endpoint in your test configuration.

Use Docker Compose healthchecks before running tests. LocalStack takes a few seconds to initialize. Use depends_on with healthcheck conditions to avoid race conditions in CI.

Deploy Lambda as ZIP files to LocalStack. LocalStack supports Lambda ZIP and container image deployments. For integration tests, package your Lambda handler into a ZIP and deploy it via boto3 — no console needed.

Tear down resources between test modules, not between individual tests. Creating and destroying queues/topics per test is slow. Use module-scoped fixtures and purge queues between tests.

Use unique resource names per test run. Include a timestamp or UUID in resource names to prevent conflicts when tests run in parallel or when LocalStack state persists between runs.

When to Use LocalStack vs. moto

Scenario Tool
Testing producer logic moto
Testing consumer logic moto
Testing SQS triggering Lambda LocalStack
Testing SNS fan-out to multiple SQS queues LocalStack
Testing Lambda writing to DynamoDB LocalStack
Testing SQS → Lambda → SQS pipelines LocalStack
CI fast feedback moto
Pre-production smoke tests LocalStack

Project Structure

myapp/
├── handlers/
│   └── order_processor.py      # Lambda handler
├── tests/
│   ├── unit/
│   │   └── test_producer.py    # moto tests
│   └── integration/
│       ├── conftest.py         # LocalStack fixtures
│       ├── test_fanout.py      # SNS → SQS fan-out
│       └── test_lambda.py      # SQS → Lambda
├── docker-compose.localstack.yml
└── requirements-test.txt

Docker Compose Setup

# docker-compose.localstack.yml
version: "3.9"
services:
  localstack:
    image: localstack/localstack:3
    ports:
      - "4566:4566"
    environment:
      - SERVICES=sqs,sns,lambda,dynamodb,iam
      - DEBUG=0
      - LAMBDA_EXECUTOR=docker
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
      - "./localstack-init:/etc/localstack/init/ready.d"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"]
      interval: 5s
      timeout: 3s
      retries: 10
      start_period: 10s

Start LocalStack before running integration tests:

docker compose -f docker-compose.localstack.yml up -d --wait
pytest tests/integration/
docker compose -f docker-compose.localstack.yml down

Pytest Fixtures for LocalStack

# tests/integration/conftest.py
import boto3
import json
import time
import zipfile
import io
import pytest

LOCALSTACK_ENDPOINT = "http://localhost:4566"
REGION = "us-east-1"
AWS_CREDS = {
    "aws_access_key_id": "test",
    "aws_secret_access_key": "test",
    "region_name": REGION,
    "endpoint_url": LOCALSTACK_ENDPOINT,
}


@pytest.fixture(scope="session")
def sqs():
    return boto3.client("sqs", **AWS_CREDS)


@pytest.fixture(scope="session")
def sns():
    return boto3.client("sns", **AWS_CREDS)


@pytest.fixture(scope="session")
def lambda_client():
    return boto3.client("lambda", **AWS_CREDS)


@pytest.fixture(scope="session")
def iam():
    return boto3.client("iam", **AWS_CREDS)


@pytest.fixture(scope="session")
def run_id():
    """Unique suffix for all resources in this test run."""
    return str(int(time.time()))


def create_queue(sqs_client, name, **kwargs):
    resp = sqs_client.create_queue(QueueName=name, Attributes=kwargs.get("Attributes", {}))
    url = resp["QueueUrl"]
    arn = sqs_client.get_queue_attributes(
        QueueUrl=url, AttributeNames=["QueueArn"]
    )["Attributes"]["QueueArn"]
    return {"url": url, "arn": arn}


def package_lambda(handler_code: str) -> bytes:
    """Package a Python handler string into a ZIP archive."""
    buffer = io.BytesIO()
    with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("handler.py", handler_code)
    return buffer.getvalue()

Testing SNS → SQS Fan-Out

# tests/integration/test_fanout.py
import json
import time
import pytest


@pytest.fixture(scope="module")
def fanout_setup(sqs, sns, run_id):
    """Create topic + 2 queues + subscriptions for fan-out tests."""
    topic_arn = sns.create_topic(Name=f"orders-{run_id}")["TopicArn"]

    queue_a = create_queue(sqs, f"fulfillment-{run_id}")
    queue_b = create_queue(sqs, f"analytics-{run_id}")

    sub_a = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_a["arn"])
    sub_b = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_b["arn"])

    yield {
        "topic_arn": topic_arn,
        "queue_a": queue_a,
        "queue_b": queue_b,
        "sub_a_arn": sub_a["SubscriptionArn"],
        "sub_b_arn": sub_b["SubscriptionArn"],
    }

    # Teardown
    sqs.delete_queue(QueueUrl=queue_a["url"])
    sqs.delete_queue(QueueUrl=queue_b["url"])
    sns.delete_topic(TopicArn=topic_arn)


def test_publish_reaches_both_queues(sqs, sns, fanout_setup):
    setup = fanout_setup
    sns.publish(
        TopicArn=setup["topic_arn"],
        Message=json.dumps({"order_id": "ord-fan-001", "status": "shipped"}),
    )
    time.sleep(0.5)

    for label, queue in [("fulfillment", setup["queue_a"]), ("analytics", setup["queue_b"])]:
        resp = sqs.receive_message(
            QueueUrl=queue["url"],
            MaxNumberOfMessages=1,
            WaitTimeSeconds=3,
        )
        assert "Messages" in resp, f"{label} queue did not receive message"
        envelope = json.loads(resp["Messages"][0]["Body"])
        payload = json.loads(envelope["Message"])
        assert payload["order_id"] == "ord-fan-001"
        sqs.delete_message(
            QueueUrl=queue["url"],
            ReceiptHandle=resp["Messages"][0]["ReceiptHandle"],
        )


def test_filter_policy_routes_selectively(sqs, sns, fanout_setup, run_id):
    """Add a filtered subscription and verify only matching messages arrive."""
    shipped_queue = create_queue(sqs, f"shipped-only-{run_id}")

    sns.subscribe(
        TopicArn=fanout_setup["topic_arn"],
        Protocol="sqs",
        Endpoint=shipped_queue["arn"],
        Attributes={
            "FilterPolicy": json.dumps({"status": ["shipped"]}),
        },
    )

    # Publish shipped
    sns.publish(
        TopicArn=fanout_setup["topic_arn"],
        Message=json.dumps({"order_id": "shipped-001"}),
        MessageAttributes={"status": {"DataType": "String", "StringValue": "shipped"}},
    )
    # Publish cancelled (should NOT reach shipped_queue)
    sns.publish(
        TopicArn=fanout_setup["topic_arn"],
        Message=json.dumps({"order_id": "cancelled-001"}),
        MessageAttributes={"status": {"DataType": "String", "StringValue": "cancelled"}},
    )

    time.sleep(0.5)

    msgs = sqs.receive_message(
        QueueUrl=shipped_queue["url"], MaxNumberOfMessages=10, WaitTimeSeconds=2
    )
    bodies = [json.loads(json.loads(m["Body"])["Message"]) for m in msgs.get("Messages", [])]
    order_ids = [b["order_id"] for b in bodies]

    assert "shipped-001" in order_ids
    assert "cancelled-001" not in order_ids

    sqs.delete_queue(QueueUrl=shipped_queue["url"])

Testing SQS → Lambda Integration

Deploy a Lambda handler to LocalStack and verify it processes messages from an SQS event source mapping.

# tests/integration/test_lambda.py
import json
import time
import pytest
from conftest import package_lambda, create_queue, AWS_CREDS
import boto3


LAMBDA_HANDLER_CODE = """
import json
import boto3
import os

def handler(event, context):
    sqs = boto3.client(
        "sqs",
        region_name="us-east-1",
        endpoint_url=os.environ.get("SQS_ENDPOINT", "http://localstack:4566"),
        aws_access_key_id="test",
        aws_secret_access_key="test",
    )
    output_url = os.environ["OUTPUT_QUEUE_URL"]
    for record in event["Records"]:
        body = json.loads(record["body"])
        body["processed"] = True
        sqs.send_message(QueueUrl=output_url, MessageBody=json.dumps(body))
    return {"statusCode": 200}
"""


@pytest.fixture(scope="module")
def lambda_setup(sqs, lambda_client, iam, run_id):
    input_queue = create_queue(sqs, f"input-{run_id}")
    output_queue = create_queue(sqs, f"output-{run_id}")

    # Create IAM role for Lambda
    assume_policy = json.dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "lambda.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }],
    })
    role = iam.create_role(
        RoleName=f"lambda-role-{run_id}",
        AssumeRolePolicyDocument=assume_policy,
    )
    role_arn = role["Role"]["Arn"]

    # Deploy Lambda
    zip_bytes = package_lambda(LAMBDA_HANDLER_CODE)
    fn = lambda_client.create_function(
        FunctionName=f"order-processor-{run_id}",
        Runtime="python3.12",
        Role=role_arn,
        Handler="handler.handler",
        Code={"ZipFile": zip_bytes},
        Environment={
            "Variables": {
                "OUTPUT_QUEUE_URL": output_queue["url"],
                "SQS_ENDPOINT": "http://localstack:4566",
            }
        },
        Timeout=30,
    )

    # Wait for function to be active
    waiter = lambda_client.get_waiter("function_active")
    waiter.wait(FunctionName=fn["FunctionName"])

    # Create event source mapping (SQS → Lambda)
    mapping = lambda_client.create_event_source_mapping(
        EventSourceArn=input_queue["arn"],
        FunctionName=fn["FunctionName"],
        BatchSize=1,
        Enabled=True,
    )

    time.sleep(2)  # Allow mapping to activate

    yield {
        "input_queue": input_queue,
        "output_queue": output_queue,
        "function_name": fn["FunctionName"],
    }

    # Teardown
    lambda_client.delete_event_source_mapping(UUID=mapping["UUID"])
    lambda_client.delete_function(FunctionName=fn["FunctionName"])
    sqs.delete_queue(QueueUrl=input_queue["url"])
    sqs.delete_queue(QueueUrl=output_queue["url"])


def test_lambda_processes_sqs_message(sqs, lambda_setup):
    setup = lambda_setup
    sqs.send_message(
        QueueUrl=setup["input_queue"]["url"],
        MessageBody=json.dumps({"order_id": "ord-lambda-001"}),
    )

    # Wait for Lambda to process and write to output queue
    deadline = time.time() + 15
    output_msg = None
    while time.time() < deadline:
        resp = sqs.receive_message(
            QueueUrl=setup["output_queue"]["url"],
            MaxNumberOfMessages=1,
            WaitTimeSeconds=2,
        )
        if "Messages" in resp:
            output_msg = json.loads(resp["Messages"][0]["Body"])
            break

    assert output_msg is not None, "Lambda did not produce output within timeout"
    assert output_msg["order_id"] == "ord-lambda-001"
    assert output_msg["processed"] is True

CI Configuration

# .github/workflows/integration-tests.yml
name: Integration Tests

on: [push, pull_request]

jobs:
  integration:
    runs-on: ubuntu-latest
    services:
      localstack:
        image: localstack/localstack:3
        ports:
          - 4566:4566
        env:
          SERVICES: sqs,sns,lambda,dynamodb
        options: >-
          --health-cmd "curl -f http://localhost:4566/_localstack/health"
          --health-interval 5s
          --health-timeout 3s
          --health-retries 10
          --health-start-period 10s

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install -r requirements-test.txt

      - name: Run integration tests
        run: pytest tests/integration/ -v --tb=short
        env:
          LOCALSTACK_ENDPOINT: http://localhost:4566

Performance Tips

Use session-scoped fixtures for infrastructure. Creating queues and topics takes time. Scope fixtures to session or module and purge between tests rather than recreating.

Parallelize with pytest-xdist carefully. Parallel test workers sharing LocalStack can conflict on resource names. Use run_id derived from worker_id:

@pytest.fixture(scope="session")
def run_id(worker_id):
    return worker_id if worker_id != "master" else "main"

Skip integration tests in unit test runs. Mark integration tests and use pytest -m "not integration" for fast local feedback:

pytestmark = pytest.mark.integration

Summary

LocalStack enables true integration tests for AWS SQS + SNS + Lambda flows without hitting real AWS. Use Docker Compose to manage LocalStack lifecycle, session-scoped pytest fixtures for shared infrastructure, and unique resource names to avoid parallel test conflicts. Test fan-out patterns, filter policies, Lambda event source mappings, and cross-service pipelines in isolation before deploying to production.

Read more