Observability Pipeline Testing Best Practices: Schema, Cardinality, Latency, and Sampling

Observability Pipeline Testing Best Practices: Schema, Cardinality, Latency, and Sampling

Observability pipelines fail in subtle ways. A field name typo in a log goes unnoticed until a dashboard breaks. A new label with high cardinality silently degrades Prometheus performance over three weeks. A sampling rule with an off-by-one error drops 10% more traces than intended. None of these are caught by "does data flow through?" integration tests.

This guide covers advanced observability pipeline testing: schema enforcement, cardinality validation, pipeline latency benchmarking, and sampling correctness verification.

Schema Enforcement Testing

Logs and traces without enforced schemas drift. A service ships user_id as an integer in one version and a string in the next. Dashboards break, queries fail, and nobody knows why.

Define and Validate Log Schemas

# log_schema.py
from pydantic import BaseModel, validator
from typing import Optional, Literal
from datetime import datetime

class ApplicationLog(BaseModel):
    timestamp: datetime
    level: Literal["debug", "info", "warn", "error", "fatal"]
    message: str
    service_name: str
    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    user_id: Optional[str] = None  # Always string, never int

    @validator("trace_id")
    def validate_trace_id_format(cls, v):
        if v is not None and len(v) != 32:
            raise ValueError(f"trace_id must be 32 hex chars, got {len(v)}")
        return v

    @validator("service_name")
    def validate_service_name(cls, v):
        if not v.replace("-", "").replace("_", "").isalnum():
            raise ValueError(f"service_name must be alphanumeric with - and _")
        return v
# test_log_schema_enforcement.py
import json
import pytest
from confluent_kafka import Consumer, KafkaError
from log_schema import ApplicationLog

def consume_sample_logs(topic: str, count: int = 100) -> list[dict]:
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'schema-validator',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe([topic])

    messages = []
    while len(messages) < count:
        msg = consumer.poll(timeout=2.0)
        if msg is None:
            break
        if msg.error():
            continue
        messages.append(json.loads(msg.value().decode()))

    consumer.close()
    return messages

def test_all_logs_match_schema():
    logs = consume_sample_logs("app-logs", count=1000)
    assert len(logs) > 0, "No logs consumed — is the topic populated?"

    violations = []
    for i, log in enumerate(logs):
        try:
            ApplicationLog(**log)
        except Exception as e:
            violations.append(f"Log {i}: {e}")

    assert len(violations) == 0, (
        f"{len(violations)} schema violations found:\n" + "\n".join(violations[:10])
    )

def test_user_id_is_always_string():
    logs = consume_sample_logs("app-logs", count=500)
    int_user_ids = [
        log for log in logs
        if 'user_id' in log and isinstance(log['user_id'], int)
    ]
    assert len(int_user_ids) == 0, (
        f"Found {len(int_user_ids)} logs with integer user_id (should be string)"
    )

def test_log_levels_are_valid():
    valid_levels = {"debug", "info", "warn", "error", "fatal"}
    logs = consume_sample_logs("app-logs", count=500)
    invalid_levels = [
        (log.get('level'), log.get('service_name'))
        for log in logs
        if log.get('level') not in valid_levels
    ]
    assert len(invalid_levels) == 0, (
        f"Invalid log levels found: {invalid_levels[:5]}"
    )

OTel Schema Validation with Semantic Conventions

# test_otel_semantic_conventions.py
import requests

def get_jaeger_traces(service: str, limit: int = 50) -> list:
    resp = requests.get(
        f"http://localhost:16686/api/traces",
        params={"service": service, "limit": limit}
    )
    resp.raise_for_status()
    return resp.json().get("data", [])

def test_http_spans_have_required_attributes():
    """HTTP spans must follow OTel semantic conventions."""
    required_attrs = {"http.method", "http.url", "http.status_code"}
    
    traces = get_jaeger_traces("api-gateway")
    http_spans = [
        span
        for trace in traces
        for span in trace.get("spans", [])
        if any(tag["key"] == "http.method" for tag in span.get("tags", []))
    ]

    assert len(http_spans) > 0, "No HTTP spans found"

    violations = []
    for span in http_spans:
        tag_keys = {tag["key"] for tag in span.get("tags", [])}
        missing = required_attrs - tag_keys
        if missing:
            violations.append(f"Span '{span['operationName']}' missing: {missing}")

    assert len(violations) == 0, "\n".join(violations[:5])

