Testing Async Rust with Tokio: Unit Tests, Mocking, and Timeouts

Testing Async Rust with Tokio: Unit Tests, Mocking, and Timeouts

Tokio is Rust's most popular async runtime. Testing async Rust code requires a Tokio runtime to drive futures to completion. Tokio provides #[tokio::test] for exactly that.

The Basics: #[tokio::test]

Add tokio with the test feature to Cargo.toml:

[dev-dependencies]
tokio = { version = "1", features = ["full"] }

Mark async test functions with #[tokio::test]:

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_fetch_data() {
        let result = fetch_data("https://example.com/api").await;
        assert!(result.is_ok());
        assert_eq!(result.unwrap().status, 200);
    }
}

#[tokio::test] creates a Tokio runtime for the test, runs the async function on it, and tears it down when done. Each test gets its own runtime — no shared state between tests.

Configuring the Runtime

By default, #[tokio::test] creates a multi-threaded runtime. Use flavor to control this:

// Multi-threaded runtime (default)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_concurrent() {
    // ...
}

// Single-threaded runtime — useful for testing specific timing behaviors
#[tokio::test(flavor = "current_thread")]
async fn test_single_thread() {
    // ...
}

Single-threaded mode is deterministic — tasks execute in a specific order. Useful for testing task scheduling and avoiding flaky tests.

Testing with Timeouts

Wrap operations in tokio::time::timeout to verify they complete within bounds:

use tokio::time::{timeout, Duration};

#[tokio::test]
async fn test_operation_completes_in_time() {
    let result = timeout(Duration::from_millis(100), expensive_operation()).await;

    match result {
        Ok(value) => assert_eq!(value, "expected"),
        Err(_) => panic!("operation took too long"),
    }
}

Or use the more ergonomic approach:

#[tokio::test]
async fn test_fast_response() {
    let result = tokio::time::timeout(
        Duration::from_secs(1),
        fetch_user(42),
    )
    .await
    .expect("operation timed out")
    .expect("fetch failed");

    assert_eq!(result.id, 42);
}

Controlling Time with time::pause()

Tokio's time::pause() freezes the internal clock. Combine it with time::advance() to test time-dependent code without actually waiting:

use tokio::time;

#[tokio::test(start_paused = true)]
async fn test_cache_expiry() {
    let cache = Cache::new(Duration::from_secs(60));
    cache.insert("key", "value").await;

    // Advance time by 30 seconds — cache should still be valid
    time::advance(Duration::from_secs(30)).await;
    assert_eq!(cache.get("key").await, Some("value"));

    // Advance past expiry
    time::advance(Duration::from_secs(31)).await;
    assert_eq!(cache.get("key").await, None);
}

start_paused = true in the macro attribute is shorthand for pausing time at the start of the test.

Mocking Async Traits

Use mockall for mocking:

[dev-dependencies]
mockall = "0.12"

Define a mockable trait:

use mockall::{automock, predicate::*};

#[automock]
#[async_trait::async_trait]
pub trait UserRepository {
    async fn find_by_id(&self, id: u64) -> Option<User>;
    async fn save(&self, user: User) -> Result<User, DbError>;
}

Use the mock in tests:

#[tokio::test]
async fn test_user_service_create() {
    let mut mock_repo = MockUserRepository::new();

    mock_repo
        .expect_save()
        .with(predicate::always())
        .times(1)
        .returning(|user| Ok(User { id: 1, ..user }));

    let service = UserService::new(Arc::new(mock_repo));
    let result = service.create_user("alice@example.com").await;

    assert!(result.is_ok());
    assert_eq!(result.unwrap().id, 1);
}

Testing Cancellation

Rust's futures are lazy — they do nothing until polled. Cancellation happens when a future is dropped. Test that your code handles this correctly:

#[tokio::test]
async fn test_task_cancellation() {
    let (tx, rx) = tokio::sync::oneshot::channel::<()>();

    let task = tokio::spawn(async move {
        // Listen for cancellation signal
        rx.await.ok();
        "completed"
    });

    // Cancel by dropping the sender
    drop(tx);

    let result = task.await;
    // Task completed (receiver got Err from dropped sender)
    assert!(result.is_ok());
}

Test that resources are cleaned up on cancellation:

struct CleanupGuard {
    dropped: Arc<AtomicBool>,
}

