End-to-End Testing of Multi-Agent Pipelines: Deterministic Replay, Agent Mocks, and Pipeline Assertions

End-to-End Testing of Multi-Agent Pipelines: Deterministic Replay, Agent Mocks, and Pipeline Assertions

A multi-agent pipeline is a chain of AI agents where each agent's output becomes the next agent's input. Agent A researches. Agent B synthesizes. Agent C writes. Agent D edits.

When the final output is wrong, you have a debugging problem: which agent introduced the failure? Was it wrong data from the researcher, a bad synthesis step, or a writing style mismatch? Without a testing strategy, you debug by reading long LLM output and guessing.

End-to-end testing of multi-agent pipelines requires three techniques: deterministic replay to make LLM behavior predictable, agent mock injection to isolate failures, and pipeline output assertions to validate correctness at each stage. Here's how to apply all three.

The Non-Determinism Problem

The fundamental challenge in testing AI pipelines is that LLMs are non-deterministic. The same input produces different outputs on different runs. This makes traditional test assertions — "assert output equals X" — impossible.

The solution isn't to accept flakiness. It's to make your tests deterministic by controlling what the LLM returns.

Three strategies:

  1. Mock the LLM — replace it with a deterministic function that returns predefined outputs
  2. Record and replay — capture real LLM outputs in a recording, replay them in tests
  3. Semantic assertions — instead of exact matching, assert on properties that should be stable

Most pipelines need all three, applied at different levels.

Setting Up Deterministic Agent Mocks

The cleanest approach is replacing LLM calls with mock functions that return scripted outputs.

from unittest.mock import MagicMock, patch
from typing import Callable

class DeterministicLLM:
    """An LLM mock that returns scripted responses based on input patterns."""
    
    def __init__(self, response_map: dict[str, str], default: str = ""):
        self.response_map = response_map
        self.default = default
        self.call_log = []
    
    def invoke(self, messages):
        from langchain_core.messages import AIMessage
        
        # Get the last user message content
        last_message = messages[-1] if isinstance(messages, list) else messages
        content = getattr(last_message, "content", str(last_message))
        
        self.call_log.append(content)
        
        # Find matching response
        for pattern, response in self.response_map.items():
            if pattern.lower() in content.lower():
                return AIMessage(content=response)
        
        return AIMessage(content=self.default)
    
    def __call__(self, *args, **kwargs):
        return self.invoke(args[0] if args else kwargs.get("messages", []))

# Create a deterministic pipeline for testing
def create_test_pipeline():
    researcher_llm = DeterministicLLM(
        response_map={
            "research": "Found 3 key findings:\n1. Finding A\n2. Finding B\n3. Finding C",
            "search": "Search results: [result1, result2, result3]"
        },
        default="No information found for this query"
    )
    
    writer_llm = DeterministicLLM(
        response_map={
            "write": "# Title\n\nIntroduction paragraph.\n\n## Section 1\nContent based on findings.",
            "draft": "Draft content based on research findings"
        },
        default="Draft content"
    )
    
    return researcher_llm, writer_llm

Testing Pipeline Stages in Isolation

Before testing the full pipeline, test each stage independently. This isolates failures to a single agent.

from your_pipeline import ResearchStage, WritingStage, EditingStage

class TestPipelineStages:
    def test_research_stage_output_format(self):
        researcher_llm, _ = create_test_pipeline()
        
        with patch("your_pipeline.research.llm", researcher_llm):
            stage = ResearchStage()
            output = stage.run(topic="AI testing frameworks 2026")
        
        # Research output should be structured
        assert output.findings is not None
        assert len(output.findings) >= 1
        assert output.sources is not None
    
    def test_writing_stage_uses_research_context(self):
        _, writer_llm = create_test_pipeline()
        
        research_output = ResearchOutput(
            findings=["Finding A", "Finding B", "Finding C"],
            sources=["https://example.com"],
            summary="Test summary"
        )
        
        with patch("your_pipeline.writing.llm", writer_llm):
            stage = WritingStage()
            output = stage.run(research=research_output, style="technical")
        
        assert output.content is not None
        assert len(output.content) > 100
    
    def test_editing_stage_improves_readability(self):
        draft_content = """
        AI testing frameworks, they exist in 2026. Many frameworks. LangGraph is one.
        Testing is hard. Mocking helps. Use deterministic mocks.
        """
        
        editor_llm = DeterministicLLM(
            response_map={
                "edit": "AI testing frameworks have matured significantly in 2026. "
                       "LangGraph has emerged as the leading framework. "
                       "Using deterministic mocks is now standard practice."
            }
        )
        
        with patch("your_pipeline.editing.llm", editor_llm):
            stage = EditingStage()
            output = stage.run(draft=draft_content, guidelines="improve clarity")
        
        assert len(output.content) > 50
        assert output.content != draft_content  # Something was changed

