Continuous LLM Evaluation: Building an Evals Pipeline for Production AI

Continuous LLM Evaluation: Building an Evals Pipeline for Production AI

Deploying an LLM is not a one-time event. Prompts change. Models get updated. Retrieval indexes get refreshed. Each of these changes can silently degrade the quality of your AI application — and without a continuous evaluation pipeline, you won't know until users start complaining.

This guide covers how to build an automated evals pipeline that runs in CI/CD, monitors production, and catches quality regressions before they reach users.

The Problem: LLM Changes Break Silently

Traditional software breaks loudly — an error is thrown, a test fails, a monitor fires. LLM degradation is subtle:

  • A prompt change causes the model to format responses differently, breaking downstream parsing
  • A new model version scores 2% better on MMLU but 15% worse on your specific use cases
  • A RAG index refresh introduces stale documents, degrading answer accuracy
  • Temperature drift in prompts increases hallucination rate from 3% to 11%

None of these cause errors. All of them hurt your users. The only way to catch them is continuous automated evaluation.

Architecture Overview

A complete continuous evaluation pipeline has three layers:

Pre-commit → CI Eval Suite → Production Sampling
    ↓              ↓                ↓
 Fast checks   Full regression   Live monitoring
  (<2 min)      (10-30 min)      (continuous)

Pre-commit: Runs in seconds. Catches obvious regressions on a small golden set. Does not block the developer workflow but surfaces obvious issues early.

CI Eval Suite: Runs on every PR. Full evaluation suite on your complete test set. Quality gates block merging if scores drop below thresholds.

Production Sampling: Continuously samples live traffic, evaluates responses, and alerts on drift. Catches problems that only appear at scale or with real user inputs.

Building the Golden Dataset

Your evaluation pipeline is only as good as your test data. Build a golden dataset: a curated set of inputs with ground-truth outputs or evaluation criteria.

What makes a good golden dataset:

  1. Representative sampling. Covers the distribution of real user inputs — not just edge cases, not just easy cases.
  2. Adequate size. Minimum 100 examples for meaningful statistics. 500+ for reliable confidence intervals.
  3. Versioned and stable. The golden dataset should not change frequently. When you need to add examples, version the dataset so you can compare runs across versions.
  4. Labeled correctly. Ground truth labels need domain expert review, not just model-generated labels.
# golden_dataset.py
import json
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path

@dataclass
class GoldenExample:
    id: str
    input: str
    context: str | None
    expected_output: str
    evaluation_criteria: dict  # e.g., {"faithfulness": 0.9, "relevancy": 0.85}
    tags: list[str]  # e.g., ["edge_case", "multilingual", "numerical"]
    added_at: str
    added_by: str

class GoldenDataset:
    def __init__(self, path: str):
        self.path = Path(path)
        self.examples: list[GoldenExample] = []
        if self.path.exists():
            self._load()
    
    def _load(self):
        with open(self.path) as f:
            data = json.load(f)
        self.version = data['version']
        self.examples = [GoldenExample(**e) for e in data['examples']]
    
    def save(self, version: str):
        with open(self.path, 'w') as f:
            json.dump({
                'version': version,
                'updated_at': datetime.utcnow().isoformat(),
                'examples': [asdict(e) for e in self.examples]
            }, f, indent=2)
    
    def add_example(self, example: GoldenExample):
        assert not any(e.id == example.id for e in self.examples), \
            f"Example {example.id} already exists"
        self.examples.append(example)
    
    def filter_by_tag(self, tag: str) -> list[GoldenExample]:
        return [e for e in self.examples if tag in e.tags]

Pre-Commit Checks

Fast checks that run locally before a developer pushes:

#!/bin/bash
<span class="hljs-comment"># .git/hooks/pre-commit or in a Makefile

<span class="hljs-built_in">echo <span class="hljs-string">"Running LLM quick eval..."

<span class="hljs-comment"># Only run on the smoke test subset (10-20 examples)
python <span class="hljs-built_in">eval/run_eval.py \
  --dataset golden_dataset.json \
  --filter-tag smoke_test \
  --model <span class="hljs-string">"claude-opus-4-6" \
  --fail-threshold 0.80

