Testing Apache Iceberg Tables: Schema Evolution, Partitioning, and Data Quality

Testing Apache Iceberg Tables: Schema Evolution, Partitioning, and Data Quality

Apache Iceberg tables support schema evolution, partition spec changes, time-travel queries, and row-level deletes — all of which are correctness-critical operations that are easy to break silently. This guide covers testing each of these operations using PyIceberg and a local catalog, verifying that column additions don't break readers, partition changes apply correctly, time-travel returns historical snapshots, and MERGE INTO operations produce the right row counts.

Key Takeaways

Test schema evolution with a local SQLite catalog. PyIceberg supports a SQLite-backed REST catalog out of the box — no external services needed for schema evolution tests.

Schema evolution must preserve backward compatibility. Adding columns and renaming columns are safe; changing column types or dropping columns can break downstream readers — write tests that verify both paths.

Partition spec changes are non-destructive but non-retroactive. New data uses the new partition spec; old data keeps the old one. Test that queries spanning both specs return correct row counts.

Time-travel tests verify audit and rollback semantics. Test that AS OF queries return the snapshot before a write, not after — this is your paper trail for data corrections.

Row-level delete tests (MERGE INTO) must verify both the delete and the preserve. Always assert what was removed AND what was kept — a test that only checks the final count can pass with a truncate.

Why Iceberg Operations Need Tests

Iceberg's power comes from its metadata-rich table format: snapshots, manifests, partition specs, and schema versions are all stored in the catalog. Operations like schema evolution, partition changes, and row-level deletes modify this metadata in ways that can silently corrupt downstream queries if done incorrectly.

A column rename doesn't fail — it succeeds and writes new metadata. But if your reader code references the old column name, queries silently return nulls. A partition spec change doesn't fail — but if your backfill doesn't re-partition old data, range queries return incomplete results. These failures are invisible unless you test explicitly.

Setting Up PyIceberg with a Local Catalog

PyIceberg supports multiple catalog backends. For testing, use the in-memory catalog or a SQLite-backed catalog:

pip install pyiceberg[sql-sqlite,pyarrow] pytest pandas pyarrow
# tests/conftest.py
import pytest
import tempfile
import os
from pyiceberg.catalog.sql import SqlCatalog

@pytest.fixture(scope="function")
def catalog(tmp_path):
    """Fresh SQLite catalog per test."""
    warehouse_path = str(tmp_path / "warehouse")
    os.makedirs(warehouse_path, exist_ok=True)

    cat = SqlCatalog(
        "test",
        **{
            "uri": f"sqlite:///{tmp_path}/catalog.db",
            "warehouse": f"file://{warehouse_path}",
        },
    )
    cat.create_namespace("testing")
    return cat

Testing Schema Evolution

Iceberg supports seven schema evolution operations: add column, drop column, rename column, update column type (widening only), reorder columns, add required column (only if table is empty), and update column doc.

Testing Add Column

# tests/test_schema_evolution.py
import pytest
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField, StringType, DoubleType, LongType, BooleanType
)

def test_add_column_preserves_existing_data(catalog):
    schema = Schema(
        NestedField(1, "order_id", StringType(), required=True),
        NestedField(2, "amount", DoubleType(), required=False),
    )
    catalog.create_table("testing.orders", schema=schema)
    table = catalog.load_table("testing.orders")

    # Write initial data
    initial_data = pa.table({
        "order_id": ["o1", "o2"],
        "amount": [100.0, 200.0],
    })
    table.append(initial_data)

    # Add a new column
    with table.update_schema() as update:
        update.add_column("status", StringType())

    # New data includes the column
    new_data = pa.table({
        "order_id": ["o3"],
        "amount": [150.0],
        "status": ["completed"],
    })
    table.append(new_data)

    # Read all data — old rows should have null for the new column
    result = table.scan().to_arrow()
    assert len(result) == 3

    o1_status = result.filter(
        pa.compute.equal(result["order_id"], "o1")
    )["status"][0].as_py()
    assert o1_status is None  # Old row: null for new column

    o3_status = result.filter(
        pa.compute.equal(result["order_id"], "o3")
    )["status"][0].as_py()
    assert o3_status == "completed"


def test_rename_column(catalog):
    schema = Schema(
        NestedField(1, "cust_id", StringType(), required=True),
        NestedField(2, "rev", DoubleType(), required=False),
    )
    catalog.create_table("testing.customers", schema=schema)
    table = catalog.load_table("testing.customers")

    table.append(pa.table({"cust_id": ["c1", "c2"], "rev": [500.0, 300.0]}))

    # Rename column — should preserve data
    with table.update_schema() as update:
        update.rename_column("cust_id", "customer_id")
        update.rename_column("rev", "revenue")

    result = table.scan().to_arrow()
    assert "customer_id" in result.column_names
    assert "revenue" in result.column_names
    assert "cust_id" not in result.column_names

    # Data intact
    assert result["customer_id"].to_pylist() == ["c1", "c2"]
    assert result["revenue"].to_pylist() == [500.0, 300.0]


