Snowflake Data Testing: dbt + Great Expectations for Data Quality

Snowflake Data Testing: dbt + Great Expectations for Data Quality

Testing data quality in Snowflake requires both SQL-based tests (dbt works natively with Snowflake) and Python-based data validation (Great Expectations connects via the Snowflake SQLAlchemy adapter). Run dbt tests for schema and transformation validation, Great Expectations checkpoints for statistical and pattern-based assertions, and scheduled CI pipelines to catch issues before they reach dashboards.

Key Takeaways

dbt connects to Snowflake with a warehouse, database, and schema in profiles.yml. Tests run SQL directly against your Snowflake tables—no data leaves the warehouse.

Great Expectations uses the Snowflake SQLAlchemy connection string. Install sqlalchemy-snowflake, configure the connection, and point it at any Snowflake table or view.

Zero-copy cloning enables fast isolated test environments. Clone production databases in seconds with no storage cost for unmodified data, run tests in the clone, and drop it when done.

Time Travel supports historical validation. Query data as it existed at a previous point in time—useful for comparing current state against a known-good baseline.

Snowflake query tagging helps track test costs. Set ALTER SESSION SET QUERY_TAG before running tests to identify test queries in usage data.

Why Snowflake Needs a Testing Strategy

Snowflake's elastic compute and separation of storage from compute makes it attractive for data warehousing, but these same properties make data quality issues expensive: bad data gets queried by many concurrent warehouses before anyone notices. A testing strategy catches data quality issues at ingestion and transformation time, not after the fact.

The combination of dbt (SQL-based schema and transformation tests) and Great Expectations (statistical and pattern assertions) covers the two main classes of data quality issues: structural problems (wrong schema, broken foreign keys, duplicates) and semantic problems (values out of expected range, unusual distributions, suspicious null rates).

Snowflake + dbt Setup

Configure your Snowflake connection in ~/.dbt/profiles.yml:

my_project:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role: TRANSFORMER
      database: RAW_DEV
      warehouse: TRANSFORM_WH
      schema: PUBLIC
      threads: 4
      client_session_keep_alive: false

    prod:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      private_key_path: "{{ env_var('SNOWFLAKE_PRIVATE_KEY_PATH') }}"
      role: TRANSFORMER
      database: RAW_PROD
      warehouse: TRANSFORM_WH
      schema: PUBLIC
      threads: 8

Verify the connection:

dbt debug

dbt Schema Tests for Snowflake

Configure tests in schema.yml files. dbt generates SQL that runs directly against your Snowflake tables:

# models/staging/schema.yml
version: 2

models:
  - name: stg_orders
    description: "Staged orders from the raw layer"
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
      - name: order_status
        tests:
          - accepted_values:
              values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded']
      - name: order_total
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000
      - name: created_at
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: "2020-01-01"
              max_value: "2030-12-31"
              parse_strings_as_datetimes: true

Run tests against Snowflake:

dbt test --target prod --<span class="hljs-keyword">select stg_orders

Testing Snowflake-Specific Patterns

Snowflake's VARIANT type (semi-structured data) requires specific test patterns:

      - name: raw_payload
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_json_parseable

For JSON fields within VARIANT columns, use a custom generic test:

-- tests/generic/assert_variant_field_not_null.sql
{% test assert_variant_field_not_null(model, column_name, field_path) %}

select *
from {{ model }}
where {{ column_name }}:{{ field_path }} is null

{% endtest %}
      - name: raw_payload
        tests:
          - assert_variant_field_not_null:
              field_path: "order.customer_id"

Zero-Copy Cloning for Test Environments

Snowflake's zero-copy cloning lets you create isolated test databases instantly without duplicating storage:

-- Create a test environment from production
CREATE DATABASE RAW_TEST CLONE RAW_PROD;

-- Create a test schema
CREATE SCHEMA RAW_TEST.TESTING CLONE RAW_PROD.PUBLIC;

Use this in CI to run tests against real data in isolation:

#!/bin/bash
<span class="hljs-comment"># ci-test.sh

<span class="hljs-comment"># Create test database
snowsql -q <span class="hljs-string">"CREATE DATABASE ${CI_DATABASE} CLONE RAW_PROD;"

<span class="hljs-comment"># Run dbt tests against the clone
dbt <span class="hljs-built_in">test --target ci --vars <span class="hljs-string">"database: ${CI_DATABASE}"