<span class="hljs-keyword">if [ $? -ne 0 ]; <span class="hljs-keyword">then
  <span class="hljs-built_in">echo <span class="hljs-string">"❌ Quick eval failed. Fix issues before committing."
  <span class="hljs-built_in">exit 1
<span class="hljs-keyword">fi

<span class="hljs-built_in">echo <span class="hljs-string">"✅ Quick eval passed"

The smoke test subset should cover the most important capabilities and known failure modes. Keep it under 20 examples so it runs in under 2 minutes.

CI Eval Suite

The full evaluation suite runs in CI on every PR that touches:

  • Prompt templates
  • Model configuration
  • RAG pipeline
  • Retrieval indexes
# .github/workflows/llm-eval.yml
name: LLM Eval Suite

on:
  pull_request:
    paths:
      - 'prompts/**'
      - 'config/model*.yaml'
      - 'retrieval/**'
      - 'eval/**'

jobs:
  eval:
    runs-on: ubuntu-latest
    timeout-minutes: 60
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Run evaluation suite
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python eval/run_eval.py \
            --dataset golden_dataset.json \
            --model ${{ vars.EVAL_MODEL }} \
            --output eval_results.json \
            --compare-baseline baseline_results.json
      
      - name: Check quality gates
        run: |
          python eval/check_gates.py \
            --results eval_results.json \
            --gates eval/quality_gates.yaml
      
      - name: Post results to PR
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('eval_results.json'));
            const comment = formatEvalResults(results);
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: comment
            });

The eval runner:

# eval/run_eval.py
import json
import asyncio
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import anthropic

client = anthropic.Anthropic()

async def evaluate_example(example, model: str, evaluator) -> dict:
    # Generate response
    response = client.messages.create(
        model=model,
        max_tokens=1024,
        messages=[{"role": "user", "content": build_prompt(example)}]
    )
    actual = response.content[0].text
    
    # Evaluate
    result = await evaluator.evaluate(
        question=example.input,
        expected=example.expected_output,
        actual=actual,
        context=example.context
    )
    
    return {
        'example_id': example.id,
        'tags': example.tags,
        'score': result.overall,
        'passed': result.passed,
        'details': {
            'rouge_l': result.rouge_l,
            'bert_f1': result.bert_f1,
            'faithfulness': result.llm_faithfulness,
            'relevancy': result.llm_relevancy,
        },
        'actual_output': actual
    }

async def run_eval_suite(dataset, model: str, max_concurrency: int = 10) -> dict:
    evaluator = LLMEvaluator()
    semaphore = asyncio.Semaphore(max_concurrency)
    
    async def bounded_eval(example):
        async with semaphore:
            return await evaluate_example(example, model, evaluator)
    
    tasks = [bounded_eval(ex) for ex in dataset.examples]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Filter out exceptions and log them
    valid_results = []
    for r in results:
        if isinstance(r, Exception):
            print(f"Eval error: {r}")
        else:
            valid_results.append(r)
    
    return aggregate_results(valid_results)

