Feature Store Testing and Data Lineage Validation

Feature Store Testing and Data Lineage Validation

Feature stores solve the problem of feature reuse and training/serving consistency in ML systems. They also introduce a new category of bugs: features that compute correctly in isolation but produce wrong values when served at inference time due to point-in-time leakage, online/offline discrepancies, or stale data.

Testing a feature store means validating not just that features compute correctly, but that they're consistent, fresh, and available when the model needs them.

What Can Go Wrong in Feature Stores

Online/offline skew — The offline store (used for training) and online store (used for serving) compute features differently. The model trains on one distribution and serves on another.

Point-in-time contamination — A feature computed for training uses data from after the label event. Example: computing "purchases in the last 7 days" for a churn prediction using purchases that happened after the customer already churned.

Feature staleness — Online feature values are stale because the pipeline failed or ran late. The model receives yesterday's data while making today's decisions.

Schema changes — A feature's type or range changed due to an upstream schema migration. The model silently receives out-of-range values.

Testing Online/Offline Consistency

The most critical test: do the same features compute to the same values in online and offline stores?

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import pytest

class FeatureStoreConsistencyTester:
    def __init__(self, offline_store, online_store, feature_view_name: str):
        self.offline = offline_store
        self.online = online_store
        self.feature_view = feature_view_name
    
    def test_consistency(self, entity_ids: list, as_of_timestamp: datetime,
                         tolerance: float = 1e-6) -> dict:
        """
        Compare features for the same entities between online and offline stores.
        Returns a report of inconsistencies.
        """
        # Get features from offline store (point-in-time query)
        offline_features = self.offline.get_historical_features(
            entity_ids=entity_ids,
            feature_view=self.feature_view,
            timestamp=as_of_timestamp
        )
        
        # Get features from online store (current values)
        online_features = self.online.get_online_features(
            entity_ids=entity_ids,
            feature_view=self.feature_view
        )
        
        inconsistencies = []
        
        for entity_id in entity_ids:
            offline_row = offline_features[entity_id]
            online_row = online_features[entity_id]
            
            for feature_name in offline_row:
                offline_val = offline_row[feature_name]
                online_val = online_row.get(feature_name)
                
                if online_val is None:
                    inconsistencies.append({
                        "entity_id": entity_id,
                        "feature": feature_name,
                        "issue": "missing_in_online_store",
                        "offline_value": offline_val,
                        "online_value": None
                    })
                elif isinstance(offline_val, float) and isinstance(online_val, float):
                    if abs(offline_val - online_val) > tolerance:
                        inconsistencies.append({
                            "entity_id": entity_id,
                            "feature": feature_name,
                            "issue": "value_mismatch",
                            "offline_value": offline_val,
                            "online_value": online_val,
                            "diff": abs(offline_val - online_val)
                        })
                elif offline_val != online_val:
                    inconsistencies.append({
                        "entity_id": entity_id,
                        "feature": feature_name,
                        "issue": "value_mismatch",
                        "offline_value": offline_val,
                        "online_value": online_val
                    })
        
        return {
            "entities_tested": len(entity_ids),
            "inconsistency_count": len(inconsistencies),
            "inconsistency_rate": len(inconsistencies) / max(1, len(entity_ids)),
            "inconsistencies": inconsistencies
        }

def test_user_features_online_offline_consistent(feature_store):
    tester = FeatureStoreConsistencyTester(
        offline_store=feature_store.offline,
        online_store=feature_store.online,
        feature_view_name="user_purchase_features"
    )
    
    # Use a timestamp from the past where we know offline and online should agree
    as_of = datetime.now() - timedelta(hours=6)
    test_user_ids = ["user_001", "user_002", "user_003", "user_004", "user_005"]
    
    result = tester.test_consistency(test_user_ids, as_of_timestamp=as_of)
    
    assert result["inconsistency_rate"] < 0.01, \
        f"Online/offline inconsistency rate {result['inconsistency_rate']:.2%} exceeds 1%\n" \
        f"Inconsistencies:\n{result['inconsistencies'][:5]}"