def test_drop_column_removes_data(catalog):
    schema = Schema(
        NestedField(1, "order_id", StringType(), required=True),
        NestedField(2, "internal_flag", BooleanType(), required=False),
        NestedField(3, "amount", DoubleType(), required=False),
    )
    catalog.create_table("testing.orders_v2", schema=schema)
    table = catalog.load_table("testing.orders_v2")

    table.append(pa.table({
        "order_id": ["o1"],
        "internal_flag": [True],
        "amount": [100.0],
    }))

    with table.update_schema() as update:
        update.delete_column("internal_flag")

    result = table.scan().to_arrow()
    assert "internal_flag" not in result.column_names
    assert "amount" in result.column_names

Testing Partition Spec Changes

# tests/test_partitioning.py
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import TimestampType, StringType, NestedField
from pyiceberg.schema import Schema
import pyarrow as pa
from datetime import datetime, timezone

def test_partition_spec_evolution(catalog):
    """Verify that data written before and after a partition spec change is both readable."""
    schema = Schema(
        NestedField(1, "event_id", StringType(), required=True),
        NestedField(2, "event_ts", TimestampType(), required=True),
        NestedField(3, "region", StringType(), required=False),
    )

    # Initially partition by day
    initial_spec = PartitionSpec(
        PartitionField(source_id=2, field_id=1000, transform=DayTransform(), name="event_day")
    )
    catalog.create_table(
        "testing.events",
        schema=schema,
        partition_spec=initial_spec,
    )
    table = catalog.load_table("testing.events")

    # Write data with day partitioning
    ts_jan15 = datetime(2026, 1, 15, 10, 0, 0, tzinfo=timezone.utc)
    table.append(pa.table({
        "event_id": ["e1", "e2"],
        "event_ts": pa.array([ts_jan15, ts_jan15], type=pa.timestamp("us", tz="UTC")),
        "region": ["us-east", "eu-west"],
    }))

    assert table.scan().to_arrow().num_rows == 2

    # Evolve partition spec — add region as secondary partition
    with table.update_spec() as update:
        update.add_field("region", IdentityTransform(), "region_id")

    # Write data with new spec
    ts_jan16 = datetime(2026, 1, 16, 10, 0, 0, tzinfo=timezone.utc)
    table.append(pa.table({
        "event_id": ["e3"],
        "event_ts": pa.array([ts_jan16], type=pa.timestamp("us", tz="UTC")),
        "region": ["us-east"],
    }))

    # All rows readable across both partition specs
    all_rows = table.scan().to_arrow()
    assert all_rows.num_rows == 3

Testing Time-Travel Queries

# tests/test_time_travel.py
from pyiceberg.expressions import EqualTo
import time

def test_time_travel_returns_historical_snapshot(catalog):
    schema = Schema(
        NestedField(1, "product_id", StringType(), required=True),
        NestedField(2, "price", DoubleType(), required=False),
    )
    catalog.create_table("testing.products", schema=schema)
    table = catalog.load_table("testing.products")

    # Snapshot 1: initial prices
    table.append(pa.table({
        "product_id": ["p1", "p2"],
        "price": [9.99, 24.99],
    }))
    snapshot_1 = table.current_snapshot().snapshot_id

    # Snapshot 2: price update
    table.append(pa.table({
        "product_id": ["p3"],
        "price": [49.99],
    }))
    snapshot_2 = table.current_snapshot().snapshot_id

    assert snapshot_1 != snapshot_2

    # Time-travel to snapshot 1 — should only see 2 rows
    historical = table.scan(snapshot_id=snapshot_1).to_arrow()
    assert historical.num_rows == 2

    product_ids_at_s1 = set(historical["product_id"].to_pylist())
    assert product_ids_at_s1 == {"p1", "p2"}

    # Current state — should see all 3 rows
    current = table.scan().to_arrow()
    assert current.num_rows == 3


def test_snapshot_rollback(catalog):
    """Verify that after rollback, the table returns to the historical state."""
    schema = Schema(
        NestedField(1, "id", StringType(), required=True),
        NestedField(2, "value", DoubleType(), required=False),
    )
    catalog.create_table("testing.rollback_test", schema=schema)
    table = catalog.load_table("testing.rollback_test")

    table.append(pa.table({"id": ["r1"], "value": [1.0]}))
    good_snapshot = table.current_snapshot().snapshot_id

    table.append(pa.table({"id": ["r2"], "value": [2.0]}))

    # Rollback to snapshot before bad write
    table.manage_snapshots().rollback_to_snapshot(good_snapshot).commit()

    result = table.scan().to_arrow()
    assert result.num_rows == 1
    assert result["id"][0].as_py() == "r1"

Testing Row-Level Deletes (MERGE INTO)

Iceberg supports row-level deletes via equality deletes and position deletes. Test them with overwrite operations that simulate MERGE INTO semantics:

# tests/test_row_level_operations.py
from pyiceberg.expressions import EqualTo, In