<span class="hljs-comment"># Drop the clone when done
snowsql -q <span class="hljs-string">"DROP DATABASE ${CI_DATABASE};"

This approach gives you:

  • Real production data shapes and volumes for tests
  • Complete isolation (tests don't affect production)
  • Near-zero cost for storage (only changed data incurs charges)
  • Fast setup (seconds, not minutes)

Great Expectations + Snowflake

Install the required packages:

pip install great-expectations sqlalchemy-snowflake snowflake-connector-python

Configure a Snowflake Datasource

import great_expectations as gx

context = gx.get_context()

# Add Snowflake as a SQL datasource
datasource = context.sources.add_or_update_sql(
    name="snowflake_datasource",
    connection_string=(
        "snowflake://{user}:{password}@{account}/{database}/{schema}"
        "?warehouse={warehouse}&role={role}"
    ).format(
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        database="RAW_PROD",
        schema="PUBLIC",
        warehouse="VALIDATION_WH",
        role="VALIDATOR"
    )
)

Add Table and Query Assets

# Test a specific table
table_asset = datasource.add_table_asset(
    name="stg_orders",
    table_name="STG_ORDERS",
    schema_name="PUBLIC"
)

# Test a specific query (useful for testing a subset)
query_asset = datasource.add_query_asset(
    name="recent_orders",
    query="SELECT * FROM STG_ORDERS WHERE CREATED_AT >= DATEADD(day, -7, CURRENT_DATE())"
)

Define and Run Expectations

# Create an Expectation Suite for orders
suite = context.add_or_update_expectation_suite("snowflake_orders_suite")

batch_request = table_asset.build_batch_request()
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="snowflake_orders_suite"
)

# Structural assertions
validator.expect_table_columns_to_match_set(
    column_set=["ORDER_ID", "CUSTOMER_ID", "ORDER_TOTAL", "STATUS", "CREATED_AT"],
    exact_match=False  # Allows additional columns
)
validator.expect_table_row_count_to_be_between(min_value=10000)

# Column-level assertions
validator.expect_column_values_to_not_be_null("ORDER_ID")
validator.expect_column_values_to_be_unique("ORDER_ID")
validator.expect_column_values_to_be_between(
    "ORDER_TOTAL",
    min_value=0,
    max_value=500000
)
validator.expect_column_values_to_be_in_set(
    "STATUS",
    value_set=["pending", "processing", "shipped", "delivered", "cancelled"]
)

# Statistical assertions (catch sudden distribution shifts)
validator.expect_column_mean_to_be_between("ORDER_TOTAL", min_value=50, max_value=1000)
validator.expect_column_stdev_to_be_between("ORDER_TOTAL", min_value=10, max_value=5000)

# Null rate checks
validator.expect_column_values_to_not_be_null("CUSTOMER_ID", mostly=0.99)

validator.save_expectation_suite()

Snowflake Data Freshness Testing

Test that data is being loaded on schedule by checking the most recent record timestamp:

-- Custom dbt test for data freshness
-- tests/generic/assert_data_freshness.sql
{% test assert_data_freshness(model, column_name, max_age_hours) %}

select
    max({{ column_name }}) as latest_record,
    current_timestamp() as now,
    datediff('hour', max({{ column_name }}), current_timestamp()) as hours_since_latest
from {{ model }}
having datediff('hour', max({{ column_name }}), current_timestamp()) > {{ max_age_hours }}

{% endtest %}
models:
  - name: stg_orders
    tests:
      - assert_data_freshness:
          column_name: created_at
          max_age_hours: 25  # Data should be loaded daily

Using dbt's built-in source freshness:

sources:
  - name: raw
    database: RAW_PROD
    schema: PUBLIC
    tables:
      - name: raw_orders
        loaded_at_field: _FIVETRAN_SYNCED
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}

Run freshness checks:

dbt source freshness

Snowflake Time Travel for Historical Validation

Use Snowflake's Time Travel to compare current data against a known-good historical baseline:

-- Compare today's order count against 24 hours ago
SELECT
    (SELECT COUNT(*) FROM STG_ORDERS) as current_count,
    (SELECT COUNT(*) FROM STG_ORDERS AT(OFFSET => -86400)) as yesterday_count,
    RATIO_TO_REPORT(COUNT(*)) OVER () as ratio