Testing Point-in-Time Correctness

Point-in-time (PIT) queries should only use data available before the label event:

def test_point_in_time_correctness(offline_store):
    """
    Verify that historical feature queries don't use future data.
    
    Test approach: create a known event at time T, verify that features
    computed as of T-1 day don't include any data from after T-1 day.
    """
    # Known entity with a purchase at a specific time
    test_user_id = "test-user-pit-001"
    purchase_time = datetime(2026, 1, 15, 10, 0, 0)  # Jan 15 10am
    
    # Seed test data
    offline_store.seed_test_event(
        entity_id=test_user_id,
        event_type="purchase",
        amount=500.0,
        timestamp=purchase_time
    )
    
    # Query features as of 1 day before the purchase
    as_of = purchase_time - timedelta(days=1)
    features = offline_store.get_historical_features(
        entity_ids=[test_user_id],
        feature_view="user_purchase_features",
        timestamp=as_of
    )
    
    user_features = features[test_user_id]
    
    # The $500 purchase that happened AFTER as_of should NOT be included
    # in any aggregation (total spend, purchase count, etc.)
    
    # Get features as of 1 day AFTER the purchase (should include it)
    as_of_after = purchase_time + timedelta(days=1)
    features_after = offline_store.get_historical_features(
        entity_ids=[test_user_id],
        feature_view="user_purchase_features",
        timestamp=as_of_after
    )
    user_features_after = features_after[test_user_id]
    
    # Total spend should be higher after the purchase
    assert user_features_after["total_spend_30d"] > user_features["total_spend_30d"], \
        "Point-in-time query is not excluding future purchases correctly"
    
    # Difference should match the seeded purchase amount
    spend_diff = user_features_after["total_spend_30d"] - user_features["total_spend_30d"]
    assert abs(spend_diff - 500.0) < 0.01, \
        f"Expected $500 difference, got ${spend_diff:.2f} — possible PIT leakage"

Testing Feature Freshness

Online feature stores must serve fresh data. Test that feature pipelines are running and producing up-to-date values:

from datetime import datetime, timezone

def test_feature_freshness(online_store, max_staleness_minutes: int = 30):
    """
    Verify that all critical features were updated within the allowed staleness window.
    """
    now = datetime.now(timezone.utc)
    stale_features = []
    
    critical_feature_views = [
        "user_purchase_features",
        "user_session_features",
        "item_popularity_features"
    ]
    
    for feature_view_name in critical_feature_views:
        last_update = online_store.get_last_materialization_time(feature_view_name)
        
        if last_update is None:
            stale_features.append({
                "feature_view": feature_view_name,
                "issue": "never_materialized"
            })
            continue
        
        staleness_minutes = (now - last_update).total_seconds() / 60
        
        if staleness_minutes > max_staleness_minutes:
            stale_features.append({
                "feature_view": feature_view_name,
                "last_update": last_update.isoformat(),
                "staleness_minutes": staleness_minutes
            })
    
    assert len(stale_features) == 0, \
        f"Stale features detected (max allowed: {max_staleness_minutes}min):\n" + \
        "\n".join([f"  {f['feature_view']}: {f.get('staleness_minutes', 'never')} min stale" 
                   for f in stale_features])

def test_feature_serving_latency(online_store):
    """Feature serving should meet latency SLA for real-time inference."""
    import time
    
    test_entity_ids = ["user_001", "user_002", "user_003"]
    
    latencies = []
    for _ in range(100):
        start = time.perf_counter_ns()
        online_store.get_online_features(
            entity_ids=test_entity_ids,
            feature_view="user_purchase_features"
        )
        latencies.append((time.perf_counter_ns() - start) / 1_000_000)
    
    latencies.sort()
    p99 = latencies[int(0.99 * len(latencies))]
    
    assert p99 < 20.0, f"Feature serving P99 {p99:.1f}ms exceeds 20ms SLA"

