Rust Async Test Patterns: Testing Futures, Tokio Tasks, and Concurrent Code

Rust Async Test Patterns: Testing Futures, Tokio Tasks, and Concurrent Code

Testing async Rust code requires an async runtime. Tokio provides the #[tokio::test] attribute macro that wraps your test in an async executor. This post covers patterns for testing futures, concurrent tasks, timeouts, and async traits.

Basic Async Tests

Use #[tokio::test] instead of #[test] for async test functions:

#[tokio::test]
async fn fetches_user_by_id() {
    let service = UserService::new();
    let user = service.get_user(1).await.unwrap();
    assert_eq!(user.id, 1);
    assert!(!user.name.is_empty());
}

#[tokio::test]
async fn returns_error_on_invalid_id() {
    let service = UserService::new();
    let result = service.get_user(0).await;
    assert!(result.is_err());
    assert_eq!(result.unwrap_err().to_string(), "invalid user id");
}

Add to Cargo.toml:

[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }

Multi-Thread vs Current-Thread Runtime

By default, #[tokio::test] uses the current-thread scheduler. For tests that exercise multi-threaded behavior:

// Current-thread (default) — deterministic, simpler
#[tokio::test]
async fn single_threaded_test() { /* ... */ }

// Multi-thread — tests true parallelism
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multi_threaded_test() {
    // Tasks run on a real thread pool
    let handles: Vec<_> = (0..100)
        .map(|i| tokio::spawn(async move { i * 2 }))
        .collect();

    let results: Vec<_> = futures::future::join_all(handles)
        .await
        .into_iter()
        .map(|r| r.unwrap())
        .collect();

    assert_eq!(results.len(), 100);
}

Testing Concurrent Tasks

use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
use tokio::task::JoinSet;

#[tokio::test]
async fn concurrent_increments_are_safe() {
    let counter = Arc::new(AtomicU64::new(0));
    let mut set = JoinSet::new();

    for _ in 0..1000 {
        let c = Arc::clone(&counter);
        set.spawn(async move {
            c.fetch_add(1, Ordering::SeqCst);
        });
    }

    while let Some(result) = set.join_next().await {
        result.unwrap(); // propagate panics from tasks
    }

    assert_eq!(counter.load(Ordering::SeqCst), 1000);
}

Testing Timeout Behavior

use tokio::time::{timeout, Duration};

#[tokio::test]
async fn operation_completes_within_deadline() {
    let result = timeout(
        Duration::from_millis(100),
        some_async_operation(),
    )
    .await;

    assert!(result.is_ok(), "operation timed out");
}

#[tokio::test]
async fn slow_operation_times_out() {
    let result = timeout(
        Duration::from_millis(10),
        tokio::time::sleep(Duration::from_secs(1)),
    )
    .await;

    assert!(result.is_err(), "expected timeout");
}

Deterministic Time with tokio-test

tokio::time::pause() freezes the clock and lets you advance it manually — no real waiting in tests:

use tokio::time::{self, Duration, Instant};

#[tokio::test]
async fn cache_expires_after_ttl() {
    time::pause(); // freeze time

    let cache = Cache::with_ttl(Duration::from_secs(60));
    cache.insert("key", "value");

    assert_eq!(cache.get("key"), Some("value"));

    time::advance(Duration::from_secs(61)).await; // jump forward

    assert_eq!(cache.get("key"), None, "entry should have expired");
}

Testing Channels

use tokio::sync::{mpsc, oneshot};

#[tokio::test]
async fn producer_sends_all_messages() {
    let (tx, mut rx) = mpsc::channel(10);

    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.unwrap();
        }
    });

    let mut received = vec![];
    while let Some(msg) = rx.recv().await {
        received.push(msg);
    }

    assert_eq!(received, vec![0, 1, 2, 3, 4]);
}

#[tokio::test]
async fn oneshot_channel_sends_response() {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        tx.send("pong".to_string()).unwrap();
    });

    let response = rx.await.unwrap();
    assert_eq!(response, "pong");
}

Mocking Async Traits with mockall

# Cargo.toml
[dev-dependencies]
mockall = "0.13"
use mockall::{automock, predicate::*};

#[automock]
#[async_trait::async_trait]
pub trait UserRepository {
    async fn find_by_id(&self, id: u64) -> Result<User, Error>;
    async fn save(&self, user: &User) -> Result<(), Error>;
}

#[tokio::test]
async fn service_returns_user_from_repository() {
    let mut mock = MockUserRepository::new();

    mock.expect_find_by_id()
        .with(eq(42u64))
        .times(1)
        .returning(|_| Ok(User { id: 42, name: "Alice".into() }));

    let service = UserService::new(mock);
    let user = service.get(42).await.unwrap();

    assert_eq!(user.id, 42);
    assert_eq!(user.name, "Alice");
}

Testing Error Propagation

#[tokio::test]
async fn error_propagates_through_chain() {
    let result: Result<String, _> = async {
        let data = fetch_data().await?;          // could fail
        let parsed = parse_data(&data).await?;   // could fail
        Ok(parsed.to_uppercase())
    }
    .await;

    match result {
        Err(e) => assert!(e.to_string().contains("expected error text")),
        Ok(_) => panic!("expected error but got success"),
    }
}

Testing Broadcast and Watch Channels

use tokio::sync::watch;

#[tokio::test]
async fn watch_channel_delivers_latest_value() {
    let (tx, mut rx) = watch::channel("initial");

    tx.send("updated").unwrap();

    rx.changed().await.unwrap();
    assert_eq!(*rx.borrow(), "updated");
}

#[tokio::test]
async fn multiple_receivers_see_update() {
    let (tx, rx1) = watch::channel(0i32);
    let mut rx2 = rx1.clone();

    tx.send(42).unwrap();

    assert_eq!(*rx1.borrow(), 42);
    assert_eq!(*rx2.borrow(), 42);
}

Using tokio_test::block_on in Sync Contexts

When writing tests in a sync test context that needs to call async code:

#[test]
fn sync_test_calling_async_code() {
    let result = tokio_test::block_on(async {
        fetch_data().await
    });
    assert!(result.is_ok());
}

Key Takeaways

  • #[tokio::test] is the standard macro for async tests; add flavor = "multi_thread" when testing real parallelism
  • Use JoinSet to spawn and collect results from concurrent tasks
  • tokio::time::pause() + time::advance() eliminates real waiting in time-dependent tests
  • mockall with #[automock] provides ergonomic async trait mocking
  • Always propagate task panics with .unwrap() on JoinHandle results — otherwise test failures silently disappear

Read more