Testing OpenTelemetry Collector Pipelines, Processors, and Exporters

Testing OpenTelemetry Collector Pipelines, Processors, and Exporters

OpenTelemetry Collector sits at the heart of modern observability pipelines. It receives telemetry from your services, processes it, and exports it to your backends. When that pipeline misbehaves—dropping spans, garbling attributes, or misrouting logs—debugging it is painful. The solution is testing it properly before it ever touches production.

This guide covers how to test OTel Collector pipelines end-to-end, from individual processor unit tests to full pipeline integration tests using the collector-contrib test harness.

Why Testing the Collector Is Non-Trivial

The Collector runs as a separate process. Its configuration is YAML, not code. Processors transform data using a combination of OTTL expressions, regex patterns, and custom logic. All of this makes the standard unit testing approach awkward.

The risks if you skip testing:

  • A broken filter processor silently drops all your traces
  • An attributes processor with a wrong regex corrupts span names
  • A misconfigured exporter causes back-pressure that crashes your services

Setting Up the Test Harness

The collector-contrib project ships a testbed package that spins up a real Collector subprocess for integration testing.

package pipeline_test

import (
    "testing"

    "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
    "go.opentelemetry.io/collector/component/componenttest"
)

func TestTracePipeline(t *testing.T) {
    sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
    receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))

    runner := testbed.NewInProcessCollector(createProcessors())
    validator := testbed.NewCorrectTestValidator(
        sender.GenConfigYAMLStr(),
        receiver.GenConfigYAMLStr(),
        testbed.WithBatchedData(),
    )

    tc := testbed.NewTestCase(
        t, sender, receiver, runner, validator,
        testbed.WithDecisionFunc(testbed.OverrideDecisionFunc()),
    )
    defer tc.Stop()

    tc.StartBackend()
    tc.StartAgent()
    tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 100, ItemsPerBatch: 10})

    tc.WaitFor(func() bool {
        return tc.MockBackend.DataItemsReceived() >= 1000
    }, "wait for data")

    tc.StopLoad()
    tc.ValidateData()
}

Testing Individual Processors

For processors with complex logic, test them directly using processortest.NewNopCreateSettings().

Filter Processor

The filter processor drops telemetry matching certain conditions. Test that your filter does what you think it does:

func TestFilterProcessor(t *testing.T) {
    cfg := &filterprocessor.Config{
        Traces: filterprocessor.TraceFilters{
            SpanConditions: []string{
                `attributes["http.url"] == "/healthz"`,
            },
        },
    }

    factory := filterprocessor.NewFactory()
    sink := new(consumertest.TracesSink)
    
    proc, err := factory.CreateTracesProcessor(
        context.Background(),
        processortest.NewNopCreateSettings(),
        cfg,
        sink,
    )
    require.NoError(t, err)
    require.NoError(t, proc.Start(context.Background(), componenttest.NewNopHost()))
    defer proc.Shutdown(context.Background())

    // Build test spans
    td := ptrace.NewTraces()
    rs := td.ResourceSpans().AppendEmpty()
    ss := rs.ScopeSpans().AppendEmpty()

    // Health check span — should be filtered
    healthSpan := ss.Spans().AppendEmpty()
    healthSpan.SetName("GET /healthz")
    healthSpan.Attributes().PutStr("http.url", "/healthz")

    // Real span — should pass through
    realSpan := ss.Spans().AppendEmpty()
    realSpan.SetName("GET /api/users")
    realSpan.Attributes().PutStr("http.url", "/api/users")

    require.NoError(t, proc.ConsumeTraces(context.Background(), td))

    // Only the real span should reach the sink
    require.Equal(t, 1, sink.SpanCount())
    require.Equal(t, "GET /api/users", sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
}

Attributes Processor

func TestAttributesProcessor_AddHTTPMethod(t *testing.T) {
    cfg := &attributesprocessor.Config{
        Actions: []attraction.ActionKeyValue{
            {
                Key:    "http.method",
                Value:  "GET",
                Action: attraction.INSERT,
            },
        },
    }

    factory := attributesprocessor.NewFactory()
    sink := new(consumertest.TracesSink)
    
    proc, err := factory.CreateTracesProcessor(
        context.Background(),
        processortest.NewNopCreateSettings(),
        cfg,
        sink,
    )
    require.NoError(t, err)
    require.NoError(t, proc.Start(context.Background(), componenttest.NewNopHost()))
    defer proc.Shutdown(context.Background())

    td := createTestSpan("test-span", map[string]string{})
    require.NoError(t, proc.ConsumeTraces(context.Background(), td))

    spans := sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans()
    val, ok := spans.At(0).Attributes().Get("http.method")
    require.True(t, ok)
    require.Equal(t, "GET", val.Str())
}

Testing OTTL Expressions

OTTL (OpenTelemetry Transformation Language) is increasingly used in transform processors. Unit test your expressions:

func TestOTTLExpression(t *testing.T) {
    tests := []struct {
        name       string
        expression string
        input      map[string]interface{}
        wantAttr   map[string]string
    }{
        {
            name:       "extract service from url",
            expression: `set(attributes["service.name"], ExtractPatterns(attributes["http.url"], "^/(?P<service>[^/]+)/"))["service"]`,
            input:      map[string]interface{}{"http.url": "/payments/charge"},
            wantAttr:   map[string]string{"service.name": "payments"},
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            cfg := &transformprocessor.Config{
                TraceStatements: []common.ContextStatements{
                    {
                        Context:    "span",
                        Statements: []string{tt.expression},
                    },
                },
            }
            // Run through processor and verify output attrs
            verifyTransform(t, cfg, tt.input, tt.wantAttr)
        })
    }
}

