Saga Pattern Testing: Distributed Transactions and Chaos Injection in Orchestration
The saga pattern solves distributed transaction consistency without two-phase commit. Instead of atomic transactions across services, sagas use a sequence of local transactions where each step has a corresponding compensation action. When step 4 of 6 fails, steps 1–3 are undone through their compensating transactions.
Sagas are correct in theory and notoriously buggy in practice. The compensation logic is tested far less than the happy path, and the failure modes are subtle: partial compensation, double compensation, compensation failures, and compensation ordering bugs. This guide covers how to test sagas systematically.
The Saga Testing Challenge
Standard integration tests exercise the happy path. Saga testing requires deliberately injecting failures at every step and verifying that compensation runs correctly.
For a 6-step saga, that means testing:
- Failure at step 1 → verify cleanup (minimal, possibly nothing)
- Failure at step 2 → verify step 1 is compensated
- Failure at step 3 → verify steps 1 and 2 are compensated
- ...and so on
- Failure during compensation → verify system remains consistent
That's at minimum N + N compensating tests for an N-step saga, plus idempotency tests for each step.
Example: E-Commerce Order Saga
# saga/order_saga.py
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class SagaStep(Enum):
RESERVE_INVENTORY = "reserve_inventory"
PROCESS_PAYMENT = "process_payment"
SHIP_ORDER = "ship_order"
SEND_CONFIRMATION = "send_confirmation"
@dataclass
class SagaContext:
order_id: str
customer_id: str
items: list[str]
amount: float
reservation_id: Optional[str] = None
transaction_id: Optional[str] = None
shipment_id: Optional[str] = None
completed_steps: list[SagaStep] = None
def __post_init__(self):
if self.completed_steps is None:
self.completed_steps = []
class OrderSaga:
def __init__(self, inventory_service, payment_service, shipping_service, notification_service):
self.inventory = inventory_service
self.payment = payment_service
self.shipping = shipping_service
self.notifications = notification_service
def execute(self, ctx: SagaContext) -> SagaContext:
"""Execute the order saga with automatic compensation on failure."""
try:
ctx = self._reserve_inventory(ctx)
ctx = self._process_payment(ctx)
ctx = self._ship_order(ctx)
ctx = self._send_confirmation(ctx)
return ctx
except Exception as e:
self._compensate(ctx, e)
raise
def _reserve_inventory(self, ctx: SagaContext) -> SagaContext:
reservation = self.inventory.reserve(ctx.order_id, ctx.items)
ctx.reservation_id = reservation.id
ctx.completed_steps.append(SagaStep.RESERVE_INVENTORY)
return ctx
def _process_payment(self, ctx: SagaContext) -> SagaContext:
txn = self.payment.charge(ctx.customer_id, ctx.amount)
ctx.transaction_id = txn.id
ctx.completed_steps.append(SagaStep.PROCESS_PAYMENT)
return ctx
def _ship_order(self, ctx: SagaContext) -> SagaContext:
shipment = self.shipping.create_shipment(ctx.order_id, ctx.items)
ctx.shipment_id = shipment.id
ctx.completed_steps.append(SagaStep.SHIP_ORDER)
return ctx
def _send_confirmation(self, ctx: SagaContext) -> SagaContext:
self.notifications.send_order_confirmation(ctx.customer_id, ctx.order_id)
ctx.completed_steps.append(SagaStep.SEND_CONFIRMATION)
return ctx
def _compensate(self, ctx: SagaContext, failure: Exception):
"""Execute compensating transactions in reverse order."""
if SagaStep.SHIP_ORDER in ctx.completed_steps:
try:
self.shipping.cancel_shipment(ctx.shipment_id)
except Exception as comp_error:
# Log compensation failure — this is a critical alert
# Do not raise; continue compensating other steps
print(f"COMPENSATION FAILED: cancel_shipment({ctx.shipment_id}): {comp_error}")
if SagaStep.PROCESS_PAYMENT in ctx.completed_steps:
try:
self.payment.refund(ctx.transaction_id, ctx.amount)
except Exception as comp_error:
print(f"COMPENSATION FAILED: refund({ctx.transaction_id}): {comp_error}")
if SagaStep.RESERVE_INVENTORY in ctx.completed_steps:
try:
self.inventory.release(ctx.reservation_id)
except Exception as comp_error:
print(f"COMPENSATION FAILED: release({ctx.reservation_id}): {comp_error}")Testing Each Failure Point
# tests/test_order_saga.py
import pytest
from unittest.mock import MagicMock, call, patch
from saga.order_saga import OrderSaga, SagaContext, SagaStep
@pytest.fixture
def services():
return {
"inventory": MagicMock(),
"payment": MagicMock(),
"shipping": MagicMock(),
"notifications": MagicMock()
}
@pytest.fixture
def saga(services):
return OrderSaga(
inventory_service=services["inventory"],
payment_service=services["payment"],
shipping_service=services["shipping"],
notification_service=services["notifications"]
)
@pytest.fixture
def base_context():
return SagaContext(
order_id="order-test-001",
customer_id="customer-test-001",
items=["widget-a", "widget-b"],
amount=149.99
)
class TestOrderSagaHappyPath:
def test_all_steps_complete_in_order(self, saga, services, base_context):
"""Happy path: all steps complete, all services called once."""
services["inventory"].reserve.return_value = MagicMock(id="res-001")
services["payment"].charge.return_value = MagicMock(id="txn-001")
services["shipping"].create_shipment.return_value = MagicMock(id="ship-001")
result = saga.execute(base_context)
assert len(result.completed_steps) == 4
assert result.reservation_id == "res-001"
assert result.transaction_id == "txn-001"
assert result.shipment_id == "ship-001"
# Each service called exactly once
services["inventory"].reserve.assert_called_once()
services["payment"].charge.assert_called_once()
services["shipping"].create_shipment.assert_called_once()
services["notifications"].send_order_confirmation.assert_called_once()
class TestOrderSagaCompensation:
def test_payment_failure_releases_inventory(self, saga, services, base_context):
"""Payment failure must trigger inventory release compensation."""
services["inventory"].reserve.return_value = MagicMock(id="res-001")
services["payment"].charge.side_effect = Exception("Card declined")
with pytest.raises(Exception, match="Card declined"):
saga.execute(base_context)
# Inventory must be released
services["inventory"].release.assert_called_once_with("res-001")
# Shipping must NOT be called (failed before that step)
services["shipping"].create_shipment.assert_not_called()
services["shipping"].cancel_shipment.assert_not_called()
def test_shipping_failure_refunds_payment_and_releases_inventory(
self, saga, services, base_context
):
"""Shipping failure must trigger payment refund AND inventory release."""
services["inventory"].reserve.return_value = MagicMock(id="res-001")
services["payment"].charge.return_value = MagicMock(id="txn-001")
services["shipping"].create_shipment.side_effect = Exception("Carrier unavailable")
with pytest.raises(Exception, match="Carrier unavailable"):
saga.execute(base_context)
# Both previous steps must be compensated
services["payment"].refund.assert_called_once_with("txn-001", 149.99)
services["inventory"].release.assert_called_once_with("res-001")
def test_compensation_runs_in_reverse_order(self, saga, services, base_context):
"""Compensations must run in reverse order of the original steps."""
services["inventory"].reserve.return_value = MagicMock(id="res-001")
services["payment"].charge.return_value = MagicMock(id="txn-001")
services["shipping"].create_shipment.side_effect = Exception("Carrier error")
compensation_order = []
services["payment"].refund.side_effect = lambda *a: compensation_order.append("refund")
services["inventory"].release.side_effect = lambda *a: compensation_order.append("release")
with pytest.raises(Exception):
saga.execute(base_context)
assert compensation_order == ["refund", "release"], (
f"Expected compensation in reverse order [refund, release], "
f"got {compensation_order}"
)
def test_inventory_failure_has_no_compensation(self, saga, services, base_context):
"""First step failure requires no compensation — nothing has been done yet."""
services["inventory"].reserve.side_effect = Exception("Out of stock")
with pytest.raises(Exception, match="Out of stock"):
saga.execute(base_context)
# Nothing should be compensated
services["payment"].refund.assert_not_called()
services["inventory"].release.assert_not_called()
services["shipping"].cancel_shipment.assert_not_called()
class TestCompensationFailures:
def test_compensation_failure_does_not_prevent_other_compensations(
self, saga, services, base_context
):
"""If one compensation fails, remaining compensations still run."""
services["inventory"].reserve.return_value = MagicMock(id="res-001")
services["payment"].charge.return_value = MagicMock(id="txn-001")
services["shipping"].create_shipment.side_effect = Exception("Carrier error")
# Payment refund fails (gateway down during compensation)
services["payment"].refund.side_effect = Exception("Refund gateway timeout")
# The original exception should propagate, not the compensation exception
with pytest.raises(Exception, match="Carrier error"):
saga.execute(base_context)
# Inventory release must still be attempted even though refund failed
services["inventory"].release.assert_called_once_with("res-001")Idempotency Testing
Each saga step must be idempotent — safe to call multiple times with the same input:
class TestIdempotency:
def test_payment_charge_is_idempotent(self):
"""Charging the same order twice must not double-charge the customer."""
from myapp.payments import PaymentGateway
from myapp.payments import IdempotencyKeyStore
gateway = PaymentGateway()
idempotency_key = "order-test-001-charge"
with patch.object(gateway.client, "charge") as mock_charge:
mock_charge.return_value = MagicMock(id="txn-001", status="success")
result1 = gateway.charge(
amount=99.99,
idempotency_key=idempotency_key
)
# Simulate retry — same idempotency key
mock_charge.reset_mock()
mock_charge.return_value = MagicMock(id="txn-001", status="success")
result2 = gateway.charge(
amount=99.99,
idempotency_key=idempotency_key
)
# Both return the same transaction ID (idempotent)
assert result1.id == result2.id
def test_inventory_release_is_idempotent(self):
"""Releasing the same reservation twice must not cause errors."""
from myapp.inventory import InventoryService
service = InventoryService()
with patch.object(service, "_release_db_reservation") as mock_release:
# First call succeeds
mock_release.return_value = True
result1 = service.release("res-001")
# Second call: reservation already released
mock_release.return_value = False # Already released
result2 = service.release("res-001")
# Both must succeed without exception
assert result1 is not None
assert result2 is not NoneChaos Injection
Test saga resilience with systematic chaos:
# tests/chaos/test_saga_chaos.py
import pytest
import random
from unittest.mock import MagicMock
from saga.order_saga import OrderSaga, SagaContext
class ChaoticService:
"""A service wrapper that randomly injects failures."""
def __init__(self, real_service, failure_rate: float = 0.3):
self._service = real_service
self.failure_rate = failure_rate
self.call_log = []
def __getattr__(self, name):
attr = getattr(self._service, name)
if callable(attr):
def chaotic_call(*args, **kwargs):
self.call_log.append({"method": name, "args": args})
if random.random() < self.failure_rate:
raise ConnectionError(f"Chaos: {name} randomly failed")
return attr(*args, **kwargs)
return chaotic_call
return attr
@pytest.mark.parametrize("seed", range(10))
def test_saga_eventual_consistency_under_chaos(seed, services, base_context):
"""
Under random failures, the saga must always leave the system in a consistent state.
Either all steps complete, or all completed steps are compensated.
Run multiple times with different seeds to exercise different failure combinations.
"""
random.seed(seed)
# Wrap services with chaos
chaotic_inventory = ChaoticService(services["inventory"], failure_rate=0.3)
chaotic_payment = ChaoticService(services["payment"], failure_rate=0.3)
chaotic_shipping = ChaoticService(services["shipping"], failure_rate=0.3)
services["inventory"].reserve.return_value = MagicMock(id=f"res-{seed}")
services["payment"].charge.return_value = MagicMock(id=f"txn-{seed}")
services["shipping"].create_shipment.return_value = MagicMock(id=f"ship-{seed}")
saga = OrderSaga(
inventory_service=services["inventory"],
payment_service=services["payment"],
shipping_service=services["shipping"],
notification_service=services["notifications"]
)
try:
result = saga.execute(base_context)
# Success case: all 4 steps completed
assert len(result.completed_steps) == 4
except Exception:
# Failure case: verify compensations ran for completed steps
# Number of compensations should equal number of completed steps
completed = len(base_context.completed_steps)
compensation_calls = (
services["inventory"].release.call_count +
services["payment"].refund.call_count +
services["shipping"].cancel_shipment.call_count
)
assert compensation_calls >= completed - 1, (
f"Seed {seed}: Completed {completed} steps but only {compensation_calls} "
f"compensations ran. This indicates a missing compensation."
)CI Pipeline
# .github/workflows/saga-tests.yml
name: Saga Pattern Tests
on: [push, pull_request]
jobs:
saga-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install pytest
- name: Unit tests
run: pytest tests/test_order_saga.py -v
- name: Idempotency tests
run: pytest tests/test_idempotency.py -v
- name: Chaos tests
run: pytest tests/chaos/ -v --count=5 # Run each parameterized case 5xMonitoring Distributed Transactions in Production
Saga compensation failures are silent by default. A failed refund or inventory release may not surface until a customer complains. HelpMeTest lets you schedule monitoring tests that query your saga state store and alert when compensation tasks are stuck in a failed state — turning silent consistency violations into actionable alerts.
Conclusion
Testing the saga pattern requires a mindset shift from happy-path testing to systematic failure injection. For each step in your saga, write an explicit test that fails at that step and verifies all preceding steps are correctly compensated. Test compensation failures independently — they're the hardest failure mode to recover from. Add idempotency tests for every step that might be retried. Run chaos tests with multiple random seeds to catch compensation ordering bugs that deterministic tests miss. The saga pattern is correct in theory; systematic testing is what makes it correct in practice.