Feature Store Testing and Data Lineage Validation
Feature stores solve the problem of feature reuse and training/serving consistency in ML systems. They also introduce a new category of bugs: features that compute correctly in isolation but produce wrong values when served at inference time due to point-in-time leakage, online/offline discrepancies, or stale data.
Testing a feature store means validating not just that features compute correctly, but that they're consistent, fresh, and available when the model needs them.
What Can Go Wrong in Feature Stores
Online/offline skew — The offline store (used for training) and online store (used for serving) compute features differently. The model trains on one distribution and serves on another.
Point-in-time contamination — A feature computed for training uses data from after the label event. Example: computing "purchases in the last 7 days" for a churn prediction using purchases that happened after the customer already churned.
Feature staleness — Online feature values are stale because the pipeline failed or ran late. The model receives yesterday's data while making today's decisions.
Schema changes — A feature's type or range changed due to an upstream schema migration. The model silently receives out-of-range values.
Testing Online/Offline Consistency
The most critical test: do the same features compute to the same values in online and offline stores?
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import pytest
class FeatureStoreConsistencyTester:
def __init__(self, offline_store, online_store, feature_view_name: str):
self.offline = offline_store
self.online = online_store
self.feature_view = feature_view_name
def test_consistency(self, entity_ids: list, as_of_timestamp: datetime,
tolerance: float = 1e-6) -> dict:
"""
Compare features for the same entities between online and offline stores.
Returns a report of inconsistencies.
"""
# Get features from offline store (point-in-time query)
offline_features = self.offline.get_historical_features(
entity_ids=entity_ids,
feature_view=self.feature_view,
timestamp=as_of_timestamp
)
# Get features from online store (current values)
online_features = self.online.get_online_features(
entity_ids=entity_ids,
feature_view=self.feature_view
)
inconsistencies = []
for entity_id in entity_ids:
offline_row = offline_features[entity_id]
online_row = online_features[entity_id]
for feature_name in offline_row:
offline_val = offline_row[feature_name]
online_val = online_row.get(feature_name)
if online_val is None:
inconsistencies.append({
"entity_id": entity_id,
"feature": feature_name,
"issue": "missing_in_online_store",
"offline_value": offline_val,
"online_value": None
})
elif isinstance(offline_val, float) and isinstance(online_val, float):
if abs(offline_val - online_val) > tolerance:
inconsistencies.append({
"entity_id": entity_id,
"feature": feature_name,
"issue": "value_mismatch",
"offline_value": offline_val,
"online_value": online_val,
"diff": abs(offline_val - online_val)
})
elif offline_val != online_val:
inconsistencies.append({
"entity_id": entity_id,
"feature": feature_name,
"issue": "value_mismatch",
"offline_value": offline_val,
"online_value": online_val
})
return {
"entities_tested": len(entity_ids),
"inconsistency_count": len(inconsistencies),
"inconsistency_rate": len(inconsistencies) / max(1, len(entity_ids)),
"inconsistencies": inconsistencies
}
def test_user_features_online_offline_consistent(feature_store):
tester = FeatureStoreConsistencyTester(
offline_store=feature_store.offline,
online_store=feature_store.online,
feature_view_name="user_purchase_features"
)
# Use a timestamp from the past where we know offline and online should agree
as_of = datetime.now() - timedelta(hours=6)
test_user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
result = tester.test_consistency(test_user_ids, as_of_timestamp=as_of)
assert result["inconsistency_rate"] < 0.01, \
f"Online/offline inconsistency rate {result['inconsistency_rate']:.2%} exceeds 1%\n" \
f"Inconsistencies:\n{result['inconsistencies'][:5]}"Testing Point-in-Time Correctness
Point-in-time (PIT) queries should only use data available before the label event:
def test_point_in_time_correctness(offline_store):
"""
Verify that historical feature queries don't use future data.
Test approach: create a known event at time T, verify that features
computed as of T-1 day don't include any data from after T-1 day.
"""
# Known entity with a purchase at a specific time
test_user_id = "test-user-pit-001"
purchase_time = datetime(2026, 1, 15, 10, 0, 0) # Jan 15 10am
# Seed test data
offline_store.seed_test_event(
entity_id=test_user_id,
event_type="purchase",
amount=500.0,
timestamp=purchase_time
)
# Query features as of 1 day before the purchase
as_of = purchase_time - timedelta(days=1)
features = offline_store.get_historical_features(
entity_ids=[test_user_id],
feature_view="user_purchase_features",
timestamp=as_of
)
user_features = features[test_user_id]
# The $500 purchase that happened AFTER as_of should NOT be included
# in any aggregation (total spend, purchase count, etc.)
# Get features as of 1 day AFTER the purchase (should include it)
as_of_after = purchase_time + timedelta(days=1)
features_after = offline_store.get_historical_features(
entity_ids=[test_user_id],
feature_view="user_purchase_features",
timestamp=as_of_after
)
user_features_after = features_after[test_user_id]
# Total spend should be higher after the purchase
assert user_features_after["total_spend_30d"] > user_features["total_spend_30d"], \
"Point-in-time query is not excluding future purchases correctly"
# Difference should match the seeded purchase amount
spend_diff = user_features_after["total_spend_30d"] - user_features["total_spend_30d"]
assert abs(spend_diff - 500.0) < 0.01, \
f"Expected $500 difference, got ${spend_diff:.2f} — possible PIT leakage"Testing Feature Freshness
Online feature stores must serve fresh data. Test that feature pipelines are running and producing up-to-date values:
from datetime import datetime, timezone
def test_feature_freshness(online_store, max_staleness_minutes: int = 30):
"""
Verify that all critical features were updated within the allowed staleness window.
"""
now = datetime.now(timezone.utc)
stale_features = []
critical_feature_views = [
"user_purchase_features",
"user_session_features",
"item_popularity_features"
]
for feature_view_name in critical_feature_views:
last_update = online_store.get_last_materialization_time(feature_view_name)
if last_update is None:
stale_features.append({
"feature_view": feature_view_name,
"issue": "never_materialized"
})
continue
staleness_minutes = (now - last_update).total_seconds() / 60
if staleness_minutes > max_staleness_minutes:
stale_features.append({
"feature_view": feature_view_name,
"last_update": last_update.isoformat(),
"staleness_minutes": staleness_minutes
})
assert len(stale_features) == 0, \
f"Stale features detected (max allowed: {max_staleness_minutes}min):\n" + \
"\n".join([f" {f['feature_view']}: {f.get('staleness_minutes', 'never')} min stale"
for f in stale_features])
def test_feature_serving_latency(online_store):
"""Feature serving should meet latency SLA for real-time inference."""
import time
test_entity_ids = ["user_001", "user_002", "user_003"]
latencies = []
for _ in range(100):
start = time.perf_counter_ns()
online_store.get_online_features(
entity_ids=test_entity_ids,
feature_view="user_purchase_features"
)
latencies.append((time.perf_counter_ns() - start) / 1_000_000)
latencies.sort()
p99 = latencies[int(0.99 * len(latencies))]
assert p99 < 20.0, f"Feature serving P99 {p99:.1f}ms exceeds 20ms SLA"Data Lineage Validation
Data lineage tracks how features are derived from source data. Testing lineage ensures that feature computation can be traced and audited:
class DataLineageValidator:
def __init__(self, lineage_registry):
self.registry = lineage_registry
def validate_feature_lineage(self, feature_name: str) -> dict:
"""
Verify that a feature has complete, valid lineage documentation.
"""
lineage = self.registry.get_lineage(feature_name)
issues = []
# Check lineage exists
if lineage is None:
return {"feature": feature_name, "valid": False,
"issues": ["No lineage registered"]}
# Check source is documented
if not lineage.get("source_tables"):
issues.append("Missing source table documentation")
# Check transformation logic is documented
if not lineage.get("transformation"):
issues.append("Missing transformation documentation")
# Check owner is assigned
if not lineage.get("owner"):
issues.append("No owner assigned")
# Check SLA is defined
if not lineage.get("freshness_sla_minutes"):
issues.append("No freshness SLA defined")
# Verify all upstream sources are themselves registered
for source in lineage.get("source_tables", []):
if not self.registry.table_is_registered(source):
issues.append(f"Source table '{source}' not in registry")
return {
"feature": feature_name,
"valid": len(issues) == 0,
"issues": issues,
"lineage": lineage
}
def test_all_critical_features_have_lineage(lineage_registry):
"""All features used in production models must have registered lineage."""
validator = DataLineageValidator(lineage_registry)
production_features = load_production_model_features()
all_issues = []
for feature in production_features:
result = validator.validate_feature_lineage(feature)
if not result["valid"]:
all_issues.append(f"{feature}: {', '.join(result['issues'])}")
assert len(all_issues) == 0, \
f"Features missing valid lineage:\n" + "\n".join(f" - {issue}" for issue in all_issues)Testing with Feast
If you're using Feast (a popular open-source feature store), here's how to write integration tests:
from feast import FeatureStore
import pandas as pd
import pytest
@pytest.fixture(scope="session")
def feast_store(tmp_path_factory):
"""Spin up a test Feast store with SQLite backend."""
store_path = tmp_path_factory.mktemp("feast")
store = FeatureStore(repo_path=str(store_path))
store.apply([user_feature_view, item_feature_view]) # Apply feature definitions
# Materialize test data
store.materialize(
start_date=pd.Timestamp("2026-01-01", tz="UTC"),
end_date=pd.Timestamp("2026-01-31", tz="UTC")
)
return store
def test_feast_online_features(feast_store):
entity_rows = [
{"user_id": "user_001"},
{"user_id": "user_002"}
]
features = feast_store.get_online_features(
features=["user_purchase_features:total_spend_30d",
"user_purchase_features:purchase_count_7d"],
entity_rows=entity_rows
).to_dict()
assert "total_spend_30d" in features
assert "purchase_count_7d" in features
assert len(features["total_spend_30d"]) == 2
def test_feast_historical_features(feast_store):
entity_df = pd.DataFrame({
"user_id": ["user_001", "user_002"],
"event_timestamp": [
pd.Timestamp("2026-01-20", tz="UTC"),
pd.Timestamp("2026-01-25", tz="UTC")
]
})
training_df = feast_store.get_historical_features(
entity_df=entity_df,
features=["user_purchase_features:total_spend_30d"]
).to_df()
assert len(training_df) == 2
assert "total_spend_30d" in training_df.columns
assert not training_df["total_spend_30d"].isna().all()Feature store tests are often overlooked until something goes wrong in production. A model that silently trains on leaked future data, or that's served stale features during a critical business period, can cause significant harm. Testing lineage, freshness, and online/offline consistency should be part of every ML deployment checklist.