Testing Apache Iceberg Tables: Schema Evolution, Partitioning, and Data Quality
Apache Iceberg tables support schema evolution, partition spec changes, time-travel queries, and row-level deletes — all of which are correctness-critical operations that are easy to break silently. This guide covers testing each of these operations using PyIceberg and a local catalog, verifying that column additions don't break readers, partition changes apply correctly, time-travel returns historical snapshots, and MERGE INTO operations produce the right row counts.
Key Takeaways
Test schema evolution with a local SQLite catalog. PyIceberg supports a SQLite-backed REST catalog out of the box — no external services needed for schema evolution tests.
Schema evolution must preserve backward compatibility. Adding columns and renaming columns are safe; changing column types or dropping columns can break downstream readers — write tests that verify both paths.
Partition spec changes are non-destructive but non-retroactive. New data uses the new partition spec; old data keeps the old one. Test that queries spanning both specs return correct row counts.
Time-travel tests verify audit and rollback semantics. Test that AS OF queries return the snapshot before a write, not after — this is your paper trail for data corrections.
Row-level delete tests (MERGE INTO) must verify both the delete and the preserve. Always assert what was removed AND what was kept — a test that only checks the final count can pass with a truncate.
Why Iceberg Operations Need Tests
Iceberg's power comes from its metadata-rich table format: snapshots, manifests, partition specs, and schema versions are all stored in the catalog. Operations like schema evolution, partition changes, and row-level deletes modify this metadata in ways that can silently corrupt downstream queries if done incorrectly.
A column rename doesn't fail — it succeeds and writes new metadata. But if your reader code references the old column name, queries silently return nulls. A partition spec change doesn't fail — but if your backfill doesn't re-partition old data, range queries return incomplete results. These failures are invisible unless you test explicitly.
Setting Up PyIceberg with a Local Catalog
PyIceberg supports multiple catalog backends. For testing, use the in-memory catalog or a SQLite-backed catalog:
pip install pyiceberg[sql-sqlite,pyarrow] pytest pandas pyarrow# tests/conftest.py
import pytest
import tempfile
import os
from pyiceberg.catalog.sql import SqlCatalog
@pytest.fixture(scope="function")
def catalog(tmp_path):
"""Fresh SQLite catalog per test."""
warehouse_path = str(tmp_path / "warehouse")
os.makedirs(warehouse_path, exist_ok=True)
cat = SqlCatalog(
"test",
**{
"uri": f"sqlite:///{tmp_path}/catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
cat.create_namespace("testing")
return catTesting Schema Evolution
Iceberg supports seven schema evolution operations: add column, drop column, rename column, update column type (widening only), reorder columns, add required column (only if table is empty), and update column doc.
Testing Add Column
# tests/test_schema_evolution.py
import pytest
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField, StringType, DoubleType, LongType, BooleanType
)
def test_add_column_preserves_existing_data(catalog):
schema = Schema(
NestedField(1, "order_id", StringType(), required=True),
NestedField(2, "amount", DoubleType(), required=False),
)
catalog.create_table("testing.orders", schema=schema)
table = catalog.load_table("testing.orders")
# Write initial data
initial_data = pa.table({
"order_id": ["o1", "o2"],
"amount": [100.0, 200.0],
})
table.append(initial_data)
# Add a new column
with table.update_schema() as update:
update.add_column("status", StringType())
# New data includes the column
new_data = pa.table({
"order_id": ["o3"],
"amount": [150.0],
"status": ["completed"],
})
table.append(new_data)
# Read all data — old rows should have null for the new column
result = table.scan().to_arrow()
assert len(result) == 3
o1_status = result.filter(
pa.compute.equal(result["order_id"], "o1")
)["status"][0].as_py()
assert o1_status is None # Old row: null for new column
o3_status = result.filter(
pa.compute.equal(result["order_id"], "o3")
)["status"][0].as_py()
assert o3_status == "completed"
def test_rename_column(catalog):
schema = Schema(
NestedField(1, "cust_id", StringType(), required=True),
NestedField(2, "rev", DoubleType(), required=False),
)
catalog.create_table("testing.customers", schema=schema)
table = catalog.load_table("testing.customers")
table.append(pa.table({"cust_id": ["c1", "c2"], "rev": [500.0, 300.0]}))
# Rename column — should preserve data
with table.update_schema() as update:
update.rename_column("cust_id", "customer_id")
update.rename_column("rev", "revenue")
result = table.scan().to_arrow()
assert "customer_id" in result.column_names
assert "revenue" in result.column_names
assert "cust_id" not in result.column_names
# Data intact
assert result["customer_id"].to_pylist() == ["c1", "c2"]
assert result["revenue"].to_pylist() == [500.0, 300.0]
def test_drop_column_removes_data(catalog):
schema = Schema(
NestedField(1, "order_id", StringType(), required=True),
NestedField(2, "internal_flag", BooleanType(), required=False),
NestedField(3, "amount", DoubleType(), required=False),
)
catalog.create_table("testing.orders_v2", schema=schema)
table = catalog.load_table("testing.orders_v2")
table.append(pa.table({
"order_id": ["o1"],
"internal_flag": [True],
"amount": [100.0],
}))
with table.update_schema() as update:
update.delete_column("internal_flag")
result = table.scan().to_arrow()
assert "internal_flag" not in result.column_names
assert "amount" in result.column_namesTesting Partition Spec Changes
# tests/test_partitioning.py
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import TimestampType, StringType, NestedField
from pyiceberg.schema import Schema
import pyarrow as pa
from datetime import datetime, timezone
def test_partition_spec_evolution(catalog):
"""Verify that data written before and after a partition spec change is both readable."""
schema = Schema(
NestedField(1, "event_id", StringType(), required=True),
NestedField(2, "event_ts", TimestampType(), required=True),
NestedField(3, "region", StringType(), required=False),
)
# Initially partition by day
initial_spec = PartitionSpec(
PartitionField(source_id=2, field_id=1000, transform=DayTransform(), name="event_day")
)
catalog.create_table(
"testing.events",
schema=schema,
partition_spec=initial_spec,
)
table = catalog.load_table("testing.events")
# Write data with day partitioning
ts_jan15 = datetime(2026, 1, 15, 10, 0, 0, tzinfo=timezone.utc)
table.append(pa.table({
"event_id": ["e1", "e2"],
"event_ts": pa.array([ts_jan15, ts_jan15], type=pa.timestamp("us", tz="UTC")),
"region": ["us-east", "eu-west"],
}))
assert table.scan().to_arrow().num_rows == 2
# Evolve partition spec — add region as secondary partition
with table.update_spec() as update:
update.add_field("region", IdentityTransform(), "region_id")
# Write data with new spec
ts_jan16 = datetime(2026, 1, 16, 10, 0, 0, tzinfo=timezone.utc)
table.append(pa.table({
"event_id": ["e3"],
"event_ts": pa.array([ts_jan16], type=pa.timestamp("us", tz="UTC")),
"region": ["us-east"],
}))
# All rows readable across both partition specs
all_rows = table.scan().to_arrow()
assert all_rows.num_rows == 3Testing Time-Travel Queries
# tests/test_time_travel.py
from pyiceberg.expressions import EqualTo
import time
def test_time_travel_returns_historical_snapshot(catalog):
schema = Schema(
NestedField(1, "product_id", StringType(), required=True),
NestedField(2, "price", DoubleType(), required=False),
)
catalog.create_table("testing.products", schema=schema)
table = catalog.load_table("testing.products")
# Snapshot 1: initial prices
table.append(pa.table({
"product_id": ["p1", "p2"],
"price": [9.99, 24.99],
}))
snapshot_1 = table.current_snapshot().snapshot_id
# Snapshot 2: price update
table.append(pa.table({
"product_id": ["p3"],
"price": [49.99],
}))
snapshot_2 = table.current_snapshot().snapshot_id
assert snapshot_1 != snapshot_2
# Time-travel to snapshot 1 — should only see 2 rows
historical = table.scan(snapshot_id=snapshot_1).to_arrow()
assert historical.num_rows == 2
product_ids_at_s1 = set(historical["product_id"].to_pylist())
assert product_ids_at_s1 == {"p1", "p2"}
# Current state — should see all 3 rows
current = table.scan().to_arrow()
assert current.num_rows == 3
def test_snapshot_rollback(catalog):
"""Verify that after rollback, the table returns to the historical state."""
schema = Schema(
NestedField(1, "id", StringType(), required=True),
NestedField(2, "value", DoubleType(), required=False),
)
catalog.create_table("testing.rollback_test", schema=schema)
table = catalog.load_table("testing.rollback_test")
table.append(pa.table({"id": ["r1"], "value": [1.0]}))
good_snapshot = table.current_snapshot().snapshot_id
table.append(pa.table({"id": ["r2"], "value": [2.0]}))
# Rollback to snapshot before bad write
table.manage_snapshots().rollback_to_snapshot(good_snapshot).commit()
result = table.scan().to_arrow()
assert result.num_rows == 1
assert result["id"][0].as_py() == "r1"Testing Row-Level Deletes (MERGE INTO)
Iceberg supports row-level deletes via equality deletes and position deletes. Test them with overwrite operations that simulate MERGE INTO semantics:
# tests/test_row_level_operations.py
from pyiceberg.expressions import EqualTo, In
def test_equality_delete(catalog):
schema = Schema(
NestedField(1, "order_id", StringType(), required=True),
NestedField(2, "status", StringType(), required=False),
NestedField(3, "amount", DoubleType(), required=False),
)
catalog.create_table("testing.orders_del", schema=schema)
table = catalog.load_table("testing.orders_del")
table.append(pa.table({
"order_id": ["o1", "o2", "o3", "o4"],
"status": ["completed", "pending", "refunded", "pending"],
"amount": [100.0, 50.0, 200.0, 75.0],
}))
assert table.scan().to_arrow().num_rows == 4
# Delete all pending orders
table.delete(delete_filter=EqualTo("status", "pending"))
result = table.scan().to_arrow()
assert result.num_rows == 2
remaining_statuses = set(result["status"].to_pylist())
assert "pending" not in remaining_statuses
assert "completed" in remaining_statuses
assert "refunded" in remaining_statuses
def test_upsert_via_overwrite(catalog):
"""Simulate MERGE INTO: update existing rows, insert new ones."""
schema = Schema(
NestedField(1, "customer_id", StringType(), required=True),
NestedField(2, "email", StringType(), required=False),
NestedField(3, "tier", StringType(), required=False),
)
catalog.create_table("testing.customers_upsert", schema=schema)
table = catalog.load_table("testing.customers_upsert")
# Initial state
table.append(pa.table({
"customer_id": ["c1", "c2"],
"email": ["alice@example.com", "bob@example.com"],
"tier": ["standard", "standard"],
}))
# Upsert: c1 gets upgraded to premium, c3 is new
upsert_data = pa.table({
"customer_id": ["c1", "c3"],
"email": ["alice@example.com", "carol@example.com"],
"tier": ["premium", "standard"],
})
# Overwrite rows matching customer_id in upsert set
table.overwrite(
df=upsert_data,
overwrite_filter=In("customer_id", ["c1", "c3"]),
)
result = table.scan().to_arrow()
# c1 (updated), c2 (unchanged), c3 (inserted)
assert result.num_rows == 3
c1 = result.filter(pa.compute.equal(result["customer_id"], "c1"))
assert c1["tier"][0].as_py() == "premium"
c2 = result.filter(pa.compute.equal(result["customer_id"], "c2"))
assert c2["tier"][0].as_py() == "standard"Testing Iceberg Metadata Tables
Iceberg exposes metadata through virtual tables — history, snapshots, manifests, files. Query them to write tests that verify operational invariants:
# tests/test_metadata.py
def test_snapshot_count_after_writes(catalog):
schema = Schema(
NestedField(1, "id", StringType(), required=True),
)
catalog.create_table("testing.meta_test", schema=schema)
table = catalog.load_table("testing.meta_test")
for i in range(3):
table.append(pa.table({"id": [f"row_{i}"]}))
history = table.history()
assert len(history) == 3
# Each append creates exactly one snapshot
snapshot_ids = [h.snapshot_id for h in history]
assert len(set(snapshot_ids)) == 3 # all unique
def test_data_files_per_partition(catalog):
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
schema = Schema(
NestedField(1, "region", StringType(), required=True),
NestedField(2, "sales", DoubleType(), required=False),
)
spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="region")
)
catalog.create_table("testing.sales_by_region", schema=schema, partition_spec=spec)
table = catalog.load_table("testing.sales_by_region")
table.append(pa.table({
"region": ["us-east", "eu-west", "us-east"],
"sales": [100.0, 200.0, 150.0],
}))
files = list(table.scan().plan_files())
# Should have at least 1 file (exact count depends on writer partitioning)
assert len(files) >= 1What to Test vs. What to Skip
Test these:
- Schema evolution — add column leaves nulls in old rows, rename preserves data, drop removes column from scans
- Partition spec changes — data written under both old and new specs is readable
- Time-travel — snapshot_id queries return historical row counts, not current
- Row-level deletes — both the deleted rows are gone AND the preserved rows remain
- Snapshot rollback — after rollback, table reflects pre-rollback state exactly
- Metadata tables — snapshot count, file count per partition, history length after N writes
Skip these:
- Iceberg catalog implementations (Hive, Glue, Nessie) — they differ in transactional semantics that are hard to mock faithfully
- Compaction (rewriting data files) — operational concern, results should be identical pre/post
- Iceberg REST catalog authentication — security boundary testing, needs real credentials
- Cross-engine compatibility (Spark reading PyIceberg-written tables) — integration test requiring a real Spark cluster
- Object storage durability — S3/GCS/ADLS data durability is the cloud provider's responsibility