def test_equality_delete(catalog):
    schema = Schema(
        NestedField(1, "order_id", StringType(), required=True),
        NestedField(2, "status", StringType(), required=False),
        NestedField(3, "amount", DoubleType(), required=False),
    )
    catalog.create_table("testing.orders_del", schema=schema)
    table = catalog.load_table("testing.orders_del")

    table.append(pa.table({
        "order_id": ["o1", "o2", "o3", "o4"],
        "status": ["completed", "pending", "refunded", "pending"],
        "amount": [100.0, 50.0, 200.0, 75.0],
    }))

    assert table.scan().to_arrow().num_rows == 4

    # Delete all pending orders
    table.delete(delete_filter=EqualTo("status", "pending"))

    result = table.scan().to_arrow()
    assert result.num_rows == 2

    remaining_statuses = set(result["status"].to_pylist())
    assert "pending" not in remaining_statuses
    assert "completed" in remaining_statuses
    assert "refunded" in remaining_statuses


def test_upsert_via_overwrite(catalog):
    """Simulate MERGE INTO: update existing rows, insert new ones."""
    schema = Schema(
        NestedField(1, "customer_id", StringType(), required=True),
        NestedField(2, "email", StringType(), required=False),
        NestedField(3, "tier", StringType(), required=False),
    )
    catalog.create_table("testing.customers_upsert", schema=schema)
    table = catalog.load_table("testing.customers_upsert")

    # Initial state
    table.append(pa.table({
        "customer_id": ["c1", "c2"],
        "email": ["alice@example.com", "bob@example.com"],
        "tier": ["standard", "standard"],
    }))

    # Upsert: c1 gets upgraded to premium, c3 is new
    upsert_data = pa.table({
        "customer_id": ["c1", "c3"],
        "email": ["alice@example.com", "carol@example.com"],
        "tier": ["premium", "standard"],
    })

    # Overwrite rows matching customer_id in upsert set
    table.overwrite(
        df=upsert_data,
        overwrite_filter=In("customer_id", ["c1", "c3"]),
    )

    result = table.scan().to_arrow()
    # c1 (updated), c2 (unchanged), c3 (inserted)
    assert result.num_rows == 3

    c1 = result.filter(pa.compute.equal(result["customer_id"], "c1"))
    assert c1["tier"][0].as_py() == "premium"

    c2 = result.filter(pa.compute.equal(result["customer_id"], "c2"))
    assert c2["tier"][0].as_py() == "standard"

Testing Iceberg Metadata Tables

Iceberg exposes metadata through virtual tables — history, snapshots, manifests, files. Query them to write tests that verify operational invariants:

# tests/test_metadata.py

def test_snapshot_count_after_writes(catalog):
    schema = Schema(
        NestedField(1, "id", StringType(), required=True),
    )
    catalog.create_table("testing.meta_test", schema=schema)
    table = catalog.load_table("testing.meta_test")

    for i in range(3):
        table.append(pa.table({"id": [f"row_{i}"]}))

    history = table.history()
    assert len(history) == 3

    # Each append creates exactly one snapshot
    snapshot_ids = [h.snapshot_id for h in history]
    assert len(set(snapshot_ids)) == 3  # all unique


def test_data_files_per_partition(catalog):
    from pyiceberg.partitioning import PartitionSpec, PartitionField
    from pyiceberg.transforms import IdentityTransform

    schema = Schema(
        NestedField(1, "region", StringType(), required=True),
        NestedField(2, "sales", DoubleType(), required=False),
    )
    spec = PartitionSpec(
        PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="region")
    )
    catalog.create_table("testing.sales_by_region", schema=schema, partition_spec=spec)
    table = catalog.load_table("testing.sales_by_region")

    table.append(pa.table({
        "region": ["us-east", "eu-west", "us-east"],
        "sales": [100.0, 200.0, 150.0],
    }))

    files = list(table.scan().plan_files())
    # Should have at least 1 file (exact count depends on writer partitioning)
    assert len(files) >= 1

What to Test vs. What to Skip

Test these:

  • Schema evolution — add column leaves nulls in old rows, rename preserves data, drop removes column from scans
  • Partition spec changes — data written under both old and new specs is readable
  • Time-travel — snapshot_id queries return historical row counts, not current
  • Row-level deletes — both the deleted rows are gone AND the preserved rows remain
  • Snapshot rollback — after rollback, table reflects pre-rollback state exactly
  • Metadata tables — snapshot count, file count per partition, history length after N writes

Skip these:

  • Iceberg catalog implementations (Hive, Glue, Nessie) — they differ in transactional semantics that are hard to mock faithfully
  • Compaction (rewriting data files) — operational concern, results should be identical pre/post
  • Iceberg REST catalog authentication — security boundary testing, needs real credentials
  • Cross-engine compatibility (Spark reading PyIceberg-written tables) — integration test requiring a real Spark cluster
  • Object storage durability — S3/GCS/ADLS data durability is the cloud provider's responsibility

Read more