Testing Async Rust with Tokio: Unit Tests, Mocking, and Timeouts
Tokio is Rust's most popular async runtime. Testing async Rust code requires a Tokio runtime to drive futures to completion. Tokio provides #[tokio::test] for exactly that.
The Basics: #[tokio::test]
Add tokio with the test feature to Cargo.toml:
[dev-dependencies]
tokio = { version = "1", features = ["full"] }Mark async test functions with #[tokio::test]:
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_fetch_data() {
let result = fetch_data("https://example.com/api").await;
assert!(result.is_ok());
assert_eq!(result.unwrap().status, 200);
}
}#[tokio::test] creates a Tokio runtime for the test, runs the async function on it, and tears it down when done. Each test gets its own runtime — no shared state between tests.
Configuring the Runtime
By default, #[tokio::test] creates a multi-threaded runtime. Use flavor to control this:
// Multi-threaded runtime (default)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_concurrent() {
// ...
}
// Single-threaded runtime — useful for testing specific timing behaviors
#[tokio::test(flavor = "current_thread")]
async fn test_single_thread() {
// ...
}Single-threaded mode is deterministic — tasks execute in a specific order. Useful for testing task scheduling and avoiding flaky tests.
Testing with Timeouts
Wrap operations in tokio::time::timeout to verify they complete within bounds:
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_operation_completes_in_time() {
let result = timeout(Duration::from_millis(100), expensive_operation()).await;
match result {
Ok(value) => assert_eq!(value, "expected"),
Err(_) => panic!("operation took too long"),
}
}Or use the more ergonomic approach:
#[tokio::test]
async fn test_fast_response() {
let result = tokio::time::timeout(
Duration::from_secs(1),
fetch_user(42),
)
.await
.expect("operation timed out")
.expect("fetch failed");
assert_eq!(result.id, 42);
}Controlling Time with time::pause()
Tokio's time::pause() freezes the internal clock. Combine it with time::advance() to test time-dependent code without actually waiting:
use tokio::time;
#[tokio::test(start_paused = true)]
async fn test_cache_expiry() {
let cache = Cache::new(Duration::from_secs(60));
cache.insert("key", "value").await;
// Advance time by 30 seconds — cache should still be valid
time::advance(Duration::from_secs(30)).await;
assert_eq!(cache.get("key").await, Some("value"));
// Advance past expiry
time::advance(Duration::from_secs(31)).await;
assert_eq!(cache.get("key").await, None);
}start_paused = true in the macro attribute is shorthand for pausing time at the start of the test.
Mocking Async Traits
Use mockall for mocking:
[dev-dependencies]
mockall = "0.12"Define a mockable trait:
use mockall::{automock, predicate::*};
#[automock]
#[async_trait::async_trait]
pub trait UserRepository {
async fn find_by_id(&self, id: u64) -> Option<User>;
async fn save(&self, user: User) -> Result<User, DbError>;
}Use the mock in tests:
#[tokio::test]
async fn test_user_service_create() {
let mut mock_repo = MockUserRepository::new();
mock_repo
.expect_save()
.with(predicate::always())
.times(1)
.returning(|user| Ok(User { id: 1, ..user }));
let service = UserService::new(Arc::new(mock_repo));
let result = service.create_user("alice@example.com").await;
assert!(result.is_ok());
assert_eq!(result.unwrap().id, 1);
}Testing Cancellation
Rust's futures are lazy — they do nothing until polled. Cancellation happens when a future is dropped. Test that your code handles this correctly:
#[tokio::test]
async fn test_task_cancellation() {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move {
// Listen for cancellation signal
rx.await.ok();
"completed"
});
// Cancel by dropping the sender
drop(tx);
let result = task.await;
// Task completed (receiver got Err from dropped sender)
assert!(result.is_ok());
}Test that resources are cleaned up on cancellation:
struct CleanupGuard {
dropped: Arc<AtomicBool>,
}
impl Drop for CleanupGuard {
fn drop(&mut self) {
self.dropped.store(true, Ordering::SeqCst);
}
}
#[tokio::test]
async fn test_cleanup_on_cancel() {
let dropped = Arc::new(AtomicBool::new(false));
let dropped_clone = dropped.clone();
let task = tokio::spawn(async move {
let _guard = CleanupGuard { dropped: dropped_clone };
tokio::time::sleep(Duration::from_secs(60)).await; // will be cancelled
"done"
});
// Give task time to start
tokio::task::yield_now().await;
// Cancel the task
task.abort();
let _ = task.await;
assert!(dropped.load(Ordering::SeqCst), "cleanup should have run");
}Testing Channels
Tokio's channels are central to async communication. Test both sending and receiving sides:
use tokio::sync::mpsc;
#[tokio::test]
async fn test_mpsc_channel() {
let (tx, mut rx) = mpsc::channel(10);
// Send from a spawned task
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.expect("receiver dropped");
}
});
// Receive all messages
let mut received = Vec::new();
while let Some(val) = rx.recv().await {
received.push(val);
}
assert_eq!(received, vec![0, 1, 2, 3, 4]);
}Test backpressure:
#[tokio::test]
async fn test_channel_backpressure() {
let (tx, _rx) = mpsc::channel(1); // capacity 1
tx.send("first").await.unwrap();
// This should fail immediately — channel is full and no receiver is reading
let result = tx.try_send("second");
assert!(result.is_err());
}Concurrent Task Testing
Test that multiple tasks execute concurrently and produce the expected results:
#[tokio::test]
async fn test_concurrent_processing() {
let (results_tx, mut results_rx) = mpsc::channel(100);
let mut handles = Vec::new();
for i in 0..10 {
let tx = results_tx.clone();
let handle = tokio::spawn(async move {
let result = process_item(i).await;
tx.send(result).await.unwrap();
});
handles.push(handle);
}
// Wait for all tasks
for handle in handles {
handle.await.unwrap();
}
drop(results_tx);
let mut results = Vec::new();
while let Some(r) = results_rx.recv().await {
results.push(r);
}
assert_eq!(results.len(), 10);
}Integration Testing with a Real Server
Use tokio::test for end-to-end tests against a real server started in the test:
use axum::{routing::get, Router};
use tokio::net::TcpListener;
async fn start_test_server() -> String {
let app = Router::new().route("/health", get(|| async { "ok" }));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{}", addr)
}
#[tokio::test]
async fn test_health_endpoint() {
let base_url = start_test_server().await;
let response = reqwest::get(format!("{}/health", base_url))
.await
.unwrap();
assert_eq!(response.status(), 200);
assert_eq!(response.text().await.unwrap(), "ok");
}Using tokio-test for Finer Control
The tokio-test crate provides utilities for testing Tokio code:
[dev-dependencies]
tokio-test = "0.4"assert_ready! and assert_pending! for testing futures:
use tokio_test::{assert_pending, assert_ready_ok, task};
#[test]
fn test_future_not_ready() {
let mut task = task::spawn(pending_future());
assert_pending!(task.poll());
}
#[test]
fn test_future_resolves() {
let mut task = task::spawn(ready_future());
assert_ready_ok!(task.poll());
}End-to-End Testing Async APIs
Unit tests with mocked traits verify logic. Integration tests verify that services connect correctly. For full end-to-end testing of async APIs in production — with real network calls, real timing, and real concurrency — HelpMeTest runs tests against your live service:
Scenario: async job processing
Given a job is submitted to the queue
When the worker processes it asynchronously
Then the job status becomes "complete" within 10 seconds
And the result is stored correctlyThis catches issues that neither unit tests nor Tokio's simulated time reveal.
Key Takeaways
#[tokio::test]provides a per-test Tokio runtime — no shared state between tests- Use
start_paused = trueandtime::advance()to test time-dependent code without sleeping mockallwith#[automock]generates mock implementations of async traits- Test cancellation by aborting tasks and verifying
Dropcleanup runs - Single-thread flavor (
current_thread) gives deterministic task scheduling for timing-sensitive tests