End-to-End Log Pipeline Integration Testing with Testcontainers

End-to-End Log Pipeline Integration Testing with Testcontainers

Unit testing individual pipeline components catches local bugs. But the integration between Kafka, OTel Collector, and your backends is where real failures hide. A schema mismatch between Kafka producer and OTel receiver. A Loki label cardinality issue that only surfaces under load. An OTel Collector exporter that retries forever when Jaeger is temporarily unavailable.

This guide shows how to build integration tests for the full observability pipeline — Kafka → OTel Collector → Jaeger/Loki — using Testcontainers. These tests run in CI, require no persistent infrastructure, and catch the failures that unit tests miss.

Architecture Under Test

The pipeline we're testing:

Application → Kafka topic (structured logs)
           → OTel Collector (consumes Kafka, transforms, routes)
           → Jaeger (traces)
           → Loki (logs)
           → Prometheus (metrics)

Each arrow is a potential failure point.

Setting Up Testcontainers

Go Setup

// go.mod dependencies:
// github.com/testcontainers/testcontainers-go v0.30.0
// github.com/testcontainers/testcontainers-go/modules/kafka
// github.com/testcontainers/testcontainers-go/modules/jaeger

package pipeline_test

import (
    "context"
    "testing"
    "time"

    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/modules/kafka"
    "github.com/testcontainers/testcontainers-go/wait"
)

type PipelineTestSuite struct {
    ctx            context.Context
    kafkaContainer *kafka.KafkaContainer
    jaegerContainer testcontainers.Container
    lokiContainer   testcontainers.Container
    otelContainer   testcontainers.Container
    kafkaBroker    string
    jaegerURL      string
    lokiURL        string
}

func setupPipeline(t *testing.T) *PipelineTestSuite {
    t.Helper()
    ctx := context.Background()
    suite := &PipelineTestSuite{ctx: ctx}

    // Start Kafka
    kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0",
        kafka.WithClusterID("test-cluster"),
    )
    if err != nil {
        t.Fatalf("failed to start Kafka: %v", err)
    }
    t.Cleanup(func() { kafkaContainer.Terminate(ctx) })

    brokers, err := kafkaContainer.Brokers(ctx)
    if err != nil {
        t.Fatalf("failed to get Kafka brokers: %v", err)
    }
    suite.kafkaBroker = brokers[0]
    suite.kafkaContainer = kafkaContainer

    // Start Jaeger
    jaegerReq := testcontainers.ContainerRequest{
        Image:        "jaegertracing/all-in-one:1.55",
        ExposedPorts: []string{"16686/tcp", "14250/tcp"},
        WaitingFor:   wait.ForHTTP("/").WithPort("16686/tcp"),
    }
    jaegerContainer, err := testcontainers.GenericContainer(ctx,
        testcontainers.GenericContainerRequest{
            ContainerRequest: jaegerReq,
            Started:          true,
        })
    if err != nil {
        t.Fatalf("failed to start Jaeger: %v", err)
    }
    t.Cleanup(func() { jaegerContainer.Terminate(ctx) })

    jaegerHost, _ := jaegerContainer.Host(ctx)
    jaegerPort, _ := jaegerContainer.MappedPort(ctx, "16686")
    suite.jaegerURL = fmt.Sprintf("http://%s:%s", jaegerHost, jaegerPort.Port())

    // Start Loki
    lokiReq := testcontainers.ContainerRequest{
        Image:        "grafana/loki:2.9.4",
        ExposedPorts: []string{"3100/tcp"},
        Cmd:          []string{"-config.file=/etc/loki/local-config.yaml"},
        WaitingFor:   wait.ForHTTP("/ready").WithPort("3100/tcp"),
    }
    lokiContainer, err := testcontainers.GenericContainer(ctx,
        testcontainers.GenericContainerRequest{
            ContainerRequest: lokiReq,
            Started:          true,
        })
    if err != nil {
        t.Fatalf("failed to start Loki: %v", err)
    }
    t.Cleanup(func() { lokiContainer.Terminate(ctx) })

    lokiHost, _ := lokiContainer.Host(ctx)
    lokiPort, _ := lokiContainer.MappedPort(ctx, "3100")
    suite.lokiURL = fmt.Sprintf("http://%s:%s", lokiHost, lokiPort.Port())

    // Start OTel Collector with config pointing to the started containers
    suite.otelContainer = startOtelCollector(t, ctx, suite)

    return suite
}

Starting OTel Collector with Dynamic Config

