Testing Fluentd and Fluent Bit: Plugins, Log Routing, and Filter Chains

Testing Fluentd and Fluent Bit: Plugins, Log Routing, and Filter Chains

Fluentd and Fluent Bit are the workhorses of log aggregation. Fluentd's rich plugin ecosystem handles complex parsing and routing; Fluent Bit's lightweight C core handles high-throughput edge collection. Both have one thing in common: when their configs break, logs disappear silently.

This guide shows how to test Fluentd plugins, validate routing configurations, and verify filter chain correctness before deploying to production.

Why Log Pipeline Tests Get Skipped (and Why They Shouldn't)

Log pipelines feel simple until they aren't. Common failure modes:

  • A Fluentd parser plugin with a wrong regex drops all logs from a service
  • A routing rule sends production logs to a staging index
  • A record_transformer filter adds a field in development but not in production because of an env var dependency
  • Fluent Bit's grep filter silently drops records when the field doesn't exist

None of these produce errors. They just lose data.

Testing Fluentd Plugins

Fluentd plugins are Ruby gems. The fluent-plugin-test helper makes unit testing straightforward.

Testing an Input Plugin

# test/plugin/test_in_http_extended.rb
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_http_extended'

class HTTPExtendedInputTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf)
    Fluent::Test::Driver::Input.new(Fluent::Plugin::HTTPExtendedInput).configure(conf)
  end

  test 'parses JSON body correctly' do
    d = create_driver(<<~CONF)
      port 9880
      bind 127.0.0.1
      format json
    CONF

    d.run(expect_records: 1, timeout: 5) do
      # Simulate incoming HTTP request
      Net::HTTP.post(
        URI('http://127.0.0.1:9880/test.tag'),
        '{"level":"info","message":"hello","service":"api"}',
        'Content-Type' => 'application/json'
      )
    end

    records = d.events
    assert_equal 1, records.size
    assert_equal 'info', records[0][2]['level']
    assert_equal 'hello', records[0][2]['message']
    assert_equal 'api', records[0][2]['service']
  end
end

Testing a Filter Plugin

# test/plugin/test_filter_kubernetes_metadata.rb
require 'fluent/test'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_kubernetes_metadata'

class KubernetesMetadataFilterTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf = '')
    Fluent::Test::Driver::Filter.new(Fluent::Plugin::KubernetesMetadataFilter).configure(conf)
  end

  test 'enriches record with pod metadata' do
    d = create_driver(<<~CONF)
      kubernetes_url https://kubernetes.default.svc
      verify_ssl false
    CONF

    record = {
      'log' => 'Starting server',
      'kubernetes' => {
        'namespace_name' => 'production',
        'pod_name' => 'api-server-abc123',
        'container_name' => 'api'
      }
    }

    d.run do
      d.feed('kubernetes.production.api-server-abc123.api', event_time, record)
    end

    assert_equal 1, d.filtered_records.size
    filtered = d.filtered_records[0]
    assert_equal 'production', filtered['kubernetes']['namespace_name']
    assert_equal 'api-server-abc123', filtered['kubernetes']['pod_name']
  end

  test 'passes through records without kubernetes key' do
    d = create_driver

    record = {'log' => 'system log', 'host' => 'node1'}
    d.run { d.feed('system.log', event_time, record) }

    assert_equal 1, d.filtered_records.size
    assert_equal record, d.filtered_records[0]
  end

  test 'handles missing container_name gracefully' do
    d = create_driver

    record = {
      'log' => 'test',
      'kubernetes' => {'namespace_name' => 'prod', 'pod_name' => 'pod-1'}
      # no container_name
    }

    assert_nothing_raised do
      d.run { d.feed('test.tag', event_time, record) }
    end
  end
end

Testing an Output Plugin

# test/plugin/test_out_elasticsearch_custom.rb
require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_elasticsearch_custom'
require 'webmock/test_unit'

class ElasticsearchCustomOutputTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
    WebMock.enable!
    stub_request(:get, "http://localhost:9200/").to_return(
      status: 200,
      body: '{"version":{"number":"8.0.0"}}',
      headers: {'Content-Type' => 'application/json'}
    )
  end

  teardown do
    WebMock.disable!
  end

  def create_driver(conf)
    Fluent::Test::Driver::Output.new(Fluent::Plugin::ElasticsearchCustomOutput).configure(conf)
  end

  test 'indexes document with correct routing key' do
    stub = stub_request(:post, "http://localhost:9200/logs-2026.05/_bulk")
      .to_return(status: 200, body: '{"errors":false,"items":[]}')

    d = create_driver(<<~CONF)
      host localhost
      port 9200
      index_name logs-%Y.%m
      type_name _doc
    CONF

    d.run do
      d.feed({'level' => 'info', 'message' => 'test', 'service' => 'api'})
    end

    assert_requested(stub, times: 1)
  end
end

Validating Routing Configurations

Fluentd routing is complex. Use fluentd --dry-run for config syntax validation, and write integration tests for routing logic.

Config Validation Script

#!/bin/bash
<span class="hljs-comment"># validate-fluentd-config.sh

CONFIG_FILE=<span class="hljs-string">"${1:-/etc/fluent/fluent.conf}"

<span class="hljs-built_in">echo <span class="hljs-string">"Validating Fluentd configuration: $CONFIG_FILE"

<span class="hljs-comment"># Syntax check
<span class="hljs-keyword">if ! fluentd --dry-run -c <span class="hljs-string">"$CONFIG_FILE" 2>&1; <span class="hljs-keyword">then
    <span class="hljs-built_in">echo <span class="hljs-string">"FAIL: Config syntax error"
    <span class="hljs-built_in">exit 1
<span class="hljs-keyword">fi

<span class="hljs-built_in">echo <span class="hljs-string">"PASS: Config syntax valid"

<span class="hljs-comment"># Check for common routing mistakes
<span class="hljs-keyword">if grep -q <span class="hljs-string">"tag.*\*\.\*\.\*" <span class="hljs-string">"$CONFIG_FILE"; <span class="hljs-keyword">then
    <span class="hljs-built_in">echo <span class="hljs-string">"WARN: Triple-wildcard tag match detected — may capture unintended logs"
<span class="hljs-keyword">fi

<span class="hljs-keyword">if grep -q <span class="hljs-string">"match \*\*" <span class="hljs-string">"$CONFIG_FILE" && ! grep -q <span class="hljs-string">"match \*\*" <span class="hljs-string">"$CONFIG_FILE" <span class="hljs-pipe">| <span class="hljs-built_in">tail -5; <span class="hljs-keyword">then
    <span class="hljs-built_in">echo <span class="hljs-string">"WARN: Catch-all match (**) may be shadowing more specific routes"
<span class="hljs-keyword">fi

Integration Test for Log Routing

# test_fluentd_routing.py
import subprocess
import tempfile
import time
import os
import json
import pytest

FLUENTD_CONFIG = """
<source>
  @type forward
  port 24224
  bind 127.0.0.1
</source>

<match app.production.**>
  @type file
  path /tmp/test-production
  <format>
    @type json
  </format>
</match>

<match app.staging.**>
  @type file
  path /tmp/test-staging
  <format>
    @type json
  </format>
</match>

<match **>
  @type null
</match>
"""

