Rust Async Test Patterns: Testing Futures, Tokio Tasks, and Concurrent Code
Testing async Rust code requires an async runtime. Tokio provides the #[tokio::test] attribute macro that wraps your test in an async executor. This post covers patterns for testing futures, concurrent tasks, timeouts, and async traits.
Basic Async Tests
Use #[tokio::test] instead of #[test] for async test functions:
#[tokio::test]
async fn fetches_user_by_id() {
let service = UserService::new();
let user = service.get_user(1).await.unwrap();
assert_eq!(user.id, 1);
assert!(!user.name.is_empty());
}
#[tokio::test]
async fn returns_error_on_invalid_id() {
let service = UserService::new();
let result = service.get_user(0).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), "invalid user id");
}Add to Cargo.toml:
[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }Multi-Thread vs Current-Thread Runtime
By default, #[tokio::test] uses the current-thread scheduler. For tests that exercise multi-threaded behavior:
// Current-thread (default) — deterministic, simpler
#[tokio::test]
async fn single_threaded_test() { /* ... */ }
// Multi-thread — tests true parallelism
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multi_threaded_test() {
// Tasks run on a real thread pool
let handles: Vec<_> = (0..100)
.map(|i| tokio::spawn(async move { i * 2 }))
.collect();
let results: Vec<_> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(results.len(), 100);
}Testing Concurrent Tasks
use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
use tokio::task::JoinSet;
#[tokio::test]
async fn concurrent_increments_are_safe() {
let counter = Arc::new(AtomicU64::new(0));
let mut set = JoinSet::new();
for _ in 0..1000 {
let c = Arc::clone(&counter);
set.spawn(async move {
c.fetch_add(1, Ordering::SeqCst);
});
}
while let Some(result) = set.join_next().await {
result.unwrap(); // propagate panics from tasks
}
assert_eq!(counter.load(Ordering::SeqCst), 1000);
}Testing Timeout Behavior
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn operation_completes_within_deadline() {
let result = timeout(
Duration::from_millis(100),
some_async_operation(),
)
.await;
assert!(result.is_ok(), "operation timed out");
}
#[tokio::test]
async fn slow_operation_times_out() {
let result = timeout(
Duration::from_millis(10),
tokio::time::sleep(Duration::from_secs(1)),
)
.await;
assert!(result.is_err(), "expected timeout");
}Deterministic Time with tokio-test
tokio::time::pause() freezes the clock and lets you advance it manually — no real waiting in tests:
use tokio::time::{self, Duration, Instant};
#[tokio::test]
async fn cache_expires_after_ttl() {
time::pause(); // freeze time
let cache = Cache::with_ttl(Duration::from_secs(60));
cache.insert("key", "value");
assert_eq!(cache.get("key"), Some("value"));
time::advance(Duration::from_secs(61)).await; // jump forward
assert_eq!(cache.get("key"), None, "entry should have expired");
}Testing Channels
use tokio::sync::{mpsc, oneshot};
#[tokio::test]
async fn producer_sends_all_messages() {
let (tx, mut rx) = mpsc::channel(10);
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});
let mut received = vec![];
while let Some(msg) = rx.recv().await {
received.push(msg);
}
assert_eq!(received, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn oneshot_channel_sends_response() {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
tx.send("pong".to_string()).unwrap();
});
let response = rx.await.unwrap();
assert_eq!(response, "pong");
}Mocking Async Traits with mockall
# Cargo.toml
[dev-dependencies]
mockall = "0.13"use mockall::{automock, predicate::*};
#[automock]
#[async_trait::async_trait]
pub trait UserRepository {
async fn find_by_id(&self, id: u64) -> Result<User, Error>;
async fn save(&self, user: &User) -> Result<(), Error>;
}
#[tokio::test]
async fn service_returns_user_from_repository() {
let mut mock = MockUserRepository::new();
mock.expect_find_by_id()
.with(eq(42u64))
.times(1)
.returning(|_| Ok(User { id: 42, name: "Alice".into() }));
let service = UserService::new(mock);
let user = service.get(42).await.unwrap();
assert_eq!(user.id, 42);
assert_eq!(user.name, "Alice");
}Testing Error Propagation
#[tokio::test]
async fn error_propagates_through_chain() {
let result: Result<String, _> = async {
let data = fetch_data().await?; // could fail
let parsed = parse_data(&data).await?; // could fail
Ok(parsed.to_uppercase())
}
.await;
match result {
Err(e) => assert!(e.to_string().contains("expected error text")),
Ok(_) => panic!("expected error but got success"),
}
}Testing Broadcast and Watch Channels
use tokio::sync::watch;
#[tokio::test]
async fn watch_channel_delivers_latest_value() {
let (tx, mut rx) = watch::channel("initial");
tx.send("updated").unwrap();
rx.changed().await.unwrap();
assert_eq!(*rx.borrow(), "updated");
}
#[tokio::test]
async fn multiple_receivers_see_update() {
let (tx, rx1) = watch::channel(0i32);
let mut rx2 = rx1.clone();
tx.send(42).unwrap();
assert_eq!(*rx1.borrow(), 42);
assert_eq!(*rx2.borrow(), 42);
}Using tokio_test::block_on in Sync Contexts
When writing tests in a sync test context that needs to call async code:
#[test]
fn sync_test_calling_async_code() {
let result = tokio_test::block_on(async {
fetch_data().await
});
assert!(result.is_ok());
}Key Takeaways
#[tokio::test]is the standard macro for async tests; addflavor = "multi_thread"when testing real parallelism- Use
JoinSetto spawn and collect results from concurrent tasks tokio::time::pause()+time::advance()eliminates real waiting in time-dependent testsmockallwith#[automock]provides ergonomic async trait mocking- Always propagate task panics with
.unwrap()onJoinHandleresults — otherwise test failures silently disappear