Agent Mock Injection

For testing multi-agent coordination, replace entire agents with mock implementations that return scripted outputs.

from abc import ABC, abstractmethod

class BaseAgent(ABC):
    @abstractmethod
    def run(self, input_data: dict) -> dict:
        pass

class MockResearcherAgent(BaseAgent):
    """Mock researcher that returns deterministic research output."""
    
    def __init__(self, findings: list[str], sources: list[str]):
        self.findings = findings
        self.sources = sources
        self.run_count = 0
        self.last_input = None
    
    def run(self, input_data: dict) -> dict:
        self.run_count += 1
        self.last_input = input_data
        return {
            "findings": self.findings,
            "sources": self.sources,
            "topic": input_data.get("topic", "unknown"),
            "word_count": 500
        }

class MockWriterAgent(BaseAgent):
    """Mock writer that returns deterministic content."""
    
    def __init__(self, content_template: str = "Article about: {topic}"):
        self.content_template = content_template
        self.last_input = None
    
    def run(self, input_data: dict) -> dict:
        self.last_input = input_data
        return {
            "content": self.content_template.format(
                topic=input_data.get("topic", "unknown topic")
            ),
            "word_count": len(self.content_template.split())
        }

class TestPipelineWithMockAgents:
    def test_pipeline_passes_research_to_writer(self):
        mock_researcher = MockResearcherAgent(
            findings=["Finding A", "Finding B"],
            sources=["https://source1.com"]
        )
        mock_writer = MockWriterAgent(
            content_template="Based on research: {findings}\n\nArticle content here."
        )
        
        from your_pipeline import ArticlePipeline
        pipeline = ArticlePipeline(
            researcher=mock_researcher,
            writer=mock_writer
        )
        
        result = pipeline.run(topic="AI testing in 2026")
        
        # Writer should have received research findings
        assert mock_writer.last_input is not None
        assert "findings" in mock_writer.last_input
        assert mock_writer.last_input["findings"] == ["Finding A", "Finding B"]
    
    def test_pipeline_handles_researcher_failure(self):
        class FailingResearcher(BaseAgent):
            def run(self, input_data: dict) -> dict:
                raise ValueError("Research API unavailable")
        
        mock_writer = MockWriterAgent()
        
        from your_pipeline import ArticlePipeline
        pipeline = ArticlePipeline(
            researcher=FailingResearcher(),
            writer=mock_writer
        )
        
        # Pipeline should handle failure gracefully
        result = pipeline.run(topic="test")
        
        assert result.error is not None or result.status == "failed"
        # Writer should NOT have been called
        assert mock_writer.last_input is None
    
    def test_pipeline_calls_agents_in_correct_order(self):
        call_order = []
        
        class OrderTrackingResearcher(BaseAgent):
            def run(self, input_data: dict) -> dict:
                call_order.append("researcher")
                return {"findings": ["f1"], "sources": ["https://s.com"]}
        
        class OrderTrackingWriter(BaseAgent):
            def run(self, input_data: dict) -> dict:
                call_order.append("writer")
                return {"content": "draft content"}
        
        class OrderTrackingEditor(BaseAgent):
            def run(self, input_data: dict) -> dict:
                call_order.append("editor")
                return {"content": "final content"}
        
        from your_pipeline import ArticlePipeline
        pipeline = ArticlePipeline(
            researcher=OrderTrackingResearcher(),
            writer=OrderTrackingWriter(),
            editor=OrderTrackingEditor()
        )
        
        pipeline.run(topic="test")
        
        assert call_order == ["researcher", "writer", "editor"]

Record and Replay Testing

For integration tests that need realistic LLM behavior, record real LLM interactions during development and replay them in tests.

import json
import hashlib
from pathlib import Path

class RecordingLLM:
    """Records LLM calls to a file for later replay."""
    
    def __init__(self, real_llm, recording_path: Path):
        self.real_llm = real_llm
        self.recording_path = recording_path
        self.recordings = self._load_recordings()
    
    def _load_recordings(self) -> dict:
        if self.recording_path.exists():
            return json.loads(self.recording_path.read_text())
        return {}
    
    def _save_recordings(self):
        self.recording_path.write_text(json.dumps(self.recordings, indent=2))
    
    def _input_hash(self, messages) -> str:
        content = str(messages)
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def invoke(self, messages):
        key = self._input_hash(messages)
        
        if key in self.recordings:
            # Replay recorded response
            from langchain_core.messages import AIMessage
            return AIMessage(content=self.recordings[key])
        
        # Make real call and record
        response = self.real_llm.invoke(messages)
        self.recordings[key] = response.content
        self._save_recordings()
        return response

