Testing Vector: Transforms, Sinks, and Unit Tests in vector.toml
Vector is fast. The Rust-based observability data pipeline handles logs, metrics, and traces with impressive throughput. But speed means nothing if your transforms mangle data or your sinks silently drop events. Vector's built-in unit test framework makes it practical to test pipelines without running a full stack.
This guide covers Vector's native unit tests, transform testing, sink integration testing, and end-to-end pipeline validation.
Vector's Built-In Unit Test Framework
Vector ships with native unit testing support directly in vector.toml. No external framework needed.
Basic Unit Test Structure
# vector.toml
[sources.test_input]
type = "test_case" # Special source for unit tests
[transforms.parse_nginx]
type = "remap"
inputs = ["test_input"]
source = '''
. = parse_nginx_log!(string!(.message), "combined")
.environment = get_env_var!("DEPLOY_ENV")
'''
[sinks.test_output]
type = "test_sink"
inputs = ["parse_nginx"]
# Unit test definitions
[[tests]]
name = "parse valid nginx log"
[[tests.inputs]]
insert_at = "test_input"
type = "log"
[tests.inputs.log_fields]
message = '127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08"'
[[tests.outputs]]
extract_from = "test_output"
[[tests.outputs.conditions]]
type = "vrl"
source = '''
assert_eq!(.status, 200)
assert_eq!(.method, "GET")
assert_eq!(.path, "/apache_pb.gif")
assert_eq!(.client, "127.0.0.1")
true
'''Run tests:
vector test vector.tomlTesting Multiple Scenarios
[[tests]]
name = "parse 200 response"
[[tests.inputs]]
insert_at = "test_input"
type = "log"
[tests.inputs.log_fields]
message = '10.0.0.1 - - [18/May/2026:10:00:00 +0000] "GET /api/health HTTP/1.1" 200 42'
[[tests.outputs]]
extract_from = "test_output"
[[tests.outputs.conditions]]
type = "vrl"
source = '''
assert_eq!(.status, 200)
assert_eq!(.path, "/api/health")
true
'''
[[tests]]
name = "parse 500 response"
[[tests.inputs]]
insert_at = "test_input"
type = "log"
[tests.inputs.log_fields]
message = '10.0.0.1 - - [18/May/2026:10:00:01 +0000] "POST /api/payments HTTP/1.1" 500 0'
[[tests.outputs]]
extract_from = "test_output"
[[tests.outputs.conditions]]
type = "vrl"
source = '''
assert_eq!(.status, 500)
assert_eq!(.method, "POST")
true
'''Testing VRL Transforms
Vector Remap Language (VRL) is the heart of Vector transforms. Test VRL expressions in isolation using vrl CLI before embedding them in configs.
VRL CLI Testing
# Install vrl CLI
cargo install vrl-cli
<span class="hljs-comment"># Test a transform interactively
<span class="hljs-built_in">echo <span class="hljs-string">'{"message": "ERROR: connection refused to 10.0.0.5:5432"}' <span class="hljs-pipe">| \
vrl --program <span class="hljs-string">'
. = parse_kv!(.message, field_delimiter: " ", key_value_delimiter: ":")
.severity = "error"
del(.message)
'Unit Testing VRL Programs
# test-transforms.toml
[sources.raw_logs]
type = "test_case"
[transforms.enrich_logs]
type = "remap"
inputs = ["raw_logs"]
source = '''
# Extract log level
.level, err = parse_regex(.message, r'^(?P<level>DEBUG|INFO|WARN|ERROR)')
if err != null {
.level = "unknown"
} else {
.level = downcase!(.level.level)
}
# Add metadata
.ingested_at = now()
.pipeline_version = "2.0"
# Drop noisy health check logs
if .path == "/health" || .path == "/healthz" {
abort
}
'''
[sinks.enriched]
type = "test_sink"
inputs = ["enrich_logs"]
# Test: INFO log level extraction
[[tests]]
name = "extracts info level"
[[tests.inputs]]
insert_at = "raw_logs"
type = "log"
[tests.inputs.log_fields]
message = "INFO: user logged in"
path = "/api/auth"
[[tests.outputs]]
extract_from = "enriched"
[[tests.outputs.conditions]]
type = "vrl"
source = 'assert_eq!(.level, "info"); true'
# Test: health check logs are dropped
[[tests]]
name = "drops health check logs"
[[tests.inputs]]
insert_at = "raw_logs"
type = "log"
[tests.inputs.log_fields]
message = "INFO: OK"
path = "/healthz"
[[tests.outputs]]
extract_from = "enriched"
# No conditions — we expect zero outputsWhen a transform aborts, the event should not appear at the output. Vector's test framework counts zero outputs as a pass when no conditions are specified.
Testing Metric Transforms
# metrics-test.toml
[sources.app_metrics]
type = "test_case"
[transforms.add_dimensions]
type = "remap"
inputs = ["app_metrics"]
source = '''
.tags.region = get_env_var!("AWS_REGION")
.tags.cluster = "prod-us-east-1"
# Normalize metric name format
.name = replace(.name, r'[^a-zA-Z0-9_]', "_")
'''
[sinks.dimensioned_metrics]
type = "test_sink"
inputs = ["add_dimensions"]
[[tests]]
name = "adds cluster tag to metrics"
[[tests.inputs]]
insert_at = "app_metrics"
type = "metric"
[tests.inputs.metric]
name = "http.requests.total"
kind = "incremental"
[tests.inputs.metric.counter]
value = 42.0
[[tests.outputs]]
extract_from = "dimensioned_metrics"
[[tests.outputs.conditions]]
type = "vrl"
source = '''
assert_eq!(.tags.cluster, "prod-us-east-1")
assert_eq!(.name, "http_requests_total")
true
'''Integration Testing Sinks
Unit tests catch transform logic bugs. Integration tests catch sink configuration issues.
Testing the Elasticsearch Sink
# test_vector_elasticsearch_sink.py
import subprocess
import json
import time
import requests
import pytest
from elasticsearch import Elasticsearch
VECTOR_CONFIG = """
[sources.json_logs]
type = "stdin"
[transforms.add_metadata]
type = "remap"
inputs = ["json_logs"]
source = '''
.ingested_at = now()
.pipeline = "test"
'''
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["add_metadata"]
endpoints = ["http://localhost:9200"]
index = "test-logs-%Y-%m-%d"
bulk.index = "test-logs"
"""
@pytest.fixture(scope="module")
def elasticsearch():
es = Elasticsearch("http://localhost:9200")
yield es
es.indices.delete(index="test-logs-*", ignore_unavailable=True)
@pytest.fixture(scope="module")
def vector_process():
import tempfile, os
with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f:
f.write(VECTOR_CONFIG)
config_path = f.name
proc = subprocess.Popen(
['vector', '--config', config_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
time.sleep(2)
yield proc
proc.terminate()
os.unlink(config_path)
def test_log_reaches_elasticsearch(vector_process, elasticsearch):
log_record = {'level': 'info', 'message': 'test event', 'service': 'api'}
vector_process.stdin.write((json.dumps(log_record) + '\n').encode())
vector_process.stdin.flush()
time.sleep(1)
result = elasticsearch.search(
index='test-logs-*',
body={'query': {'match': {'message': 'test event'}}}
)
assert result['hits']['total']['value'] == 1
hit = result['hits']['hits'][0]['_source']
assert hit['level'] == 'info'
assert hit['pipeline'] == 'test'
assert 'ingested_at' in hit
def test_malformed_json_does_not_crash_vector(vector_process, elasticsearch):
"""Vector should skip malformed JSON, not crash."""
vector_process.stdin.write(b'not valid json\n')
vector_process.stdin.flush()
time.sleep(0.5)
# Process should still be running
assert vector_process.poll() is NoneTesting the Kafka Sink
# kafka-sink-test.toml
[sources.events]
type = "test_case"
[sinks.kafka_output]
type = "kafka"
inputs = ["events"]
bootstrap_servers = "localhost:9092"
topic = "test-events"
encoding.codec = "json"
[[tests]]
name = "kafka sink encodes JSON correctly"
[[tests.inputs]]
insert_at = "events"
type = "log"
[tests.inputs.log_fields]
service = "payments"
event_type = "charge.succeeded"
amount = 2999
[[tests.outputs]]
# For sink tests, use an integration test instead
# Vector unit tests don't support verifying sink output directlyFor Kafka sink verification, use docker-compose with Redpanda (Kafka-compatible) and poll the topic:
def test_events_reach_kafka(vector_process):
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'test-events',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=3000,
value_deserializer=lambda m: json.loads(m.decode())
)
messages = list(consumer)
assert len(messages) > 0
assert messages[0].value['service'] == 'payments'Testing Pipelines With Docker Compose
For full end-to-end testing, run Vector with its actual dependencies:
# docker-compose.test.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200/_cluster/health"]
interval: 10s
timeout: 5s
retries: 5
vector:
image: timberio/vector:0.37.0-alpine
volumes:
- ./vector.toml:/etc/vector/vector.toml
depends_on:
elasticsearch:
condition: service_healthy
command: ["--config", "/etc/vector/vector.toml"]
test-runner:
image: python:3.12-slim
depends_on:
- vector
volumes:
- ./tests:/tests
command: ["python", "-m", "pytest", "/tests/integration/", "-v"]Run the full integration test suite:
docker-compose -f docker-compose.test.yml up --abort-on-container-exit --exit-code-from test-runnerCI Configuration
# .github/workflows/vector-tests.yml
name: Vector Pipeline Tests
on: [push, pull_request]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Vector
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.vector.dev | bash
echo "$HOME/.vector/bin" >> $GITHUB_PATH
- name: Run unit tests
run: vector test vector.toml
integration-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run integration tests
run: docker-compose -f docker-compose.test.yml up --abort-on-container-exitSummary
Vector's built-in unit test framework handles most transform and routing validation — use it heavily. Write a test for every VRL transform, especially around edge cases like missing fields, type mismatches, and abort conditions. For sink testing, Docker Compose integration tests are the right tool. Validate config syntax with vector validate before every deploy, and run unit tests in CI on every push.