End-to-End Log Pipeline Integration Testing with Testcontainers
Unit testing individual pipeline components catches local bugs. But the integration between Kafka, OTel Collector, and your backends is where real failures hide. A schema mismatch between Kafka producer and OTel receiver. A Loki label cardinality issue that only surfaces under load. An OTel Collector exporter that retries forever when Jaeger is temporarily unavailable.
This guide shows how to build integration tests for the full observability pipeline — Kafka → OTel Collector → Jaeger/Loki — using Testcontainers. These tests run in CI, require no persistent infrastructure, and catch the failures that unit tests miss.
Architecture Under Test
The pipeline we're testing:
Application → Kafka topic (structured logs)
→ OTel Collector (consumes Kafka, transforms, routes)
→ Jaeger (traces)
→ Loki (logs)
→ Prometheus (metrics)Each arrow is a potential failure point.
Setting Up Testcontainers
Go Setup
// go.mod dependencies:
// github.com/testcontainers/testcontainers-go v0.30.0
// github.com/testcontainers/testcontainers-go/modules/kafka
// github.com/testcontainers/testcontainers-go/modules/jaeger
package pipeline_test
import (
"context"
"testing"
"time"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/kafka"
"github.com/testcontainers/testcontainers-go/wait"
)
type PipelineTestSuite struct {
ctx context.Context
kafkaContainer *kafka.KafkaContainer
jaegerContainer testcontainers.Container
lokiContainer testcontainers.Container
otelContainer testcontainers.Container
kafkaBroker string
jaegerURL string
lokiURL string
}
func setupPipeline(t *testing.T) *PipelineTestSuite {
t.Helper()
ctx := context.Background()
suite := &PipelineTestSuite{ctx: ctx}
// Start Kafka
kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
)
if err != nil {
t.Fatalf("failed to start Kafka: %v", err)
}
t.Cleanup(func() { kafkaContainer.Terminate(ctx) })
brokers, err := kafkaContainer.Brokers(ctx)
if err != nil {
t.Fatalf("failed to get Kafka brokers: %v", err)
}
suite.kafkaBroker = brokers[0]
suite.kafkaContainer = kafkaContainer
// Start Jaeger
jaegerReq := testcontainers.ContainerRequest{
Image: "jaegertracing/all-in-one:1.55",
ExposedPorts: []string{"16686/tcp", "14250/tcp"},
WaitingFor: wait.ForHTTP("/").WithPort("16686/tcp"),
}
jaegerContainer, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: jaegerReq,
Started: true,
})
if err != nil {
t.Fatalf("failed to start Jaeger: %v", err)
}
t.Cleanup(func() { jaegerContainer.Terminate(ctx) })
jaegerHost, _ := jaegerContainer.Host(ctx)
jaegerPort, _ := jaegerContainer.MappedPort(ctx, "16686")
suite.jaegerURL = fmt.Sprintf("http://%s:%s", jaegerHost, jaegerPort.Port())
// Start Loki
lokiReq := testcontainers.ContainerRequest{
Image: "grafana/loki:2.9.4",
ExposedPorts: []string{"3100/tcp"},
Cmd: []string{"-config.file=/etc/loki/local-config.yaml"},
WaitingFor: wait.ForHTTP("/ready").WithPort("3100/tcp"),
}
lokiContainer, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: lokiReq,
Started: true,
})
if err != nil {
t.Fatalf("failed to start Loki: %v", err)
}
t.Cleanup(func() { lokiContainer.Terminate(ctx) })
lokiHost, _ := lokiContainer.Host(ctx)
lokiPort, _ := lokiContainer.MappedPort(ctx, "3100")
suite.lokiURL = fmt.Sprintf("http://%s:%s", lokiHost, lokiPort.Port())
// Start OTel Collector with config pointing to the started containers
suite.otelContainer = startOtelCollector(t, ctx, suite)
return suite
}Starting OTel Collector with Dynamic Config
func startOtelCollector(t *testing.T, ctx context.Context, suite *PipelineTestSuite) testcontainers.Container {
t.Helper()
// Generate config with actual container addresses
jaegerGRPCPort, _ := suite.jaegerContainer.MappedPort(ctx, "14250")
jaegerHost, _ := suite.jaegerContainer.Host(ctx)
otelConfig := fmt.Sprintf(`
receivers:
kafka:
brokers: [%s]
topic: app-logs
protocol_version: 2.0.0
encoding: json
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
batch:
timeout: 1s
resource:
attributes:
- key: environment
value: test
action: upsert
exporters:
jaeger:
endpoint: %s:%s
tls:
insecure: true
loki:
endpoint: %s/loki/api/v1/push
labels:
resource_attributes:
- service.name
- environment
logging:
loglevel: debug
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resource]
exporters: [jaeger]
logs:
receivers: [kafka]
processors: [batch, resource]
exporters: [loki, logging]
`, suite.kafkaBroker, jaegerHost, jaegerGRPCPort.Port(), suite.lokiURL)
// Write config to a temp file and mount it
configFile, err := os.CreateTemp("", "otel-collector-*.yaml")
require.NoError(t, err)
configFile.WriteString(otelConfig)
configFile.Close()
t.Cleanup(func() { os.Remove(configFile.Name()) })
req := testcontainers.ContainerRequest{
Image: "otel/opentelemetry-collector-contrib:0.95.0",
ExposedPorts: []string{"4317/tcp"},
Mounts: testcontainers.ContainerMounts{
testcontainers.BindMount(configFile.Name(), "/etc/otelcol/config.yaml"),
},
Cmd: []string{"--config=/etc/otelcol/config.yaml"},
WaitingFor: wait.ForLog("Everything is ready").WithStartupTimeout(30 * time.Second),
}
container, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{ContainerRequest: req, Started: true})
require.NoError(t, err)
t.Cleanup(func() { container.Terminate(ctx) })
return container
}Writing the Integration Tests
Test: Logs Flow from Kafka to Loki
func TestLogsFlowKafkaToLoki(t *testing.T) {
suite := setupPipeline(t)
// Produce a log event to Kafka
writer := &kafka.Writer{
Addr: kafka.TCP(suite.kafkaBroker),
Topic: "app-logs",
}
defer writer.Close()
logEvent := map[string]interface{}{
"timestamp": time.Now().UnixNano(),
"level": "error",
"message": "database connection failed",
"service.name": "payments",
"trace_id": "abc123def456",
}
payload, _ := json.Marshal(logEvent)
err := writer.WriteMessages(suite.ctx, kafka.Message{Value: payload})
require.NoError(t, err)
// Poll Loki until the log appears (up to 10 seconds)
lokiClient := loki.NewClient(suite.lokiURL)
var found bool
require.Eventually(t, func() bool {
result, err := lokiClient.Query(`{service_name="payments"} |= "database connection failed"`, 10)
if err != nil {
return false
}
found = result.TotalEntries() > 0
return found
}, 10*time.Second, 500*time.Millisecond, "log did not reach Loki within timeout")
// Verify log content
entries := lokiClient.QueryRange(`{service_name="payments"}`, time.Now().Add(-1*time.Minute), time.Now(), 10)
require.Greater(t, len(entries), 0)
entry := entries[0]
assert.Contains(t, entry.Line, "database connection failed")
assert.Equal(t, "error", entry.Labels["level"])
}Test: Traces Flow from OTLP to Jaeger
func TestTracesFlowOTLPToJaeger(t *testing.T) {
suite := setupPipeline(t)
// Get OTel Collector's OTLP endpoint
otelHost, _ := suite.otelContainer.Host(suite.ctx)
otelPort, _ := suite.otelContainer.MappedPort(suite.ctx, "4317")
// Send a trace
conn, err := grpc.DialContext(suite.ctx,
fmt.Sprintf("%s:%s", otelHost, otelPort.Port()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()
traceClient := otlptracegrpc.NewClient(otlptracegrpc.WithGRPCConn(conn))
traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceClient),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("test-service"),
)),
)
defer traceProvider.Shutdown(suite.ctx)
tracer := traceProvider.Tracer("test")
_, span := tracer.Start(suite.ctx, "integration-test-span")
span.SetAttributes(attribute.String("test.id", "HEL-318"))
span.End()
traceProvider.ForceFlush(suite.ctx)
// Query Jaeger for the trace
serviceURL := fmt.Sprintf("%s/api/services", suite.jaegerURL)
require.Eventually(t, func() bool {
resp, err := http.Get(serviceURL)
if err != nil {
return false
}
defer resp.Body.Close()
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
services, _ := result["data"].([]interface{})
for _, s := range services {
if s == "test-service" {
return true
}
}
return false
}, 10*time.Second, 500*time.Millisecond, "service not found in Jaeger")
// Verify the trace
tracesURL := fmt.Sprintf("%s/api/traces?service=test-service&limit=10", suite.jaegerURL)
resp, err := http.Get(tracesURL)
require.NoError(t, err)
defer resp.Body.Close()
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
data := result["data"].([]interface{})
require.Greater(t, len(data), 0, "no traces found in Jaeger")
}Test: Pipeline Handles Malformed Kafka Messages
func TestPipelineHandlesMalformedMessages(t *testing.T) {
suite := setupPipeline(t)
writer := &kafka.Writer{
Addr: kafka.TCP(suite.kafkaBroker),
Topic: "app-logs",
}
defer writer.Close()
// Send malformed JSON
malformed := []kafka.Message{
{Value: []byte("not json at all")},
{Value: []byte(`{"incomplete": `)},
{Value: []byte("")},
// Valid message after malformed ones
{Value: []byte(`{"level":"info","message":"valid after malformed","service.name":"api"}`)},
}
for _, msg := range malformed {
writer.WriteMessages(suite.ctx, msg)
}
// Collector should still process the valid message
lokiClient := loki.NewClient(suite.lokiURL)
require.Eventually(t, func() bool {
result, _ := lokiClient.Query(`{service_name="api"} |= "valid after malformed"`, 5)
return result.TotalEntries() > 0
}, 15*time.Second, 500*time.Millisecond, "valid message not received after malformed messages")
}Java Version with Testcontainers
// LogPipelineIntegrationTest.java
@Testcontainers
class LogPipelineIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
@Container
static GenericContainer<?> loki = new GenericContainer<>(DockerImageName.parse("grafana/loki:2.9.4"))
.withExposedPorts(3100)
.waitingFor(Wait.forHttp("/ready").forPort(3100));
static GenericContainer<?> otelCollector;
@BeforeAll
static void startCollector() {
String collectorConfig = buildCollectorConfig(kafka.getBootstrapServers(), getLokiUrl());
otelCollector = new GenericContainer<>(DockerImageName.parse("otel/opentelemetry-collector-contrib:0.95.0"))
.withExposedPorts(4317)
.withCopyToContainer(
Transferable.of(collectorConfig.getBytes()),
"/etc/otelcol/config.yaml"
)
.withCommand("--config=/etc/otelcol/config.yaml")
.waitingFor(Wait.forLogMessage(".*Everything is ready.*", 1));
otelCollector.start();
}
@Test
void logsFlowFromKafkaToLoki() throws Exception {
KafkaProducer<String, String> producer = createKafkaProducer();
String logJson = """
{"level":"warn","message":"memory threshold exceeded","service.name":"cache"}
""";
producer.send(new ProducerRecord<>("app-logs", logJson));
producer.flush();
await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(500))
.until(() -> queryLoki("{service_name=\"cache\"}", "memory threshold exceeded") > 0);
}
private long queryLoki(String selector, String filter) {
String url = getLokiUrl() + "/loki/api/v1/query?query=" +
URLEncoder.encode(selector + " |= \"" + filter + "\"", UTF_8);
// Parse Loki response and return entry count
// ...
}
}CI Pipeline
# .github/workflows/pipeline-integration-tests.yml
name: Pipeline Integration Tests
on:
push:
branches: [main]
pull_request:
jobs:
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.22'
- name: Run integration tests
run: go test ./tests/integration/... -v -timeout 120s -tags integration
env:
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: /var/run/docker.sockWhat to Test (and What Not To)
Test with Testcontainers:
- Happy path: data flows end-to-end
- Error recovery: malformed messages don't break the pipeline
- Schema validation: wrong field types are rejected or handled
- Routing: logs go to the right backend based on metadata
Don't test with Testcontainers:
- Performance benchmarks (use dedicated load testing)
- Collector transform logic (unit test those with vector test or processor tests)
- Backend storage behavior (that's Jaeger/Loki's responsibility)
Integration tests are expensive to run. Keep them focused on the integration boundaries, not the internal logic.
Summary
End-to-end integration testing with Testcontainers gives you confidence that Kafka, OTel Collector, Jaeger, and Loki actually work together. The key pattern: use dynamic config generation to point each container at the others, use Eventually assertions with reasonable timeouts, and always test failure recovery to ensure one bad message can't block the whole pipeline. Run these tests in CI on every push to main.