How to Test OAuth2 Flows: Authorization Code, PKCE, and Mock Servers
OAuth2 testing requires either a real OAuth provider in test mode or a mock OAuth server for unit and integration tests. The authorization code flow involves browser redirects that are hard to automate — mock servers eliminate the browser redirect while still testing your token exchange and validation logic. This guide covers testing OAuth2 flows with oauth2-mock-server, testing PKCE verification, and handling token refresh.
Key Takeaways
Use oauth2-mock-server for integration tests. oauth2-mock-server is a lightweight Node.js server that implements the OAuth2 authorization code flow. It gives your integration tests a real OAuth server without depending on any external service.
Test the authorization code flow at the redirect URI level. You don't need to test the login UI to test OAuth2. Start your test after the redirect — simulate receiving the authorization code at your callback URL and verify your token exchange logic.
PKCE verification is a security requirement, not just a flow detail. Test that your server rejects token exchange requests with incorrect code verifiers. If the code_verifier doesn't match the code_challenge, the exchange should fail.
Test token refresh before access tokens expire. Proactive token refresh (before expiry) avoids the user experiencing a 401. Verify your refresh logic runs within the last 10% of the token's lifetime.
Test state parameter validation. The state parameter prevents CSRF attacks in OAuth2. Test that your callback handler rejects responses with a state value it didn't issue.
OAuth2 Authorization Code Flow Overview
The authorization code flow has these steps:
- Authorization request — redirect user to OAuth provider with
response_type=code,client_id,redirect_uri,scope,state - User authenticates — at the provider, outside your app
- Authorization code — provider redirects to your
redirect_uriwithcodeandstate - Token exchange — your server sends
code+client_secretto provider's token endpoint - Tokens — provider returns
access_token,id_token,refresh_token - API calls — use
access_tokenas Bearer token for API requests
Your code owns steps 1, 3, 4, 5, and 6. Step 2 is the provider's responsibility. This means you need to test your authorization request construction, callback handling, and token exchange — not the provider's login UI.
Mock OAuth Server Setup
oauth2-mock-server runs a full OAuth2 server in tests:
npm install --save-dev @axa-group/oauth2-mock-serverStart mock server in Jest
// test/setup.js
const { OAuth2Server } = require('@axa-group/oauth2-mock-server');
let oauthServer;
beforeAll(async () => {
oauthServer = new OAuth2Server();
await oauthServer.issuer.keys.generate('RS256');
await oauthServer.start(8080, 'localhost');
// Configure OAuth client credentials
process.env.OAUTH_ISSUER = `http://localhost:8080`;
process.env.OAUTH_CLIENT_ID = 'test-client';
process.env.OAUTH_CLIENT_SECRET = 'test-secret';
});
afterAll(async () => {
await oauthServer.stop();
});Python: oauth2-proxy or a simple Flask mock
For Python tests, you can run a minimal Flask OAuth server:
# tests/mock_oauth_server.py
from flask import Flask, request, jsonify, redirect
import threading
import jwt
import time
app = Flask(__name__)
MOCK_PORT = 9090
MOCK_CODE = "test_auth_code_12345"
MOCK_ACCESS_TOKEN = "mock_access_token"
ISSUED_TOKENS = {}
@app.route("/oauth/authorize")
def authorize():
state = request.args.get("state")
redirect_uri = request.args.get("redirect_uri")
return redirect(f"{redirect_uri}?code={MOCK_CODE}&state={state}")
@app.route("/oauth/token", methods=["POST"])
def token():
code = request.form.get("code")
if code != MOCK_CODE:
return jsonify({"error": "invalid_grant"}), 400
access_token = jwt.encode(
{
"sub": "test-user-123",
"iss": f"http://localhost:{MOCK_PORT}",
"aud": "test-api",
"iat": int(time.time()),
"exp": int(time.time()) + 3600,
"scope": "openid profile email",
},
"test-secret",
algorithm="HS256",
)
return jsonify({
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "mock_refresh_token",
})
@app.route("/oauth/userinfo")
def userinfo():
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return jsonify({"error": "unauthorized"}), 401
return jsonify({
"sub": "test-user-123",
"email": "test@example.com",
"name": "Test User",
})
@app.route("/.well-known/openid-configuration")
def openid_config():
base = f"http://localhost:{MOCK_PORT}"
return jsonify({
"issuer": base,
"authorization_endpoint": f"{base}/oauth/authorize",
"token_endpoint": f"{base}/oauth/token",
"userinfo_endpoint": f"{base}/oauth/userinfo",
"jwks_uri": f"{base}/.well-known/jwks.json",
})
@pytest.fixture(scope="session")
def mock_oauth_server():
server = threading.Thread(target=lambda: app.run(port=MOCK_PORT), daemon=True)
server.start()
import time as t
t.sleep(0.5) # Wait for server to start
yield f"http://localhost:{MOCK_PORT}"Testing the Authorization Request
Test that your app constructs the correct authorization URL:
def test_authorization_url_construction(client):
response = client.get("/auth/login", follow_redirects=False)
assert response.status_code == 302
location = response.headers["Location"]
# Parse the redirect URL
from urllib.parse import urlparse, parse_qs
parsed = urlparse(location)
params = parse_qs(parsed.query)
# Verify required OAuth2 parameters
assert parsed.netloc in ("login.microsoftonline.com", "accounts.google.com", "localhost:9090")
assert params["response_type"] == ["code"]
assert params["client_id"] == [os.environ["OAUTH_CLIENT_ID"]]
assert "redirect_uri" in params
assert "state" in params
assert "scope" in params
# Store state for callback test
return params["state"][0]Testing the Callback Handler
def test_callback_with_valid_code(client, mock_oauth_server):
# First, initiate login to get the state
login_response = client.get("/auth/login", follow_redirects=False)
state = parse_qs(urlparse(login_response.headers["Location"]).query)["state"][0]
# Simulate OAuth provider redirecting back to your callback
response = client.get(f"/auth/callback?code={MOCK_CODE}&state={state}")
assert response.status_code == 302
assert response.headers["Location"] == "/" # or your post-login redirect
# Verify session was created
with client.session_transaction() as sess:
assert "access_token" in sess
assert "user_id" in sess
def test_callback_with_wrong_state_rejected(client):
"""CSRF protection: wrong state should be rejected."""
response = client.get("/auth/callback?code=valid_code&state=tampered_state")
assert response.status_code in (400, 403)
def test_callback_with_error_from_provider(client):
"""Provider can return errors — handle them gracefully."""
response = client.get("/auth/callback?error=access_denied&error_description=User+denied+access")
assert response.status_code == 302
# Should redirect to error page or home with error message
location = response.headers["Location"]
assert "error" in location or location == "/login?error=access_denied"Testing PKCE (Proof Key for Code Exchange)
PKCE is required for public clients (SPAs, mobile apps) and recommended for all OAuth2 flows in OAuth 2.1. Test that your server correctly verifies the code_verifier:
import hashlib
import base64
import secrets
def generate_pkce():
"""Generate PKCE code verifier and challenge."""
verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b'=').decode()
return verifier, challenge
def test_pkce_code_exchange_with_correct_verifier(client):
verifier, challenge = generate_pkce()
# Authorization request with code_challenge
login_response = client.get(
"/auth/login",
query_string={"code_challenge": challenge, "code_challenge_method": "S256"},
follow_redirects=False,
)
state = extract_state(login_response)
# Token exchange with correct verifier
response = client.get(
f"/auth/callback?code={MOCK_CODE}&state={state}",
# code_verifier sent in token exchange request to the server
)
assert response.status_code == 302 # Successful login
def test_pkce_token_exchange_fails_with_wrong_verifier():
"""Server MUST reject token exchange if code_verifier doesn't match challenge."""
verifier, challenge = generate_pkce()
wrong_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
# This tests your OAuth server's PKCE validation
# If using Auth0/Cognito/Keycloak, this is validated server-side automatically
response = requests.post(
TOKEN_ENDPOINT,
data={
"grant_type": "authorization_code",
"code": MOCK_CODE,
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"code_verifier": wrong_verifier, # Wrong!
}
)
assert response.status_code == 400
assert response.json()["error"] == "invalid_grant"Testing Token Refresh
def test_token_refresh_with_valid_refresh_token(client):
"""App should transparently refresh the access token."""
# Login and get tokens
login_response = simulate_oauth_login(client)
# Fast-forward time past access token expiry
with freeze_time(datetime.now() + timedelta(hours=2)):
# Make an API request — should trigger token refresh
response = client.get("/api/profile")
assert response.status_code == 200
# Verify new access token was obtained
with client.session_transaction() as sess:
assert sess.get("access_token_refreshed_at") > time.time() - 60
def test_expired_refresh_token_redirects_to_login(client):
"""When refresh token expires, user should be redirected to login."""
# Inject an expired refresh token
with client.session_transaction() as sess:
sess["access_token"] = "expired_access_token"
sess["refresh_token"] = "expired_refresh_token"
response = client.get("/api/profile", follow_redirects=False)
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]Testing Scopes
def test_insufficient_scope_returns_403(client, auth_token):
"""Tokens with read scope should not access write endpoints."""
# Token with only 'read' scope
read_token = get_token_with_scope("read")
response = client.post(
"/api/items",
headers={"Authorization": f"Bearer {read_token}"},
json={"name": "New Item"},
)
assert response.status_code == 403
def test_write_scope_allows_creation(client):
"""Tokens with write scope should access write endpoints."""
write_token = get_token_with_scope("read write")
response = client.post(
"/api/items",
headers={"Authorization": f"Bearer {write_token}"},
json={"name": "New Item"},
)
assert response.status_code == 201CI Setup
Run OAuth2 integration tests against the mock server in CI:
# .github/workflows/oauth-tests.yml
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start mock OAuth server
run: node tests/mock-oauth-server.js &
- name: Wait for mock server
run: sleep 1
- name: Run OAuth tests
run: pytest tests/test_oauth/ -v
env:
OAUTH_ISSUER: http://localhost:9090
OAUTH_CLIENT_ID: test-clientSummary
OAuth2 testing focuses on your code — the authorization request, state validation, token exchange, and refresh logic. Mock OAuth servers eliminate the external dependency while giving you a real HTTP OAuth flow to test against. Test PKCE verification, state validation, and scope enforcement explicitly — these are the security properties that matter, and they're often undertested.