Testing Exporters

Exporters are tested with mock backends. The testbed package provides several:

func TestOTLPExporter(t *testing.T) {
    // Start a mock OTLP backend
    backend := testbed.NewMockBackend("test-backend.log", testbed.NewOTLPDataReceiver(4317))
    require.NoError(t, backend.Start())
    defer backend.Stop()

    // Configure exporter pointing to mock backend
    factory := otlpexporter.NewFactory()
    cfg := factory.CreateDefaultConfig().(*otlpexporter.Config)
    cfg.Endpoint = "localhost:4317"
    cfg.TLSSetting = configtls.TLSClientSetting{Insecure: true}

    exp, err := factory.CreateTracesExporter(
        context.Background(),
        exportertest.NewNopCreateSettings(),
        cfg,
    )
    require.NoError(t, err)
    require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
    defer exp.Shutdown(context.Background())

    // Send test traces
    td := createTestTraces(10)
    require.NoError(t, exp.ConsumeTraces(context.Background(), td))

    // Allow async export
    require.Eventually(t, func() bool {
        return backend.DataItemsReceived() == 10
    }, 5*time.Second, 100*time.Millisecond)
}

Configuration Validation Tests

Your Collector config YAML is code. Validate it:

# test_collector_config.py
import yaml
import subprocess
import tempfile
import os
import pytest

VALID_CONFIG = """
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
processors:
  batch:
    timeout: 5s
exporters:
  logging:
    loglevel: debug
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [logging]
"""

def validate_config(config_yaml: str) -> tuple[bool, str]:
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
        f.write(config_yaml)
        config_path = f.name
    
    try:
        result = subprocess.run(
            ["otelcol", "validate", "--config", config_path],
            capture_output=True, text=True, timeout=10
        )
        return result.returncode == 0, result.stderr
    finally:
        os.unlink(config_path)

def test_valid_config():
    ok, err = validate_config(VALID_CONFIG)
    assert ok, f"Config validation failed: {err}"

def test_invalid_exporter_endpoint():
    bad_config = VALID_CONFIG.replace("0.0.0.0:4317", "not-a-valid-address:::")
    ok, _ = validate_config(bad_config)
    # Should still validate (endpoint is checked at runtime), not at config parse time
    assert ok  # Adjust based on your collector version behavior

def test_missing_pipeline_component():
    bad_config = """
receivers:
  otlp:
    protocols:
      grpc: {}
exporters:
  logging: {}
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [nonexistent_processor]
      exporters: [logging]
"""
    ok, err = validate_config(bad_config)
    assert not ok
    assert "nonexistent_processor" in err

CI Integration

Add collector tests to your CI pipeline:

# .github/workflows/collector-tests.yml
name: Collector Tests
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Install OTel Collector
        run: |
          wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.95.0/otelcol_0.95.0_linux_amd64.tar.gz
          tar -xzf otelcol_*.tar.gz
          sudo mv otelcol /usr/local/bin/
      
      - name: Run config validation tests
        run: pytest tests/collector/ -v
      
      - name: Run Go integration tests
        run: go test ./pipeline/... -v -timeout 120s

Key Takeaways

Testing OTel Collector pipelines requires a combination of approaches: processor unit tests for business logic, OTTL expression validation, exporter tests with mock backends, and config validation in CI. The testbed package makes integration testing practical without needing a full Kubernetes environment.

Start with config validation (fastest, catches the most errors), add processor unit tests for any complex OTTL or filter logic, and graduate to full pipeline integration tests for critical data paths.

Read more