Testing Fluentd and Fluent Bit: Plugins, Log Routing, and Filter Chains
Fluentd and Fluent Bit are the workhorses of log aggregation. Fluentd's rich plugin ecosystem handles complex parsing and routing; Fluent Bit's lightweight C core handles high-throughput edge collection. Both have one thing in common: when their configs break, logs disappear silently.
This guide shows how to test Fluentd plugins, validate routing configurations, and verify filter chain correctness before deploying to production.
Why Log Pipeline Tests Get Skipped (and Why They Shouldn't)
Log pipelines feel simple until they aren't. Common failure modes:
- A Fluentd
parserplugin with a wrong regex drops all logs from a service - A routing rule sends production logs to a staging index
- A
record_transformerfilter adds a field in development but not in production because of an env var dependency - Fluent Bit's
grepfilter silently drops records when the field doesn't exist
None of these produce errors. They just lose data.
Testing Fluentd Plugins
Fluentd plugins are Ruby gems. The fluent-plugin-test helper makes unit testing straightforward.
Testing an Input Plugin
# test/plugin/test_in_http_extended.rb
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_http_extended'
class HTTPExtendedInputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf)
Fluent::Test::Driver::Input.new(Fluent::Plugin::HTTPExtendedInput).configure(conf)
end
test 'parses JSON body correctly' do
d = create_driver(<<~CONF)
port 9880
bind 127.0.0.1
format json
CONF
d.run(expect_records: 1, timeout: 5) do
# Simulate incoming HTTP request
Net::HTTP.post(
URI('http://127.0.0.1:9880/test.tag'),
'{"level":"info","message":"hello","service":"api"}',
'Content-Type' => 'application/json'
)
end
records = d.events
assert_equal 1, records.size
assert_equal 'info', records[0][2]['level']
assert_equal 'hello', records[0][2]['message']
assert_equal 'api', records[0][2]['service']
end
endTesting a Filter Plugin
# test/plugin/test_filter_kubernetes_metadata.rb
require 'fluent/test'
require 'fluent/test/driver/filter'
require 'fluent/plugin/filter_kubernetes_metadata'
class KubernetesMetadataFilterTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf = '')
Fluent::Test::Driver::Filter.new(Fluent::Plugin::KubernetesMetadataFilter).configure(conf)
end
test 'enriches record with pod metadata' do
d = create_driver(<<~CONF)
kubernetes_url https://kubernetes.default.svc
verify_ssl false
CONF
record = {
'log' => 'Starting server',
'kubernetes' => {
'namespace_name' => 'production',
'pod_name' => 'api-server-abc123',
'container_name' => 'api'
}
}
d.run do
d.feed('kubernetes.production.api-server-abc123.api', event_time, record)
end
assert_equal 1, d.filtered_records.size
filtered = d.filtered_records[0]
assert_equal 'production', filtered['kubernetes']['namespace_name']
assert_equal 'api-server-abc123', filtered['kubernetes']['pod_name']
end
test 'passes through records without kubernetes key' do
d = create_driver
record = {'log' => 'system log', 'host' => 'node1'}
d.run { d.feed('system.log', event_time, record) }
assert_equal 1, d.filtered_records.size
assert_equal record, d.filtered_records[0]
end
test 'handles missing container_name gracefully' do
d = create_driver
record = {
'log' => 'test',
'kubernetes' => {'namespace_name' => 'prod', 'pod_name' => 'pod-1'}
# no container_name
}
assert_nothing_raised do
d.run { d.feed('test.tag', event_time, record) }
end
end
endTesting an Output Plugin
# test/plugin/test_out_elasticsearch_custom.rb
require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_elasticsearch_custom'
require 'webmock/test_unit'
class ElasticsearchCustomOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
WebMock.enable!
stub_request(:get, "http://localhost:9200/").to_return(
status: 200,
body: '{"version":{"number":"8.0.0"}}',
headers: {'Content-Type' => 'application/json'}
)
end
teardown do
WebMock.disable!
end
def create_driver(conf)
Fluent::Test::Driver::Output.new(Fluent::Plugin::ElasticsearchCustomOutput).configure(conf)
end
test 'indexes document with correct routing key' do
stub = stub_request(:post, "http://localhost:9200/logs-2026.05/_bulk")
.to_return(status: 200, body: '{"errors":false,"items":[]}')
d = create_driver(<<~CONF)
host localhost
port 9200
index_name logs-%Y.%m
type_name _doc
CONF
d.run do
d.feed({'level' => 'info', 'message' => 'test', 'service' => 'api'})
end
assert_requested(stub, times: 1)
end
endValidating Routing Configurations
Fluentd routing is complex. Use fluentd --dry-run for config syntax validation, and write integration tests for routing logic.
Config Validation Script
#!/bin/bash
<span class="hljs-comment"># validate-fluentd-config.sh
CONFIG_FILE=<span class="hljs-string">"${1:-/etc/fluent/fluent.conf}"
<span class="hljs-built_in">echo <span class="hljs-string">"Validating Fluentd configuration: $CONFIG_FILE"
<span class="hljs-comment"># Syntax check
<span class="hljs-keyword">if ! fluentd --dry-run -c <span class="hljs-string">"$CONFIG_FILE" 2>&1; <span class="hljs-keyword">then
<span class="hljs-built_in">echo <span class="hljs-string">"FAIL: Config syntax error"
<span class="hljs-built_in">exit 1
<span class="hljs-keyword">fi
<span class="hljs-built_in">echo <span class="hljs-string">"PASS: Config syntax valid"
<span class="hljs-comment"># Check for common routing mistakes
<span class="hljs-keyword">if grep -q <span class="hljs-string">"tag.*\*\.\*\.\*" <span class="hljs-string">"$CONFIG_FILE"; <span class="hljs-keyword">then
<span class="hljs-built_in">echo <span class="hljs-string">"WARN: Triple-wildcard tag match detected — may capture unintended logs"
<span class="hljs-keyword">fi
<span class="hljs-keyword">if grep -q <span class="hljs-string">"match \*\*" <span class="hljs-string">"$CONFIG_FILE" && ! grep -q <span class="hljs-string">"match \*\*" <span class="hljs-string">"$CONFIG_FILE" <span class="hljs-pipe">| <span class="hljs-built_in">tail -5; <span class="hljs-keyword">then
<span class="hljs-built_in">echo <span class="hljs-string">"WARN: Catch-all match (**) may be shadowing more specific routes"
<span class="hljs-keyword">fiIntegration Test for Log Routing
# test_fluentd_routing.py
import subprocess
import tempfile
import time
import os
import json
import pytest
FLUENTD_CONFIG = """
<source>
@type forward
port 24224
bind 127.0.0.1
</source>
<match app.production.**>
@type file
path /tmp/test-production
<format>
@type json
</format>
</match>
<match app.staging.**>
@type file
path /tmp/test-staging
<format>
@type json
</format>
</match>
<match **>
@type null
</match>
"""
@pytest.fixture(scope="module")
def fluentd_process():
with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f:
f.write(FLUENTD_CONFIG)
config_path = f.name
proc = subprocess.Popen(
["fluentd", "-c", config_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
time.sleep(2) # Wait for Fluentd to start
yield proc
proc.terminate()
os.unlink(config_path)
def send_log(tag, record):
"""Send a log record to Fluentd via fluent-cat."""
import fluent.sender
sender = fluent.sender.FluentSender(tag, host='127.0.0.1', port=24224)
return sender.emit('', record)
def test_production_logs_routed_correctly(fluentd_process):
record = {'level': 'info', 'message': 'production event', 'env': 'production'}
send_log('app.production.api', record)
time.sleep(0.5)
# Verify log landed in production file
prod_files = [f for f in os.listdir('/tmp') if f.startswith('test-production')]
assert len(prod_files) > 0, "No production log file created"
with open(f'/tmp/{prod_files[-1]}') as f:
content = f.read()
assert 'production event' in content
def test_staging_logs_not_in_production(fluentd_process):
record = {'level': 'debug', 'message': 'staging only', 'env': 'staging'}
send_log('app.staging.api', record)
time.sleep(0.5)
prod_files = [f for f in os.listdir('/tmp') if f.startswith('test-production')]
if prod_files:
with open(f'/tmp/{prod_files[-1]}') as f:
content = f.read()
assert 'staging only' not in content, "Staging log leaked into production file"Testing Fluent Bit Configurations
Fluent Bit uses its own config format and has a built-in unit_tests feature (v2.1+).
Built-in Unit Tests
# fluent-bit-unit-test.yaml
pipeline:
filters:
- name: lua
match: "*"
script: /etc/fluent-bit/scripts/enrich.lua
call: enrich_record
tests:
- name: lua filter adds environment field
input:
log: "server started"
level: info
expected:
log: "server started"
level: info
environment: production
- name: lua filter normalizes log level
input:
log: "connection refused"
severity: ERROR
expected:
log: "connection refused"
level: error
severity: ERRORRun with:
fluent-bit -c /dev/null --unit-test fluent-bit-unit-test.yamlTesting the Grep Filter
Fluent Bit's grep filter is a common source of silent log loss. Test it explicitly:
# test_fluent_bit_grep.py
import subprocess
import json
import tempfile
import os
def run_fluent_bit_with_records(config: str, records: list) -> list:
"""Run Fluent Bit with given config and input records, return output."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f:
f.write(config)
config_path = f.name
input_data = '\n'.join(json.dumps(r) for r in records)
result = subprocess.run(
['fluent-bit', '-c', config_path, '--quiet'],
input=input_data, capture_output=True, text=True, timeout=5
)
os.unlink(config_path)
return [json.loads(line) for line in result.stdout.splitlines() if line]
def test_grep_filter_includes_matching():
config = """
[INPUT]
Name stdin
[FILTER]
Name grep
Match *
Regex level (info|warn|error)
[OUTPUT]
Name stdout
Match *
Format json_lines
"""
records = [
{'level': 'info', 'msg': 'starting'},
{'level': 'debug', 'msg': 'verbose noise'},
{'level': 'error', 'msg': 'connection failed'},
]
output = run_fluent_bit_with_records(config, records)
levels = [r.get('level') for r in output]
assert 'info' in levels
assert 'error' in levels
assert 'debug' not in levels, "Debug logs should be filtered out"
def test_grep_filter_missing_field_drops_record():
"""Verify behavior when the grep field doesn't exist in a record."""
config = """
[FILTER]
Name grep
Match *
Regex level error
"""
records = [
{'msg': 'no level field here'}, # missing 'level'
{'level': 'error', 'msg': 'has level'},
]
output = run_fluent_bit_with_records(config, records)
# Record without 'level' field should be dropped (Fluent Bit's default behavior)
assert len(output) == 1
assert output[0]['level'] == 'error'CI Integration for Log Pipeline Tests
# .github/workflows/log-pipeline-tests.yml
name: Log Pipeline Tests
on: [push, pull_request]
jobs:
fluentd-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Fluentd
run: |
gem install fluentd --no-document
gem install fluent-plugin-elasticsearch --no-document
- name: Validate configs
run: |
for conf in config/fluentd/*.conf; do
fluentd --dry-run -c "$conf" || exit 1
done
- name: Run plugin unit tests
run: bundle exec rake test
fluent-bit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Fluent Bit
run: |
curl https://raw.githubusercontent.com/fluent/fluent-bit/master/install.sh | sh
- name: Run unit tests
run: |
for test_file in tests/fluent-bit/*.yaml; do
fluent-bit -c /dev/null --unit-test "$test_file" || exit 1
doneSummary
Fluentd and Fluent Bit configurations are code and deserve the same rigor as application code. Use Fluentd's Ruby test helpers for plugin unit tests, fluentd --dry-run for config validation, and Python-based integration tests to verify routing logic. For Fluent Bit, leverage the built-in unit test runner for filter chain validation, and always explicitly test grep filter behavior with records that have missing fields—that's the most common source of silent log loss.