impl Drop for CleanupGuard {
    fn drop(&mut self) {
        self.dropped.store(true, Ordering::SeqCst);
    }
}

#[tokio::test]
async fn test_cleanup_on_cancel() {
    let dropped = Arc::new(AtomicBool::new(false));
    let dropped_clone = dropped.clone();

    let task = tokio::spawn(async move {
        let _guard = CleanupGuard { dropped: dropped_clone };
        tokio::time::sleep(Duration::from_secs(60)).await; // will be cancelled
        "done"
    });

    // Give task time to start
    tokio::task::yield_now().await;

    // Cancel the task
    task.abort();
    let _ = task.await;

    assert!(dropped.load(Ordering::SeqCst), "cleanup should have run");
}

Testing Channels

Tokio's channels are central to async communication. Test both sending and receiving sides:

use tokio::sync::mpsc;

#[tokio::test]
async fn test_mpsc_channel() {
    let (tx, mut rx) = mpsc::channel(10);

    // Send from a spawned task
    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.expect("receiver dropped");
        }
    });

    // Receive all messages
    let mut received = Vec::new();
    while let Some(val) = rx.recv().await {
        received.push(val);
    }

    assert_eq!(received, vec![0, 1, 2, 3, 4]);
}

Test backpressure:

#[tokio::test]
async fn test_channel_backpressure() {
    let (tx, _rx) = mpsc::channel(1); // capacity 1

    tx.send("first").await.unwrap();

    // This should fail immediately — channel is full and no receiver is reading
    let result = tx.try_send("second");
    assert!(result.is_err());
}

Concurrent Task Testing

Test that multiple tasks execute concurrently and produce the expected results:

#[tokio::test]
async fn test_concurrent_processing() {
    let (results_tx, mut results_rx) = mpsc::channel(100);

    let mut handles = Vec::new();
    for i in 0..10 {
        let tx = results_tx.clone();
        let handle = tokio::spawn(async move {
            let result = process_item(i).await;
            tx.send(result).await.unwrap();
        });
        handles.push(handle);
    }

    // Wait for all tasks
    for handle in handles {
        handle.await.unwrap();
    }
    drop(results_tx);

    let mut results = Vec::new();
    while let Some(r) = results_rx.recv().await {
        results.push(r);
    }

    assert_eq!(results.len(), 10);
}

Integration Testing with a Real Server

Use tokio::test for end-to-end tests against a real server started in the test:

use axum::{routing::get, Router};
use tokio::net::TcpListener;

async fn start_test_server() -> String {
    let app = Router::new().route("/health", get(|| async { "ok" }));

    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();

    tokio::spawn(async move {
        axum::serve(listener, app).await.unwrap();
    });

    format!("http://{}", addr)
}

#[tokio::test]
async fn test_health_endpoint() {
    let base_url = start_test_server().await;

    let response = reqwest::get(format!("{}/health", base_url))
        .await
        .unwrap();

    assert_eq!(response.status(), 200);
    assert_eq!(response.text().await.unwrap(), "ok");
}

Using tokio-test for Finer Control

The tokio-test crate provides utilities for testing Tokio code:

[dev-dependencies]
tokio-test = "0.4"

assert_ready! and assert_pending! for testing futures:

use tokio_test::{assert_pending, assert_ready_ok, task};

#[test]
fn test_future_not_ready() {
    let mut task = task::spawn(pending_future());
    assert_pending!(task.poll());
}

#[test]
fn test_future_resolves() {
    let mut task = task::spawn(ready_future());
    assert_ready_ok!(task.poll());
}

End-to-End Testing Async APIs

Unit tests with mocked traits verify logic. Integration tests verify that services connect correctly. For full end-to-end testing of async APIs in production — with real network calls, real timing, and real concurrency — HelpMeTest runs tests against your live service:

Scenario: async job processing
  Given a job is submitted to the queue
  When the worker processes it asynchronously
  Then the job status becomes "complete" within 10 seconds
  And the result is stored correctly

This catches issues that neither unit tests nor Tokio's simulated time reveal.

Key Takeaways

  • #[tokio::test] provides a per-test Tokio runtime — no shared state between tests
  • Use start_paused = true and time::advance() to test time-dependent code without sleeping
  • mockall with #[automock] generates mock implementations of async traits
  • Test cancellation by aborting tasks and verifying Drop cleanup runs
  • Single-thread flavor (current_thread) gives deterministic task scheduling for timing-sensitive tests

Read more