def aggregate_results(results: list[dict]) -> dict:
    scores = [r['score'] for r in results]
    
    return {
        'total': len(results),
        'passed': sum(r['passed'] for r in results),
        'pass_rate': sum(r['passed'] for r in results) / len(results),
        'mean_score': sum(scores) / len(scores),
        'min_score': min(scores),
        'p10_score': sorted(scores)[len(scores) // 10],
        'failed_cases': [r for r in results if not r['passed']],
        'by_tag': compute_by_tag(results)
    }

Quality Gates

Define explicit thresholds that block deployment:

# eval/quality_gates.yaml
gates:
  - name: mean_score
    metric: mean_score
    minimum: 0.78
    critical: 0.72
    description: "Overall mean quality score"
  
  - name: pass_rate
    metric: pass_rate
    minimum: 0.85
    critical: 0.75
    description: "Fraction of examples passing individual thresholds"
  
  - name: catastrophic_failure_rate
    metric: catastrophic_failure_rate  # fraction with score < 0.4
    maximum: 0.03
    description: "No more than 3% of responses completely wrong"
  
  - name: faithfulness_p10
    metric: p10_faithfulness
    minimum: 0.70
    description: "10th percentile faithfulness (catches tail failures)"
  
  # Tag-specific gates for critical capabilities
  - name: critical_tag_pass_rate
    metric: by_tag.critical.pass_rate
    minimum: 0.95
    description: "Examples tagged 'critical' must pass at 95%+"
# eval/check_gates.py
import yaml
import json
import sys

def check_gates(results: dict, gates_config: dict) -> tuple[bool, list[str]]:
    failures = []
    
    for gate in gates_config['gates']:
        value = get_nested(results, gate['metric'])
        if value is None:
            print(f"Warning: metric {gate['metric']} not found")
            continue
        
        if 'minimum' in gate and value < gate['minimum']:
            level = 'CRITICAL' if value < gate.get('critical', 0) else 'WARNING'
            failures.append(
                f"[{level}] {gate['name']}: {value:.3f} < {gate['minimum']} minimum\n"
                f"  {gate['description']}"
            )
        
        if 'maximum' in gate and value > gate['maximum']:
            failures.append(
                f"[CRITICAL] {gate['name']}: {value:.3f} > {gate['maximum']} maximum\n"
                f"  {gate['description']}"
            )
    
    critical_failures = [f for f in failures if '[CRITICAL]' in f]
    return len(critical_failures) == 0, failures

if __name__ == '__main__':
    with open(sys.argv[2]) as f:
        results = json.load(f)
    with open(sys.argv[4]) as f:
        gates = yaml.safe_load(f)
    
    passed, failures = check_gates(results, gates)
    
    if failures:
        print("\n".join(failures))
    
    if not passed:
        print("\n❌ Critical quality gates failed. Blocking merge.")
        sys.exit(1)
    
    print("✅ All quality gates passed")

Baseline Comparison

Detect regressions by comparing against a stored baseline:

# eval/compare_baseline.py
import json
from scipy import stats

def compare_to_baseline(current: dict, baseline: dict) -> dict:
    """Statistical comparison between current run and baseline."""
    current_scores = [r['score'] for r in current['all_results']]
    baseline_scores = [r['score'] for r in baseline['all_results']]
    
    # Paired t-test if we have matching example IDs
    t_stat, p_value = stats.ttest_rel(current_scores, baseline_scores)
    
    delta = current['mean_score'] - baseline['mean_score']
    
    return {
        'delta_mean': delta,
        'p_value': p_value,
        'statistically_significant': p_value < 0.05,
        'direction': 'improvement' if delta > 0 else 'regression',
        'new_failures': find_new_failures(current, baseline),
        'fixed_failures': find_fixed_failures(current, baseline),
    }

def find_new_failures(current, baseline) -> list[str]:
    """Examples that passed in baseline but fail now."""
    baseline_passed = {r['example_id'] for r in baseline['all_results'] if r['passed']}
    return [
        r['example_id'] 
        for r in current['all_results'] 
        if not r['passed'] and r['example_id'] in baseline_passed
    ]

Production Sampling

Once deployed, continuously evaluate a sample of live traffic:

# monitoring/production_sampler.py
import random
import asyncio
from datetime import datetime
from typing import Callable

class ProductionSampler:
    def __init__(
        self,
        evaluator,
        sample_rate: float = 0.02,  # 2% of traffic
        alert_callback: Callable | None = None
    ):
        self.evaluator = evaluator
        self.sample_rate = sample_rate
        self.alert_callback = alert_callback
        self.buffer = []
        self.buffer_size = 50
    
    async def on_request(self, input: str, output: str, context: str | None = None):
        """Call this for every production request."""
        if random.random() > self.sample_rate:
            return
        
        # Don't block the request — evaluate in background
        asyncio.create_task(self._evaluate_and_buffer(input, output, context))
    
    async def _evaluate_and_buffer(self, input, output, context):
        result = await self.evaluator.evaluate(
            question=input,
            expected=None,  # No ground truth for live traffic
            actual=output,
            context=context,
            use_llm_judge=True
        )
        
        self.buffer.append({
            'timestamp': datetime.utcnow().isoformat(),
            'score': result.overall,
            'faithfulness': result.llm_faithfulness,
            'relevancy': result.llm_relevancy,
        })
        
        if len(self.buffer) >= self.buffer_size:
            await self._flush_and_check()
    
    async def _flush_and_check(self):
        scores = [r['score'] for r in self.buffer]
        mean = sum(scores) / len(scores)
        
        # Alert if rolling mean drops below threshold
        if mean < 0.75 and self.alert_callback:
            await self.alert_callback({
                'alert': 'quality_degradation',
                'mean_score': mean,
                'sample_size': len(self.buffer),
                'timestamp': datetime.utcnow().isoformat()
            })
        
        # Send metrics to your observability platform
        await self._emit_metrics(self.buffer)
        self.buffer.clear()

Integrate into your API handler:

# In your FastAPI or similar framework
sampler = ProductionSampler(evaluator=LLMEvaluator(), sample_rate=0.02)

@app.post("/chat")
async def chat(request: ChatRequest):
    # Generate response
    response = await generate_response(request.message, request.context)
    
    # Sample for evaluation (non-blocking)
    await sampler.on_request(
        input=request.message,
        output=response.text,
        context=request.context
    )
    
    return response

Alerting and Dashboards

Emit evaluation metrics to your observability stack:

# Using Prometheus metrics
from prometheus_client import Gauge, Histogram, Counter

llm_quality_score = Gauge(
    'llm_quality_score', 
    'Rolling mean quality score',
    ['model', 'endpoint']
)
llm_faithfulness = Gauge(
    'llm_faithfulness_score',
    'Rolling mean faithfulness',
    ['model', 'endpoint']
)
llm_low_quality_total = Counter(
    'llm_low_quality_responses_total',
    'Count of responses scoring below threshold',
    ['model', 'endpoint']
)

# Alert rules (Prometheus alertmanager)
alert_rules = """
groups:
  - name: llm_quality
    rules:
      - alert: LLMQualityDegradation
        expr: llm_quality_score < 0.75
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM quality score degraded"
          description: "Quality score {{ $value }} below threshold 0.75"
      
      - alert: LLMFaithfulnessDrop
        expr: llm_faithfulness_score < 0.80
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "LLM faithfulness critically low"
"""

When Eval Pipelines Go Wrong

False negatives: Pipeline passes but real quality degraded.

  • Cause: Test set not representative, thresholds too lenient
  • Fix: Regularly audit failed production samples and add them to golden dataset

False positives: Pipeline fails but model is actually fine.

  • Cause: Thresholds too tight, high variance in small test sets
  • Fix: Use statistical significance tests; require changes to be significant at p<0.05

Eval flakiness: Same code produces different scores on each run.

  • Cause: LLM-as-judge non-determinism (temperature > 0)
  • Fix: Set temperature=0 for judge; average 3 runs for borderline cases

Slow feedback loops: CI eval takes 45 minutes.

  • Fix: Parallelize evaluations; cache LLM judge responses with hash of (input, output); use smaller models for initial screening

HelpMeTest Integration

HelpMeTest provides automated monitoring for AI applications without requiring you to build and maintain the entire sampling and alerting infrastructure. You define evaluation tests in plain language, and the platform:

  • Runs them on a configurable schedule
  • Samples production traffic for evaluation
  • Alerts you when quality metrics drop below thresholds
  • Maintains historical data for trend analysis

This lets you focus on improving your AI application rather than maintaining evaluation infrastructure.

Summary

A production LLM eval pipeline has three layers:

  1. Pre-commit: Fast smoke tests on a curated subset. Catches obvious regressions before code review.
  2. CI suite: Full evaluation on every prompt/model change. Quality gates block bad changes from merging.
  3. Production sampling: Continuous evaluation of live traffic. Catches real-world drift that test suites miss.

The investment pays off quickly. A single hallucination incident caught by automated evaluation — before users see it — easily justifies the setup cost of an entire eval pipeline.

Read more