def test_database_spans_have_query_attribute():
    """Database spans should include db.statement for debugging."""
    traces = get_jaeger_traces("payments-service")
    db_spans = [
        span
        for trace in traces
        for span in trace.get("spans", [])
        if any(tag["key"] == "db.system" for tag in span.get("tags", []))
    ]

    if len(db_spans) == 0:
        pytest.skip("No database spans found in sample")

    missing_statement = [
        span["operationName"] for span in db_spans
        if not any(tag["key"] == "db.statement" for tag in span.get("tags", []))
    ]

    # Warn rather than fail — db.statement may be intentionally omitted for security
    if missing_statement:
        pytest.warns(UserWarning, match="db.statement missing")

Cardinality Testing

High cardinality in Prometheus labels is the most common cause of performance degradation. Test for it before it hits production.

Cardinality Budget Tests

# test_cardinality.py
import requests
import pytest

PROMETHEUS_URL = "http://localhost:9090"
CARDINALITY_BUDGET = {
    "http_requests_total": 1000,      # Max 1000 unique label combinations
    "database_query_duration_seconds": 500,
    "cache_hits_total": 200,
}

def get_metric_cardinality(metric_name: str) -> int:
    """Return the number of unique time series for a metric."""
    resp = requests.get(
        f"{PROMETHEUS_URL}/api/v1/query",
        params={"query": f"count(count by(__name__, job, instance) ({metric_name}))"}
    )
    resp.raise_for_status()
    result = resp.json()["data"]["result"]
    if not result:
        return 0
    return int(result[0]["value"][1])

def get_label_values_count(metric: str, label: str) -> int:
    resp = requests.get(
        f"{PROMETHEUS_URL}/api/v1/label/{label}/values",
        params={"match[]": metric}
    )
    resp.raise_for_status()
    return len(resp.json().get("data", []))

@pytest.mark.parametrize("metric,budget", CARDINALITY_BUDGET.items())
def test_metric_within_cardinality_budget(metric, budget):
    cardinality = get_metric_cardinality(metric)
    assert cardinality <= budget, (
        f"{metric} has {cardinality} series, exceeding budget of {budget}. "
        f"Check for unbounded labels (user_id, request_id, trace_id on metric labels)."
    )

def test_no_unbounded_labels():
    """Detect metrics that have request-scoped labels (common cardinality bomb)."""
    dangerous_labels = ["user_id", "request_id", "trace_id", "session_id", "order_id"]
    
    for label in dangerous_labels:
        resp = requests.get(
            f"{PROMETHEUS_URL}/api/v1/label/{label}/values",
            params={"start": "-1h"}
        )
        if resp.status_code != 200:
            continue
        
        values = resp.json().get("data", [])
        assert len(values) < 100, (
            f"Label '{label}' has {len(values)} unique values in the last hour. "
            f"Request-scoped values in Prometheus labels cause cardinality explosions. "
            f"Use exemplars for trace_id instead of labels."
        )

def test_cardinality_not_growing():
    """Run twice, 60 seconds apart, and compare cardinality."""
    import time
    
    baseline = {m: get_metric_cardinality(m) for m in CARDINALITY_BUDGET}
    time.sleep(60)
    current = {m: get_metric_cardinality(m) for m in CARDINALITY_BUDGET}

    for metric in CARDINALITY_BUDGET:
        growth = current[metric] - baseline[metric]
        growth_pct = (growth / max(baseline[metric], 1)) * 100
        assert growth_pct < 10, (
            f"{metric} cardinality grew {growth_pct:.1f}% in 60 seconds "
            f"({baseline[metric]}{current[metric]}). Possible cardinality leak."
        )

Pipeline Latency Benchmarking

