Testing Rate Limiting and Throttling in APIs and Microservices
Testing rate limiting is deceptively hard. The implementation often works correctly in isolation but fails under concurrent load, leaks when clients use different keys that should share a limit, or applies the wrong limit tier to authenticated users. Rate limiting testing requires concurrent request generation, careful timing, and validation of the HTTP semantics (status codes, headers) that clients depend on. This guide covers how to test every major rate limiting algorithm and the failure modes that bite teams in production.
Rate Limiting Algorithm Fundamentals
Before writing tests, understand what you're testing. Different algorithms have different edge cases.
Token Bucket
Requests consume tokens from a bucket. Tokens refill at a fixed rate. Allows short bursts up to bucket capacity.
- Testable property: A client can consume all tokens instantly, then must wait for refill.
- Edge case: Two clients consuming tokens concurrently may both succeed when a single token remains (race condition in non-atomic implementations).
Leaky Bucket
Requests enter a queue (the bucket) and are processed at a fixed output rate. Excess requests are dropped.
- Testable property: Smooths traffic — no bursting allowed. All requests are evenly spaced.
- Edge case: Queue overflow behavior (drop vs reject vs wait).
Fixed Window Counter
Count requests in a fixed time window (e.g., 100 requests per minute from :00 to :59).
- Testable property: At window boundary, the counter resets. A client can send 100 requests at :59 and 100 more at :00 — 200 requests in 2 seconds — without being rate limited.
- Edge case: Window boundary burst is a known weakness; test it explicitly.
Sliding Window Log / Sliding Window Counter
Tracks requests over a rolling window rather than fixed boundaries. More accurate than fixed window.
- Testable property: No boundary burst — the limit is enforced continuously.
- Edge case: Memory usage grows with request log size (sliding window log). The counter variant uses approximation.
| Algorithm | Burst Allowed | Memory | Accuracy | Complexity |
|---|---|---|---|---|
| Token Bucket | Yes | O(1) | Exact | Low |
| Leaky Bucket | No | O(queue) | Exact | Low |
| Fixed Window | At boundary | O(1) | Approx | Low |
| Sliding Window Log | No | O(requests) | Exact | High |
| Sliding Window Counter | No | O(1) | ~Approx | Medium |
Setting Up Rate Limit Tests
Python Test Framework
# test_rate_limiting.py
import pytest
import asyncio
import aiohttp
import time
from typing import List, Tuple
BASE_URL = "http://localhost:8000"
API_KEY = "test-key-free-tier"
PRO_API_KEY = "test-key-pro-tier"
async def make_request(session: aiohttp.ClientSession,
url: str,
headers: dict) -> Tuple[int, dict]:
"""Make a single request and return (status_code, headers)."""
async with session.get(url, headers=headers) as resp:
return resp.status, dict(resp.headers)
async def make_concurrent_requests(url: str,
headers: dict,
count: int) -> List[Tuple[int, dict]]:
"""Fire `count` requests concurrently and return all results."""
async with aiohttp.ClientSession() as session:
tasks = [make_request(session, url, headers) for _ in range(count)]
return await asyncio.gather(*tasks, return_exceptions=False)Testing Basic Rate Limit Enforcement
@pytest.mark.asyncio
async def test_rate_limit_enforced_after_threshold():
"""Verify that requests exceeding the rate limit receive 429."""
headers = {"X-API-Key": API_KEY}
url = f"{BASE_URL}/api/data"
# Free tier: 10 requests per minute
RATE_LIMIT = 10
# Send exactly the limit — all should succeed
results = await make_concurrent_requests(url, headers, RATE_LIMIT)
success_count = sum(1 for status, _ in results if status == 200)
assert success_count == RATE_LIMIT, \
f"Expected {RATE_LIMIT} successes, got {success_count}"
# One more request should be rate limited
async with aiohttp.ClientSession() as session:
status, resp_headers = await make_request(session, url, headers)
assert status == 429, f"Expected 429 Too Many Requests, got {status}"
@pytest.mark.asyncio
async def test_rate_limit_429_response_format():
"""Verify 429 response includes required headers for client retry logic."""
headers = {"X-API-Key": API_KEY}
url = f"{BASE_URL}/api/data"
# Exhaust the rate limit
await make_concurrent_requests(url, headers, 11)
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
assert resp.status == 429
# RFC 6585 compliance — clients depend on these headers
assert "Retry-After" in resp.headers, \
"429 must include Retry-After header"
assert "X-RateLimit-Limit" in resp.headers, \
"429 must include X-RateLimit-Limit header"
assert "X-RateLimit-Remaining" in resp.headers, \
"429 must include X-RateLimit-Remaining header"
assert "X-RateLimit-Reset" in resp.headers, \
"429 must include X-RateLimit-Reset header"
retry_after = int(resp.headers["Retry-After"])
assert 0 < retry_after <= 60, \
f"Retry-After should be seconds until window reset, got {retry_after}"
body = await resp.json()
assert "error" in body
assert "rate_limit_exceeded" in body.get("code", ""), \
"Error body should include machine-readable error code"
@pytest.mark.asyncio
async def test_rate_limit_headers_on_success():
"""Verify rate limit headers are present on successful responses too."""
headers = {"X-API-Key": API_KEY}
async with aiohttp.ClientSession() as session:
async with session.get(f"{BASE_URL}/api/data", headers=headers) as resp:
assert resp.status == 200
assert "X-RateLimit-Limit" in resp.headers
assert "X-RateLimit-Remaining" in resp.headers
limit = int(resp.headers["X-RateLimit-Limit"])
remaining = int(resp.headers["X-RateLimit-Remaining"])
assert remaining == limit - 1, \
"After first request, remaining should be limit minus 1"Testing Fixed Window Boundary Burst
This is the classic fixed window weakness — exploitable by clients that know the window reset time:
@pytest.mark.asyncio
async def test_fixed_window_boundary_burst():
"""
With a fixed window, a client can burst 2x the limit across a window boundary.
If your implementation has this weakness, this test documents it.
If you've implemented sliding window, this test should pass without the burst.
"""
headers = {"X-API-Key": f"burst-test-{time.time()}"} # fresh key
url = f"{BASE_URL}/api/data"
LIMIT = 10
# Get current window reset time
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
reset_at = int(resp.headers.get("X-RateLimit-Reset", 0))
# Wait until 1 second before window reset
now = time.time()
wait_time = reset_at - now - 1
if wait_time > 0:
await asyncio.sleep(wait_time)
# Send LIMIT requests in the last second of the current window
results_before = await make_concurrent_requests(url, headers, LIMIT)
successes_before = sum(1 for s, _ in results_before if s == 200)
# Wait for window to reset
await asyncio.sleep(1.5)
# Send LIMIT more requests in the new window
results_after = await make_concurrent_requests(url, headers, LIMIT)
successes_after = sum(1 for s, _ in results_after if s == 200)
total_successes = successes_before + successes_after
if total_successes > LIMIT:
pytest.xfail(
f"Fixed window boundary burst: {total_successes} requests succeeded "
f"across window boundary (2x the limit of {LIMIT}). "
"Consider sliding window implementation."
)Testing Per-Tier Rate Limits
@pytest.mark.asyncio
async def test_pro_tier_higher_limit_than_free():
"""Pro tier should have higher rate limit than free tier."""
free_headers = {"X-API-Key": "free-tier-key"}
pro_headers = {"X-API-Key": "pro-tier-key"}
url = f"{BASE_URL}/api/data"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=free_headers) as resp:
free_limit = int(resp.headers["X-RateLimit-Limit"])
async with session.get(url, headers=pro_headers) as resp:
pro_limit = int(resp.headers["X-RateLimit-Limit"])
assert pro_limit > free_limit, \
f"Pro tier limit ({pro_limit}) should exceed free tier ({free_limit})"
@pytest.mark.asyncio
async def test_rate_limits_are_per_client_not_global():
"""
Rate limiting should be scoped per API key.
One client exhausting their limit should not affect another client.
"""
client_a = {"X-API-Key": "client-a-key"}
client_b = {"X-API-Key": "client-b-key"}
url = f"{BASE_URL}/api/data"
# Exhaust client A's limit
await make_concurrent_requests(url, client_a, 20)
# Client B should still be able to make requests
async with aiohttp.ClientSession() as session:
status, _ = await make_request(session, url, client_b)
assert status == 200, \
f"Client B should not be affected by Client A's rate limit exhaustion, got {status}"Testing Rate Limit Bypass Attempts
Security testing for rate limiting is as important as functional testing:
@pytest.mark.asyncio
async def test_rate_limit_not_bypassable_with_different_headers():
"""Rate limit should not be bypassable by rotating User-Agent or other headers."""
url = f"{BASE_URL}/api/data"
# Exhaust limit with one User-Agent
base_headers = {"X-API-Key": "bypass-test-key", "User-Agent": "TestClient/1.0"}
await make_concurrent_requests(url, base_headers, 15)
# Try to bypass by changing User-Agent
bypass_headers = {"X-API-Key": "bypass-test-key", "User-Agent": "DifferentClient/2.0"}
async with aiohttp.ClientSession() as session:
status, _ = await make_request(session, url, bypass_headers)
assert status == 429, \
"Changing User-Agent should not bypass rate limiting — limit is per API key"
@pytest.mark.asyncio
async def test_rate_limit_not_bypassable_via_ip_spoofing():
"""Rate limiting by IP should not be bypassable via X-Forwarded-For spoofing."""
url = f"{BASE_URL}/api/public" # endpoint with IP-based rate limiting
# Exhaust limit from real IP
await make_concurrent_requests(url, {}, 5)
# Try to spoof a different IP via header
spoof_headers = {"X-Forwarded-For": "10.0.0.1"}
async with aiohttp.ClientSession() as session:
status, _ = await make_request(session, url, spoof_headers)
# If the server trusts X-Forwarded-For from untrusted sources, this would be 200
# It should be 429 because the real IP is still rate limited
assert status == 429, \
"X-Forwarded-For spoofing should not bypass IP-based rate limiting"Testing Distributed Rate Limiting (Redis-backed)
For distributed systems, rate limiting state is stored in Redis. Tests should verify the distributed behavior:
# test_distributed_rate_limiting.py
import redis
import pytest
@pytest.fixture
def redis_client():
return redis.Redis(host="localhost", port=6379, db=1) # test DB
def test_rate_limit_key_structure(redis_client):
"""Verify rate limit keys in Redis have correct TTL and structure."""
import requests
api_key = "redis-test-key"
requests.get(f"{BASE_URL}/api/data", headers={"X-API-Key": api_key})
# Check that a rate limit key was created in Redis
# Key format depends on your implementation, e.g., "rl:{api_key}:{window}"
keys = redis_client.keys(f"rl:{api_key}:*")
assert len(keys) > 0, "Rate limit key should exist in Redis after first request"
# Verify the key has an appropriate TTL (should expire with the window)
ttl = redis_client.ttl(keys[0])
assert 0 < ttl <= 60, f"Rate limit key TTL should be within the window, got {ttl}s"
@pytest.mark.asyncio
async def test_rate_limit_survives_application_restart():
"""Rate limit state should persist in Redis across application restarts."""
headers = {"X-API-Key": "persistence-test-key"}
url = f"{BASE_URL}/api/data"
# Use 8 out of 10 allowed requests
await make_concurrent_requests(url, headers, 8)
# Simulate app restart (e.g., restart the service container)
# In a real test, you'd restart the service here
# For this test, we verify the remaining count is still 2
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
assert resp.status == 200
remaining = int(resp.headers["X-RateLimit-Remaining"])
assert remaining == 1, \
f"After 9 total requests, remaining should be 1, got {remaining}"Load Testing to Find the Actual Limit
Sometimes the configured limit and the actual limit under load diverge due to race conditions. Use locust to find the actual breaking point:
# locustfile.py
from locust import HttpUser, task, between
from locust.exception import RescheduleTask
import json
class RateLimitProbeUser(HttpUser):
wait_time = between(0, 0) # No wait — maximum pressure
@task
def probe_rate_limit(self):
with self.client.get(
"/api/data",
headers={"X-API-Key": "load-test-key"},
catch_response=True,
name="probe_rate_limit"
) as response:
if response.status_code == 200:
response.success()
elif response.status_code == 429:
response.success() # 429 is expected behavior, not a failure
# Record the rate at which we're being limited
else:
response.failure(f"Unexpected status: {response.status_code}")Run with: locust --headless -u 50 -r 50 --run-time 60s --host http://localhost:8000
The key metric is the ratio of 200 to 429 responses under sustained load. If you configured 100 req/min and see 150 successful requests before 429s start, you have a concurrency bug in your rate limiter.
Rate Limit Test Checklist
| Test | What It Catches |
|---|---|
| Basic 429 after threshold | Limit not enforced |
| 429 includes Retry-After | Clients can't retry intelligently |
| Headers on 200 responses | Clients can't track their usage |
| Per-client isolation | Global limit shared across clients |
| Boundary burst (fixed window) | Algorithm weakness |
| User-Agent rotation bypass | Keying on mutable headers |
| X-Forwarded-For spoofing | Trusting untrusted proxy headers |
| Correct tier applied | Auth/tier mapping bug |
| Redis TTL on rate limit keys | State leaks across windows |
| Actual limit under load | Race condition in counter logic |
Rate limiting is infrastructure that sits between your users and your system. Getting it wrong in either direction — too permissive or too aggressive — has direct revenue impact. Testing it at the algorithm level, the HTTP semantics level, and the load level is the only way to be confident it works correctly before attackers or high-traffic events find the bugs for you.