AI API Rate Limit Testing: Test Retry Logic and Exponential Backoff

AI API Rate Limit Testing: Test Retry Logic and Exponential Backoff

Every production application that calls OpenAI, Anthropic, or Google Gemini will eventually hit a rate limit. The question isn't whether you'll get a 429 error — it's whether your application handles it gracefully. Here's how to test every aspect of rate limit handling without actually exhausting your API quota.

Understanding LLM API Rate Limits

Each provider has different rate limit dimensions:

OpenAI (GPT-4):

  • RPM (requests per minute)
  • TPM (tokens per minute)
  • RPD (requests per day)
  • Error: 429 Too Many Requests

Anthropic (Claude):

  • RPM per model
  • TPM per model
  • Error: 429 with {"type": "rate_limit_error"}

Google Gemini:

  • RPM per model
  • TPD (tokens per day on free tier)
  • Error: 429 or ResourceExhausted

Your retry strategy needs to handle all of these — plus the difference between "slow down" (transient, retry) and "you're over quota for the day" (non-transient, don't retry).

The Retry Pattern to Test

Before writing tests, define what correct behavior looks like:

# src/retry_client.py
import time
import random
import openai
import anthropic
import logging
from typing import Callable, TypeVar, Any

T = TypeVar("T")
logger = logging.getLogger(__name__)

def exponential_backoff_retry(
    func: Callable[..., T],
    *args,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
    retryable_errors: tuple = (
        openai.RateLimitError,
        openai.APIStatusError,
    ),
    **kwargs
) -> T:
    """
    Retry with exponential backoff.
    
    Delay formula: min(base_delay * 2^attempt + jitter, max_delay)
    """
    last_error = None
    
    for attempt in range(max_retries + 1):
        try:
            return func(*args, **kwargs)
        except retryable_errors as e:
            last_error = e
            
            # Check for non-retryable 429s (daily quota exhausted)
            if hasattr(e, "status_code") and e.status_code == 429:
                error_body = getattr(e, "body", {}) or {}
                error_code = error_body.get("error", {}).get("code", "")
                if error_code == "insufficient_quota":
                    logger.error("Daily quota exhausted — not retrying")
                    raise
            
            if attempt == max_retries:
                logger.error(f"Max retries ({max_retries}) exceeded")
                raise
            
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay += random.uniform(0, delay * 0.1)
            
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
            time.sleep(delay)
    
    raise last_error

Testing Rate Limit Detection

First, test that your code correctly identifies rate limit errors vs other errors:

# tests/test_rate_limit_detection.py
import pytest
import openai
import anthropic
from unittest.mock import MagicMock
from src.error_classifier import classify_openai_error, is_retryable

class TestOpenAIErrorClassification:
    def make_openai_error(self, status_code: int, error_code: str = None):
        return openai.APIStatusError(
            message="API error",
            response=MagicMock(status_code=status_code),
            body={"error": {"code": error_code}} if error_code else {}
        )
    
    def test_rate_limit_is_retryable(self):
        error = openai.RateLimitError(
            message="Rate limit exceeded",
            response=MagicMock(status_code=429),
            body={"error": {"type": "rate_limit_error"}}
        )
        assert is_retryable(error) is True

    def test_quota_exhausted_is_not_retryable(self):
        error = openai.RateLimitError(
            message="You exceeded your quota",
            response=MagicMock(status_code=429),
            body={"error": {"type": "insufficient_quota", "code": "insufficient_quota"}}
        )
        assert is_retryable(error) is False

    def test_server_error_is_retryable(self):
        error = self.make_openai_error(500)
        assert is_retryable(error) is True

    def test_overloaded_is_retryable(self):
        error = self.make_openai_error(529)
        assert is_retryable(error) is True

    def test_auth_error_is_not_retryable(self):
        error = openai.AuthenticationError(
            message="Invalid API key",
            response=MagicMock(status_code=401),
            body={}
        )
        assert is_retryable(error) is False

    def test_bad_request_is_not_retryable(self):
        error = openai.BadRequestError(
            message="Context length exceeded",
            response=MagicMock(status_code=400),
            body={}
        )
        assert is_retryable(error) is False

class TestAnthropicErrorClassification:
    def test_claude_rate_limit_is_retryable(self):
        error = anthropic.RateLimitError(
            message="Rate limit reached for model claude-3-5-sonnet",
            response=MagicMock(status_code=429),
            body={"error": {"type": "rate_limit_error"}}
        )
        assert is_retryable(error) is True

    def test_claude_overloaded_is_retryable(self):
        error = anthropic.APIStatusError(
            message="Overloaded",
            response=MagicMock(status_code=529),
            body={"error": {"type": "overloaded_error"}}
        )
        assert is_retryable(error) is True

Testing Retry Count and Timing

The critical tests: does your retry logic actually retry the right number of times?

# tests/test_retry_logic.py
import pytest
import time
from unittest.mock import MagicMock, call, patch
import openai
from src.retry_client import exponential_backoff_retry

class TestRetryCount:
    def test_succeeds_on_first_try_no_retry(self, mocker):
        mock_func = mocker.MagicMock(return_value="success")
        result = exponential_backoff_retry(mock_func, max_retries=3, base_delay=0.001)
        assert result == "success"
        assert mock_func.call_count == 1

    def test_retries_on_rate_limit_and_succeeds(self, mocker):
        rate_limit_error = openai.RateLimitError(
            message="Rate limit exceeded",
            response=MagicMock(status_code=429),
            body={"error": {"type": "rate_limit_error"}}
        )
        mock_func = mocker.MagicMock(side_effect=[
            rate_limit_error,
            rate_limit_error,
            "success on third try"
        ])
        
        result = exponential_backoff_retry(
            mock_func, max_retries=5, base_delay=0.001
        )
        
        assert result == "success on third try"
        assert mock_func.call_count == 3

    def test_raises_after_max_retries_exceeded(self, mocker):
        rate_limit_error = openai.RateLimitError(
            message="Rate limit exceeded",
            response=MagicMock(status_code=429),
            body={}
        )
        mock_func = mocker.MagicMock(side_effect=rate_limit_error)
        
        with pytest.raises(openai.RateLimitError):
            exponential_backoff_retry(
                mock_func, max_retries=3, base_delay=0.001
            )
        
        assert mock_func.call_count == 4  # 1 initial + 3 retries

    def test_does_not_retry_quota_exhausted(self, mocker):
        quota_error = openai.RateLimitError(
            message="You exceeded your current quota",
            response=MagicMock(status_code=429),
            body={"error": {"type": "insufficient_quota", "code": "insufficient_quota"}}
        )
        mock_func = mocker.MagicMock(side_effect=quota_error)
        
        with pytest.raises(openai.RateLimitError):
            exponential_backoff_retry(
                mock_func, max_retries=3, base_delay=0.001
            )
        
        # Should not retry — stops immediately
        assert mock_func.call_count == 1

    def test_does_not_retry_auth_error(self, mocker):
        auth_error = openai.AuthenticationError(
            message="Invalid API key",
            response=MagicMock(status_code=401),
            body={}
        )
        mock_func = mocker.MagicMock(side_effect=auth_error)
        
        with pytest.raises(openai.AuthenticationError):
            exponential_backoff_retry(mock_func, max_retries=3, base_delay=0.001)
        
        assert mock_func.call_count == 1

Testing Backoff Timing

Verify the delay between retries follows the expected pattern:

# tests/test_backoff_timing.py
import pytest
import time
from unittest.mock import MagicMock, patch
import openai
from src.retry_client import exponential_backoff_retry

class TestBackoffTiming:
    def test_delay_increases_exponentially(self, mocker):
        """Delays should be: ~1s, ~2s, ~4s, ~8s (without jitter)."""
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=MagicMock(status_code=429),
            body={}
        )
        mock_func = mocker.MagicMock(side_effect=[
            rate_limit, rate_limit, rate_limit, "success"
        ])
        
        sleep_calls = []
        with patch("src.retry_client.time.sleep") as mock_sleep:
            mock_sleep.side_effect = lambda d: sleep_calls.append(d)
            result = exponential_backoff_retry(
                mock_func, max_retries=5, base_delay=1.0, jitter=False
            )
        
        assert result == "success"
        assert len(sleep_calls) == 3
        assert sleep_calls[0] == 1.0   # 1 * 2^0
        assert sleep_calls[1] == 2.0   # 1 * 2^1
        assert sleep_calls[2] == 4.0   # 1 * 2^2

    def test_delay_capped_at_max(self, mocker):
        """Delay should never exceed max_delay."""
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=MagicMock(status_code=429),
            body={}
        )
        mock_func = mocker.MagicMock(side_effect=[
            rate_limit, rate_limit, rate_limit, rate_limit, "success"
        ])
        
        sleep_calls = []
        with patch("src.retry_client.time.sleep") as mock_sleep:
            mock_sleep.side_effect = lambda d: sleep_calls.append(d)
            exponential_backoff_retry(
                mock_func, max_retries=5, base_delay=10.0, max_delay=15.0, jitter=False
            )
        
        assert all(d <= 15.0 for d in sleep_calls)

    def test_jitter_adds_randomness(self, mocker):
        """With jitter enabled, delays should vary across retries."""
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=MagicMock(status_code=429),
            body={}
        )
        
        # Run the retry logic multiple times, collect all delays
        all_delays = []
        for _ in range(5):
            mock_func = mocker.MagicMock(side_effect=[rate_limit, "success"])
            sleep_calls = []
            with patch("src.retry_client.time.sleep") as mock_sleep:
                mock_sleep.side_effect = lambda d: sleep_calls.append(d)
                try:
                    exponential_backoff_retry(
                        mock_func, max_retries=2, base_delay=1.0, jitter=True
                    )
                except Exception:
                    pass
            all_delays.extend(sleep_calls)
        
        # With jitter, not all delays should be exactly equal
        assert len(set(round(d, 3) for d in all_delays)) > 1