@pytest.fixture(scope="module")
def fluentd_process():
    with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f:
        f.write(FLUENTD_CONFIG)
        config_path = f.name

    proc = subprocess.Popen(
        ["fluentd", "-c", config_path],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    time.sleep(2)  # Wait for Fluentd to start

    yield proc

    proc.terminate()
    os.unlink(config_path)

def send_log(tag, record):
    """Send a log record to Fluentd via fluent-cat."""
    import fluent.sender
    sender = fluent.sender.FluentSender(tag, host='127.0.0.1', port=24224)
    return sender.emit('', record)

def test_production_logs_routed_correctly(fluentd_process):
    record = {'level': 'info', 'message': 'production event', 'env': 'production'}
    send_log('app.production.api', record)
    time.sleep(0.5)

    # Verify log landed in production file
    prod_files = [f for f in os.listdir('/tmp') if f.startswith('test-production')]
    assert len(prod_files) > 0, "No production log file created"

    with open(f'/tmp/{prod_files[-1]}') as f:
        content = f.read()
    assert 'production event' in content

def test_staging_logs_not_in_production(fluentd_process):
    record = {'level': 'debug', 'message': 'staging only', 'env': 'staging'}
    send_log('app.staging.api', record)
    time.sleep(0.5)

    prod_files = [f for f in os.listdir('/tmp') if f.startswith('test-production')]
    if prod_files:
        with open(f'/tmp/{prod_files[-1]}') as f:
            content = f.read()
        assert 'staging only' not in content, "Staging log leaked into production file"

Testing Fluent Bit Configurations

Fluent Bit uses its own config format and has a built-in unit_tests feature (v2.1+).

Built-in Unit Tests

# fluent-bit-unit-test.yaml
pipeline:
  filters:
    - name: lua
      match: "*"
      script: /etc/fluent-bit/scripts/enrich.lua
      call: enrich_record

tests:
  - name: lua filter adds environment field
    input:
      log: "server started"
      level: info
    expected:
      log: "server started"
      level: info
      environment: production

  - name: lua filter normalizes log level
    input:
      log: "connection refused"
      severity: ERROR
    expected:
      log: "connection refused"
      level: error
      severity: ERROR

Run with:

fluent-bit -c /dev/null --unit-test fluent-bit-unit-test.yaml

Testing the Grep Filter

Fluent Bit's grep filter is a common source of silent log loss. Test it explicitly:

# test_fluent_bit_grep.py
import subprocess
import json
import tempfile
import os

def run_fluent_bit_with_records(config: str, records: list) -> list:
    """Run Fluent Bit with given config and input records, return output."""
    with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f:
        f.write(config)
        config_path = f.name

    input_data = '\n'.join(json.dumps(r) for r in records)
    
    result = subprocess.run(
        ['fluent-bit', '-c', config_path, '--quiet'],
        input=input_data, capture_output=True, text=True, timeout=5
    )
    
    os.unlink(config_path)
    return [json.loads(line) for line in result.stdout.splitlines() if line]

def test_grep_filter_includes_matching():
    config = """
[INPUT]
    Name   stdin

[FILTER]
    Name   grep
    Match  *
    Regex  level (info|warn|error)

[OUTPUT]
    Name   stdout
    Match  *
    Format json_lines
"""
    records = [
        {'level': 'info', 'msg': 'starting'},
        {'level': 'debug', 'msg': 'verbose noise'},
        {'level': 'error', 'msg': 'connection failed'},
    ]
    
    output = run_fluent_bit_with_records(config, records)
    levels = [r.get('level') for r in output]
    
    assert 'info' in levels
    assert 'error' in levels
    assert 'debug' not in levels, "Debug logs should be filtered out"

def test_grep_filter_missing_field_drops_record():
    """Verify behavior when the grep field doesn't exist in a record."""
    config = """
[FILTER]
    Name   grep
    Match  *
    Regex  level error
"""
    records = [
        {'msg': 'no level field here'},  # missing 'level'
        {'level': 'error', 'msg': 'has level'},
    ]
    
    output = run_fluent_bit_with_records(config, records)
    # Record without 'level' field should be dropped (Fluent Bit's default behavior)
    assert len(output) == 1
    assert output[0]['level'] == 'error'

CI Integration for Log Pipeline Tests

# .github/workflows/log-pipeline-tests.yml
name: Log Pipeline Tests

on: [push, pull_request]

jobs:
  fluentd-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install Fluentd
        run: |
          gem install fluentd --no-document
          gem install fluent-plugin-elasticsearch --no-document

      - name: Validate configs
        run: |
          for conf in config/fluentd/*.conf; do
            fluentd --dry-run -c "$conf" || exit 1
          done

      - name: Run plugin unit tests
        run: bundle exec rake test

  fluent-bit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install Fluent Bit
        run: |
          curl https://raw.githubusercontent.com/fluent/fluent-bit/master/install.sh | sh

      - name: Run unit tests
        run: |
          for test_file in tests/fluent-bit/*.yaml; do
            fluent-bit -c /dev/null --unit-test "$test_file" || exit 1
          done

Summary

Fluentd and Fluent Bit configurations are code and deserve the same rigor as application code. Use Fluentd's Ruby test helpers for plugin unit tests, fluentd --dry-run for config validation, and Python-based integration tests to verify routing logic. For Fluent Bit, leverage the built-in unit test runner for filter chain validation, and always explicitly test grep filter behavior with records that have missing fields—that's the most common source of silent log loss.

Read more