Rust SQLx Async Database Testing with Test Transactions and Testcontainers

Rust SQLx Async Database Testing with Test Transactions and Testcontainers

Database tests are the ones developers skip first. They're slow, they need real infrastructure, they leave state behind that breaks subsequent runs, and they're painful to set up in CI. SQLx and the Rust testing ecosystem have largely solved these problems — #[sqlx::test] auto-migrates and auto-rolls-back, testcontainers-rs spins up fresh PostgreSQL containers, and async test parallelism keeps the suite fast. This post covers all three approaches and the patterns that make database tests reliable.

The Problem with Database Tests

Traditional database tests fail for predictable reasons:

  • Shared state: one test inserts a row that another test's SELECT * picks up, causing ordering-dependent failures
  • Missing setup: tests assume a schema that hasn't been migrated in CI
  • No cleanup: tests leave data that accumulates across runs
  • Slow startup: every test suite waits for a database connection pool to initialize

SQLx's #[sqlx::test] attribute addresses all of these by creating a fresh database per test, running your migrations, and wrapping each test in a transaction that rolls back on completion. Let's see how it works.

Project Setup

[dependencies]
sqlx = { version = "0.7", features = [
    "runtime-tokio-rustls",
    "postgres",
    "macros",
    "migrate",
    "uuid",
    "chrono",
] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
uuid = { version = "1", features = ["v4", "serde"] }
serde = { version = "1", features = ["derive"] }

[dev-dependencies]
testcontainers = "0.15"
testcontainers-modules = { version = "0.3", features = ["postgres"] }

Migrations live in migrations/:

-- migrations/0001_create_users.sql
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email TEXT NOT NULL UNIQUE,
    name TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- migrations/0002_create_posts.sql
CREATE TABLE posts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    title TEXT NOT NULL,
    body TEXT NOT NULL,
    published BOOLEAN NOT NULL DEFAULT false,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX posts_user_id_idx ON posts(user_id);
CREATE INDEX posts_published_idx ON posts(published) WHERE published = true;

The #[sqlx::test] Macro

The #[sqlx::test] attribute is the fastest way to get isolated, properly-migrated database tests. It requires a DATABASE_URL environment variable pointing to a PostgreSQL instance with permission to create databases:

// src/repository.rs
use sqlx::PgPool;
use uuid::Uuid;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct User {
    pub id: Uuid,
    pub email: String,
    pub name: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Post {
    pub id: Uuid,
    pub user_id: Uuid,
    pub title: String,
    pub body: String,
    pub published: bool,
}

pub async fn create_user(pool: &PgPool, email: &str, name: &str) -> sqlx::Result<User> {
    sqlx::query_as!(
        User,
        "INSERT INTO users (email, name) VALUES ($1, $2) RETURNING id, email, name",
        email,
        name
    )
    .fetch_one(pool)
    .await
}

pub async fn find_user_by_email(pool: &PgPool, email: &str) -> sqlx::Result<Option<User>> {
    sqlx::query_as!(
        User,
        "SELECT id, email, name FROM users WHERE email = $1",
        email
    )
    .fetch_optional(pool)
    .await
}

pub async fn create_post(
    pool: &PgPool,
    user_id: Uuid,
    title: &str,
    body: &str,
) -> sqlx::Result<Post> {
    sqlx::query_as!(
        Post,
        r#"
        INSERT INTO posts (user_id, title, body)
        VALUES ($1, $2, $3)
        RETURNING id, user_id, title, body, published
        "#,
        user_id,
        title,
        body
    )
    .fetch_one(pool)
    .await
}

pub async fn list_published_posts(pool: &PgPool, user_id: Uuid) -> sqlx::Result<Vec<Post>> {
    sqlx::query_as!(
        Post,
        r#"
        SELECT id, user_id, title, body, published
        FROM posts
        WHERE user_id = $1 AND published = true
        ORDER BY created_at DESC
        "#,
        user_id
    )
    .fetch_all(pool)
    .await
}

pub async fn publish_post(pool: &PgPool, post_id: Uuid) -> sqlx::Result<Option<Post>> {
    sqlx::query_as!(
        Post,
        r#"
        UPDATE posts SET published = true
        WHERE id = $1
        RETURNING id, user_id, title, body, published
        "#,
        post_id
    )
    .fetch_optional(pool)
    .await
}

Now the tests — each one receives a PgPool connected to its own isolated database:

#[cfg(test)]
mod tests {
    use super::*;
    use sqlx::PgPool;

    #[sqlx::test(migrations = "migrations")]
    async fn test_create_user_returns_user_with_id(pool: PgPool) {
        let user = create_user(&pool, "alice@example.com", "Alice")
            .await
            .expect("create_user should succeed");

        assert_eq!(user.email, "alice@example.com");
        assert_eq!(user.name, "Alice");
        // id is generated server-side — just verify it's a valid UUID
        assert!(!user.id.is_nil());
    }

    #[sqlx::test(migrations = "migrations")]
    async fn test_find_user_by_email_returns_none_for_missing(pool: PgPool) {
        let result = find_user_by_email(&pool, "nobody@example.com")
            .await
            .expect("query should not fail");

        assert!(result.is_none());
    }

    #[sqlx::test(migrations = "migrations")]
    async fn test_duplicate_email_returns_error(pool: PgPool) {
        create_user(&pool, "bob@example.com", "Bob")
            .await
            .expect("first insert should succeed");

        let result = create_user(&pool, "bob@example.com", "Bob Duplicate").await;

        assert!(result.is_err());
        let err = result.unwrap_err();
        let db_err = err.as_database_error().unwrap();
        // PostgreSQL unique violation = error code 23505
        assert_eq!(db_err.code().unwrap(), "23505");
    }
}

Transaction Isolation in Tests

For tests that need to verify transactional behavior — like ensuring a multi-step operation is atomic — use sqlx::Transaction directly:

pub async fn transfer_post_ownership(
    pool: &PgPool,
    post_id: Uuid,
    new_owner_id: Uuid,
) -> sqlx::Result<()> {
    let mut tx = pool.begin().await?;

    // Verify the new owner exists
    let owner_exists: bool = sqlx::query_scalar!(
        "SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)",
        new_owner_id
    )
    .fetch_one(&mut *tx)
    .await?
    .unwrap_or(false);

    if !owner_exists {
        tx.rollback().await?;
        return Err(sqlx::Error::RowNotFound);
    }

    sqlx::query!(
        "UPDATE posts SET user_id = $1 WHERE id = $2",
        new_owner_id,
        post_id
    )
    .execute(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(())
}
#[sqlx::test(migrations = "migrations")]
async fn test_transfer_post_ownership_succeeds(pool: PgPool) {
    let alice = create_user(&pool, "alice@example.com", "Alice").await.unwrap();
    let bob = create_user(&pool, "bob@example.com", "Bob").await.unwrap();
    let post = create_post(&pool, alice.id, "Alice's Post", "content").await.unwrap();

    transfer_post_ownership(&pool, post.id, bob.id)
        .await
        .expect("transfer should succeed");

    // Verify the post now belongs to Bob
    let updated: Post = sqlx::query_as!(
        Post,
        "SELECT id, user_id, title, body, published FROM posts WHERE id = $1",
        post.id
    )
    .fetch_one(&pool)
    .await
    .unwrap();

    assert_eq!(updated.user_id, bob.id);
}

#[sqlx::test(migrations = "migrations")]
async fn test_transfer_to_nonexistent_user_rolls_back(pool: PgPool) {
    let alice = create_user(&pool, "alice@example.com", "Alice").await.unwrap();
    let post = create_post(&pool, alice.id, "Alice's Post", "content").await.unwrap();
    let fake_user_id = Uuid::new_v4();

    let result = transfer_post_ownership(&pool, post.id, fake_user_id).await;
    assert!(result.is_err());

    // Verify the post still belongs to Alice — the transaction rolled back
    let unchanged: Post = sqlx::query_as!(
        Post,
        "SELECT id, user_id, title, body, published FROM posts WHERE id = $1",
        post.id
    )
    .fetch_one(&pool)
    .await
    .unwrap();

    assert_eq!(unchanged.user_id, alice.id);
}

Using testcontainers-rs for a Disposable PostgreSQL Instance

When you don't have a PostgreSQL instance available (fresh CI environments, local development without a running database), testcontainers-rs spins up a container on demand:

use testcontainers::{clients::Cli, Container};
use testcontainers_modules::postgres::Postgres;
use sqlx::{PgPool, postgres::PgPoolOptions};

async fn setup_test_db() -> (PgPool, Container<'static, Postgres>) {
    let docker = Cli::default();
    let container = docker.run(Postgres::default().with_db_name("testdb"));

    let port = container.get_host_port_ipv4(5432);
    let database_url = format!(
        "postgres://postgres:postgres@localhost:{}/testdb",
        port
    );

    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect(&database_url)
        .await
        .expect("failed to connect to test database");

    // Run migrations
    sqlx::migrate!("./migrations")
        .run(&pool)
        .await
        .expect("migrations failed");

    (pool, container)
}

#[tokio::test]
async fn test_with_testcontainers_postgres() {
    let (pool, _container) = setup_test_db().await;

    let user = create_user(&pool, "test@example.com", "Test User")
        .await
        .expect("create should succeed");

    assert_eq!(user.email, "test@example.com");

    // _container is dropped here, stopping the Docker container
}

Note: _container must stay in scope for the duration of the test. Binding it to _ (without the name) would drop it immediately, stopping the container before the test runs.

Testing Raw Query Correctness

sqlx::query! performs compile-time verification against your actual schema. In tests, you can verify query behavior beyond what the macro checks:

#[sqlx::test(migrations = "migrations")]
async fn test_published_posts_index_used_correctly(pool: PgPool) {
    let user = create_user(&pool, "writer@example.com", "Writer").await.unwrap();

    // Create mixed published/unpublished posts
    let published = create_post(&pool, user.id, "Published Post", "body").await.unwrap();
    let _draft = create_post(&pool, user.id, "Draft Post", "body").await.unwrap();

    // Publish only the first
    publish_post(&pool, published.id).await.unwrap();

    // Should return only the published post
    let posts = list_published_posts(&pool, user.id).await.unwrap();
    assert_eq!(posts.len(), 1);
    assert_eq!(posts[0].id, published.id);
    assert!(posts[0].published);
}

#[sqlx::test(migrations = "migrations")]
async fn test_cascade_delete_removes_posts(pool: PgPool) {
    let user = create_user(&pool, "doomed@example.com", "Doomed User").await.unwrap();
    let _post1 = create_post(&pool, user.id, "Post 1", "body").await.unwrap();
    let _post2 = create_post(&pool, user.id, "Post 2", "body").await.unwrap();

    // Delete the user
    sqlx::query!("DELETE FROM users WHERE id = $1", user.id)
        .execute(&pool)
        .await
        .unwrap();

    // Posts should be gone due to CASCADE
    let remaining: Vec<Post> = sqlx::query_as!(
        Post,
        "SELECT id, user_id, title, body, published FROM posts WHERE user_id = $1",
        user.id
    )
    .fetch_all(&pool)
    .await
    .unwrap();

    assert!(remaining.is_empty(), "cascade delete should have removed all posts");
}

Testing Migrations with sqlx::migrate!

Test that your migrations apply cleanly and are idempotent:

#[tokio::test]
async fn migrations_apply_without_errors() {
    // Use a testcontainers instance for a clean slate
    let docker = testcontainers::clients::Cli::default();
    let container = docker.run(testcontainers_modules::postgres::Postgres::default());
    let port = container.get_host_port_ipv4(5432);

    let pool = PgPoolOptions::new()
        .connect(&format!("postgres://postgres:postgres@localhost:{}/postgres", port))
        .await
        .unwrap();

    // Applying migrations on a fresh database should succeed
    sqlx::migrate!("./migrations")
        .run(&pool)
        .await
        .expect("initial migration should succeed");

    // Applying migrations again should be idempotent (already applied)
    sqlx::migrate!("./migrations")
        .run(&pool)
        .await
        .expect("re-applying migrations should be idempotent");
}

#[tokio::test]
async fn schema_matches_expected_structure() {
    let docker = testcontainers::clients::Cli::default();
    let container = docker.run(testcontainers_modules::postgres::Postgres::default());
    let port = container.get_host_port_ipv4(5432);

    let pool = PgPoolOptions::new()
        .connect(&format!("postgres://postgres:postgres@localhost:{}/postgres", port))
        .await
        .unwrap();

    sqlx::migrate!("./migrations").run(&pool).await.unwrap();

    // Verify expected tables exist
    let tables: Vec<String> = sqlx::query_scalar!(
        "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
    )
    .fetch_all(&pool)
    .await
    .unwrap();

    assert!(tables.contains(&"users".to_string()));
    assert!(tables.contains(&"posts".to_string()));
}

Parallel Test Safety

#[sqlx::test] creates a separate database per test (naming them with a timestamp suffix), making parallel execution safe by default. However, tests that share a database connection pool need careful isolation:

// Each #[sqlx::test] gets its own pool — no shared state between tests
#[sqlx::test(migrations = "migrations")]
async fn test_a_creates_user(pool: PgPool) {
    let user = create_user(&pool, "a@example.com", "A").await.unwrap();
    assert_eq!(user.email, "a@example.com");
    // Pool is dropped, database is deleted
}

#[sqlx::test(migrations = "migrations")]
async fn test_b_also_creates_user(pool: PgPool) {
    // Different pool, different database — no interference from test_a
    let user = create_user(&pool, "a@example.com", "A").await.unwrap();
    assert_eq!(user.email, "a@example.com");
}

For testcontainers-based tests running in parallel, each test must start its own container or use a shared container fixture:

use std::sync::OnceLock;

static POOL: OnceLock<PgPool> = OnceLock::new();

async fn get_shared_pool() -> &'static PgPool {
    // Note: In practice, use a per-test transaction + rollback for isolation
    // This pattern is for read-only integration tests only
    POOL.get_or_init(|| {
        tokio::runtime::Handle::current().block_on(async {
            let (pool, _container) = setup_test_db().await;
            pool
        })
    })
}

Continuous Testing with HelpMeTest

Database tests are exactly the kind of tests that need continuous execution — schema migrations break things in non-obvious ways, query performance degrades as data grows, and concurrency bugs only appear under parallel test runs. Running them only locally means catching these regressions only when a developer happens to run the full suite.

HelpMeTest integrates with Rust CI pipelines to run your SQLx test suite continuously, track test history, detect flaky database tests, and alert when a previously-passing migration test starts failing. For teams using PostgreSQL with SQLx, that means catching a broken migration or a query regression in your pipeline — before it reaches a production database.

The combination of #[sqlx::test] for fast isolated tests, testcontainers-rs for CI environments without a running database, and continuous execution via HelpMeTest covers the full lifecycle of database testing in production Rust applications.

Read more