;

Use this in a GX query asset:

time_travel_asset = datasource.add_query_asset(
    name="order_count_comparison",
    query="""
    SELECT 
        current_count,
        yesterday_count,
        ABS(current_count - yesterday_count) / NULLIF(yesterday_count, 0) as pct_change
    FROM (
        SELECT COUNT(*) as current_count FROM STG_ORDERS
    ) current
    CROSS JOIN (
        SELECT COUNT(*) as yesterday_count FROM STG_ORDERS AT(OFFSET => -86400)
    ) yesterday
    """
)

# Assert that row count hasn't changed by more than 20%
validator.expect_column_values_to_be_between(
    "PCT_CHANGE",
    min_value=0,
    max_value=0.20
)

Query Tagging for Cost Attribution

Tag test queries to track their cost in Snowflake usage data:

# In Great Expectations, set query tags via the connection
connection_string = (
    "snowflake://...?session_parameters="
    '{"QUERY_TAG":"great_expectations_data_quality"}'
)

In dbt, add a macro:

-- macros/set_query_tag.sql
{% macro set_query_tag() -%}
  {% if var('query_tag', none) is not none %}
    ALTER SESSION SET QUERY_TAG = '{{ var("query_tag") }}';
  {% endif %}
{%- endmacro %}

Call it in your dbt_project.yml:

on-run-start:
  - "{{ set_query_tag() }}"

Run with:

dbt test --vars <span class="hljs-string">'{"query_tag": "ci_data_quality_check"}'

Now all test queries are tagged, making it easy to filter them in SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY.

CI Pipeline for Snowflake Testing

# .github/workflows/snowflake-data-quality.yml
name: Snowflake Data Quality
on:
  schedule:
    - cron: '0 7 * * *'  # Daily at 7am
  push:
    paths:
      - 'dbt/**'
      - 'expectations/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      
      - name: Install dependencies
        run: |
          pip install dbt-snowflake great-expectations sqlalchemy-snowflake
          cd dbt && dbt deps
      
      - name: Run dbt source freshness
        run: cd dbt && dbt source freshness
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
      
      - name: Run dbt tests
        run: cd dbt && dbt test --target prod
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
      
      - name: Run Great Expectations checkpoints
        run: python scripts/run_gx_checkpoints.py
        env:
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

Cost Optimization for Testing

Snowflake charges based on compute time. Keep test costs low:

Use a small warehouse for tests. A XSMALL or SMALL warehouse is sufficient for running dbt tests and GX validations. Don't use MEDIUM+ warehouses for testing.

Set auto-suspend. Configure test warehouses to suspend after 60 seconds of inactivity.

Push down aggregations. Write Great Expectations that let Snowflake do the aggregation (statistical expectations run a single aggregation query) rather than loading full tables into Python.

Use query tags to monitor. Review QUERY_HISTORY weekly to see what's being spent on data quality checks.

SELECT 
    QUERY_TAG,
    SUM(CREDITS_USED_CLOUD_SERVICES) as cloud_credits,
    COUNT(*) as query_count
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE START_TIME >= DATEADD(day, -7, CURRENT_DATE())
    AND QUERY_TAG LIKE '%data_quality%'
GROUP BY 1
ORDER BY 2 DESC;

Summary

A complete Snowflake data quality testing strategy combines:

  1. dbt schema tests — not_null, unique, accepted_values, relationships for structural validation
  2. dbt-expectations — statistical and pattern-based assertions
  3. Great Expectations — Python-based checkpoints for complex validation logic
  4. Source freshness checks — verify data is being loaded on schedule
  5. Zero-copy clone environments — isolated test databases for CI
  6. Daily CI schedule — catch regressions in data pipelines before they reach dashboards

The investment in setup pays dividends: data quality issues surface in CI, not in stakeholder meetings.

Read more

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB Testing Guide: Cassandra Driver Compatibility, Shard-per-Core Testing & Performance Regression

ScyllaDB delivers Cassandra-compatible APIs with a rewritten Seastar-based engine that achieves dramatically higher throughput. Testing ScyllaDB applications requires validating both Cassandra compatibility and ScyllaDB-specific behaviors like shard-per-core data distribution. This guide covers both angles. ScyllaDB Testing Landscape ScyllaDB is a drop-in replacement for Cassandra at the API level—which means

By HelpMeTest