Testing OpenTelemetry Collector Pipelines, Processors, and Exporters
OpenTelemetry Collector sits at the heart of modern observability pipelines. It receives telemetry from your services, processes it, and exports it to your backends. When that pipeline misbehaves—dropping spans, garbling attributes, or misrouting logs—debugging it is painful. The solution is testing it properly before it ever touches production.
This guide covers how to test OTel Collector pipelines end-to-end, from individual processor unit tests to full pipeline integration tests using the collector-contrib test harness.
Why Testing the Collector Is Non-Trivial
The Collector runs as a separate process. Its configuration is YAML, not code. Processors transform data using a combination of OTTL expressions, regex patterns, and custom logic. All of this makes the standard unit testing approach awkward.
The risks if you skip testing:
- A broken
filterprocessor silently drops all your traces - An
attributesprocessor with a wrong regex corrupts span names - A misconfigured exporter causes back-pressure that crashes your services
Setting Up the Test Harness
The collector-contrib project ships a testbed package that spins up a real Collector subprocess for integration testing.
package pipeline_test
import (
"testing"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component/componenttest"
)
func TestTracePipeline(t *testing.T) {
sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
runner := testbed.NewInProcessCollector(createProcessors())
validator := testbed.NewCorrectTestValidator(
sender.GenConfigYAMLStr(),
receiver.GenConfigYAMLStr(),
testbed.WithBatchedData(),
)
tc := testbed.NewTestCase(
t, sender, receiver, runner, validator,
testbed.WithDecisionFunc(testbed.OverrideDecisionFunc()),
)
defer tc.Stop()
tc.StartBackend()
tc.StartAgent()
tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 100, ItemsPerBatch: 10})
tc.WaitFor(func() bool {
return tc.MockBackend.DataItemsReceived() >= 1000
}, "wait for data")
tc.StopLoad()
tc.ValidateData()
}Testing Individual Processors
For processors with complex logic, test them directly using processortest.NewNopCreateSettings().
Filter Processor
The filter processor drops telemetry matching certain conditions. Test that your filter does what you think it does:
func TestFilterProcessor(t *testing.T) {
cfg := &filterprocessor.Config{
Traces: filterprocessor.TraceFilters{
SpanConditions: []string{
`attributes["http.url"] == "/healthz"`,
},
},
}
factory := filterprocessor.NewFactory()
sink := new(consumertest.TracesSink)
proc, err := factory.CreateTracesProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
cfg,
sink,
)
require.NoError(t, err)
require.NoError(t, proc.Start(context.Background(), componenttest.NewNopHost()))
defer proc.Shutdown(context.Background())
// Build test spans
td := ptrace.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
ss := rs.ScopeSpans().AppendEmpty()
// Health check span — should be filtered
healthSpan := ss.Spans().AppendEmpty()
healthSpan.SetName("GET /healthz")
healthSpan.Attributes().PutStr("http.url", "/healthz")
// Real span — should pass through
realSpan := ss.Spans().AppendEmpty()
realSpan.SetName("GET /api/users")
realSpan.Attributes().PutStr("http.url", "/api/users")
require.NoError(t, proc.ConsumeTraces(context.Background(), td))
// Only the real span should reach the sink
require.Equal(t, 1, sink.SpanCount())
require.Equal(t, "GET /api/users", sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
}Attributes Processor
func TestAttributesProcessor_AddHTTPMethod(t *testing.T) {
cfg := &attributesprocessor.Config{
Actions: []attraction.ActionKeyValue{
{
Key: "http.method",
Value: "GET",
Action: attraction.INSERT,
},
},
}
factory := attributesprocessor.NewFactory()
sink := new(consumertest.TracesSink)
proc, err := factory.CreateTracesProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
cfg,
sink,
)
require.NoError(t, err)
require.NoError(t, proc.Start(context.Background(), componenttest.NewNopHost()))
defer proc.Shutdown(context.Background())
td := createTestSpan("test-span", map[string]string{})
require.NoError(t, proc.ConsumeTraces(context.Background(), td))
spans := sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans()
val, ok := spans.At(0).Attributes().Get("http.method")
require.True(t, ok)
require.Equal(t, "GET", val.Str())
}Testing OTTL Expressions
OTTL (OpenTelemetry Transformation Language) is increasingly used in transform processors. Unit test your expressions:
func TestOTTLExpression(t *testing.T) {
tests := []struct {
name string
expression string
input map[string]interface{}
wantAttr map[string]string
}{
{
name: "extract service from url",
expression: `set(attributes["service.name"], ExtractPatterns(attributes["http.url"], "^/(?P<service>[^/]+)/"))["service"]`,
input: map[string]interface{}{"http.url": "/payments/charge"},
wantAttr: map[string]string{"service.name": "payments"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &transformprocessor.Config{
TraceStatements: []common.ContextStatements{
{
Context: "span",
Statements: []string{tt.expression},
},
},
}
// Run through processor and verify output attrs
verifyTransform(t, cfg, tt.input, tt.wantAttr)
})
}
}Testing Exporters
Exporters are tested with mock backends. The testbed package provides several:
func TestOTLPExporter(t *testing.T) {
// Start a mock OTLP backend
backend := testbed.NewMockBackend("test-backend.log", testbed.NewOTLPDataReceiver(4317))
require.NoError(t, backend.Start())
defer backend.Stop()
// Configure exporter pointing to mock backend
factory := otlpexporter.NewFactory()
cfg := factory.CreateDefaultConfig().(*otlpexporter.Config)
cfg.Endpoint = "localhost:4317"
cfg.TLSSetting = configtls.TLSClientSetting{Insecure: true}
exp, err := factory.CreateTracesExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg,
)
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
defer exp.Shutdown(context.Background())
// Send test traces
td := createTestTraces(10)
require.NoError(t, exp.ConsumeTraces(context.Background(), td))
// Allow async export
require.Eventually(t, func() bool {
return backend.DataItemsReceived() == 10
}, 5*time.Second, 100*time.Millisecond)
}Configuration Validation Tests
Your Collector config YAML is code. Validate it:
# test_collector_config.py
import yaml
import subprocess
import tempfile
import os
import pytest
VALID_CONFIG = """
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
batch:
timeout: 5s
exporters:
logging:
loglevel: debug
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging]
"""
def validate_config(config_yaml: str) -> tuple[bool, str]:
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(config_yaml)
config_path = f.name
try:
result = subprocess.run(
["otelcol", "validate", "--config", config_path],
capture_output=True, text=True, timeout=10
)
return result.returncode == 0, result.stderr
finally:
os.unlink(config_path)
def test_valid_config():
ok, err = validate_config(VALID_CONFIG)
assert ok, f"Config validation failed: {err}"
def test_invalid_exporter_endpoint():
bad_config = VALID_CONFIG.replace("0.0.0.0:4317", "not-a-valid-address:::")
ok, _ = validate_config(bad_config)
# Should still validate (endpoint is checked at runtime), not at config parse time
assert ok # Adjust based on your collector version behavior
def test_missing_pipeline_component():
bad_config = """
receivers:
otlp:
protocols:
grpc: {}
exporters:
logging: {}
service:
pipelines:
traces:
receivers: [otlp]
processors: [nonexistent_processor]
exporters: [logging]
"""
ok, err = validate_config(bad_config)
assert not ok
assert "nonexistent_processor" in errCI Integration
Add collector tests to your CI pipeline:
# .github/workflows/collector-tests.yml
name: Collector Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install OTel Collector
run: |
wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.95.0/otelcol_0.95.0_linux_amd64.tar.gz
tar -xzf otelcol_*.tar.gz
sudo mv otelcol /usr/local/bin/
- name: Run config validation tests
run: pytest tests/collector/ -v
- name: Run Go integration tests
run: go test ./pipeline/... -v -timeout 120sKey Takeaways
Testing OTel Collector pipelines requires a combination of approaches: processor unit tests for business logic, OTTL expression validation, exporter tests with mock backends, and config validation in CI. The testbed package makes integration testing practical without needing a full Kubernetes environment.
Start with config validation (fastest, catches the most errors), add processor unit tests for any complex OTTL or filter logic, and graduate to full pipeline integration tests for critical data paths.