Observability Pipeline Testing Best Practices: Schema, Cardinality, Latency, and Sampling
Observability pipelines fail in subtle ways. A field name typo in a log goes unnoticed until a dashboard breaks. A new label with high cardinality silently degrades Prometheus performance over three weeks. A sampling rule with an off-by-one error drops 10% more traces than intended. None of these are caught by "does data flow through?" integration tests.
This guide covers advanced observability pipeline testing: schema enforcement, cardinality validation, pipeline latency benchmarking, and sampling correctness verification.
Schema Enforcement Testing
Logs and traces without enforced schemas drift. A service ships user_id as an integer in one version and a string in the next. Dashboards break, queries fail, and nobody knows why.
Define and Validate Log Schemas
# log_schema.py
from pydantic import BaseModel, validator
from typing import Optional, Literal
from datetime import datetime
class ApplicationLog(BaseModel):
timestamp: datetime
level: Literal["debug", "info", "warn", "error", "fatal"]
message: str
service_name: str
trace_id: Optional[str] = None
span_id: Optional[str] = None
user_id: Optional[str] = None # Always string, never int
@validator("trace_id")
def validate_trace_id_format(cls, v):
if v is not None and len(v) != 32:
raise ValueError(f"trace_id must be 32 hex chars, got {len(v)}")
return v
@validator("service_name")
def validate_service_name(cls, v):
if not v.replace("-", "").replace("_", "").isalnum():
raise ValueError(f"service_name must be alphanumeric with - and _")
return v# test_log_schema_enforcement.py
import json
import pytest
from confluent_kafka import Consumer, KafkaError
from log_schema import ApplicationLog
def consume_sample_logs(topic: str, count: int = 100) -> list[dict]:
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'schema-validator',
'auto.offset.reset': 'earliest'
})
consumer.subscribe([topic])
messages = []
while len(messages) < count:
msg = consumer.poll(timeout=2.0)
if msg is None:
break
if msg.error():
continue
messages.append(json.loads(msg.value().decode()))
consumer.close()
return messages
def test_all_logs_match_schema():
logs = consume_sample_logs("app-logs", count=1000)
assert len(logs) > 0, "No logs consumed — is the topic populated?"
violations = []
for i, log in enumerate(logs):
try:
ApplicationLog(**log)
except Exception as e:
violations.append(f"Log {i}: {e}")
assert len(violations) == 0, (
f"{len(violations)} schema violations found:\n" + "\n".join(violations[:10])
)
def test_user_id_is_always_string():
logs = consume_sample_logs("app-logs", count=500)
int_user_ids = [
log for log in logs
if 'user_id' in log and isinstance(log['user_id'], int)
]
assert len(int_user_ids) == 0, (
f"Found {len(int_user_ids)} logs with integer user_id (should be string)"
)
def test_log_levels_are_valid():
valid_levels = {"debug", "info", "warn", "error", "fatal"}
logs = consume_sample_logs("app-logs", count=500)
invalid_levels = [
(log.get('level'), log.get('service_name'))
for log in logs
if log.get('level') not in valid_levels
]
assert len(invalid_levels) == 0, (
f"Invalid log levels found: {invalid_levels[:5]}"
)OTel Schema Validation with Semantic Conventions
# test_otel_semantic_conventions.py
import requests
def get_jaeger_traces(service: str, limit: int = 50) -> list:
resp = requests.get(
f"http://localhost:16686/api/traces",
params={"service": service, "limit": limit}
)
resp.raise_for_status()
return resp.json().get("data", [])
def test_http_spans_have_required_attributes():
"""HTTP spans must follow OTel semantic conventions."""
required_attrs = {"http.method", "http.url", "http.status_code"}
traces = get_jaeger_traces("api-gateway")
http_spans = [
span
for trace in traces
for span in trace.get("spans", [])
if any(tag["key"] == "http.method" for tag in span.get("tags", []))
]
assert len(http_spans) > 0, "No HTTP spans found"
violations = []
for span in http_spans:
tag_keys = {tag["key"] for tag in span.get("tags", [])}
missing = required_attrs - tag_keys
if missing:
violations.append(f"Span '{span['operationName']}' missing: {missing}")
assert len(violations) == 0, "\n".join(violations[:5])
def test_database_spans_have_query_attribute():
"""Database spans should include db.statement for debugging."""
traces = get_jaeger_traces("payments-service")
db_spans = [
span
for trace in traces
for span in trace.get("spans", [])
if any(tag["key"] == "db.system" for tag in span.get("tags", []))
]
if len(db_spans) == 0:
pytest.skip("No database spans found in sample")
missing_statement = [
span["operationName"] for span in db_spans
if not any(tag["key"] == "db.statement" for tag in span.get("tags", []))
]
# Warn rather than fail — db.statement may be intentionally omitted for security
if missing_statement:
pytest.warns(UserWarning, match="db.statement missing")Cardinality Testing
High cardinality in Prometheus labels is the most common cause of performance degradation. Test for it before it hits production.
Cardinality Budget Tests
# test_cardinality.py
import requests
import pytest
PROMETHEUS_URL = "http://localhost:9090"
CARDINALITY_BUDGET = {
"http_requests_total": 1000, # Max 1000 unique label combinations
"database_query_duration_seconds": 500,
"cache_hits_total": 200,
}
def get_metric_cardinality(metric_name: str) -> int:
"""Return the number of unique time series for a metric."""
resp = requests.get(
f"{PROMETHEUS_URL}/api/v1/query",
params={"query": f"count(count by(__name__, job, instance) ({metric_name}))"}
)
resp.raise_for_status()
result = resp.json()["data"]["result"]
if not result:
return 0
return int(result[0]["value"][1])
def get_label_values_count(metric: str, label: str) -> int:
resp = requests.get(
f"{PROMETHEUS_URL}/api/v1/label/{label}/values",
params={"match[]": metric}
)
resp.raise_for_status()
return len(resp.json().get("data", []))
@pytest.mark.parametrize("metric,budget", CARDINALITY_BUDGET.items())
def test_metric_within_cardinality_budget(metric, budget):
cardinality = get_metric_cardinality(metric)
assert cardinality <= budget, (
f"{metric} has {cardinality} series, exceeding budget of {budget}. "
f"Check for unbounded labels (user_id, request_id, trace_id on metric labels)."
)
def test_no_unbounded_labels():
"""Detect metrics that have request-scoped labels (common cardinality bomb)."""
dangerous_labels = ["user_id", "request_id", "trace_id", "session_id", "order_id"]
for label in dangerous_labels:
resp = requests.get(
f"{PROMETHEUS_URL}/api/v1/label/{label}/values",
params={"start": "-1h"}
)
if resp.status_code != 200:
continue
values = resp.json().get("data", [])
assert len(values) < 100, (
f"Label '{label}' has {len(values)} unique values in the last hour. "
f"Request-scoped values in Prometheus labels cause cardinality explosions. "
f"Use exemplars for trace_id instead of labels."
)
def test_cardinality_not_growing():
"""Run twice, 60 seconds apart, and compare cardinality."""
import time
baseline = {m: get_metric_cardinality(m) for m in CARDINALITY_BUDGET}
time.sleep(60)
current = {m: get_metric_cardinality(m) for m in CARDINALITY_BUDGET}
for metric in CARDINALITY_BUDGET:
growth = current[metric] - baseline[metric]
growth_pct = (growth / max(baseline[metric], 1)) * 100
assert growth_pct < 10, (
f"{metric} cardinality grew {growth_pct:.1f}% in 60 seconds "
f"({baseline[metric]} → {current[metric]}). Possible cardinality leak."
)Pipeline Latency Benchmarking
The time between an event happening and it appearing in your observability backend matters for incident response.
Measuring End-to-End Latency
# test_pipeline_latency.py
import time
import uuid
import json
import requests
from confluent_kafka import Producer
# SLA targets
MAX_LOG_LATENCY_SECONDS = 5.0 # Log must appear in Loki within 5s
MAX_TRACE_LATENCY_SECONDS = 3.0 # Trace must appear in Jaeger within 3s
def test_log_pipeline_latency():
marker = f"latency-test-{uuid.uuid4().hex[:8]}"
send_time = time.time()
# Produce a uniquely identifiable log
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('app-logs', json.dumps({
'timestamp': int(send_time * 1e9),
'level': 'info',
'message': marker,
'service_name': 'latency-test'
}).encode())
producer.flush()
# Poll Loki until the log appears
start_poll = time.time()
found = False
while time.time() - start_poll < MAX_LOG_LATENCY_SECONDS + 2:
resp = requests.get('http://localhost:3100/loki/api/v1/query', params={
'query': f'{{service_name="latency-test"}} |= "{marker}"',
'limit': 1
})
if resp.status_code == 200:
result = resp.json().get('data', {}).get('result', [])
if result:
found = True
latency = time.time() - send_time
break
time.sleep(0.2)
assert found, f"Log with marker '{marker}' not found in Loki within {MAX_LOG_LATENCY_SECONDS}s"
assert latency <= MAX_LOG_LATENCY_SECONDS, (
f"Pipeline latency {latency:.2f}s exceeds SLA of {MAX_LOG_LATENCY_SECONDS}s"
)
print(f"Log pipeline latency: {latency:.3f}s")
def benchmark_pipeline_throughput():
"""Measure how many logs/second the pipeline handles before latency degrades."""
rates = [100, 500, 1000, 2000, 5000]
results = {}
for rate in rates:
latencies = send_logs_and_measure_latency(
logs_per_second=rate,
duration_seconds=10,
sample_size=20
)
p99 = sorted(latencies)[int(len(latencies) * 0.99)]
results[rate] = p99
print(f"Rate: {rate} logs/s → p99 latency: {p99:.3f}s")
if p99 > MAX_LOG_LATENCY_SECONDS:
print(f"SLA breached at {rate} logs/s")
break
return resultsSampling Correctness Verification
When you sample traces (head-based or tail-based), you need to verify the math is right and that critical traces are never dropped.
Verifying Sample Rate
# test_sampling.py
import requests
import time
import math
EXPECTED_SAMPLE_RATE = 0.10 # 10% sampling
TOLERANCE = 0.02 # ±2%
def count_traces_in_window(service: str, start: str, end: str) -> int:
"""Count traces in Jaeger for a time window."""
resp = requests.get(
"http://localhost:16686/api/traces",
params={"service": service, "start": start, "end": end, "limit": 10000}
)
return len(resp.json().get("data", []))
def count_requests_in_prometheus(service: str, window: str = "5m") -> int:
"""Count actual request volume from Prometheus metrics."""
resp = requests.get(
"http://localhost:9090/api/v1/query",
params={"query": f'sum(increase(http_requests_total{{service="{service}"}}[{window}]))'}
)
result = resp.json()["data"]["result"]
if not result:
return 0
return int(float(result[0]["value"][1]))
def test_sampling_rate_within_tolerance():
window_minutes = 5
end_time = int(time.time() * 1e6) # microseconds for Jaeger
start_time = end_time - (window_minutes * 60 * 1e6)
actual_traces = count_traces_in_window(
"api-gateway",
str(int(start_time)),
str(int(end_time))
)
total_requests = count_requests_in_prometheus("api-gateway", f"{window_minutes}m")
if total_requests < 100:
pytest.skip("Not enough traffic for meaningful sampling test")
actual_rate = actual_traces / total_requests
error = abs(actual_rate - EXPECTED_SAMPLE_RATE)
assert error <= TOLERANCE, (
f"Sampling rate {actual_rate:.3f} deviates from expected {EXPECTED_SAMPLE_RATE} "
f"by {error:.3f} (tolerance: {TOLERANCE}). "
f"Traces: {actual_traces}, Requests: {total_requests}"
)
def test_error_traces_never_sampled_away():
"""Critical: error traces must always be kept regardless of sample rate."""
# Query for 500 errors in Prometheus
resp = requests.get(
"http://localhost:9090/api/v1/query",
params={"query": 'sum(increase(http_requests_total{status_code="500"}[5m]))'}
)
error_count = int(float(resp.json()["data"]["result"][0]["value"][1]))
if error_count == 0:
pytest.skip("No 500 errors in window — inject some for this test")
# Query Jaeger for error traces
resp = requests.get(
"http://localhost:16686/api/traces",
params={
"service": "api-gateway",
"tags": '{"error":"true"}',
"limit": 10000
}
)
error_traces = len(resp.json().get("data", []))
# Every error should have a trace (100% sampling for errors)
assert error_traces >= error_count, (
f"Found {error_traces} error traces but {error_count} 500 errors in Prometheus. "
f"Error traces are being sampled away — this should never happen."
)
def test_dropping_correctness():
"""Verify that dropped logs represent the right fraction."""
# Get drop statistics from OTel Collector metrics
resp = requests.get(
"http://localhost:8888/metrics" # OTel Collector's own metrics endpoint
)
lines = resp.text.splitlines()
accepted = dropped = 0
for line in lines:
if 'otelcol_processor_accepted_log_records' in line and not line.startswith('#'):
accepted = float(line.split()[-1])
if 'otelcol_processor_refused_log_records' in line and not line.startswith('#'):
dropped = float(line.split()[-1])
total = accepted + dropped
if total == 0:
pytest.skip("No log records processed yet")
drop_rate = dropped / total
# Our filter should drop ~20% (health check noise)
assert 0.15 <= drop_rate <= 0.25, (
f"Drop rate {drop_rate:.2%} outside expected 15-25% range. "
f"Check filter processor configuration."
)Alerting on Pipeline Health
Wire these tests into your CI pipeline, but also run cardinality and latency checks as scheduled monitors:
# .github/workflows/observability-pipeline-tests.yml
name: Observability Pipeline Tests
on:
schedule:
- cron: '*/15 * * * *' # Every 15 minutes
push:
paths:
- 'observability/**'
- 'vector.toml'
- 'otelcol/*.yaml'
jobs:
schema-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install -r requirements-test.txt
- run: pytest tests/observability/test_log_schema_enforcement.py -v
env:
KAFKA_BROKERS: ${{ secrets.KAFKA_BROKERS }}
cardinality-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pytest tests/observability/test_cardinality.py -v
env:
PROMETHEUS_URL: ${{ secrets.PROMETHEUS_URL }}
latency-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pytest tests/observability/test_pipeline_latency.py -v --timeout=60What Good Observability Pipeline Testing Looks Like
A mature observability pipeline test suite has four layers:
- Schema tests — run in CI on every deploy, validate against Pydantic/JSON Schema models
- Cardinality tests — run on a schedule (daily), alert when a metric exceeds its budget
- Latency tests — run on a schedule (every 15 min), alert when p99 latency exceeds SLA
- Sampling tests — run after every sampler config change, validate error traces are always captured
Each layer catches different failure modes. Schema tests catch developer mistakes. Cardinality tests catch growth patterns before they become incidents. Latency tests catch infrastructure degradation. Sampling tests catch configuration drift.
Start with schema enforcement — it's the highest value per hour of implementation, and it prevents entire categories of dashboard breakage.