class ReplayingLLM:
    """Replays recorded LLM calls from a file."""
    
    def __init__(self, recording_path: Path):
        self.recordings = json.loads(recording_path.read_text())
    
    def _input_hash(self, messages) -> str:
        content = str(messages)
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def invoke(self, messages):
        from langchain_core.messages import AIMessage
        key = self._input_hash(messages)
        
        if key not in self.recordings:
            raise ValueError(f"No recording found for input hash {key}. Run with RecordingLLM first.")
        
        return AIMessage(content=self.recordings[key])

# Usage in tests:
RECORDINGS_DIR = Path("tests/recordings")

def test_pipeline_with_recorded_llm():
    recording_file = RECORDINGS_DIR / "research_pipeline.json"
    
    if not recording_file.exists():
        pytest.skip("No recordings available. Run with RECORD=true to generate.")
    
    replaying_llm = ReplayingLLM(recording_file)
    
    with patch("your_pipeline.llm", replaying_llm):
        from your_pipeline import ArticlePipeline
        pipeline = ArticlePipeline()
        result = pipeline.run(topic="AI testing frameworks")
    
    assert result.content is not None
    assert len(result.content) > 100

Commit your recordings to the repository. They become stable test fixtures that capture real LLM behavior without requiring LLM calls in CI.

Pipeline Output Assertions

Final output assertions should be semantic rather than exact. Assert on properties and structure, not specific wording.

class PipelineOutputValidator:
    def validate_article(self, output: dict) -> list[str]:
        """Returns list of validation errors. Empty list = valid."""
        errors = []
        
        # Structure assertions
        if not output.get("title"):
            errors.append("Missing title")
        
        if not output.get("content"):
            errors.append("Missing content")
        elif len(output["content"].split()) < 500:
            errors.append(f"Content too short: {len(output['content'].split())} words < 500")
        
        # Content quality assertions
        content = output.get("content", "")
        if output.get("topic") and output["topic"].lower() not in content.lower():
            errors.append(f"Content doesn't mention the topic: {output['topic']}")
        
        # Format assertions
        if "##" not in content and "\n\n" not in content:
            errors.append("Content has no structure (no headers or paragraphs)")
        
        # Sources assertions
        if not output.get("sources"):
            errors.append("No sources cited")
        
        return errors

class TestPipelineOutputAssertions:
    def test_output_passes_validation(self):
        validator = PipelineOutputValidator()
        
        mock_output = {
            "title": "Testing AI Pipelines in 2026",
            "content": "## Introduction\n\n" + "Testing AI pipelines " * 100,
            "topic": "AI Pipelines",
            "sources": ["https://example.com"],
            "word_count": 500
        }
        
        errors = validator.validate_article(mock_output)
        assert errors == [], f"Validation failed: {errors}"
    
    def test_semantic_topic_coverage(self):
        """Assert the output covers key aspects of the topic, not exact wording."""
        required_concepts = ["testing", "agent", "pipeline"]
        
        content = """
        Multi-agent workflows require systematic quality assurance. When building agent 
        pipelines, each stage must be validated independently before integration testing.
        """
        
        missing = [c for c in required_concepts if c not in content.lower()]
        assert missing == [], f"Output doesn't cover: {missing}"

Failure Mode Testing

Test that your pipeline handles each failure mode gracefully:

class TestPipelineFailureModes:
    def test_handles_rate_limit_from_researcher(self):
        class RateLimitedResearcher(BaseAgent):
            def run(self, input_data):
                raise Exception("Rate limit: 429 Too Many Requests")
        
        pipeline = ArticlePipeline(researcher=RateLimitedResearcher())
        result = pipeline.run(topic="test")
        
        assert result.status in ["failed", "error"]
        assert "rate limit" in result.error_message.lower()
    
    def test_handles_empty_research_results(self):
        empty_researcher = MockResearcherAgent(findings=[], sources=[])
        pipeline = ArticlePipeline(researcher=empty_researcher, writer=MockWriterAgent())
        
        result = pipeline.run(topic="obscure topic")
        
        # Should either fail gracefully or produce minimal content
        assert result is not None
        if result.status == "success":
            assert "insufficient" in result.content.lower() or len(result.content) > 0
    
    def test_handles_writer_producing_no_content(self):
        class EmptyWriter(BaseAgent):
            def run(self, input_data):
                return {"content": "", "word_count": 0}
        
        pipeline = ArticlePipeline(
            researcher=MockResearcherAgent(["finding"], ["https://s.com"]),
            writer=EmptyWriter()
        )
        
        result = pipeline.run(topic="test")
        
        assert result.status != "success" or result.content != ""

Multi-agent pipeline testing rewards the investment. Every bug caught in a mock-injected test is a bug that didn't make it to production, where it would have cost real API calls to reproduce.

Read more