func startOtelCollector(t *testing.T, ctx context.Context, suite *PipelineTestSuite) testcontainers.Container {
    t.Helper()

    // Generate config with actual container addresses
    jaegerGRPCPort, _ := suite.jaegerContainer.MappedPort(ctx, "14250")
    jaegerHost, _ := suite.jaegerContainer.Host(ctx)

    otelConfig := fmt.Sprintf(`
receivers:
  kafka:
    brokers: [%s]
    topic: app-logs
    protocol_version: 2.0.0
    encoding: json
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  batch:
    timeout: 1s
  resource:
    attributes:
      - key: environment
        value: test
        action: upsert

exporters:
  jaeger:
    endpoint: %s:%s
    tls:
      insecure: true
  loki:
    endpoint: %s/loki/api/v1/push
    labels:
      resource_attributes:
        - service.name
        - environment
  logging:
    loglevel: debug

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, resource]
      exporters: [jaeger]
    logs:
      receivers: [kafka]
      processors: [batch, resource]
      exporters: [loki, logging]
`, suite.kafkaBroker, jaegerHost, jaegerGRPCPort.Port(), suite.lokiURL)

    // Write config to a temp file and mount it
    configFile, err := os.CreateTemp("", "otel-collector-*.yaml")
    require.NoError(t, err)
    configFile.WriteString(otelConfig)
    configFile.Close()
    t.Cleanup(func() { os.Remove(configFile.Name()) })

    req := testcontainers.ContainerRequest{
        Image:        "otel/opentelemetry-collector-contrib:0.95.0",
        ExposedPorts: []string{"4317/tcp"},
        Mounts: testcontainers.ContainerMounts{
            testcontainers.BindMount(configFile.Name(), "/etc/otelcol/config.yaml"),
        },
        Cmd:        []string{"--config=/etc/otelcol/config.yaml"},
        WaitingFor: wait.ForLog("Everything is ready").WithStartupTimeout(30 * time.Second),
    }

    container, err := testcontainers.GenericContainer(ctx,
        testcontainers.GenericContainerRequest{ContainerRequest: req, Started: true})
    require.NoError(t, err)
    t.Cleanup(func() { container.Terminate(ctx) })

    return container
}

Writing the Integration Tests

Test: Logs Flow from Kafka to Loki

func TestLogsFlowKafkaToLoki(t *testing.T) {
    suite := setupPipeline(t)

    // Produce a log event to Kafka
    writer := &kafka.Writer{
        Addr:  kafka.TCP(suite.kafkaBroker),
        Topic: "app-logs",
    }
    defer writer.Close()

    logEvent := map[string]interface{}{
        "timestamp":    time.Now().UnixNano(),
        "level":        "error",
        "message":      "database connection failed",
        "service.name": "payments",
        "trace_id":     "abc123def456",
    }

    payload, _ := json.Marshal(logEvent)
    err := writer.WriteMessages(suite.ctx, kafka.Message{Value: payload})
    require.NoError(t, err)

    // Poll Loki until the log appears (up to 10 seconds)
    lokiClient := loki.NewClient(suite.lokiURL)

    var found bool
    require.Eventually(t, func() bool {
        result, err := lokiClient.Query(`{service_name="payments"} |= "database connection failed"`, 10)
        if err != nil {
            return false
        }
        found = result.TotalEntries() > 0
        return found
    }, 10*time.Second, 500*time.Millisecond, "log did not reach Loki within timeout")

    // Verify log content
    entries := lokiClient.QueryRange(`{service_name="payments"}`, time.Now().Add(-1*time.Minute), time.Now(), 10)
    require.Greater(t, len(entries), 0)

    entry := entries[0]
    assert.Contains(t, entry.Line, "database connection failed")
    assert.Equal(t, "error", entry.Labels["level"])
}

Test: Traces Flow from OTLP to Jaeger

func TestTracesFlowOTLPToJaeger(t *testing.T) {
    suite := setupPipeline(t)

    // Get OTel Collector's OTLP endpoint
    otelHost, _ := suite.otelContainer.Host(suite.ctx)
    otelPort, _ := suite.otelContainer.MappedPort(suite.ctx, "4317")

    // Send a trace
    conn, err := grpc.DialContext(suite.ctx,
        fmt.Sprintf("%s:%s", otelHost, otelPort.Port()),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    require.NoError(t, err)
    defer conn.Close()

    traceClient := otlptracegrpc.NewClient(otlptracegrpc.WithGRPCConn(conn))
    traceProvider := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(traceClient),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName("test-service"),
        )),
    )
    defer traceProvider.Shutdown(suite.ctx)

    tracer := traceProvider.Tracer("test")
    _, span := tracer.Start(suite.ctx, "integration-test-span")
    span.SetAttributes(attribute.String("test.id", "HEL-318"))
    span.End()

    traceProvider.ForceFlush(suite.ctx)

    // Query Jaeger for the trace
    serviceURL := fmt.Sprintf("%s/api/services", suite.jaegerURL)
    require.Eventually(t, func() bool {
        resp, err := http.Get(serviceURL)
        if err != nil {
            return false
        }
        defer resp.Body.Close()
        var result map[string]interface{}
        json.NewDecoder(resp.Body).Decode(&result)
        services, _ := result["data"].([]interface{})
        for _, s := range services {
            if s == "test-service" {
                return true
            }
        }
        return false
    }, 10*time.Second, 500*time.Millisecond, "service not found in Jaeger")

    // Verify the trace
    tracesURL := fmt.Sprintf("%s/api/traces?service=test-service&limit=10", suite.jaegerURL)
    resp, err := http.Get(tracesURL)
    require.NoError(t, err)
    defer resp.Body.Close()

    var result map[string]interface{}
    json.NewDecoder(resp.Body).Decode(&result)
    data := result["data"].([]interface{})
    require.Greater(t, len(data), 0, "no traces found in Jaeger")
}

