Monte Carlo Data Observability Testing: Detecting Anomalies and Pipeline Failures

Monte Carlo Data Observability Testing: Detecting Anomalies and Pipeline Failures

Monte Carlo is a data observability platform that monitors data pipelines using ML-powered anomaly detection rather than manually defined thresholds. Instead of writing checks for every column, Monte Carlo learns your data's normal behavior and alerts when something changes unexpectedly. This guide covers how to test with Monte Carlo, integrate it into your pipelines, and use it for effective data incident response.

Monte Carlo's Approach vs. Threshold-Based Testing

Traditional data quality tools (Soda Core, Great Expectations) require you to define explicit rules:

# Traditional threshold-based
- row_count > 1000
- missing_percent(amount) < 1%

Monte Carlo learns baselines automatically:

  • It observes your data for 1-2 weeks
  • Builds statistical models of normal behavior
  • Alerts when metrics deviate from learned patterns
  • No manual threshold configuration needed

This catches anomalies you didn't think to write rules for — like a pipeline running at 3am when it normally runs at 6am, or row counts that are in the "valid" range but are 40% lower than usual.

Table Health Monitors

Monte Carlo's five table health monitors automatically track every table you connect:

  1. Volume — row count and table size changes
  2. Freshness — time since last update
  3. Schema — column additions, deletions, and type changes
  4. Field health — null rates, unique rates, and distribution for every column
  5. Distribution — value distribution shifts for categorical and numerical columns

Enabling Auto-Monitors via API

from pycarlo.core import Client, Query, Mutation

client = Client()

# Query existing monitors for a table
query = Query()
query.get_table_monitors(
    mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders'
)
result = client(query)
monitors = result.get_table_monitors

# Enable a specific monitor
mutation = Mutation()
mutation.set_table_monitor(
    mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders',
    monitor_type='VOLUME',
    enabled=True
)
client(mutation)

Writing Custom SQL Monitors

Beyond automatic monitors, define custom SQL rules for business logic:

# Create a custom SQL monitor via API
mutation = Mutation()
mutation.create_custom_sql_monitor(
    connection_id='your-connection-id',
    name='No negative amounts on delivered orders',
    description='Delivered orders must have positive amounts',
    query="""
        SELECT COUNT(*) as count
        FROM `project.dataset.orders`
        WHERE status = 'delivered'
          AND amount <= 0
    """,
    threshold_type='ANOMALY',  # ML-based, or 'ABSOLUTE' for fixed threshold
    severity='HIGH',
    schedules=[{'intervalMinutes': 60}]
)
result = client(mutation)
print(f"Monitor created: {result.create_custom_sql_monitor.monitor.uuid}")

Testing Data Lineage

Monte Carlo builds lineage graphs automatically. Use the API to verify your expected lineage:

def test_orders_lineage():
    """Verify the orders table has expected upstream dependencies"""
    client = Client()
    query = Query()
    
    query.get_table_lineage(
        mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders',
        direction='UPSTREAM',
        hops=2
    )
    result = client(query)
    
    upstream_tables = {
        node.mcon 
        for node in result.get_table_lineage.nodes
    }
    
    # Verify expected sources are upstream
    assert any('raw_orders' in mcon for mcon in upstream_tables), \
        "raw_orders should be upstream of orders"
    assert any('customers' in mcon for mcon in upstream_tables), \
        "customers should be upstream of orders"

def test_no_unexpected_downstream_dependencies():
    """Detect unexpected consumers that weren't in the original design"""
    query = Query()
    query.get_table_lineage(
        mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders',
        direction='DOWNSTREAM',
        hops=1
    )
    result = client(query)
    
    downstream = {node.full_table_id for node in result.get_table_lineage.nodes}
    allowed_downstream = {
        'project.dataset.orders_summary',
        'project.dataset.customer_orders',
        'project.dataset.revenue_metrics',
    }
    
    unexpected = downstream - allowed_downstream
    assert not unexpected, \
        f"Unexpected downstream consumers detected: {unexpected}"

Querying Monitor Status Programmatically

Use Monte Carlo's API to query monitor status in your CI/CD pipeline:

import time
from pycarlo.core import Client, Query

def get_table_health(mcon: str, lookback_hours: int = 4) -> dict:
    """Query Monte Carlo for recent anomalies on a table"""
    client = Client()
    query = Query()
    
    query.get_anomalies(
        mcon=mcon,
        start_time=int((time.time() - lookback_hours * 3600) * 1000),
        end_time=int(time.time() * 1000),
    )
    result = client(query)
    
    anomalies = result.get_anomalies.anomalies or []
    return {
        'table_mcon': mcon,
        'anomaly_count': len(anomalies),
        'critical_count': sum(1 for a in anomalies if a.severity == 'HIGH'),
        'anomalies': [
            {
                'type': a.type,
                'severity': a.severity,
                'field': a.field_name,
                'description': a.description,
                'detected_at': a.created_time,
            }
            for a in anomalies
        ],
    }

