End-to-End Testing of Multi-Agent Pipelines: Deterministic Replay, Agent Mocks, and Pipeline Assertions
A multi-agent pipeline is a chain of AI agents where each agent's output becomes the next agent's input. Agent A researches. Agent B synthesizes. Agent C writes. Agent D edits.
When the final output is wrong, you have a debugging problem: which agent introduced the failure? Was it wrong data from the researcher, a bad synthesis step, or a writing style mismatch? Without a testing strategy, you debug by reading long LLM output and guessing.
End-to-end testing of multi-agent pipelines requires three techniques: deterministic replay to make LLM behavior predictable, agent mock injection to isolate failures, and pipeline output assertions to validate correctness at each stage. Here's how to apply all three.
The Non-Determinism Problem
The fundamental challenge in testing AI pipelines is that LLMs are non-deterministic. The same input produces different outputs on different runs. This makes traditional test assertions — "assert output equals X" — impossible.
The solution isn't to accept flakiness. It's to make your tests deterministic by controlling what the LLM returns.
Three strategies:
- Mock the LLM — replace it with a deterministic function that returns predefined outputs
- Record and replay — capture real LLM outputs in a recording, replay them in tests
- Semantic assertions — instead of exact matching, assert on properties that should be stable
Most pipelines need all three, applied at different levels.
Setting Up Deterministic Agent Mocks
The cleanest approach is replacing LLM calls with mock functions that return scripted outputs.
from unittest.mock import MagicMock, patch
from typing import Callable
class DeterministicLLM:
"""An LLM mock that returns scripted responses based on input patterns."""
def __init__(self, response_map: dict[str, str], default: str = ""):
self.response_map = response_map
self.default = default
self.call_log = []
def invoke(self, messages):
from langchain_core.messages import AIMessage
# Get the last user message content
last_message = messages[-1] if isinstance(messages, list) else messages
content = getattr(last_message, "content", str(last_message))
self.call_log.append(content)
# Find matching response
for pattern, response in self.response_map.items():
if pattern.lower() in content.lower():
return AIMessage(content=response)
return AIMessage(content=self.default)
def __call__(self, *args, **kwargs):
return self.invoke(args[0] if args else kwargs.get("messages", []))
# Create a deterministic pipeline for testing
def create_test_pipeline():
researcher_llm = DeterministicLLM(
response_map={
"research": "Found 3 key findings:\n1. Finding A\n2. Finding B\n3. Finding C",
"search": "Search results: [result1, result2, result3]"
},
default="No information found for this query"
)
writer_llm = DeterministicLLM(
response_map={
"write": "# Title\n\nIntroduction paragraph.\n\n## Section 1\nContent based on findings.",
"draft": "Draft content based on research findings"
},
default="Draft content"
)
return researcher_llm, writer_llmTesting Pipeline Stages in Isolation
Before testing the full pipeline, test each stage independently. This isolates failures to a single agent.
from your_pipeline import ResearchStage, WritingStage, EditingStage
class TestPipelineStages:
def test_research_stage_output_format(self):
researcher_llm, _ = create_test_pipeline()
with patch("your_pipeline.research.llm", researcher_llm):
stage = ResearchStage()
output = stage.run(topic="AI testing frameworks 2026")
# Research output should be structured
assert output.findings is not None
assert len(output.findings) >= 1
assert output.sources is not None
def test_writing_stage_uses_research_context(self):
_, writer_llm = create_test_pipeline()
research_output = ResearchOutput(
findings=["Finding A", "Finding B", "Finding C"],
sources=["https://example.com"],
summary="Test summary"
)
with patch("your_pipeline.writing.llm", writer_llm):
stage = WritingStage()
output = stage.run(research=research_output, style="technical")
assert output.content is not None
assert len(output.content) > 100
def test_editing_stage_improves_readability(self):
draft_content = """
AI testing frameworks, they exist in 2026. Many frameworks. LangGraph is one.
Testing is hard. Mocking helps. Use deterministic mocks.
"""
editor_llm = DeterministicLLM(
response_map={
"edit": "AI testing frameworks have matured significantly in 2026. "
"LangGraph has emerged as the leading framework. "
"Using deterministic mocks is now standard practice."
}
)
with patch("your_pipeline.editing.llm", editor_llm):
stage = EditingStage()
output = stage.run(draft=draft_content, guidelines="improve clarity")
assert len(output.content) > 50
assert output.content != draft_content # Something was changedAgent Mock Injection
For testing multi-agent coordination, replace entire agents with mock implementations that return scripted outputs.
from abc import ABC, abstractmethod
class BaseAgent(ABC):
@abstractmethod
def run(self, input_data: dict) -> dict:
pass
class MockResearcherAgent(BaseAgent):
"""Mock researcher that returns deterministic research output."""
def __init__(self, findings: list[str], sources: list[str]):
self.findings = findings
self.sources = sources
self.run_count = 0
self.last_input = None
def run(self, input_data: dict) -> dict:
self.run_count += 1
self.last_input = input_data
return {
"findings": self.findings,
"sources": self.sources,
"topic": input_data.get("topic", "unknown"),
"word_count": 500
}
class MockWriterAgent(BaseAgent):
"""Mock writer that returns deterministic content."""
def __init__(self, content_template: str = "Article about: {topic}"):
self.content_template = content_template
self.last_input = None
def run(self, input_data: dict) -> dict:
self.last_input = input_data
return {
"content": self.content_template.format(
topic=input_data.get("topic", "unknown topic")
),
"word_count": len(self.content_template.split())
}
class TestPipelineWithMockAgents:
def test_pipeline_passes_research_to_writer(self):
mock_researcher = MockResearcherAgent(
findings=["Finding A", "Finding B"],
sources=["https://source1.com"]
)
mock_writer = MockWriterAgent(
content_template="Based on research: {findings}\n\nArticle content here."
)
from your_pipeline import ArticlePipeline
pipeline = ArticlePipeline(
researcher=mock_researcher,
writer=mock_writer
)
result = pipeline.run(topic="AI testing in 2026")
# Writer should have received research findings
assert mock_writer.last_input is not None
assert "findings" in mock_writer.last_input
assert mock_writer.last_input["findings"] == ["Finding A", "Finding B"]
def test_pipeline_handles_researcher_failure(self):
class FailingResearcher(BaseAgent):
def run(self, input_data: dict) -> dict:
raise ValueError("Research API unavailable")
mock_writer = MockWriterAgent()
from your_pipeline import ArticlePipeline
pipeline = ArticlePipeline(
researcher=FailingResearcher(),
writer=mock_writer
)
# Pipeline should handle failure gracefully
result = pipeline.run(topic="test")
assert result.error is not None or result.status == "failed"
# Writer should NOT have been called
assert mock_writer.last_input is None
def test_pipeline_calls_agents_in_correct_order(self):
call_order = []
class OrderTrackingResearcher(BaseAgent):
def run(self, input_data: dict) -> dict:
call_order.append("researcher")
return {"findings": ["f1"], "sources": ["https://s.com"]}
class OrderTrackingWriter(BaseAgent):
def run(self, input_data: dict) -> dict:
call_order.append("writer")
return {"content": "draft content"}
class OrderTrackingEditor(BaseAgent):
def run(self, input_data: dict) -> dict:
call_order.append("editor")
return {"content": "final content"}
from your_pipeline import ArticlePipeline
pipeline = ArticlePipeline(
researcher=OrderTrackingResearcher(),
writer=OrderTrackingWriter(),
editor=OrderTrackingEditor()
)
pipeline.run(topic="test")
assert call_order == ["researcher", "writer", "editor"]Record and Replay Testing
For integration tests that need realistic LLM behavior, record real LLM interactions during development and replay them in tests.
import json
import hashlib
from pathlib import Path
class RecordingLLM:
"""Records LLM calls to a file for later replay."""
def __init__(self, real_llm, recording_path: Path):
self.real_llm = real_llm
self.recording_path = recording_path
self.recordings = self._load_recordings()
def _load_recordings(self) -> dict:
if self.recording_path.exists():
return json.loads(self.recording_path.read_text())
return {}
def _save_recordings(self):
self.recording_path.write_text(json.dumps(self.recordings, indent=2))
def _input_hash(self, messages) -> str:
content = str(messages)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def invoke(self, messages):
key = self._input_hash(messages)
if key in self.recordings:
# Replay recorded response
from langchain_core.messages import AIMessage
return AIMessage(content=self.recordings[key])
# Make real call and record
response = self.real_llm.invoke(messages)
self.recordings[key] = response.content
self._save_recordings()
return response
class ReplayingLLM:
"""Replays recorded LLM calls from a file."""
def __init__(self, recording_path: Path):
self.recordings = json.loads(recording_path.read_text())
def _input_hash(self, messages) -> str:
content = str(messages)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def invoke(self, messages):
from langchain_core.messages import AIMessage
key = self._input_hash(messages)
if key not in self.recordings:
raise ValueError(f"No recording found for input hash {key}. Run with RecordingLLM first.")
return AIMessage(content=self.recordings[key])
# Usage in tests:
RECORDINGS_DIR = Path("tests/recordings")
def test_pipeline_with_recorded_llm():
recording_file = RECORDINGS_DIR / "research_pipeline.json"
if not recording_file.exists():
pytest.skip("No recordings available. Run with RECORD=true to generate.")
replaying_llm = ReplayingLLM(recording_file)
with patch("your_pipeline.llm", replaying_llm):
from your_pipeline import ArticlePipeline
pipeline = ArticlePipeline()
result = pipeline.run(topic="AI testing frameworks")
assert result.content is not None
assert len(result.content) > 100Commit your recordings to the repository. They become stable test fixtures that capture real LLM behavior without requiring LLM calls in CI.
Pipeline Output Assertions
Final output assertions should be semantic rather than exact. Assert on properties and structure, not specific wording.
class PipelineOutputValidator:
def validate_article(self, output: dict) -> list[str]:
"""Returns list of validation errors. Empty list = valid."""
errors = []
# Structure assertions
if not output.get("title"):
errors.append("Missing title")
if not output.get("content"):
errors.append("Missing content")
elif len(output["content"].split()) < 500:
errors.append(f"Content too short: {len(output['content'].split())} words < 500")
# Content quality assertions
content = output.get("content", "")
if output.get("topic") and output["topic"].lower() not in content.lower():
errors.append(f"Content doesn't mention the topic: {output['topic']}")
# Format assertions
if "##" not in content and "\n\n" not in content:
errors.append("Content has no structure (no headers or paragraphs)")
# Sources assertions
if not output.get("sources"):
errors.append("No sources cited")
return errors
class TestPipelineOutputAssertions:
def test_output_passes_validation(self):
validator = PipelineOutputValidator()
mock_output = {
"title": "Testing AI Pipelines in 2026",
"content": "## Introduction\n\n" + "Testing AI pipelines " * 100,
"topic": "AI Pipelines",
"sources": ["https://example.com"],
"word_count": 500
}
errors = validator.validate_article(mock_output)
assert errors == [], f"Validation failed: {errors}"
def test_semantic_topic_coverage(self):
"""Assert the output covers key aspects of the topic, not exact wording."""
required_concepts = ["testing", "agent", "pipeline"]
content = """
Multi-agent workflows require systematic quality assurance. When building agent
pipelines, each stage must be validated independently before integration testing.
"""
missing = [c for c in required_concepts if c not in content.lower()]
assert missing == [], f"Output doesn't cover: {missing}"Failure Mode Testing
Test that your pipeline handles each failure mode gracefully:
class TestPipelineFailureModes:
def test_handles_rate_limit_from_researcher(self):
class RateLimitedResearcher(BaseAgent):
def run(self, input_data):
raise Exception("Rate limit: 429 Too Many Requests")
pipeline = ArticlePipeline(researcher=RateLimitedResearcher())
result = pipeline.run(topic="test")
assert result.status in ["failed", "error"]
assert "rate limit" in result.error_message.lower()
def test_handles_empty_research_results(self):
empty_researcher = MockResearcherAgent(findings=[], sources=[])
pipeline = ArticlePipeline(researcher=empty_researcher, writer=MockWriterAgent())
result = pipeline.run(topic="obscure topic")
# Should either fail gracefully or produce minimal content
assert result is not None
if result.status == "success":
assert "insufficient" in result.content.lower() or len(result.content) > 0
def test_handles_writer_producing_no_content(self):
class EmptyWriter(BaseAgent):
def run(self, input_data):
return {"content": "", "word_count": 0}
pipeline = ArticlePipeline(
researcher=MockResearcherAgent(["finding"], ["https://s.com"]),
writer=EmptyWriter()
)
result = pipeline.run(topic="test")
assert result.status != "success" or result.content != ""Multi-agent pipeline testing rewards the investment. Every bug caught in a mock-injected test is a bug that didn't make it to production, where it would have cost real API calls to reproduce.