The time between an event happening and it appearing in your observability backend matters for incident response.

Measuring End-to-End Latency

# test_pipeline_latency.py
import time
import uuid
import json
import requests
from confluent_kafka import Producer

# SLA targets
MAX_LOG_LATENCY_SECONDS = 5.0    # Log must appear in Loki within 5s
MAX_TRACE_LATENCY_SECONDS = 3.0  # Trace must appear in Jaeger within 3s

def test_log_pipeline_latency():
    marker = f"latency-test-{uuid.uuid4().hex[:8]}"
    send_time = time.time()

    # Produce a uniquely identifiable log
    producer = Producer({'bootstrap.servers': 'localhost:9092'})
    producer.produce('app-logs', json.dumps({
        'timestamp': int(send_time * 1e9),
        'level': 'info',
        'message': marker,
        'service_name': 'latency-test'
    }).encode())
    producer.flush()

    # Poll Loki until the log appears
    start_poll = time.time()
    found = False
    
    while time.time() - start_poll < MAX_LOG_LATENCY_SECONDS + 2:
        resp = requests.get('http://localhost:3100/loki/api/v1/query', params={
            'query': f'{{service_name="latency-test"}} |= "{marker}"',
            'limit': 1
        })
        
        if resp.status_code == 200:
            result = resp.json().get('data', {}).get('result', [])
            if result:
                found = True
                latency = time.time() - send_time
                break
        
        time.sleep(0.2)

    assert found, f"Log with marker '{marker}' not found in Loki within {MAX_LOG_LATENCY_SECONDS}s"
    assert latency <= MAX_LOG_LATENCY_SECONDS, (
        f"Pipeline latency {latency:.2f}s exceeds SLA of {MAX_LOG_LATENCY_SECONDS}s"
    )

    print(f"Log pipeline latency: {latency:.3f}s")

def benchmark_pipeline_throughput():
    """Measure how many logs/second the pipeline handles before latency degrades."""
    rates = [100, 500, 1000, 2000, 5000]
    results = {}

    for rate in rates:
        latencies = send_logs_and_measure_latency(
            logs_per_second=rate,
            duration_seconds=10,
            sample_size=20
        )
        p99 = sorted(latencies)[int(len(latencies) * 0.99)]
        results[rate] = p99
        print(f"Rate: {rate} logs/s → p99 latency: {p99:.3f}s")

        if p99 > MAX_LOG_LATENCY_SECONDS:
            print(f"SLA breached at {rate} logs/s")
            break

    return results

Sampling Correctness Verification

When you sample traces (head-based or tail-based), you need to verify the math is right and that critical traces are never dropped.

Verifying Sample Rate

# test_sampling.py
import requests
import time
import math

EXPECTED_SAMPLE_RATE = 0.10  # 10% sampling
TOLERANCE = 0.02  # ±2%

def count_traces_in_window(service: str, start: str, end: str) -> int:
    """Count traces in Jaeger for a time window."""
    resp = requests.get(
        "http://localhost:16686/api/traces",
        params={"service": service, "start": start, "end": end, "limit": 10000}
    )
    return len(resp.json().get("data", []))

def count_requests_in_prometheus(service: str, window: str = "5m") -> int:
    """Count actual request volume from Prometheus metrics."""
    resp = requests.get(
        "http://localhost:9090/api/v1/query",
        params={"query": f'sum(increase(http_requests_total{{service="{service}"}}[{window}]))'}
    )
    result = resp.json()["data"]["result"]
    if not result:
        return 0
    return int(float(result[0]["value"][1]))

def test_sampling_rate_within_tolerance():
    window_minutes = 5
    end_time = int(time.time() * 1e6)  # microseconds for Jaeger
    start_time = end_time - (window_minutes * 60 * 1e6)

    actual_traces = count_traces_in_window(
        "api-gateway",
        str(int(start_time)),
        str(int(end_time))
    )
    total_requests = count_requests_in_prometheus("api-gateway", f"{window_minutes}m")

    if total_requests < 100:
        pytest.skip("Not enough traffic for meaningful sampling test")

    actual_rate = actual_traces / total_requests
    error = abs(actual_rate - EXPECTED_SAMPLE_RATE)

    assert error <= TOLERANCE, (
        f"Sampling rate {actual_rate:.3f} deviates from expected {EXPECTED_SAMPLE_RATE} "
        f"by {error:.3f} (tolerance: {TOLERANCE}). "
        f"Traces: {actual_traces}, Requests: {total_requests}"
    )