def fail_pipeline_on_critical_anomalies(table_mcon: str):
    """Use in ETL pipelines to gate on data quality"""
    health = get_table_health(table_mcon, lookback_hours=2)
    
    if health['critical_count'] > 0:
        critical = [a for a in health['anomalies'] if a['severity'] == 'HIGH']
        raise Exception(
            f"Blocking pipeline: {health['critical_count']} critical anomalies detected on {table_mcon}:\n" +
            "\n".join(f"  - {a['type']}: {a['description']}" for a in critical)
        )

CI/CD Integration

Gate your pipeline deployments on Monte Carlo health:

# scripts/check_data_health.py
"""
Pre-deploy check: verify no active anomalies on critical tables
"""
import sys
from monte_carlo_client import get_table_health

CRITICAL_TABLES = [
    'MCON++account++bigquery++bigquery++project.dataset.orders',
    'MCON++account++bigquery++bigquery++project.dataset.customers',
    'MCON++account++bigquery++bigquery++project.dataset.events',
]

def main():
    issues = []
    
    for table_mcon in CRITICAL_TABLES:
        health = get_table_health(table_mcon, lookback_hours=4)
        
        if health['critical_count'] > 0:
            for anomaly in health['anomalies']:
                if anomaly['severity'] == 'HIGH':
                    issues.append({
                        'table': table_mcon.split('++')[-1],
                        'issue': f"{anomaly['type']}: {anomaly['description']}",
                    })
    
    if issues:
        print("BLOCKING DEPLOY: Active data quality issues detected:")
        for issue in issues:
            print(f"  [{issue['table']}] {issue['issue']}")
        sys.exit(1)
    
    print("All critical tables healthy. Proceeding with deploy.")
    sys.exit(0)

if __name__ == '__main__':
    main()
# .github/workflows/deploy-pipeline.yml
steps:
  - name: Check data health before deploy
    run: python scripts/check_data_health.py
    env:
      MONTE_CARLO_API_ID: ${{ secrets.MC_API_ID }}
      MONTE_CARLO_API_TOKEN: ${{ secrets.MC_API_TOKEN }}

  - name: Deploy pipeline (only if health check passes)
    run: dbt run --select orders_pipeline

Testing Monte Carlo's Schema Change Detection

Schema changes are common breaking incidents. Verify Monte Carlo detects them:

def test_schema_change_creates_incident():
    """Integration test: verify Monte Carlo fires on schema changes"""
    # Note: This test runs against a test data source, not production
    
    # Get the current incident count
    initial_incidents = get_open_incidents(table_mcon=TEST_TABLE_MCON)
    
    # Simulate a schema change (drop a column)
    execute_sql(f"ALTER TABLE {TEST_TABLE} DROP COLUMN test_field")
    
    # Wait for Monte Carlo to detect the change (up to 10 minutes)
    max_wait = 600
    interval = 30
    waited = 0
    
    new_incident = None
    while waited < max_wait:
        time.sleep(interval)
        waited += interval
        current_incidents = get_open_incidents(table_mcon=TEST_TABLE_MCON)
        new_incidents = set(current_incidents) - set(initial_incidents)
        if new_incidents:
            new_incident = list(new_incidents)[0]
            break
    
    assert new_incident is not None, \
        f"Monte Carlo did not create an incident after schema change (waited {waited}s)"
    assert 'SCHEMA' in new_incident.type.upper(), \
        f"Expected schema change incident, got: {new_incident.type}"
    
    # Restore the column
    execute_sql(f"ALTER TABLE {TEST_TABLE} ADD COLUMN test_field STRING")

Incident Response Testing

Monte Carlo's value extends to incident response. Test your response procedures:

def test_incident_response_runbook():
    """Verify the incident response automation works correctly"""
    
    # Simulate a Monte Carlo webhook payload
    webhook_payload = {
        "event_type": "anomaly_detected",
        "severity": "HIGH",
        "table_mcon": "MCON++test++bigquery++bigquery++test.dataset.orders",
        "anomaly_type": "VOLUME",
        "description": "Row count dropped 65% vs baseline",
        "detected_at": "2026-05-17T14:23:00Z",
    }
    
    # Test your incident handler
    response = handle_monte_carlo_incident(webhook_payload)
    
    assert response.slack_notification_sent is True
    assert response.on_call_paged is True  # HIGH severity should page
    assert response.incident_ticket_created is True
    assert response.runbook_link is not None

Summary

Monte Carlo data observability testing requires: using pycarlo to query monitor status and anomalies in CI pipelines, writing custom SQL monitors for business logic rules, testing lineage relationships to catch unexpected dependencies, gating ETL deployments on active anomaly checks, integration tests that verify Monte Carlo fires on schema changes, and incident response tests that validate your automation handles webhooks correctly. Monte Carlo's ML-based detection catches the anomalies you didn't think to write rules for — but you still need to verify the detection and response pipeline works end to end.

Read more