Stress Testing REST APIs with Locust: A Practical Guide

Stress Testing REST APIs with Locust: A Practical Guide

Locust is a Python-based load testing tool with a simple, expressive API. Unlike k6 (JavaScript) or Gatling (Scala/Java), Locust lets you write tests in plain Python — making it especially accessible for backend developers and data engineers already working in the language.

This guide focuses on stress testing REST APIs with Locust: realistic user flows, ramp-up strategies, distributed testing, and actionable result interpretation.

Why Locust for API Stress Testing?

Locust's advantages:

  • Python everywhere: use requests, authentication libraries, data factories — any Python package
  • Realistic user modeling: define user classes with weighted tasks and think time
  • Built-in web UI: real-time monitoring without external dashboards
  • Distributed mode: scale to millions of users across multiple workers
  • Custom failure conditions: define what "failure" means in application terms, not just HTTP codes

Installation

pip install locust

Locust requires Python 3.7+.

Basic REST API Stress Test

from locust import HttpUser, task, between

class APIUser(HttpUser):
    wait_time = between(1, 3)  # simulated think time between tasks
    
    @task
    def get_items(self):
        self.client.get("/api/items")
    
    @task(3)
    def get_item_detail(self):
        # 3x more likely to run than get_items
        self.client.get("/api/items/1")
    
    @task
    def create_item(self):
        self.client.post("/api/items", json={
            "name": "Test Item",
            "price": 9.99
        })

Run it:

locust -f locustfile.py --host https://api.example.com

Open the web UI at http://localhost:8089, set user count and spawn rate, and start the test.

Modeling Realistic API Flows

Real users don't just hit one endpoint. Model complete user journeys:

from locust import HttpUser, task, between, SequentialTaskSet

class CheckoutFlow(SequentialTaskSet):
    
    def on_start(self):
        # Authenticate once per user
        resp = self.client.post("/auth/login", json={
            "email": "test@example.com",
            "password": "password123"
        })
        self.token = resp.json()["token"]
        self.headers = {"Authorization": f"Bearer {self.token}"}
    
    @task
    def browse_products(self):
        self.client.get("/api/products", headers=self.headers)
    
    @task
    def view_product(self):
        self.client.get("/api/products/42", headers=self.headers)
    
    @task
    def add_to_cart(self):
        self.client.post("/api/cart/items", 
            json={"product_id": 42, "quantity": 1},
            headers=self.headers
        )
    
    @task
    def checkout(self):
        resp = self.client.post("/api/orders", 
            json={"payment_method": "test_card"},
            headers=self.headers
        )
        if resp.status_code == 201:
            order_id = resp.json()["id"]
            self.client.get(f"/api/orders/{order_id}", headers=self.headers)


class ShoppingUser(HttpUser):
    tasks = [CheckoutFlow]
    wait_time = between(1, 5)

SequentialTaskSet runs tasks in order, modeling a realistic user flow rather than random endpoint hammering.

Parameterized Requests

Realistic tests use varying data, not the same request repeated:

import random
from faker import Faker  # pip install faker

fake = Faker()

class APIUser(HttpUser):
    wait_time = between(0.5, 2)
    
    def on_start(self):
        self.user_ids = list(range(1, 10001))
    
    @task
    def get_user_profile(self):
        user_id = random.choice(self.user_ids)
        self.client.get(f"/api/users/{user_id}")
    
    @task
    def search_products(self):
        query = fake.word()
        self.client.get("/api/products/search", params={"q": query})
    
    @task
    def create_review(self):
        self.client.post("/api/reviews", json={
            "product_id": random.randint(1, 500),
            "rating": random.randint(1, 5),
            "comment": fake.sentence()
        })

Random user IDs ensure cache behavior reflects reality. Fixed IDs would make cache hit rates unrealistically high.

Stress Test Configuration

For stress testing specifically, you want to ramp up aggressively and find the breaking point:

Headless Mode with Ramp-Up

# Ramp from 0 to 500 users over 10 minutes, then hold for 20 minutes
locust -f locustfile.py \
  --host https://api.example.com \
  --headless \
  --<span class="hljs-built_in">users 500 \
  --spawn-rate 0.83 \
  --run-time 30m \
  --csv results

--spawn-rate 0.83 adds ~50 users per minute (500 users / 10 minutes = 0.83/second).

Custom Load Shape

For more complex ramp patterns, override the load shape:

from locust import LoadTestShape

