Dagger Integration Tests: Running Real Databases and Services in CI
Integration tests that hit real databases and message queues are harder to run than unit tests. In GitHub Actions, you configure service containers in YAML and hope they start before your tests run. In Dagger, you define services as code, bind them to your test container, and get the same environment locally and in CI.
This guide covers practical Dagger patterns for integration tests: PostgreSQL, Redis, Kafka, and composing multiple services together.
Why Dagger for Integration Tests
The core problem with integration tests in CI:
- Service startup races: tests start before Postgres is ready
- Port conflicts: multiple jobs fight over port 5432
- Environment drift: local Docker Compose differs from CI service containers
- Debugging: you can't reproduce the exact CI environment locally
Dagger solves all four:
# This runs identically on your laptop and in GitHub Actions
postgres = (
dag.container()
.from_("postgres:16-alpine")
.with_env_variable("POSTGRES_PASSWORD", "test")
.with_exposed_port(5432)
.as_service()
)
result = await (
dag.container()
.with_service_binding("postgres", postgres)
# Postgres is DNS-resolvable as "postgres" inside this container
.with_env_variable("DATABASE_URL", "postgresql://postgres:test@postgres:5432/testdb")
...
)No port conflicts (Dagger uses isolated networks per pipeline run). No startup races (.as_service() ensures the container is healthy before binding). Same config locally and in CI.
PostgreSQL Integration Tests
Python / pytest
import dagger
from dagger import dag, function, object_type
@object_type
class Pipeline:
@function
async def integration_test(
self,
source: dagger.Directory,
pg_password: dagger.Secret,
) -> str:
"""Run pytest integration tests with PostgreSQL."""
postgres = (
dag.container()
.from_("postgres:16-alpine")
.with_env_variable("POSTGRES_USER", "testuser")
.with_secret_variable("POSTGRES_PASSWORD", pg_password)
.with_env_variable("POSTGRES_DB", "testdb")
.with_env_variable("POSTGRES_HOST_AUTH_METHOD", "trust")
.with_exposed_port(5432)
.as_service()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("postgres", postgres)
.with_env_variable(
"DATABASE_URL",
"postgresql+asyncpg://testuser@postgres:5432/testdb"
)
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
# Run migrations before tests
.with_exec(["alembic", "upgrade", "head"])
.with_exec([
"pytest",
"tests/integration/",
"-v",
"--tb=short",
"-m", "integration",
])
.stdout()
)TypeScript / Node.js
@func()
async integrationTest(
source: Directory,
pgPassword: Secret,
): Promise<string> {
const postgres = dag
.container()
.from("postgres:16-alpine")
.withEnvVariable("POSTGRES_USER", "testuser")
.withSecretVariable("POSTGRES_PASSWORD", pgPassword)
.withEnvVariable("POSTGRES_DB", "testdb")
.withExposedPort(5432)
.asService()
return dag
.container()
.from("node:22-alpine")
.withServiceBinding("postgres", postgres)
.withEnvVariable(
"DATABASE_URL",
"postgresql://testuser@postgres:5432/testdb"
)
.withDirectory("/app", source)
.withWorkdir("/app")
.withExec(["npm", "ci"])
.withExec(["npx", "prisma", "migrate", "deploy"])
.withExec(["npm", "run", "test:integration"])
.stdout()
}Redis Integration Tests
@function
async def test_with_redis(self, source: dagger.Directory) -> str:
redis = (
dag.container()
.from_("redis:7-alpine")
.with_exposed_port(6379)
.as_service()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("redis", redis)
.with_env_variable("REDIS_URL", "redis://redis:6379/0")
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", "tests/integration/", "-k", "redis"])
.stdout()
)Multiple Services: PostgreSQL + Redis + RabbitMQ
Real applications often need multiple services. Dagger handles this cleanly:
@function
async def full_integration_test(
self,
source: dagger.Directory,
pg_password: dagger.Secret,
rabbitmq_password: dagger.Secret,
) -> str:
"""Run full integration suite with all services."""
postgres = (
dag.container()
.from_("postgres:16-alpine")
.with_env_variable("POSTGRES_USER", "testuser")
.with_secret_variable("POSTGRES_PASSWORD", pg_password)
.with_env_variable("POSTGRES_DB", "testdb")
.with_exposed_port(5432)
.as_service()
)
redis = (
dag.container()
.from_("redis:7-alpine")
.with_exposed_port(6379)
.as_service()
)
rabbitmq = (
dag.container()
.from_("rabbitmq:3-management-alpine")
.with_env_variable("RABBITMQ_DEFAULT_USER", "guest")
.with_secret_variable("RABBITMQ_DEFAULT_PASS", rabbitmq_password)
.with_exposed_port(5672)
.as_service()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("postgres", postgres)
.with_service_binding("redis", redis)
.with_service_binding("rabbitmq", rabbitmq)
.with_env_variable("DATABASE_URL", "postgresql://testuser@postgres:5432/testdb")
.with_env_variable("REDIS_URL", "redis://redis:6379/0")
.with_env_variable("AMQP_URL", "amqp://guest@rabbitmq:5672/")
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", "tests/integration/", "-v"])
.stdout()
)All three services start in parallel. Dagger waits until each is healthy before binding.
Kafka Integration Tests
@function
async def kafka_test(self, source: dagger.Directory) -> str:
# KRaft mode: no ZooKeeper needed
kafka = (
dag.container()
.from_("apache/kafka:3.7.0")
.with_env_variable("KAFKA_NODE_ID", "1")
.with_env_variable("KAFKA_PROCESS_ROLES", "broker,controller")
.with_env_variable("KAFKA_LISTENERS", "PLAINTEXT://:9092,CONTROLLER://:9093")
.with_env_variable("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:9092")
.with_env_variable("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093")
.with_exposed_port(9092)
.as_service()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("kafka", kafka)
.with_env_variable("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", "tests/integration/", "-k", "kafka", "-v"])
.stdout()
)Waiting for Service Readiness
Services sometimes need a few seconds to start. Dagger's .as_service() handles basic TCP readiness, but for application-level readiness you can add a wait step:
@function
async def test_with_migration(
self,
source: dagger.Directory,
) -> str:
postgres = (
dag.container()
.from_("postgres:16-alpine")
.with_env_variable("POSTGRES_PASSWORD", "test")
.with_env_variable("POSTGRES_DB", "testdb")
.with_exposed_port(5432)
.as_service()
)
# Wait for postgres and run migrations in a separate step
await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("postgres", postgres)
.with_exec(["apt-get", "install", "-y", "postgresql-client"])
# Retry until pg_isready succeeds
.with_exec([
"sh", "-c",
"for i in $(seq 1 30); do pg_isready -h postgres && break; sleep 1; done"
])
.stdout()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("postgres", postgres)
.with_env_variable("DATABASE_URL", "postgresql://postgres:test@postgres:5432/testdb")
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["alembic", "upgrade", "head"])
.with_exec(["pytest", "tests/integration/"])
.stdout()
)LocalStack for AWS Services
Test against real AWS APIs locally with LocalStack:
@function
async def aws_integration_test(self, source: dagger.Directory) -> str:
localstack = (
dag.container()
.from_("localstack/localstack:3.4")
.with_env_variable("SERVICES", "s3,sqs,dynamodb,secretsmanager")
.with_env_variable("DEBUG", "0")
.with_exposed_port(4566)
.as_service()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("localstack", localstack)
.with_env_variable("AWS_ENDPOINT_URL", "http://localstack:4566")
.with_env_variable("AWS_ACCESS_KEY_ID", "test")
.with_env_variable("AWS_SECRET_ACCESS_KEY", "test")
.with_env_variable("AWS_DEFAULT_REGION", "us-east-1")
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", "tests/integration/aws/", "-v"])
.stdout()
)MongoDB Integration Tests
@function
async def mongodb_test(self, source: dagger.Directory) -> str:
mongo = (
dag.container()
.from_("mongo:7-jammy")
.with_env_variable("MONGO_INITDB_ROOT_USERNAME", "testuser")
.with_env_variable("MONGO_INITDB_ROOT_PASSWORD", "testpass")
.with_exposed_port(27017)
.as_service()
)
return await (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("mongo", mongo)
.with_env_variable(
"MONGODB_URI",
"mongodb://testuser:testpass@mongo:27017/"
)
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", "tests/integration/", "-k", "mongo"])
.stdout()
)Parallel Integration Suites
Run integration tests for different modules in parallel:
import asyncio
@function
async def parallel_integration(self, source: dagger.Directory) -> str:
"""Run user, payment, and notification integration tests in parallel."""
# Share the same postgres instance
postgres = (
dag.container()
.from_("postgres:16-alpine")
.with_env_variable("POSTGRES_PASSWORD", "test")
.with_env_variable("POSTGRES_DB", "testdb")
.with_exposed_port(5432)
.as_service()
)
def make_test_container(test_path: str) -> dagger.Container:
return (
dag.container()
.from_("python:3.12-slim")
.with_service_binding("postgres", postgres)
.with_env_variable("DATABASE_URL", "postgresql://postgres:test@postgres:5432/testdb")
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", test_path, "-v"])
)
results = await asyncio.gather(
make_test_container("tests/integration/users/").stdout(),
make_test_container("tests/integration/payments/").stdout(),
make_test_container("tests/integration/notifications/").stdout(),
return_exceptions=True,
)
failed = [r for r in results if isinstance(r, Exception)]
if failed:
raise Exception(f"{len(failed)} integration suite(s) failed")
return f"All 3 integration suites passed"Connecting to External Test Databases
For persistent test data scenarios, connect to an external database:
@function
async def external_db_test(
self,
source: dagger.Directory,
db_url: dagger.Secret,
) -> str:
"""Run tests against an external (non-ephemeral) test database."""
return await (
dag.container()
.from_("python:3.12-slim")
.with_secret_variable("DATABASE_URL", db_url)
.with_directory("/src", source)
.with_workdir("/src")
.with_exec(["pip", "install", "-e", ".[dev]"])
.with_exec(["pytest", "tests/integration/", "-v", "--tb=long"])
.stdout()
)dagger call external-db-test --source . --db-url <span class="hljs-built_in">env:TEST_DATABASE_URLGitHub Actions Setup
name: Integration Tests
on:
push:
branches: [main]
pull_request:
jobs:
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dagger/dagger-for-github@v7
with:
version: "latest"
- name: Run integration tests
run: |
dagger call full-integration-test \
--source . \
--pg-password env:PG_PASSWORD \
--rabbitmq-password env:RABBITMQ_PASSWORD
env:
PG_PASSWORD: ${{ secrets.PG_PASSWORD }}
RABBITMQ_PASSWORD: ${{ secrets.RABBITMQ_PASSWORD }}
DAGGER_CLOUD_TOKEN: ${{ secrets.DAGGER_CLOUD_TOKEN }}Summary
Dagger's service binding model makes integration tests significantly easier:
- No port conflicts: each Dagger run gets an isolated network
- No startup races: services are ready before your tests run
- Local parity: the exact same containers run on your laptop and in CI
- Parallelism: start all services simultaneously, not sequentially
- Debugging: reproduce any CI failure locally with
dagger call --interactive
For integration tests specifically, the local reproducibility alone is worth the migration from GitHub Actions service containers. When tests fail at 2am, you debug in your terminal — not by pushing commits and waiting for CI.
Pair Dagger integration tests with HelpMeTest for end-to-end browser testing, and you have a complete testing pipeline that runs anywhere.