Monte Carlo Data Observability Testing: Detecting Anomalies and Pipeline Failures
Monte Carlo is a data observability platform that monitors data pipelines using ML-powered anomaly detection rather than manually defined thresholds. Instead of writing checks for every column, Monte Carlo learns your data's normal behavior and alerts when something changes unexpectedly. This guide covers how to test with Monte Carlo, integrate it into your pipelines, and use it for effective data incident response.
Monte Carlo's Approach vs. Threshold-Based Testing
Traditional data quality tools (Soda Core, Great Expectations) require you to define explicit rules:
# Traditional threshold-based
- row_count > 1000
- missing_percent(amount) < 1%Monte Carlo learns baselines automatically:
- It observes your data for 1-2 weeks
- Builds statistical models of normal behavior
- Alerts when metrics deviate from learned patterns
- No manual threshold configuration needed
This catches anomalies you didn't think to write rules for — like a pipeline running at 3am when it normally runs at 6am, or row counts that are in the "valid" range but are 40% lower than usual.
Table Health Monitors
Monte Carlo's five table health monitors automatically track every table you connect:
- Volume — row count and table size changes
- Freshness — time since last update
- Schema — column additions, deletions, and type changes
- Field health — null rates, unique rates, and distribution for every column
- Distribution — value distribution shifts for categorical and numerical columns
Enabling Auto-Monitors via API
from pycarlo.core import Client, Query, Mutation
client = Client()
# Query existing monitors for a table
query = Query()
query.get_table_monitors(
mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders'
)
result = client(query)
monitors = result.get_table_monitors
# Enable a specific monitor
mutation = Mutation()
mutation.set_table_monitor(
mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders',
monitor_type='VOLUME',
enabled=True
)
client(mutation)Writing Custom SQL Monitors
Beyond automatic monitors, define custom SQL rules for business logic:
# Create a custom SQL monitor via API
mutation = Mutation()
mutation.create_custom_sql_monitor(
connection_id='your-connection-id',
name='No negative amounts on delivered orders',
description='Delivered orders must have positive amounts',
query="""
SELECT COUNT(*) as count
FROM `project.dataset.orders`
WHERE status = 'delivered'
AND amount <= 0
""",
threshold_type='ANOMALY', # ML-based, or 'ABSOLUTE' for fixed threshold
severity='HIGH',
schedules=[{'intervalMinutes': 60}]
)
result = client(mutation)
print(f"Monitor created: {result.create_custom_sql_monitor.monitor.uuid}")Testing Data Lineage
Monte Carlo builds lineage graphs automatically. Use the API to verify your expected lineage:
def test_orders_lineage():
"""Verify the orders table has expected upstream dependencies"""
client = Client()
query = Query()
query.get_table_lineage(
mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders',
direction='UPSTREAM',
hops=2
)
result = client(query)
upstream_tables = {
node.mcon
for node in result.get_table_lineage.nodes
}
# Verify expected sources are upstream
assert any('raw_orders' in mcon for mcon in upstream_tables), \
"raw_orders should be upstream of orders"
assert any('customers' in mcon for mcon in upstream_tables), \
"customers should be upstream of orders"
def test_no_unexpected_downstream_dependencies():
"""Detect unexpected consumers that weren't in the original design"""
query = Query()
query.get_table_lineage(
mcon='MCON++account-id++bigquery++bigquery++project.dataset.orders',
direction='DOWNSTREAM',
hops=1
)
result = client(query)
downstream = {node.full_table_id for node in result.get_table_lineage.nodes}
allowed_downstream = {
'project.dataset.orders_summary',
'project.dataset.customer_orders',
'project.dataset.revenue_metrics',
}
unexpected = downstream - allowed_downstream
assert not unexpected, \
f"Unexpected downstream consumers detected: {unexpected}"Querying Monitor Status Programmatically
Use Monte Carlo's API to query monitor status in your CI/CD pipeline:
import time
from pycarlo.core import Client, Query
def get_table_health(mcon: str, lookback_hours: int = 4) -> dict:
"""Query Monte Carlo for recent anomalies on a table"""
client = Client()
query = Query()
query.get_anomalies(
mcon=mcon,
start_time=int((time.time() - lookback_hours * 3600) * 1000),
end_time=int(time.time() * 1000),
)
result = client(query)
anomalies = result.get_anomalies.anomalies or []
return {
'table_mcon': mcon,
'anomaly_count': len(anomalies),
'critical_count': sum(1 for a in anomalies if a.severity == 'HIGH'),
'anomalies': [
{
'type': a.type,
'severity': a.severity,
'field': a.field_name,
'description': a.description,
'detected_at': a.created_time,
}
for a in anomalies
],
}
def fail_pipeline_on_critical_anomalies(table_mcon: str):
"""Use in ETL pipelines to gate on data quality"""
health = get_table_health(table_mcon, lookback_hours=2)
if health['critical_count'] > 0:
critical = [a for a in health['anomalies'] if a['severity'] == 'HIGH']
raise Exception(
f"Blocking pipeline: {health['critical_count']} critical anomalies detected on {table_mcon}:\n" +
"\n".join(f" - {a['type']}: {a['description']}" for a in critical)
)CI/CD Integration
Gate your pipeline deployments on Monte Carlo health:
# scripts/check_data_health.py
"""
Pre-deploy check: verify no active anomalies on critical tables
"""
import sys
from monte_carlo_client import get_table_health
CRITICAL_TABLES = [
'MCON++account++bigquery++bigquery++project.dataset.orders',
'MCON++account++bigquery++bigquery++project.dataset.customers',
'MCON++account++bigquery++bigquery++project.dataset.events',
]
def main():
issues = []
for table_mcon in CRITICAL_TABLES:
health = get_table_health(table_mcon, lookback_hours=4)
if health['critical_count'] > 0:
for anomaly in health['anomalies']:
if anomaly['severity'] == 'HIGH':
issues.append({
'table': table_mcon.split('++')[-1],
'issue': f"{anomaly['type']}: {anomaly['description']}",
})
if issues:
print("BLOCKING DEPLOY: Active data quality issues detected:")
for issue in issues:
print(f" [{issue['table']}] {issue['issue']}")
sys.exit(1)
print("All critical tables healthy. Proceeding with deploy.")
sys.exit(0)
if __name__ == '__main__':
main()# .github/workflows/deploy-pipeline.yml
steps:
- name: Check data health before deploy
run: python scripts/check_data_health.py
env:
MONTE_CARLO_API_ID: ${{ secrets.MC_API_ID }}
MONTE_CARLO_API_TOKEN: ${{ secrets.MC_API_TOKEN }}
- name: Deploy pipeline (only if health check passes)
run: dbt run --select orders_pipelineTesting Monte Carlo's Schema Change Detection
Schema changes are common breaking incidents. Verify Monte Carlo detects them:
def test_schema_change_creates_incident():
"""Integration test: verify Monte Carlo fires on schema changes"""
# Note: This test runs against a test data source, not production
# Get the current incident count
initial_incidents = get_open_incidents(table_mcon=TEST_TABLE_MCON)
# Simulate a schema change (drop a column)
execute_sql(f"ALTER TABLE {TEST_TABLE} DROP COLUMN test_field")
# Wait for Monte Carlo to detect the change (up to 10 minutes)
max_wait = 600
interval = 30
waited = 0
new_incident = None
while waited < max_wait:
time.sleep(interval)
waited += interval
current_incidents = get_open_incidents(table_mcon=TEST_TABLE_MCON)
new_incidents = set(current_incidents) - set(initial_incidents)
if new_incidents:
new_incident = list(new_incidents)[0]
break
assert new_incident is not None, \
f"Monte Carlo did not create an incident after schema change (waited {waited}s)"
assert 'SCHEMA' in new_incident.type.upper(), \
f"Expected schema change incident, got: {new_incident.type}"
# Restore the column
execute_sql(f"ALTER TABLE {TEST_TABLE} ADD COLUMN test_field STRING")Incident Response Testing
Monte Carlo's value extends to incident response. Test your response procedures:
def test_incident_response_runbook():
"""Verify the incident response automation works correctly"""
# Simulate a Monte Carlo webhook payload
webhook_payload = {
"event_type": "anomaly_detected",
"severity": "HIGH",
"table_mcon": "MCON++test++bigquery++bigquery++test.dataset.orders",
"anomaly_type": "VOLUME",
"description": "Row count dropped 65% vs baseline",
"detected_at": "2026-05-17T14:23:00Z",
}
# Test your incident handler
response = handle_monte_carlo_incident(webhook_payload)
assert response.slack_notification_sent is True
assert response.on_call_paged is True # HIGH severity should page
assert response.incident_ticket_created is True
assert response.runbook_link is not NoneSummary
Monte Carlo data observability testing requires: using pycarlo to query monitor status and anomalies in CI pipelines, writing custom SQL monitors for business logic rules, testing lineage relationships to catch unexpected dependencies, gating ETL deployments on active anomaly checks, integration tests that verify Monte Carlo fires on schema changes, and incident response tests that validate your automation handles webhooks correctly. Monte Carlo's ML-based detection catches the anomalies you didn't think to write rules for — but you still need to verify the detection and response pipeline works end to end.