DuckDB in Tests: Fast Analytical Query Testing for Data Pipelines
DuckDB is an in-process analytical database that starts in milliseconds, runs SQL against Parquet/CSV files natively, and integrates seamlessly with pandas and polars. It makes an ideal test fixture for data pipelines — replace your BigQuery or Redshift dependency with DuckDB in tests, keep your production SQL mostly unchanged, and get a sub-second feedback loop on every commit.
Key Takeaways
DuckDB needs zero infrastructure. It runs in-process with no server, no Docker, no credentials — import duckdb and you're done. Perfect for CI that has to run everywhere.
DuckDB reads Parquet and CSV directly. Your pipelines that read from S3 can be tested against local fixture files with identical SQL — just swap the path.
Use pytest fixtures to share a DuckDB connection across tests. A session-scoped fixture with an in-memory database gives each test a clean schema without recreating the connection.
DuckDB's SQL dialect is ANSI-compatible with window functions, CTEs, and QUALIFY. Most BigQuery/Redshift/Snowflake SQL runs unchanged — making it a practical stand-in for warehouse unit tests.
Test pandas/polars pipeline logic by round-tripping through DuckDB. duckdb.from_df() and duckdb.arrow() let you query DataFrames with SQL, then hand results back to pandas — no serialization overhead.
Why DuckDB Works as a Test Fixture
Most data pipeline tests require one of three things: a live cloud warehouse (expensive, slow), a mocked client (brittle, misses SQL logic), or a PostgreSQL container (close but missing analytical SQL features). DuckDB sidesteps all three problems.
It runs in-process, speaks SQL with window functions, CTEs, QUALIFY, PIVOT, LIST aggregates, and UNNEST, and reads Parquet files natively. The SQL you write for BigQuery or Snowflake typically runs unchanged against DuckDB with minor adjustments for dialect differences.
The result: data pipeline tests that run in under a second, require no infrastructure, and actually exercise your SQL logic.
Installation and Basic Setup
pip install duckdb pytest pandas polars pyarrowDuckDB has no server process — just import and use:
import duckdb
# In-memory database (default)
con = duckdb.connect()
# Or file-backed (persists between test runs)
con = duckdb.connect("test.duckdb")pytest Fixtures for DuckDB
A session-scoped fixture with an in-memory database keeps tests fast while isolating schemas with CREATE SCHEMA:
# tests/conftest.py
import pytest
import duckdb
@pytest.fixture(scope="session")
def db():
"""Shared in-memory DuckDB connection for the test session."""
con = duckdb.connect(":memory:")
yield con
con.close()
@pytest.fixture(autouse=True)
def clean_schema(db):
"""Create a fresh schema for each test and drop it after."""
db.execute("CREATE SCHEMA IF NOT EXISTS test_run")
db.execute("SET search_path = test_run")
yield
db.execute("DROP SCHEMA test_run CASCADE")
db.execute("SET search_path = main")Testing SQL Transformations
The core use case: test SQL transformation logic with known input data and assert on exact output.
# src/sql/daily_revenue.sql
# SELECT
# CAST(order_ts AS DATE) AS order_date,
# region,
# SUM(amount_usd) AS total_revenue,
# COUNT(*) AS order_count,
# COUNT(DISTINCT customer_id) AS unique_customers
# FROM orders
# WHERE status = 'completed'
# GROUP BY 1, 2
# ORDER BY 1, 2
# tests/test_daily_revenue.py
import duckdb
import pytest
def test_daily_revenue_aggregation(db):
db.execute("""
CREATE TABLE orders (
order_id VARCHAR,
order_ts TIMESTAMP,
region VARCHAR,
customer_id VARCHAR,
amount_usd DOUBLE,
status VARCHAR
)
""")
db.execute("""
INSERT INTO orders VALUES
('o1', '2026-01-15 10:00:00', 'us-east', 'c1', 100.0, 'completed'),
('o2', '2026-01-15 11:00:00', 'us-east', 'c1', 200.0, 'completed'),
('o3', '2026-01-15 12:00:00', 'eu-west', 'c2', 150.0, 'completed'),
('o4', '2026-01-15 13:00:00', 'us-east', 'c3', 99.0, 'cancelled'),
('o5', '2026-01-16 09:00:00', 'us-east', 'c2', 75.0, 'completed')
""")
result = db.execute("""
SELECT
CAST(order_ts AS DATE) AS order_date,
region,
SUM(amount_usd) AS total_revenue,
COUNT(*) AS order_count,
COUNT(DISTINCT customer_id) AS unique_customers
FROM orders
WHERE status = 'completed'
GROUP BY 1, 2
ORDER BY 1, 2
""").fetchdf()
assert len(result) == 3 # jan15/us-east, jan15/eu-west, jan16/us-east
jan15_us = result[
(result["order_date"].astype(str) == "2026-01-15") &
(result["region"] == "us-east")
].iloc[0]
assert jan15_us["total_revenue"] == 300.0
assert jan15_us["order_count"] == 2
assert jan15_us["unique_customers"] == 1 # c1 appears twice, one unique customer
def test_daily_revenue_excludes_non_completed(db):
db.execute("""
CREATE TABLE orders (
order_id VARCHAR, order_ts TIMESTAMP, region VARCHAR,
customer_id VARCHAR, amount_usd DOUBLE, status VARCHAR
)
""")
db.execute("""
INSERT INTO orders VALUES
('o1', '2026-01-15 10:00:00', 'us-east', 'c1', 500.0, 'refunded'),
('o2', '2026-01-15 11:00:00', 'us-east', 'c2', 999.0, 'pending')
""")
result = db.execute("""
SELECT COUNT(*) AS cnt FROM orders WHERE status = 'completed'
""").fetchone()
assert result[0] == 0Testing Parquet and CSV File Transformations
DuckDB reads files directly — no loading step required. This mirrors how many pipelines read from S3 or a data lake.
# tests/test_parquet_pipeline.py
import duckdb
import pandas as pd
import pytest
import tempfile
import os
@pytest.fixture
def parquet_fixture(tmp_path):
"""Write fixture data to Parquet files simulating a data lake partition."""
df_jan = pd.DataFrame({
"order_id": ["o1", "o2", "o3"],
"amount_usd": [100.0, 200.0, 150.0],
"status": ["completed", "completed", "refunded"],
"region": ["us-east", "eu-west", "us-east"],
})
df_feb = pd.DataFrame({
"order_id": ["o4", "o5"],
"amount_usd": [75.0, 50.0],
"status": ["completed", "cancelled"],
"region": ["us-east", "eu-west"],
})
jan_path = tmp_path / "year=2026" / "month=01"
feb_path = tmp_path / "year=2026" / "month=02"
jan_path.mkdir(parents=True)
feb_path.mkdir(parents=True)
df_jan.to_parquet(jan_path / "data.parquet", index=False)
df_feb.to_parquet(feb_path / "data.parquet", index=False)
return str(tmp_path)
def test_read_partitioned_parquet(db, parquet_fixture):
result = db.execute(f"""
SELECT
region,
SUM(amount_usd) AS total,
COUNT(*) AS cnt
FROM read_parquet('{parquet_fixture}/**/*.parquet', hive_partitioning=true)
WHERE status = 'completed'
GROUP BY region
ORDER BY region
""").fetchdf()
assert len(result) == 2
eu = result[result["region"] == "eu-west"].iloc[0]
assert eu["total"] == 200.0
assert eu["cnt"] == 1
def test_csv_ingest_with_type_casting(db, tmp_path):
csv_content = "id,score,label\n1,0.95,good\n2,0.12,bad\n3,,unknown\n"
csv_file = tmp_path / "scores.csv"
csv_file.write_text(csv_content)
result = db.execute(f"""
SELECT id, score, label
FROM read_csv('{csv_file}',
columns={{'id': 'INTEGER', 'score': 'DOUBLE', 'label': 'VARCHAR'}},
nullstr=''
)
""").fetchdf()
assert len(result) == 3
assert pd.isna(result.iloc[2]["score"]) # empty → NULLTesting pandas and polars Pipeline Logic
DuckDB integrates directly with both pandas DataFrames and polars LazyFrames:
# tests/test_pandas_pipeline.py
import duckdb
import pandas as pd
def test_pandas_join_with_sql(db):
orders = pd.DataFrame({
"order_id": ["o1", "o2", "o3"],
"customer_id": ["c1", "c2", "c1"],
"amount_usd": [100.0, 200.0, 150.0],
})
customers = pd.DataFrame({
"customer_id": ["c1", "c2"],
"name": ["Alice", "Bob"],
"tier": ["premium", "standard"],
})
# DuckDB can query DataFrames directly by name
result = duckdb.query("""
SELECT
c.name,
c.tier,
SUM(o.amount_usd) AS lifetime_value,
COUNT(*) AS order_count
FROM orders o
JOIN customers c USING (customer_id)
GROUP BY c.name, c.tier
ORDER BY lifetime_value DESC
""").fetchdf()
assert result.iloc[0]["name"] == "Alice"
assert result.iloc[0]["lifetime_value"] == 250.0
assert result.iloc[0]["order_count"] == 2
# tests/test_polars_pipeline.py
import duckdb
import polars as pl
def test_polars_window_function(db):
df = pl.DataFrame({
"customer_id": ["c1", "c1", "c2", "c2", "c3"],
"order_date": ["2026-01-01", "2026-01-10", "2026-01-05", "2026-01-15", "2026-01-20"],
"amount": [100.0, 200.0, 150.0, 50.0, 300.0],
})
# Use DuckDB to run window function on polars DataFrame
result = duckdb.query("""
SELECT
customer_id,
order_date,
amount,
SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS running_total
FROM df
ORDER BY customer_id, order_date
""").pl() # Returns polars DataFrame
c1_rows = result.filter(pl.col("customer_id") == "c1")
assert c1_rows["running_total"].to_list() == [100.0, 300.0]Testing Macro/SQL Template Logic
If your pipeline uses Jinja-templated SQL (like dbt), test the rendered SQL against DuckDB:
# tests/test_sql_templates.py
from jinja2 import Template
import duckdb
REVENUE_TEMPLATE = """
SELECT
DATE_TRUNC('{{ grain }}', order_ts) AS period,
SUM(amount_usd) AS revenue
FROM orders
WHERE status = 'completed'
AND order_ts >= '{{ start_date }}'
GROUP BY 1
ORDER BY 1
"""
@pytest.mark.parametrize("grain,expected_periods", [
("week", 2),
("month", 1),
])
def test_revenue_template_grain(db, grain, expected_periods):
db.execute("""
CREATE TABLE orders (
order_id VARCHAR, order_ts TIMESTAMP,
amount_usd DOUBLE, status VARCHAR
)
""")
db.execute("""
INSERT INTO orders VALUES
('o1', '2026-01-05 10:00:00', 100.0, 'completed'),
('o2', '2026-01-12 10:00:00', 200.0, 'completed'),
('o3', '2026-01-19 10:00:00', 150.0, 'cancelled')
""")
sql = Template(REVENUE_TEMPLATE).render(
grain=grain, start_date="2026-01-01"
)
result = db.execute(sql).fetchdf()
assert len(result) == expected_periodsWhat to Test vs. What to Skip
Test these:
- SQL transformation logic — aggregations, window functions, CTEs, QUALIFY clauses
- Parquet/CSV read behavior — column type inference, null handling, partition pruning
- pandas/polars pipeline logic that can be expressed as SQL or round-tripped through DuckDB
- Jinja/dbt SQL template rendering against expected output
- Data quality constraints — null checks, value ranges, referential integrity
- Edge cases — empty DataFrames, all-null columns, single-row inputs, very large integers
Skip these:
- Cloud warehouse-specific features that DuckDB doesn't emulate — Redshift COPY, BigQuery partitioned table metadata, Snowflake clustering depth
- Network I/O in pipelines — S3 reads in production; use local file fixtures instead
- Concurrency and transaction isolation — DuckDB is single-writer, so concurrent write tests don't apply
- DuckDB's own engine correctness — the database itself, not your SQL
- Performance benchmarks — DuckDB in-memory performance doesn't predict warehouse costs