Data Lineage Validation

Data lineage tracks how features are derived from source data. Testing lineage ensures that feature computation can be traced and audited:

class DataLineageValidator:
    def __init__(self, lineage_registry):
        self.registry = lineage_registry
    
    def validate_feature_lineage(self, feature_name: str) -> dict:
        """
        Verify that a feature has complete, valid lineage documentation.
        """
        lineage = self.registry.get_lineage(feature_name)
        issues = []
        
        # Check lineage exists
        if lineage is None:
            return {"feature": feature_name, "valid": False, 
                    "issues": ["No lineage registered"]}
        
        # Check source is documented
        if not lineage.get("source_tables"):
            issues.append("Missing source table documentation")
        
        # Check transformation logic is documented
        if not lineage.get("transformation"):
            issues.append("Missing transformation documentation")
        
        # Check owner is assigned
        if not lineage.get("owner"):
            issues.append("No owner assigned")
        
        # Check SLA is defined
        if not lineage.get("freshness_sla_minutes"):
            issues.append("No freshness SLA defined")
        
        # Verify all upstream sources are themselves registered
        for source in lineage.get("source_tables", []):
            if not self.registry.table_is_registered(source):
                issues.append(f"Source table '{source}' not in registry")
        
        return {
            "feature": feature_name,
            "valid": len(issues) == 0,
            "issues": issues,
            "lineage": lineage
        }

def test_all_critical_features_have_lineage(lineage_registry):
    """All features used in production models must have registered lineage."""
    validator = DataLineageValidator(lineage_registry)
    
    production_features = load_production_model_features()
    
    all_issues = []
    for feature in production_features:
        result = validator.validate_feature_lineage(feature)
        if not result["valid"]:
            all_issues.append(f"{feature}: {', '.join(result['issues'])}")
    
    assert len(all_issues) == 0, \
        f"Features missing valid lineage:\n" + "\n".join(f"  - {issue}" for issue in all_issues)

Testing with Feast

If you're using Feast (a popular open-source feature store), here's how to write integration tests:

from feast import FeatureStore
import pandas as pd
import pytest

@pytest.fixture(scope="session")
def feast_store(tmp_path_factory):
    """Spin up a test Feast store with SQLite backend."""
    store_path = tmp_path_factory.mktemp("feast")
    store = FeatureStore(repo_path=str(store_path))
    store.apply([user_feature_view, item_feature_view])  # Apply feature definitions
    
    # Materialize test data
    store.materialize(
        start_date=pd.Timestamp("2026-01-01", tz="UTC"),
        end_date=pd.Timestamp("2026-01-31", tz="UTC")
    )
    
    return store

def test_feast_online_features(feast_store):
    entity_rows = [
        {"user_id": "user_001"},
        {"user_id": "user_002"}
    ]
    
    features = feast_store.get_online_features(
        features=["user_purchase_features:total_spend_30d",
                  "user_purchase_features:purchase_count_7d"],
        entity_rows=entity_rows
    ).to_dict()
    
    assert "total_spend_30d" in features
    assert "purchase_count_7d" in features
    assert len(features["total_spend_30d"]) == 2

def test_feast_historical_features(feast_store):
    entity_df = pd.DataFrame({
        "user_id": ["user_001", "user_002"],
        "event_timestamp": [
            pd.Timestamp("2026-01-20", tz="UTC"),
            pd.Timestamp("2026-01-25", tz="UTC")
        ]
    })
    
    training_df = feast_store.get_historical_features(
        entity_df=entity_df,
        features=["user_purchase_features:total_spend_30d"]
    ).to_df()
    
    assert len(training_df) == 2
    assert "total_spend_30d" in training_df.columns
    assert not training_df["total_spend_30d"].isna().all()

Feature store tests are often overlooked until something goes wrong in production. A model that silently trains on leaked future data, or that's served stale features during a critical business period, can cause significant harm. Testing lineage, freshness, and online/offline consistency should be part of every ML deployment checklist.

Read more