Testing Concurrent Rate Limit Handling

In production, multiple threads or async tasks may hit rate limits simultaneously. Test that they don't interfere:

# tests/test_concurrent_retry.py
import pytest
import asyncio
import openai
from unittest.mock import MagicMock, AsyncMock
from src.async_retry_client import async_retry_with_backoff

class TestConcurrentRetry:
    @pytest.mark.asyncio
    async def test_concurrent_requests_all_eventually_succeed(self):
        """Multiple concurrent requests should all retry independently."""
        call_count = 0
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=MagicMock(status_code=429),
            body={}
        )
        
        async def mock_api_call(request_id: int):
            nonlocal call_count
            call_count += 1
            if call_count <= 6:  # First 6 calls fail (2 failures per 3 concurrent)
                raise rate_limit
            return f"success-{request_id}"
        
        tasks = [
            async_retry_with_backoff(mock_api_call, i, max_retries=5, base_delay=0.001)
            for i in range(3)
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # All should eventually succeed
        assert all(isinstance(r, str) for r in results)
        assert all(r.startswith("success-") for r in results)

    @pytest.mark.asyncio
    async def test_retry_uses_asyncio_sleep_not_time_sleep(self, mocker):
        """Async retry must use asyncio.sleep, not time.sleep (would block)."""
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=MagicMock(status_code=429),
            body={}
        )
        mock_func = AsyncMock(side_effect=[rate_limit, "success"])
        
        asyncio_sleep = mocker.patch("src.async_retry_client.asyncio.sleep", new_callable=AsyncMock)
        time_sleep = mocker.patch("time.sleep")
        
        await async_retry_with_backoff(mock_func, max_retries=2, base_delay=0.001)
        
        assert asyncio_sleep.called
        assert not time_sleep.called  # Must not block the event loop