def test_error_traces_never_sampled_away():
    """Critical: error traces must always be kept regardless of sample rate."""
    # Query for 500 errors in Prometheus
    resp = requests.get(
        "http://localhost:9090/api/v1/query",
        params={"query": 'sum(increase(http_requests_total{status_code="500"}[5m]))'}
    )
    error_count = int(float(resp.json()["data"]["result"][0]["value"][1]))

    if error_count == 0:
        pytest.skip("No 500 errors in window — inject some for this test")

    # Query Jaeger for error traces
    resp = requests.get(
        "http://localhost:16686/api/traces",
        params={
            "service": "api-gateway",
            "tags": '{"error":"true"}',
            "limit": 10000
        }
    )
    error_traces = len(resp.json().get("data", []))

    # Every error should have a trace (100% sampling for errors)
    assert error_traces >= error_count, (
        f"Found {error_traces} error traces but {error_count} 500 errors in Prometheus. "
        f"Error traces are being sampled away — this should never happen."
    )

def test_dropping_correctness():
    """Verify that dropped logs represent the right fraction."""
    # Get drop statistics from OTel Collector metrics
    resp = requests.get(
        "http://localhost:8888/metrics"  # OTel Collector's own metrics endpoint
    )
    
    lines = resp.text.splitlines()
    accepted = dropped = 0
    
    for line in lines:
        if 'otelcol_processor_accepted_log_records' in line and not line.startswith('#'):
            accepted = float(line.split()[-1])
        if 'otelcol_processor_refused_log_records' in line and not line.startswith('#'):
            dropped = float(line.split()[-1])

    total = accepted + dropped
    if total == 0:
        pytest.skip("No log records processed yet")

    drop_rate = dropped / total
    # Our filter should drop ~20% (health check noise)
    assert 0.15 <= drop_rate <= 0.25, (
        f"Drop rate {drop_rate:.2%} outside expected 15-25% range. "
        f"Check filter processor configuration."
    )

Alerting on Pipeline Health

Wire these tests into your CI pipeline, but also run cardinality and latency checks as scheduled monitors:

# .github/workflows/observability-pipeline-tests.yml
name: Observability Pipeline Tests

on:
  schedule:
    - cron: '*/15 * * * *'  # Every 15 minutes
  push:
    paths:
      - 'observability/**'
      - 'vector.toml'
      - 'otelcol/*.yaml'

jobs:
  schema-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements-test.txt
      - run: pytest tests/observability/test_log_schema_enforcement.py -v
        env:
          KAFKA_BROKERS: ${{ secrets.KAFKA_BROKERS }}

  cardinality-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pytest tests/observability/test_cardinality.py -v
        env:
          PROMETHEUS_URL: ${{ secrets.PROMETHEUS_URL }}

  latency-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pytest tests/observability/test_pipeline_latency.py -v --timeout=60

What Good Observability Pipeline Testing Looks Like

A mature observability pipeline test suite has four layers:

  1. Schema tests — run in CI on every deploy, validate against Pydantic/JSON Schema models
  2. Cardinality tests — run on a schedule (daily), alert when a metric exceeds its budget
  3. Latency tests — run on a schedule (every 15 min), alert when p99 latency exceeds SLA
  4. Sampling tests — run after every sampler config change, validate error traces are always captured

Each layer catches different failure modes. Schema tests catch developer mistakes. Cardinality tests catch growth patterns before they become incidents. Latency tests catch infrastructure degradation. Sampling tests catch configuration drift.

Start with schema enforcement — it's the highest value per hour of implementation, and it prevents entire categories of dashboard breakage.

Read more