Test: Pipeline Handles Malformed Kafka Messages

func TestPipelineHandlesMalformedMessages(t *testing.T) {
    suite := setupPipeline(t)

    writer := &kafka.Writer{
        Addr:  kafka.TCP(suite.kafkaBroker),
        Topic: "app-logs",
    }
    defer writer.Close()

    // Send malformed JSON
    malformed := []kafka.Message{
        {Value: []byte("not json at all")},
        {Value: []byte(`{"incomplete": `)},
        {Value: []byte("")},
        // Valid message after malformed ones
        {Value: []byte(`{"level":"info","message":"valid after malformed","service.name":"api"}`)},
    }

    for _, msg := range malformed {
        writer.WriteMessages(suite.ctx, msg)
    }

    // Collector should still process the valid message
    lokiClient := loki.NewClient(suite.lokiURL)
    require.Eventually(t, func() bool {
        result, _ := lokiClient.Query(`{service_name="api"} |= "valid after malformed"`, 5)
        return result.TotalEntries() > 0
    }, 15*time.Second, 500*time.Millisecond, "valid message not received after malformed messages")
}

Java Version with Testcontainers

// LogPipelineIntegrationTest.java
@Testcontainers
class LogPipelineIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));

    @Container
    static GenericContainer<?> loki = new GenericContainer<>(DockerImageName.parse("grafana/loki:2.9.4"))
        .withExposedPorts(3100)
        .waitingFor(Wait.forHttp("/ready").forPort(3100));

    static GenericContainer<?> otelCollector;

    @BeforeAll
    static void startCollector() {
        String collectorConfig = buildCollectorConfig(kafka.getBootstrapServers(), getLokiUrl());
        
        otelCollector = new GenericContainer<>(DockerImageName.parse("otel/opentelemetry-collector-contrib:0.95.0"))
            .withExposedPorts(4317)
            .withCopyToContainer(
                Transferable.of(collectorConfig.getBytes()),
                "/etc/otelcol/config.yaml"
            )
            .withCommand("--config=/etc/otelcol/config.yaml")
            .waitingFor(Wait.forLogMessage(".*Everything is ready.*", 1));
        
        otelCollector.start();
    }

    @Test
    void logsFlowFromKafkaToLoki() throws Exception {
        KafkaProducer<String, String> producer = createKafkaProducer();
        
        String logJson = """
            {"level":"warn","message":"memory threshold exceeded","service.name":"cache"}
            """;
        
        producer.send(new ProducerRecord<>("app-logs", logJson));
        producer.flush();

        await()
            .atMost(Duration.ofSeconds(10))
            .pollInterval(Duration.ofMillis(500))
            .until(() -> queryLoki("{service_name=\"cache\"}", "memory threshold exceeded") > 0);
    }

    private long queryLoki(String selector, String filter) {
        String url = getLokiUrl() + "/loki/api/v1/query?query=" + 
            URLEncoder.encode(selector + " |= \"" + filter + "\"", UTF_8);
        // Parse Loki response and return entry count
        // ...
    }
}

CI Pipeline

# .github/workflows/pipeline-integration-tests.yml
name: Pipeline Integration Tests

on:
  push:
    branches: [main]
  pull_request:

jobs:
  integration:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Go
        uses: actions/setup-go@v5
        with:
          go-version: '1.22'

      - name: Run integration tests
        run: go test ./tests/integration/... -v -timeout 120s -tags integration
        env:
          TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: /var/run/docker.sock

What to Test (and What Not To)

Test with Testcontainers:

  • Happy path: data flows end-to-end
  • Error recovery: malformed messages don't break the pipeline
  • Schema validation: wrong field types are rejected or handled
  • Routing: logs go to the right backend based on metadata

Don't test with Testcontainers:

  • Performance benchmarks (use dedicated load testing)
  • Collector transform logic (unit test those with vector test or processor tests)
  • Backend storage behavior (that's Jaeger/Loki's responsibility)

Integration tests are expensive to run. Keep them focused on the integration boundaries, not the internal logic.

Summary

End-to-end integration testing with Testcontainers gives you confidence that Kafka, OTel Collector, Jaeger, and Loki actually work together. The key pattern: use dynamic config generation to point each container at the others, use Eventually assertions with reasonable timeouts, and always test failure recovery to ensure one bad message can't block the whole pipeline. Run these tests in CI on every push to main.

Read more