Testing Rust HTTP Middleware: Tower Layers, Error Handling, and Request Tracing
Tower middleware is the backbone of the Rust async HTTP ecosystem. axum, hyper, and tonic all build on it. Yet testing Tower middleware is often handled poorly — developers test only the happy path through integration tests and never exercise middleware in isolation. When a rate limiter has a subtle off-by-one bug, or an authentication middleware leaks timing information, you want a focused unit test that catches it immediately, not a flaky end-to-end scenario.
This post covers the full spectrum: unit testing custom Layer and Service implementations directly, using tower_test::mock for deterministic inner service behavior, testing tracing middleware with tracing-test, and integrating middleware stacks with axum for end-to-end middleware tests.
Understanding Tower Middleware Structure
Before writing tests, understand what you're testing. A Tower middleware consists of two traits:
Layer<S>: a factory that wraps a serviceSand returns a new serviceService<Request>: the actual request-processing logic
use tower::{Layer, Service};
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
// The Layer — created once, produces Services
#[derive(Clone)]
pub struct LoggingLayer {
prefix: String,
}
// The Service — wraps the inner service
pub struct LoggingService<S> {
inner: S,
prefix: String,
}
impl<S> Layer<S> for LoggingLayer {
type Service = LoggingService<S>;
fn layer(&self, inner: S) -> Self::Service {
LoggingService {
inner,
prefix: self.prefix.clone(),
}
}
}
impl<S, Req> Service<Req> for LoggingService<S>
where
S: Service<Req>,
S::Future: Send + 'static,
Req: std::fmt::Debug + Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
println!("[{}] Request: {:?}", self.prefix, req);
self.inner.call(req)
}
}Unit Testing a Custom Service with tower_test::mock
tower-test provides a mock module that gives you a controllable inner service. You drive requests through your middleware and manually trigger responses from the mock:
[dev-dependencies]
tower = { version = "0.4", features = ["full", "test-util"] }
tower-test = "0.4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
http = "1"#[cfg(test)]
mod tests {
use super::*;
use tower::ServiceExt;
use tower_test::mock;
#[tokio::test]
async fn logging_layer_passes_request_to_inner_service() {
let (mut service, mut handle) = mock::spawn_layer::<String, String, _>(
LoggingLayer { prefix: "TEST".into() }
);
// The service should be ready
service.poll_ready().await.expect("service not ready");
// Spawn a request
let call = tokio::spawn(service.call("hello".to_string()));
// Drive the mock inner service: receive the request, send a response
let (req, send_response) = handle.next_request().await.unwrap();
assert_eq!(req, "hello");
send_response.send_response("world".to_string());
// Verify the response passes through
let response = call.await.unwrap().unwrap();
assert_eq!(response, "world");
}
}Testing Rate Limiting Middleware
Rate limiting is where off-by-one bugs live. Test the boundary conditions explicitly:
use std::sync::{Arc, Mutex};
use tower::{Layer, Service};
#[derive(Clone)]
pub struct RateLimitLayer {
max_requests: u32,
window_secs: u64,
}
#[derive(Clone)]
pub struct RateLimitService<S> {
inner: S,
counter: Arc<Mutex<u32>>,
limit: u32,
}
impl<S> Layer<S> for RateLimitLayer {
type Service = RateLimitService<S>;
fn layer(&self, inner: S) -> Self::Service {
RateLimitService {
inner,
counter: Arc::new(Mutex::new(0)),
limit: self.max_requests,
}
}
}
impl<S, Req> Service<Req> for RateLimitService<S>
where
S: Service<Req, Error = Box<dyn std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = Box<dyn std::error::Error + Send + Sync>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let count = *self.counter.lock().unwrap();
if count >= self.limit {
return Poll::Ready(Err("rate limit exceeded".into()));
}
self.inner.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: Req) -> Self::Future {
*self.counter.lock().unwrap() += 1;
let fut = self.inner.call(req);
Box::pin(async move { fut.await.map_err(Into::into) })
}
}#[tokio::test]
async fn rate_limit_allows_requests_under_limit() {
let layer = RateLimitLayer { max_requests: 3, window_secs: 60 };
let (mut service, mut handle) = mock::spawn_layer::<&str, &str, _>(layer);
for _ in 0..3 {
service.poll_ready().await.expect("should be ready under limit");
let call = tokio::spawn(service.call("request"));
let (_req, send) = handle.next_request().await.unwrap();
send.send_response("ok");
call.await.unwrap().unwrap();
}
}
#[tokio::test]
async fn rate_limit_rejects_at_limit() {
let layer = RateLimitLayer { max_requests: 2, window_secs: 60 };
let (mut service, mut handle) = mock::spawn_layer::<&str, &str, _>(layer);
// Exhaust the limit
for _ in 0..2 {
service.poll_ready().await.unwrap();
let call = tokio::spawn(service.call("request"));
let (_, send) = handle.next_request().await.unwrap();
send.send_response("ok");
call.await.unwrap().unwrap();
}
// Next poll_ready should fail
let result = service.poll_ready().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("rate limit exceeded"));
}Testing Authentication Middleware
Authentication middleware needs to validate tokens, set identity in request extensions, and reject unauthorized requests — all testable in isolation:
use axum::{
extract::Request,
http::StatusCode,
middleware::Next,
response::Response,
Extension,
};
#[derive(Clone, Debug)]
pub struct AuthenticatedUser {
pub id: u64,
pub email: String,
}
pub async fn jwt_auth_middleware(
mut request: Request,
next: Next,
) -> Result<Response, StatusCode> {
let token = request
.headers()
.get("Authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or(StatusCode::UNAUTHORIZED)?;
let user = verify_jwt(token).ok_or(StatusCode::UNAUTHORIZED)?;
request.extensions_mut().insert(user);
Ok(next.run(request).await)
}
fn verify_jwt(token: &str) -> Option<AuthenticatedUser> {
// In production: parse and verify JWT signature
if token == "valid-jwt-token" {
Some(AuthenticatedUser {
id: 42,
email: "user@example.com".into(),
})
} else {
None
}
}use axum::{middleware, routing::get, Router};
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
async fn identity_handler(
Extension(user): Extension<AuthenticatedUser>,
) -> String {
format!("Hello, {}", user.email)
}
fn auth_router() -> Router {
Router::new()
.route("/me", get(identity_handler))
.layer(middleware::from_fn(jwt_auth_middleware))
}
#[tokio::test]
async fn auth_middleware_extracts_user_into_extensions() {
let app = auth_router();
let response = app
.oneshot(
Request::builder()
.uri("/me")
.header("Authorization", "Bearer valid-jwt-token")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
use http_body_util::BodyExt;
let body = response.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body[..], b"Hello, user@example.com");
}
#[tokio::test]
async fn auth_middleware_rejects_expired_token() {
let app = auth_router();
let response = app
.oneshot(
Request::builder()
.uri("/me")
.header("Authorization", "Bearer expired-token")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn auth_middleware_rejects_malformed_header() {
let app = auth_router();
// No "Bearer " prefix
let response = app
.oneshot(
Request::builder()
.uri("/me")
.header("Authorization", "token-without-bearer")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}Testing Tracing Middleware with tracing-test
Tracing middleware is notoriously hard to test — you can't easily assert what was logged. The tracing-test crate solves this by capturing spans and events in a test-scoped buffer:
[dev-dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = "0.2"use tracing::{info, instrument};
use axum::{extract::Request, middleware::Next, response::Response};
pub async fn tracing_middleware(request: Request, next: Next) -> Response {
let method = request.method().clone();
let uri = request.uri().clone();
info!(method = %method, path = %uri.path(), "incoming request");
let response = next.run(request).await;
info!(status = %response.status(), "request complete");
response
}use tracing_test::traced_test;
#[tokio::test]
#[traced_test]
async fn tracing_middleware_logs_method_and_path() {
use axum::{middleware, routing::get, Router};
use tower::ServiceExt;
let app = Router::new()
.route("/health", get(|| async { "ok" }))
.layer(middleware::from_fn(tracing_middleware));
let _ = app
.oneshot(
Request::builder()
.method("GET")
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
// tracing-test captures log output; assert_eq! comparisons work on the buffer
assert!(logs_contain("incoming request"));
assert!(logs_contain("method=GET"));
assert!(logs_contain("path=/health"));
assert!(logs_contain("request complete"));
}
#[tokio::test]
#[traced_test]
async fn tracing_middleware_logs_response_status() {
let app = Router::new()
.route("/missing", get(|| async {
axum::http::StatusCode::NOT_FOUND
}))
.layer(middleware::from_fn(tracing_middleware));
let _ = app
.oneshot(
Request::builder()
.uri("/missing")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert!(logs_contain("status=404"));
}Testing Error Mapping Layers
Error mapping middleware transforms upstream errors into specific response shapes. Test that the mapping is correct for each error type:
use tower::{Layer, Service, ServiceBuilder};
use axum::response::IntoResponse;
#[derive(Debug)]
pub enum AppError {
NotFound(String),
Unauthorized,
Internal(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
match self {
AppError::NotFound(msg) => (StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": msg}))).into_response(),
AppError::Unauthorized => StatusCode::UNAUTHORIZED.into_response(),
AppError::Internal(msg) => (StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": "internal", "detail": msg}))).into_response(),
}
}
}#[tokio::test]
async fn error_mapping_layer_converts_not_found() {
async fn handler_that_errors() -> Result<String, AppError> {
Err(AppError::NotFound("item not found".into()))
}
let app = Router::new()
.route("/fail", get(handler_that_errors));
let response = app
.oneshot(
Request::builder()
.uri("/fail")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
let body = response.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["error"], "item not found");
}Testing a Composed Middleware Stack
Real applications stack multiple middleware layers. Test the full stack to ensure ordering is correct and layers don't interfere:
fn production_router(state: AppState) -> Router {
Router::new()
.route("/api/data", get(data_handler))
.layer(
ServiceBuilder::new()
.layer(middleware::from_fn(tracing_middleware))
.layer(middleware::from_fn(jwt_auth_middleware))
.layer(tower::limit::ConcurrencyLimitLayer::new(100))
)
.with_state(state)
}
#[tokio::test]
#[traced_test]
async fn full_stack_authenticated_request_traced_correctly() {
let app = production_router(AppState::default());
let response = app
.oneshot(
Request::builder()
.uri("/api/data")
.header("Authorization", "Bearer valid-jwt-token")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
assert!(logs_contain("incoming request"));
assert!(logs_contain("request complete"));
}
#[tokio::test]
async fn full_stack_auth_layer_runs_before_handler() {
let app = production_router(AppState::default());
// Without a token, auth middleware should reject before handler runs
let response = app
.oneshot(
Request::builder()
.uri("/api/data")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
// Auth middleware rejected — 401, not 200
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}Polling poll_ready Correctly
A subtle bug in many Tower Service implementations: poll_ready must be called before call. If your middleware stores state that poll_ready initializes, tests that skip it will give misleading results. Always poll first:
#[tokio::test]
async fn service_is_ready_after_poll_ready() {
use std::future::poll_fn;
let layer = LoggingLayer { prefix: "TEST".into() };
let inner = tower::service_fn(|req: &str| async move {
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(format!("response to {req}"))
});
let mut svc = layer.layer(inner);
// Must poll_ready before call — this mirrors production Tower usage
poll_fn(|cx| svc.poll_ready(cx)).await.unwrap();
let response = svc.call("test request").await.unwrap();
assert_eq!(response, "response to test request");
}Continuous Testing with HelpMeTest
Tower middleware bugs are subtle — an authentication layer that works locally but behaves differently under concurrent load, a rate limiter that resets on every deploy, tracing that works in isolation but drops spans under certain request patterns. These problems surface in production, not in local cargo test runs.
HelpMeTest lets you run your middleware test suite continuously — on every commit, on a schedule, and against your staging environment — with automatic flakiness detection and regression alerts. For Tower middleware in particular, continuous testing with concurrent execution patterns catches race conditions that sequential local tests miss.
The patterns in this post — tower_test::mock for deterministic unit tests, tracing-test for log assertions, oneshot() for integration tests through the full stack — give you complete coverage of your middleware layer without requiring a running server or complex test setup.