Continuous LLM Evaluation: Building an Evals Pipeline for Production AI
Deploying an LLM is not a one-time event. Prompts change. Models get updated. Retrieval indexes get refreshed. Each of these changes can silently degrade the quality of your AI application — and without a continuous evaluation pipeline, you won't know until users start complaining.
This guide covers how to build an automated evals pipeline that runs in CI/CD, monitors production, and catches quality regressions before they reach users.
The Problem: LLM Changes Break Silently
Traditional software breaks loudly — an error is thrown, a test fails, a monitor fires. LLM degradation is subtle:
- A prompt change causes the model to format responses differently, breaking downstream parsing
- A new model version scores 2% better on MMLU but 15% worse on your specific use cases
- A RAG index refresh introduces stale documents, degrading answer accuracy
- Temperature drift in prompts increases hallucination rate from 3% to 11%
None of these cause errors. All of them hurt your users. The only way to catch them is continuous automated evaluation.
Architecture Overview
A complete continuous evaluation pipeline has three layers:
Pre-commit → CI Eval Suite → Production Sampling
↓ ↓ ↓
Fast checks Full regression Live monitoring
(<2 min) (10-30 min) (continuous)Pre-commit: Runs in seconds. Catches obvious regressions on a small golden set. Does not block the developer workflow but surfaces obvious issues early.
CI Eval Suite: Runs on every PR. Full evaluation suite on your complete test set. Quality gates block merging if scores drop below thresholds.
Production Sampling: Continuously samples live traffic, evaluates responses, and alerts on drift. Catches problems that only appear at scale or with real user inputs.
Building the Golden Dataset
Your evaluation pipeline is only as good as your test data. Build a golden dataset: a curated set of inputs with ground-truth outputs or evaluation criteria.
What makes a good golden dataset:
- Representative sampling. Covers the distribution of real user inputs — not just edge cases, not just easy cases.
- Adequate size. Minimum 100 examples for meaningful statistics. 500+ for reliable confidence intervals.
- Versioned and stable. The golden dataset should not change frequently. When you need to add examples, version the dataset so you can compare runs across versions.
- Labeled correctly. Ground truth labels need domain expert review, not just model-generated labels.
# golden_dataset.py
import json
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
@dataclass
class GoldenExample:
id: str
input: str
context: str | None
expected_output: str
evaluation_criteria: dict # e.g., {"faithfulness": 0.9, "relevancy": 0.85}
tags: list[str] # e.g., ["edge_case", "multilingual", "numerical"]
added_at: str
added_by: str
class GoldenDataset:
def __init__(self, path: str):
self.path = Path(path)
self.examples: list[GoldenExample] = []
if self.path.exists():
self._load()
def _load(self):
with open(self.path) as f:
data = json.load(f)
self.version = data['version']
self.examples = [GoldenExample(**e) for e in data['examples']]
def save(self, version: str):
with open(self.path, 'w') as f:
json.dump({
'version': version,
'updated_at': datetime.utcnow().isoformat(),
'examples': [asdict(e) for e in self.examples]
}, f, indent=2)
def add_example(self, example: GoldenExample):
assert not any(e.id == example.id for e in self.examples), \
f"Example {example.id} already exists"
self.examples.append(example)
def filter_by_tag(self, tag: str) -> list[GoldenExample]:
return [e for e in self.examples if tag in e.tags]Pre-Commit Checks
Fast checks that run locally before a developer pushes:
#!/bin/bash
<span class="hljs-comment"># .git/hooks/pre-commit or in a Makefile
<span class="hljs-built_in">echo <span class="hljs-string">"Running LLM quick eval..."
<span class="hljs-comment"># Only run on the smoke test subset (10-20 examples)
python <span class="hljs-built_in">eval/run_eval.py \
--dataset golden_dataset.json \
--filter-tag smoke_test \
--model <span class="hljs-string">"claude-opus-4-6" \
--fail-threshold 0.80
<span class="hljs-keyword">if [ $? -ne 0 ]; <span class="hljs-keyword">then
<span class="hljs-built_in">echo <span class="hljs-string">"❌ Quick eval failed. Fix issues before committing."
<span class="hljs-built_in">exit 1
<span class="hljs-keyword">fi
<span class="hljs-built_in">echo <span class="hljs-string">"✅ Quick eval passed"The smoke test subset should cover the most important capabilities and known failure modes. Keep it under 20 examples so it runs in under 2 minutes.
CI Eval Suite
The full evaluation suite runs in CI on every PR that touches:
- Prompt templates
- Model configuration
- RAG pipeline
- Retrieval indexes
# .github/workflows/llm-eval.yml
name: LLM Eval Suite
on:
pull_request:
paths:
- 'prompts/**'
- 'config/model*.yaml'
- 'retrieval/**'
- 'eval/**'
jobs:
eval:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Run evaluation suite
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python eval/run_eval.py \
--dataset golden_dataset.json \
--model ${{ vars.EVAL_MODEL }} \
--output eval_results.json \
--compare-baseline baseline_results.json
- name: Check quality gates
run: |
python eval/check_gates.py \
--results eval_results.json \
--gates eval/quality_gates.yaml
- name: Post results to PR
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const results = JSON.parse(fs.readFileSync('eval_results.json'));
const comment = formatEvalResults(results);
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});The eval runner:
# eval/run_eval.py
import json
import asyncio
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import anthropic
client = anthropic.Anthropic()
async def evaluate_example(example, model: str, evaluator) -> dict:
# Generate response
response = client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": build_prompt(example)}]
)
actual = response.content[0].text
# Evaluate
result = await evaluator.evaluate(
question=example.input,
expected=example.expected_output,
actual=actual,
context=example.context
)
return {
'example_id': example.id,
'tags': example.tags,
'score': result.overall,
'passed': result.passed,
'details': {
'rouge_l': result.rouge_l,
'bert_f1': result.bert_f1,
'faithfulness': result.llm_faithfulness,
'relevancy': result.llm_relevancy,
},
'actual_output': actual
}
async def run_eval_suite(dataset, model: str, max_concurrency: int = 10) -> dict:
evaluator = LLMEvaluator()
semaphore = asyncio.Semaphore(max_concurrency)
async def bounded_eval(example):
async with semaphore:
return await evaluate_example(example, model, evaluator)
tasks = [bounded_eval(ex) for ex in dataset.examples]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and log them
valid_results = []
for r in results:
if isinstance(r, Exception):
print(f"Eval error: {r}")
else:
valid_results.append(r)
return aggregate_results(valid_results)
def aggregate_results(results: list[dict]) -> dict:
scores = [r['score'] for r in results]
return {
'total': len(results),
'passed': sum(r['passed'] for r in results),
'pass_rate': sum(r['passed'] for r in results) / len(results),
'mean_score': sum(scores) / len(scores),
'min_score': min(scores),
'p10_score': sorted(scores)[len(scores) // 10],
'failed_cases': [r for r in results if not r['passed']],
'by_tag': compute_by_tag(results)
}Quality Gates
Define explicit thresholds that block deployment:
# eval/quality_gates.yaml
gates:
- name: mean_score
metric: mean_score
minimum: 0.78
critical: 0.72
description: "Overall mean quality score"
- name: pass_rate
metric: pass_rate
minimum: 0.85
critical: 0.75
description: "Fraction of examples passing individual thresholds"
- name: catastrophic_failure_rate
metric: catastrophic_failure_rate # fraction with score < 0.4
maximum: 0.03
description: "No more than 3% of responses completely wrong"
- name: faithfulness_p10
metric: p10_faithfulness
minimum: 0.70
description: "10th percentile faithfulness (catches tail failures)"
# Tag-specific gates for critical capabilities
- name: critical_tag_pass_rate
metric: by_tag.critical.pass_rate
minimum: 0.95
description: "Examples tagged 'critical' must pass at 95%+"# eval/check_gates.py
import yaml
import json
import sys
def check_gates(results: dict, gates_config: dict) -> tuple[bool, list[str]]:
failures = []
for gate in gates_config['gates']:
value = get_nested(results, gate['metric'])
if value is None:
print(f"Warning: metric {gate['metric']} not found")
continue
if 'minimum' in gate and value < gate['minimum']:
level = 'CRITICAL' if value < gate.get('critical', 0) else 'WARNING'
failures.append(
f"[{level}] {gate['name']}: {value:.3f} < {gate['minimum']} minimum\n"
f" {gate['description']}"
)
if 'maximum' in gate and value > gate['maximum']:
failures.append(
f"[CRITICAL] {gate['name']}: {value:.3f} > {gate['maximum']} maximum\n"
f" {gate['description']}"
)
critical_failures = [f for f in failures if '[CRITICAL]' in f]
return len(critical_failures) == 0, failures
if __name__ == '__main__':
with open(sys.argv[2]) as f:
results = json.load(f)
with open(sys.argv[4]) as f:
gates = yaml.safe_load(f)
passed, failures = check_gates(results, gates)
if failures:
print("\n".join(failures))
if not passed:
print("\n❌ Critical quality gates failed. Blocking merge.")
sys.exit(1)
print("✅ All quality gates passed")Baseline Comparison
Detect regressions by comparing against a stored baseline:
# eval/compare_baseline.py
import json
from scipy import stats
def compare_to_baseline(current: dict, baseline: dict) -> dict:
"""Statistical comparison between current run and baseline."""
current_scores = [r['score'] for r in current['all_results']]
baseline_scores = [r['score'] for r in baseline['all_results']]
# Paired t-test if we have matching example IDs
t_stat, p_value = stats.ttest_rel(current_scores, baseline_scores)
delta = current['mean_score'] - baseline['mean_score']
return {
'delta_mean': delta,
'p_value': p_value,
'statistically_significant': p_value < 0.05,
'direction': 'improvement' if delta > 0 else 'regression',
'new_failures': find_new_failures(current, baseline),
'fixed_failures': find_fixed_failures(current, baseline),
}
def find_new_failures(current, baseline) -> list[str]:
"""Examples that passed in baseline but fail now."""
baseline_passed = {r['example_id'] for r in baseline['all_results'] if r['passed']}
return [
r['example_id']
for r in current['all_results']
if not r['passed'] and r['example_id'] in baseline_passed
]Production Sampling
Once deployed, continuously evaluate a sample of live traffic:
# monitoring/production_sampler.py
import random
import asyncio
from datetime import datetime
from typing import Callable
class ProductionSampler:
def __init__(
self,
evaluator,
sample_rate: float = 0.02, # 2% of traffic
alert_callback: Callable | None = None
):
self.evaluator = evaluator
self.sample_rate = sample_rate
self.alert_callback = alert_callback
self.buffer = []
self.buffer_size = 50
async def on_request(self, input: str, output: str, context: str | None = None):
"""Call this for every production request."""
if random.random() > self.sample_rate:
return
# Don't block the request — evaluate in background
asyncio.create_task(self._evaluate_and_buffer(input, output, context))
async def _evaluate_and_buffer(self, input, output, context):
result = await self.evaluator.evaluate(
question=input,
expected=None, # No ground truth for live traffic
actual=output,
context=context,
use_llm_judge=True
)
self.buffer.append({
'timestamp': datetime.utcnow().isoformat(),
'score': result.overall,
'faithfulness': result.llm_faithfulness,
'relevancy': result.llm_relevancy,
})
if len(self.buffer) >= self.buffer_size:
await self._flush_and_check()
async def _flush_and_check(self):
scores = [r['score'] for r in self.buffer]
mean = sum(scores) / len(scores)
# Alert if rolling mean drops below threshold
if mean < 0.75 and self.alert_callback:
await self.alert_callback({
'alert': 'quality_degradation',
'mean_score': mean,
'sample_size': len(self.buffer),
'timestamp': datetime.utcnow().isoformat()
})
# Send metrics to your observability platform
await self._emit_metrics(self.buffer)
self.buffer.clear()Integrate into your API handler:
# In your FastAPI or similar framework
sampler = ProductionSampler(evaluator=LLMEvaluator(), sample_rate=0.02)
@app.post("/chat")
async def chat(request: ChatRequest):
# Generate response
response = await generate_response(request.message, request.context)
# Sample for evaluation (non-blocking)
await sampler.on_request(
input=request.message,
output=response.text,
context=request.context
)
return responseAlerting and Dashboards
Emit evaluation metrics to your observability stack:
# Using Prometheus metrics
from prometheus_client import Gauge, Histogram, Counter
llm_quality_score = Gauge(
'llm_quality_score',
'Rolling mean quality score',
['model', 'endpoint']
)
llm_faithfulness = Gauge(
'llm_faithfulness_score',
'Rolling mean faithfulness',
['model', 'endpoint']
)
llm_low_quality_total = Counter(
'llm_low_quality_responses_total',
'Count of responses scoring below threshold',
['model', 'endpoint']
)
# Alert rules (Prometheus alertmanager)
alert_rules = """
groups:
- name: llm_quality
rules:
- alert: LLMQualityDegradation
expr: llm_quality_score < 0.75
for: 5m
labels:
severity: warning
annotations:
summary: "LLM quality score degraded"
description: "Quality score {{ $value }} below threshold 0.75"
- alert: LLMFaithfulnessDrop
expr: llm_faithfulness_score < 0.80
for: 10m
labels:
severity: critical
annotations:
summary: "LLM faithfulness critically low"
"""When Eval Pipelines Go Wrong
False negatives: Pipeline passes but real quality degraded.
- Cause: Test set not representative, thresholds too lenient
- Fix: Regularly audit failed production samples and add them to golden dataset
False positives: Pipeline fails but model is actually fine.
- Cause: Thresholds too tight, high variance in small test sets
- Fix: Use statistical significance tests; require changes to be significant at p<0.05
Eval flakiness: Same code produces different scores on each run.
- Cause: LLM-as-judge non-determinism (temperature > 0)
- Fix: Set temperature=0 for judge; average 3 runs for borderline cases
Slow feedback loops: CI eval takes 45 minutes.
- Fix: Parallelize evaluations; cache LLM judge responses with hash of (input, output); use smaller models for initial screening
HelpMeTest Integration
HelpMeTest provides automated monitoring for AI applications without requiring you to build and maintain the entire sampling and alerting infrastructure. You define evaluation tests in plain language, and the platform:
- Runs them on a configurable schedule
- Samples production traffic for evaluation
- Alerts you when quality metrics drop below thresholds
- Maintains historical data for trend analysis
This lets you focus on improving your AI application rather than maintaining evaluation infrastructure.
Summary
A production LLM eval pipeline has three layers:
- Pre-commit: Fast smoke tests on a curated subset. Catches obvious regressions before code review.
- CI suite: Full evaluation on every prompt/model change. Quality gates block bad changes from merging.
- Production sampling: Continuous evaluation of live traffic. Catches real-world drift that test suites miss.
The investment pays off quickly. A single hallucination incident caught by automated evaluation — before users see it — easily justifies the setup cost of an entire eval pipeline.