class StressTestShape(LoadTestShape):
    """
    Ramp up in stages, finding the breaking point.
    """
    stages = [
        {"duration": 120, "users": 100, "spawn_rate": 10},
        {"duration": 240, "users": 300, "spawn_rate": 10},
        {"duration": 360, "users": 600, "spawn_rate": 10},
        {"duration": 480, "users": 1000, "spawn_rate": 20},
        {"duration": 600, "users": 1500, "spawn_rate": 20},
        {"duration": 660, "users": 0, "spawn_rate": 100},  # cooldown
    ]

    def tick(self):
        run_time = self.get_run_time()
        for stage in self.stages:
            if run_time < stage["duration"]:
                return stage["users"], stage["spawn_rate"]
        return None  # stop test

Add this class to your locustfile — Locust picks it up automatically.

Custom Failure Conditions

HTTP 200 doesn't mean success. Define application-level failure conditions:

from locust import HttpUser, task, between, events

class APIUser(HttpUser):
    wait_time = between(1, 2)
    
    @task
    def search(self):
        with self.client.get(
            "/api/search",
            params={"q": "test"},
            catch_response=True
        ) as response:
            if response.status_code == 200:
                data = response.json()
                if "results" not in data:
                    response.failure("Response missing 'results' key")
                elif len(data["results"]) == 0:
                    # Empty results might be an app bug, not a search miss
                    response.failure("Empty results for guaranteed query")
                else:
                    response.success()
            elif response.status_code == 429:
                response.failure("Rate limited")
            else:
                response.failure(f"Unexpected status: {response.status_code}")

catch_response=True disables automatic success/failure detection, letting you define it explicitly.

Distributed Stress Testing

For high load (>10,000 concurrent users), run Locust in distributed mode:

# Master node
locust -f locustfile.py --master --host https://api.example.com

<span class="hljs-comment"># Worker nodes (run on separate machines)
locust -f locustfile.py --worker --master-host=<master-ip>
locust -f locustfile.py --worker --master-host=<master-ip>
locust -f locustfile.py --worker --master-host=<master-ip>

Each worker adds its VU capacity. The master aggregates metrics and controls the test.

For cloud-based distributed testing:

# Launch 10 workers via Docker Compose
version: <span class="hljs-string">'3'
services:
  master:
    image: locustio/locust
    ports: [<span class="hljs-string">"8089:8089"]
    volumes: [<span class="hljs-string">".:/mnt/locust"]
    <span class="hljs-built_in">command: -f /mnt/locust/locustfile.py --master
    
  worker:
    image: locustio/locust
    volumes: [<span class="hljs-string">".:/mnt/locust"]
    <span class="hljs-built_in">command: -f /mnt/locust/locustfile.py --worker --master-host master
    deploy:
      replicas: 10

Reading Locust Results

Locust's CSV output contains per-endpoint statistics. Key columns:

  • 50%ile (ms): median response time
  • 95%ile (ms): 95th percentile — your primary SLA metric
  • 99%ile (ms): tail latency
  • Failure count: total failures
  • Requests/s: throughput at time of measurement

Look for the inflection point where:

  1. Requests/s stops increasing despite more users
  2. 95%ile latency starts climbing steeply
  3. Failure count appears

This is your breaking point.

CI Integration

locust -f locustfile.py \
  --host https://api.staging.example.com \
  --headless \
  --users 200 \
  --spawn-rate 10 \
  --run-time 5m \
  --csv results \
  --exit-code-on-error 1

Parse results in CI:

import csv
import sys

with open('results_stats.csv') as f:
    reader = csv.DictReader(f)
    for row in reader:
        if row['Name'] == 'Aggregated':
            p95 = float(row['95%'])
            failure_pct = float(row['Failure Count']) / float(row['Request Count']) * 100
            
            if p95 > 1000:
                print(f"FAIL: p95 {p95}ms exceeds 1000ms threshold")
                sys.exit(1)
            if failure_pct > 1:
                print(f"FAIL: {failure_pct:.1f}% failure rate exceeds 1% threshold")
                sys.exit(1)
            
            print(f"PASS: p95={p95}ms, failure_rate={failure_pct:.2f}%")

Conclusion

Locust's Python-native API makes it easy to write realistic API stress tests that model real user behavior. The combination of SequentialTaskSet for flow modeling, custom load shapes for ramp-up patterns, and catch_response for application-level failure detection gives you precise control over stress testing scenarios.

Once you've identified API limits through stress testing, pair with continuous functional monitoring. HelpMeTest monitors your API endpoints 24/7, catching functional regressions before they compound with performance issues in production.

Read more