Testing Retry-After Header Handling

Some APIs return a Retry-After header indicating when to retry:

# tests/test_retry_after_header.py
import pytest
from unittest.mock import MagicMock, patch
import openai
from src.retry_client import retry_with_retry_after_header

class TestRetryAfterHeader:
    def test_respects_retry_after_header(self, mocker):
        """Should sleep for the duration specified in Retry-After header."""
        response = MagicMock(status_code=429)
        response.headers = {"Retry-After": "5"}
        
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=response,
            body={}
        )
        mock_func = mocker.MagicMock(side_effect=[rate_limit, "success"])
        
        sleep_calls = []
        with patch("src.retry_client.time.sleep") as mock_sleep:
            mock_sleep.side_effect = lambda d: sleep_calls.append(d)
            retry_with_retry_after_header(mock_func, base_delay=1.0)
        
        # Should have used the header value (5s), not exponential backoff (1s)
        assert sleep_calls[0] == 5.0

    def test_falls_back_to_exponential_if_no_header(self, mocker):
        """Without Retry-After header, falls back to exponential backoff."""
        response = MagicMock(status_code=429)
        response.headers = {}
        
        rate_limit = openai.RateLimitError(
            message="Rate limit",
            response=response,
            body={}
        )
        mock_func = mocker.MagicMock(side_effect=[rate_limit, "success"])
        
        sleep_calls = []
        with patch("src.retry_client.time.sleep") as mock_sleep:
            mock_sleep.side_effect = lambda d: sleep_calls.append(d)
            retry_with_retry_after_header(mock_func, base_delay=2.0)
        
        # Should use exponential: 2.0 * 2^0 = 2.0
        assert sleep_calls[0] == 2.0

