AI API Rate Limit Testing: Test Retry Logic and Exponential Backoff
Every production application that calls OpenAI, Anthropic, or Google Gemini will eventually hit a rate limit. The question isn't whether you'll get a 429 error — it's whether your application handles it gracefully. Here's how to test every aspect of rate limit handling without actually exhausting your API quota.
Understanding LLM API Rate Limits
Each provider has different rate limit dimensions:
OpenAI (GPT-4):
- RPM (requests per minute)
- TPM (tokens per minute)
- RPD (requests per day)
- Error:
429 Too Many Requests
Anthropic (Claude):
- RPM per model
- TPM per model
- Error:
429with{"type": "rate_limit_error"}
Google Gemini:
- RPM per model
- TPD (tokens per day on free tier)
- Error:
429orResourceExhausted
Your retry strategy needs to handle all of these — plus the difference between "slow down" (transient, retry) and "you're over quota for the day" (non-transient, don't retry).
The Retry Pattern to Test
Before writing tests, define what correct behavior looks like:
# src/retry_client.py
import time
import random
import openai
import anthropic
import logging
from typing import Callable, TypeVar, Any
T = TypeVar("T")
logger = logging.getLogger(__name__)
def exponential_backoff_retry(
func: Callable[..., T],
*args,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
retryable_errors: tuple = (
openai.RateLimitError,
openai.APIStatusError,
),
**kwargs
) -> T:
"""
Retry with exponential backoff.
Delay formula: min(base_delay * 2^attempt + jitter, max_delay)
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except retryable_errors as e:
last_error = e
# Check for non-retryable 429s (daily quota exhausted)
if hasattr(e, "status_code") and e.status_code == 429:
error_body = getattr(e, "body", {}) or {}
error_code = error_body.get("error", {}).get("code", "")
if error_code == "insufficient_quota":
logger.error("Daily quota exhausted — not retrying")
raise
if attempt == max_retries:
logger.error(f"Max retries ({max_retries}) exceeded")
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay += random.uniform(0, delay * 0.1)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
time.sleep(delay)
raise last_errorTesting Rate Limit Detection
First, test that your code correctly identifies rate limit errors vs other errors:
# tests/test_rate_limit_detection.py
import pytest
import openai
import anthropic
from unittest.mock import MagicMock
from src.error_classifier import classify_openai_error, is_retryable
class TestOpenAIErrorClassification:
def make_openai_error(self, status_code: int, error_code: str = None):
return openai.APIStatusError(
message="API error",
response=MagicMock(status_code=status_code),
body={"error": {"code": error_code}} if error_code else {}
)
def test_rate_limit_is_retryable(self):
error = openai.RateLimitError(
message="Rate limit exceeded",
response=MagicMock(status_code=429),
body={"error": {"type": "rate_limit_error"}}
)
assert is_retryable(error) is True
def test_quota_exhausted_is_not_retryable(self):
error = openai.RateLimitError(
message="You exceeded your quota",
response=MagicMock(status_code=429),
body={"error": {"type": "insufficient_quota", "code": "insufficient_quota"}}
)
assert is_retryable(error) is False
def test_server_error_is_retryable(self):
error = self.make_openai_error(500)
assert is_retryable(error) is True
def test_overloaded_is_retryable(self):
error = self.make_openai_error(529)
assert is_retryable(error) is True
def test_auth_error_is_not_retryable(self):
error = openai.AuthenticationError(
message="Invalid API key",
response=MagicMock(status_code=401),
body={}
)
assert is_retryable(error) is False
def test_bad_request_is_not_retryable(self):
error = openai.BadRequestError(
message="Context length exceeded",
response=MagicMock(status_code=400),
body={}
)
assert is_retryable(error) is False
class TestAnthropicErrorClassification:
def test_claude_rate_limit_is_retryable(self):
error = anthropic.RateLimitError(
message="Rate limit reached for model claude-3-5-sonnet",
response=MagicMock(status_code=429),
body={"error": {"type": "rate_limit_error"}}
)
assert is_retryable(error) is True
def test_claude_overloaded_is_retryable(self):
error = anthropic.APIStatusError(
message="Overloaded",
response=MagicMock(status_code=529),
body={"error": {"type": "overloaded_error"}}
)
assert is_retryable(error) is TrueTesting Retry Count and Timing
The critical tests: does your retry logic actually retry the right number of times?
# tests/test_retry_logic.py
import pytest
import time
from unittest.mock import MagicMock, call, patch
import openai
from src.retry_client import exponential_backoff_retry
class TestRetryCount:
def test_succeeds_on_first_try_no_retry(self, mocker):
mock_func = mocker.MagicMock(return_value="success")
result = exponential_backoff_retry(mock_func, max_retries=3, base_delay=0.001)
assert result == "success"
assert mock_func.call_count == 1
def test_retries_on_rate_limit_and_succeeds(self, mocker):
rate_limit_error = openai.RateLimitError(
message="Rate limit exceeded",
response=MagicMock(status_code=429),
body={"error": {"type": "rate_limit_error"}}
)
mock_func = mocker.MagicMock(side_effect=[
rate_limit_error,
rate_limit_error,
"success on third try"
])
result = exponential_backoff_retry(
mock_func, max_retries=5, base_delay=0.001
)
assert result == "success on third try"
assert mock_func.call_count == 3
def test_raises_after_max_retries_exceeded(self, mocker):
rate_limit_error = openai.RateLimitError(
message="Rate limit exceeded",
response=MagicMock(status_code=429),
body={}
)
mock_func = mocker.MagicMock(side_effect=rate_limit_error)
with pytest.raises(openai.RateLimitError):
exponential_backoff_retry(
mock_func, max_retries=3, base_delay=0.001
)
assert mock_func.call_count == 4 # 1 initial + 3 retries
def test_does_not_retry_quota_exhausted(self, mocker):
quota_error = openai.RateLimitError(
message="You exceeded your current quota",
response=MagicMock(status_code=429),
body={"error": {"type": "insufficient_quota", "code": "insufficient_quota"}}
)
mock_func = mocker.MagicMock(side_effect=quota_error)
with pytest.raises(openai.RateLimitError):
exponential_backoff_retry(
mock_func, max_retries=3, base_delay=0.001
)
# Should not retry — stops immediately
assert mock_func.call_count == 1
def test_does_not_retry_auth_error(self, mocker):
auth_error = openai.AuthenticationError(
message="Invalid API key",
response=MagicMock(status_code=401),
body={}
)
mock_func = mocker.MagicMock(side_effect=auth_error)
with pytest.raises(openai.AuthenticationError):
exponential_backoff_retry(mock_func, max_retries=3, base_delay=0.001)
assert mock_func.call_count == 1Testing Backoff Timing
Verify the delay between retries follows the expected pattern:
# tests/test_backoff_timing.py
import pytest
import time
from unittest.mock import MagicMock, patch
import openai
from src.retry_client import exponential_backoff_retry
class TestBackoffTiming:
def test_delay_increases_exponentially(self, mocker):
"""Delays should be: ~1s, ~2s, ~4s, ~8s (without jitter)."""
rate_limit = openai.RateLimitError(
message="Rate limit",
response=MagicMock(status_code=429),
body={}
)
mock_func = mocker.MagicMock(side_effect=[
rate_limit, rate_limit, rate_limit, "success"
])
sleep_calls = []
with patch("src.retry_client.time.sleep") as mock_sleep:
mock_sleep.side_effect = lambda d: sleep_calls.append(d)
result = exponential_backoff_retry(
mock_func, max_retries=5, base_delay=1.0, jitter=False
)
assert result == "success"
assert len(sleep_calls) == 3
assert sleep_calls[0] == 1.0 # 1 * 2^0
assert sleep_calls[1] == 2.0 # 1 * 2^1
assert sleep_calls[2] == 4.0 # 1 * 2^2
def test_delay_capped_at_max(self, mocker):
"""Delay should never exceed max_delay."""
rate_limit = openai.RateLimitError(
message="Rate limit",
response=MagicMock(status_code=429),
body={}
)
mock_func = mocker.MagicMock(side_effect=[
rate_limit, rate_limit, rate_limit, rate_limit, "success"
])
sleep_calls = []
with patch("src.retry_client.time.sleep") as mock_sleep:
mock_sleep.side_effect = lambda d: sleep_calls.append(d)
exponential_backoff_retry(
mock_func, max_retries=5, base_delay=10.0, max_delay=15.0, jitter=False
)
assert all(d <= 15.0 for d in sleep_calls)
def test_jitter_adds_randomness(self, mocker):
"""With jitter enabled, delays should vary across retries."""
rate_limit = openai.RateLimitError(
message="Rate limit",
response=MagicMock(status_code=429),
body={}
)
# Run the retry logic multiple times, collect all delays
all_delays = []
for _ in range(5):
mock_func = mocker.MagicMock(side_effect=[rate_limit, "success"])
sleep_calls = []
with patch("src.retry_client.time.sleep") as mock_sleep:
mock_sleep.side_effect = lambda d: sleep_calls.append(d)
try:
exponential_backoff_retry(
mock_func, max_retries=2, base_delay=1.0, jitter=True
)
except Exception:
pass
all_delays.extend(sleep_calls)
# With jitter, not all delays should be exactly equal
assert len(set(round(d, 3) for d in all_delays)) > 1Testing Concurrent Rate Limit Handling
In production, multiple threads or async tasks may hit rate limits simultaneously. Test that they don't interfere:
# tests/test_concurrent_retry.py
import pytest
import asyncio
import openai
from unittest.mock import MagicMock, AsyncMock
from src.async_retry_client import async_retry_with_backoff
class TestConcurrentRetry:
@pytest.mark.asyncio
async def test_concurrent_requests_all_eventually_succeed(self):
"""Multiple concurrent requests should all retry independently."""
call_count = 0
rate_limit = openai.RateLimitError(
message="Rate limit",
response=MagicMock(status_code=429),
body={}
)
async def mock_api_call(request_id: int):
nonlocal call_count
call_count += 1
if call_count <= 6: # First 6 calls fail (2 failures per 3 concurrent)
raise rate_limit
return f"success-{request_id}"
tasks = [
async_retry_with_backoff(mock_api_call, i, max_retries=5, base_delay=0.001)
for i in range(3)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should eventually succeed
assert all(isinstance(r, str) for r in results)
assert all(r.startswith("success-") for r in results)
@pytest.mark.asyncio
async def test_retry_uses_asyncio_sleep_not_time_sleep(self, mocker):
"""Async retry must use asyncio.sleep, not time.sleep (would block)."""
rate_limit = openai.RateLimitError(
message="Rate limit",
response=MagicMock(status_code=429),
body={}
)
mock_func = AsyncMock(side_effect=[rate_limit, "success"])
asyncio_sleep = mocker.patch("src.async_retry_client.asyncio.sleep", new_callable=AsyncMock)
time_sleep = mocker.patch("time.sleep")
await async_retry_with_backoff(mock_func, max_retries=2, base_delay=0.001)
assert asyncio_sleep.called
assert not time_sleep.called # Must not block the event loopTesting Retry-After Header Handling
Some APIs return a Retry-After header indicating when to retry:
# tests/test_retry_after_header.py
import pytest
from unittest.mock import MagicMock, patch
import openai
from src.retry_client import retry_with_retry_after_header
class TestRetryAfterHeader:
def test_respects_retry_after_header(self, mocker):
"""Should sleep for the duration specified in Retry-After header."""
response = MagicMock(status_code=429)
response.headers = {"Retry-After": "5"}
rate_limit = openai.RateLimitError(
message="Rate limit",
response=response,
body={}
)
mock_func = mocker.MagicMock(side_effect=[rate_limit, "success"])
sleep_calls = []
with patch("src.retry_client.time.sleep") as mock_sleep:
mock_sleep.side_effect = lambda d: sleep_calls.append(d)
retry_with_retry_after_header(mock_func, base_delay=1.0)
# Should have used the header value (5s), not exponential backoff (1s)
assert sleep_calls[0] == 5.0
def test_falls_back_to_exponential_if_no_header(self, mocker):
"""Without Retry-After header, falls back to exponential backoff."""
response = MagicMock(status_code=429)
response.headers = {}
rate_limit = openai.RateLimitError(
message="Rate limit",
response=response,
body={}
)
mock_func = mocker.MagicMock(side_effect=[rate_limit, "success"])
sleep_calls = []
with patch("src.retry_client.time.sleep") as mock_sleep:
mock_sleep.side_effect = lambda d: sleep_calls.append(d)
retry_with_retry_after_header(mock_func, base_delay=2.0)
# Should use exponential: 2.0 * 2^0 = 2.0
assert sleep_calls[0] == 2.0Integration Test: Real Rate Limiting
If you want to test real rate limiting behavior (useful for validating production behavior, not for CI):
# tests/test_rate_limit_integration.py
import pytest
import asyncio
import openai
import os
pytestmark = pytest.mark.skipif(
os.environ.get("RUN_RATE_LIMIT_TEST") != "true",
reason="Only run manually: RUN_RATE_LIMIT_TEST=true"
)
@pytest.mark.asyncio
async def test_burst_requests_trigger_rate_limit():
"""Deliberately hit rate limits to verify retry behavior."""
client = openai.AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
# Fire many requests simultaneously to trigger rate limits
tasks = [
client.chat.completions.create(
model="gpt-4o-mini",
max_tokens=10,
messages=[{"role": "user", "content": f"Say: test {i}"}]
)
for i in range(50) # 50 concurrent requests
]
results = await asyncio.gather(*tasks, return_exceptions=True)
rate_limited = [r for r in results if isinstance(r, openai.RateLimitError)]
succeeded = [r for r in results if not isinstance(r, Exception)]
print(f"Succeeded: {len(succeeded)}/50")
print(f"Rate limited: {len(rate_limited)}/50")
# At least some should succeed
assert len(succeeded) > 0Testing with Real Rate Limits Using HelpMeTest
For end-to-end validation that your LLM-powered application handles rate limits gracefully from the user's perspective, you need to test at the application level.
HelpMeTest can run continuous monitoring tests that check your app's behavior under load — verifying that users see appropriate loading states, not raw error messages, when rate limits kick in:
*** Test Cases ***
App Shows Loading State During AI Processing
Go To https://your-app.com/ai-feature
Submit Prompt Generate a long analysis
Element Should Be Visible .loading-spinner
Wait Until Element Is Visible .result timeout=60s
Element Should Not Be Visible .error-message
App Handles AI Timeout Gracefully
Go To https://your-app.com/ai-feature
Submit Prompt Generate analysis
# Simulate slow response
Wait 35 seconds
Element Should Contain .status Please wait or try again
Element Should Not Contain .status rate limitKey Takeaways
- Test every error type — rate limits, quota exhaustion, auth errors, and server errors have different retry semantics
- Mock
time.sleepin retry tests — never actually sleep in unit tests - Verify retry count explicitly —
mock_func.call_counttells you exactly how many attempts were made - Test non-retryable errors separately — quota exhaustion and auth failures must NOT be retried
- Test concurrent retry behavior — async applications need
asyncio.sleep, nottime.sleep - Test the Retry-After header — some APIs tell you exactly how long to wait
- Add end-to-end tests to verify users see graceful degradation, not raw error messages