Snowflake Data Testing: dbt + Great Expectations for Data Quality
Testing data quality in Snowflake requires both SQL-based tests (dbt works natively with Snowflake) and Python-based data validation (Great Expectations connects via the Snowflake SQLAlchemy adapter). Run dbt tests for schema and transformation validation, Great Expectations checkpoints for statistical and pattern-based assertions, and scheduled CI pipelines to catch issues before they reach dashboards.
Key Takeaways
dbt connects to Snowflake with a warehouse, database, and schema in profiles.yml. Tests run SQL directly against your Snowflake tables—no data leaves the warehouse.
Great Expectations uses the Snowflake SQLAlchemy connection string. Install sqlalchemy-snowflake, configure the connection, and point it at any Snowflake table or view.
Zero-copy cloning enables fast isolated test environments. Clone production databases in seconds with no storage cost for unmodified data, run tests in the clone, and drop it when done.
Time Travel supports historical validation. Query data as it existed at a previous point in time—useful for comparing current state against a known-good baseline.
Snowflake query tagging helps track test costs. Set ALTER SESSION SET QUERY_TAG before running tests to identify test queries in usage data.
Why Snowflake Needs a Testing Strategy
Snowflake's elastic compute and separation of storage from compute makes it attractive for data warehousing, but these same properties make data quality issues expensive: bad data gets queried by many concurrent warehouses before anyone notices. A testing strategy catches data quality issues at ingestion and transformation time, not after the fact.
The combination of dbt (SQL-based schema and transformation tests) and Great Expectations (statistical and pattern assertions) covers the two main classes of data quality issues: structural problems (wrong schema, broken foreign keys, duplicates) and semantic problems (values out of expected range, unusual distributions, suspicious null rates).
Snowflake + dbt Setup
Configure your Snowflake connection in ~/.dbt/profiles.yml:
my_project:
target: dev
outputs:
dev:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: TRANSFORMER
database: RAW_DEV
warehouse: TRANSFORM_WH
schema: PUBLIC
threads: 4
client_session_keep_alive: false
prod:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
private_key_path: "{{ env_var('SNOWFLAKE_PRIVATE_KEY_PATH') }}"
role: TRANSFORMER
database: RAW_PROD
warehouse: TRANSFORM_WH
schema: PUBLIC
threads: 8Verify the connection:
dbt debugdbt Schema Tests for Snowflake
Configure tests in schema.yml files. dbt generates SQL that runs directly against your Snowflake tables:
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
description: "Staged orders from the raw layer"
columns:
- name: order_id
tests:
- not_null
- unique
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: order_status
tests:
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded']
- name: order_total
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100000
- name: created_at
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: "2020-01-01"
max_value: "2030-12-31"
parse_strings_as_datetimes: trueRun tests against Snowflake:
dbt test --target prod --<span class="hljs-keyword">select stg_ordersTesting Snowflake-Specific Patterns
Snowflake's VARIANT type (semi-structured data) requires specific test patterns:
- name: raw_payload
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_json_parseableFor JSON fields within VARIANT columns, use a custom generic test:
-- tests/generic/assert_variant_field_not_null.sql
{% test assert_variant_field_not_null(model, column_name, field_path) %}
select *
from {{ model }}
where {{ column_name }}:{{ field_path }} is null
{% endtest %} - name: raw_payload
tests:
- assert_variant_field_not_null:
field_path: "order.customer_id"Zero-Copy Cloning for Test Environments
Snowflake's zero-copy cloning lets you create isolated test databases instantly without duplicating storage:
-- Create a test environment from production
CREATE DATABASE RAW_TEST CLONE RAW_PROD;
-- Create a test schema
CREATE SCHEMA RAW_TEST.TESTING CLONE RAW_PROD.PUBLIC;Use this in CI to run tests against real data in isolation:
#!/bin/bash
<span class="hljs-comment"># ci-test.sh
<span class="hljs-comment"># Create test database
snowsql -q <span class="hljs-string">"CREATE DATABASE ${CI_DATABASE} CLONE RAW_PROD;"
<span class="hljs-comment"># Run dbt tests against the clone
dbt <span class="hljs-built_in">test --target ci --vars <span class="hljs-string">"database: ${CI_DATABASE}"
<span class="hljs-comment"># Drop the clone when done
snowsql -q <span class="hljs-string">"DROP DATABASE ${CI_DATABASE};"This approach gives you:
- Real production data shapes and volumes for tests
- Complete isolation (tests don't affect production)
- Near-zero cost for storage (only changed data incurs charges)
- Fast setup (seconds, not minutes)
Great Expectations + Snowflake
Install the required packages:
pip install great-expectations sqlalchemy-snowflake snowflake-connector-pythonConfigure a Snowflake Datasource
import great_expectations as gx
context = gx.get_context()
# Add Snowflake as a SQL datasource
datasource = context.sources.add_or_update_sql(
name="snowflake_datasource",
connection_string=(
"snowflake://{user}:{password}@{account}/{database}/{schema}"
"?warehouse={warehouse}&role={role}"
).format(
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
account=os.environ["SNOWFLAKE_ACCOUNT"],
database="RAW_PROD",
schema="PUBLIC",
warehouse="VALIDATION_WH",
role="VALIDATOR"
)
)Add Table and Query Assets
# Test a specific table
table_asset = datasource.add_table_asset(
name="stg_orders",
table_name="STG_ORDERS",
schema_name="PUBLIC"
)
# Test a specific query (useful for testing a subset)
query_asset = datasource.add_query_asset(
name="recent_orders",
query="SELECT * FROM STG_ORDERS WHERE CREATED_AT >= DATEADD(day, -7, CURRENT_DATE())"
)Define and Run Expectations
# Create an Expectation Suite for orders
suite = context.add_or_update_expectation_suite("snowflake_orders_suite")
batch_request = table_asset.build_batch_request()
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="snowflake_orders_suite"
)
# Structural assertions
validator.expect_table_columns_to_match_set(
column_set=["ORDER_ID", "CUSTOMER_ID", "ORDER_TOTAL", "STATUS", "CREATED_AT"],
exact_match=False # Allows additional columns
)
validator.expect_table_row_count_to_be_between(min_value=10000)
# Column-level assertions
validator.expect_column_values_to_not_be_null("ORDER_ID")
validator.expect_column_values_to_be_unique("ORDER_ID")
validator.expect_column_values_to_be_between(
"ORDER_TOTAL",
min_value=0,
max_value=500000
)
validator.expect_column_values_to_be_in_set(
"STATUS",
value_set=["pending", "processing", "shipped", "delivered", "cancelled"]
)
# Statistical assertions (catch sudden distribution shifts)
validator.expect_column_mean_to_be_between("ORDER_TOTAL", min_value=50, max_value=1000)
validator.expect_column_stdev_to_be_between("ORDER_TOTAL", min_value=10, max_value=5000)
# Null rate checks
validator.expect_column_values_to_not_be_null("CUSTOMER_ID", mostly=0.99)
validator.save_expectation_suite()Snowflake Data Freshness Testing
Test that data is being loaded on schedule by checking the most recent record timestamp:
-- Custom dbt test for data freshness
-- tests/generic/assert_data_freshness.sql
{% test assert_data_freshness(model, column_name, max_age_hours) %}
select
max({{ column_name }}) as latest_record,
current_timestamp() as now,
datediff('hour', max({{ column_name }}), current_timestamp()) as hours_since_latest
from {{ model }}
having datediff('hour', max({{ column_name }}), current_timestamp()) > {{ max_age_hours }}
{% endtest %}models:
- name: stg_orders
tests:
- assert_data_freshness:
column_name: created_at
max_age_hours: 25 # Data should be loaded dailyUsing dbt's built-in source freshness:
sources:
- name: raw
database: RAW_PROD
schema: PUBLIC
tables:
- name: raw_orders
loaded_at_field: _FIVETRAN_SYNCED
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}Run freshness checks:
dbt source freshnessSnowflake Time Travel for Historical Validation
Use Snowflake's Time Travel to compare current data against a known-good historical baseline:
-- Compare today's order count against 24 hours ago
SELECT
(SELECT COUNT(*) FROM STG_ORDERS) as current_count,
(SELECT COUNT(*) FROM STG_ORDERS AT(OFFSET => -86400)) as yesterday_count,
RATIO_TO_REPORT(COUNT(*)) OVER () as ratio
;Use this in a GX query asset:
time_travel_asset = datasource.add_query_asset(
name="order_count_comparison",
query="""
SELECT
current_count,
yesterday_count,
ABS(current_count - yesterday_count) / NULLIF(yesterday_count, 0) as pct_change
FROM (
SELECT COUNT(*) as current_count FROM STG_ORDERS
) current
CROSS JOIN (
SELECT COUNT(*) as yesterday_count FROM STG_ORDERS AT(OFFSET => -86400)
) yesterday
"""
)
# Assert that row count hasn't changed by more than 20%
validator.expect_column_values_to_be_between(
"PCT_CHANGE",
min_value=0,
max_value=0.20
)Query Tagging for Cost Attribution
Tag test queries to track their cost in Snowflake usage data:
# In Great Expectations, set query tags via the connection
connection_string = (
"snowflake://...?session_parameters="
'{"QUERY_TAG":"great_expectations_data_quality"}'
)In dbt, add a macro:
-- macros/set_query_tag.sql
{% macro set_query_tag() -%}
{% if var('query_tag', none) is not none %}
ALTER SESSION SET QUERY_TAG = '{{ var("query_tag") }}';
{% endif %}
{%- endmacro %}Call it in your dbt_project.yml:
on-run-start:
- "{{ set_query_tag() }}"Run with:
dbt test --vars <span class="hljs-string">'{"query_tag": "ci_data_quality_check"}'Now all test queries are tagged, making it easy to filter them in SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY.
CI Pipeline for Snowflake Testing
# .github/workflows/snowflake-data-quality.yml
name: Snowflake Data Quality
on:
schedule:
- cron: '0 7 * * *' # Daily at 7am
push:
paths:
- 'dbt/**'
- 'expectations/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install dbt-snowflake great-expectations sqlalchemy-snowflake
cd dbt && dbt deps
- name: Run dbt source freshness
run: cd dbt && dbt source freshness
env:
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
- name: Run dbt tests
run: cd dbt && dbt test --target prod
env:
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
- name: Run Great Expectations checkpoints
run: python scripts/run_gx_checkpoints.py
env:
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}Cost Optimization for Testing
Snowflake charges based on compute time. Keep test costs low:
Use a small warehouse for tests. A XSMALL or SMALL warehouse is sufficient for running dbt tests and GX validations. Don't use MEDIUM+ warehouses for testing.
Set auto-suspend. Configure test warehouses to suspend after 60 seconds of inactivity.
Push down aggregations. Write Great Expectations that let Snowflake do the aggregation (statistical expectations run a single aggregation query) rather than loading full tables into Python.
Use query tags to monitor. Review QUERY_HISTORY weekly to see what's being spent on data quality checks.
SELECT
QUERY_TAG,
SUM(CREDITS_USED_CLOUD_SERVICES) as cloud_credits,
COUNT(*) as query_count
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE START_TIME >= DATEADD(day, -7, CURRENT_DATE())
AND QUERY_TAG LIKE '%data_quality%'
GROUP BY 1
ORDER BY 2 DESC;Summary
A complete Snowflake data quality testing strategy combines:
- dbt schema tests — not_null, unique, accepted_values, relationships for structural validation
- dbt-expectations — statistical and pattern-based assertions
- Great Expectations — Python-based checkpoints for complex validation logic
- Source freshness checks — verify data is being loaded on schedule
- Zero-copy clone environments — isolated test databases for CI
- Daily CI schedule — catch regressions in data pipelines before they reach dashboards
The investment in setup pays dividends: data quality issues surface in CI, not in stakeholder meetings.