Integration Test: Real Rate Limiting

If you want to test real rate limiting behavior (useful for validating production behavior, not for CI):

# tests/test_rate_limit_integration.py
import pytest
import asyncio
import openai
import os

pytestmark = pytest.mark.skipif(
    os.environ.get("RUN_RATE_LIMIT_TEST") != "true",
    reason="Only run manually: RUN_RATE_LIMIT_TEST=true"
)

@pytest.mark.asyncio
async def test_burst_requests_trigger_rate_limit():
    """Deliberately hit rate limits to verify retry behavior."""
    client = openai.AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
    
    # Fire many requests simultaneously to trigger rate limits
    tasks = [
        client.chat.completions.create(
            model="gpt-4o-mini",
            max_tokens=10,
            messages=[{"role": "user", "content": f"Say: test {i}"}]
        )
        for i in range(50)  # 50 concurrent requests
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    rate_limited = [r for r in results if isinstance(r, openai.RateLimitError)]
    succeeded = [r for r in results if not isinstance(r, Exception)]
    
    print(f"Succeeded: {len(succeeded)}/50")
    print(f"Rate limited: {len(rate_limited)}/50")
    
    # At least some should succeed
    assert len(succeeded) > 0

Testing with Real Rate Limits Using HelpMeTest

For end-to-end validation that your LLM-powered application handles rate limits gracefully from the user's perspective, you need to test at the application level.

HelpMeTest can run continuous monitoring tests that check your app's behavior under load — verifying that users see appropriate loading states, not raw error messages, when rate limits kick in:

*** Test Cases ***
App Shows Loading State During AI Processing
    Go To    https://your-app.com/ai-feature
    Submit Prompt    Generate a long analysis
    Element Should Be Visible    .loading-spinner
    Wait Until Element Is Visible    .result    timeout=60s
    Element Should Not Be Visible    .error-message

App Handles AI Timeout Gracefully
    Go To    https://your-app.com/ai-feature  
    Submit Prompt    Generate analysis
    # Simulate slow response
    Wait    35 seconds
    Element Should Contain    .status    Please wait or try again
    Element Should Not Contain    .status    rate limit

Key Takeaways

  • Test every error type — rate limits, quota exhaustion, auth errors, and server errors have different retry semantics
  • Mock time.sleep in retry tests — never actually sleep in unit tests
  • Verify retry count explicitly — mock_func.call_count tells you exactly how many attempts were made
  • Test non-retryable errors separately — quota exhaustion and auth failures must NOT be retried
  • Test concurrent retry behavior — async applications need asyncio.sleep, not time.sleep
  • Test the Retry-After header — some APIs tell you exactly how long to wait
  • Add end-to-end tests to verify users see graceful degradation, not raw error messages

Read more