Amazon Deequ Data Quality Testing at Scale with Apache Spark

Amazon Deequ Data Quality Testing at Scale with Apache Spark

Amazon Deequ is an open-source data quality library built on Apache Spark, developed and used internally at Amazon to validate the quality of datasets at petabyte scale. Unlike SQL-based tools that work on a single database, Deequ runs as part of your Spark job — making it ideal for validating data in S3, Redshift, EMR, and AWS Glue pipelines. This guide covers Deequ's constraint system, metrics repository, anomaly detection, and how to integrate data quality checks into production Spark pipelines.

Why Deequ for Spark Pipelines

Deequ's advantages for large-scale data processing:

  • Runs in the same Spark job — no separate query to the database; quality checks happen during processing
  • Efficient computation — computes all metrics in a single Spark pass (multi-column analysis in one scan)
  • Metrics repository — stores historical metrics to detect drift and anomalies
  • Scala and PySpark — native support for both
  • Unit testable — checks run on DataFrames, not databases

Basic Setup

Add the Deequ dependency to your Spark project:

// build.sbt
libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.7-spark-3.3"

For PySpark:

pip install pydeequ

Writing Constraint Checks (Scala)

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.checks.CheckStatus

val ordersDF = spark.read.parquet("s3://my-bucket/orders/")

val verificationResult = VerificationSuite()
  .onData(ordersDF)
  .addCheck(
    Check(CheckLevel.Error, "Orders data quality")
      // Row count checks
      .hasSize(_ > 1000, "Dataset must have more than 1000 rows")
      
      // Completeness checks
      .isComplete("order_id", "order_id must never be null")
      .isComplete("customer_id", "customer_id must never be null")
      .isComplete("amount", "amount must never be null")
      
      // Uniqueness checks
      .isUnique("order_id", "order_id must be unique")
      
      // Value range checks
      .isNonNegative("amount", "amount must be non-negative")
      .isContainedIn("status", Array("pending", "confirmed", "shipped", "delivered", "cancelled"),
        "status must be a valid value")
      
      // Pattern checks
      .matches("customer_id", "CUST-\\d{6}".r, "customer_id must match CUST-NNNNNN")
      
      // Referential checks
      .satisfies(
        "created_at <= current_timestamp()",
        "created_at must not be in the future"
      )
  )
  .run()

val checkResults = VerificationResult.checkResultsAsDataFrame(spark, verificationResult)
checkResults.show()

if (verificationResult.status != CheckStatus.Success) {
  throw new RuntimeException(s"Data quality checks failed: ${verificationResult.status}")
}

PySpark Data Quality Checks

import pydeequ
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

orders_df = spark.read.parquet("s3://my-bucket/orders/")

verification_result = (
    VerificationSuite(spark)
    .onData(orders_df)
    .addCheck(
        Check(spark, CheckLevel.Error, "Orders quality")
        .hasSize(lambda x: x > 1000)
        .isComplete("order_id")
        .isComplete("customer_id")
        .isUnique("order_id")
        .isNonNegative("amount")
        .isContainedIn("status", ["pending", "confirmed", "shipped", "delivered", "cancelled"])
        .satisfies("amount <= 1000000", "Amount within range", lambda x: x == 1.0)
        .hasPattern("customer_id", r"^CUST-\d{6}$")
    )
    .run()
)

# Get results as DataFrame
results_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
results_df.show()

# Fail the job if any Error-level checks failed
from pydeequ.checks import CheckStatus
if verification_result.status != CheckStatus.Success:
    failed = results_df.filter("check_status == 'Error'")
    failed.show()
    raise Exception(f"Data quality checks failed")

Unit Testing Deequ Checks

Test your Deequ checks against synthetic DataFrames without running a full Spark cluster:

import pytest
from pyspark.sql import SparkSession
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

@pytest.fixture(scope="session")
def spark():
    return (
        SparkSession.builder
        .master("local[2]")
        .appName("deequ-tests")
        .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.7-spark-3.3")
        .getOrCreate()
    )

@pytest.fixture
def valid_orders(spark):
    return spark.createDataFrame([
        ("order-1", "CUST-000001", 100.0, "confirmed"),
        ("order-2", "CUST-000002", 250.0, "delivered"),
        ("order-3", "CUST-000003", 75.5, "pending"),
    ], ["order_id", "customer_id", "amount", "status"])

@pytest.fixture
def invalid_orders(spark):
    return spark.createDataFrame([
        ("order-1", "CUST-000001", -50.0, "confirmed"),   # negative amount
        (None, "CUST-000002", 250.0, "delivered"),          # null order_id
        ("order-1", "CUST-000003", 75.5, "refunded"),      # duplicate + invalid status
    ], ["order_id", "customer_id", "amount", "status"])

def run_checks(spark, df):
    return (
        VerificationSuite(spark)
        .onData(df)
        .addCheck(
            Check(spark, CheckLevel.Error, "Orders")
            .isComplete("order_id")
            .isUnique("order_id")
            .isNonNegative("amount")
            .isContainedIn("status", ["pending", "confirmed", "shipped", "delivered", "cancelled"])
        )
        .run()
    )

def test_valid_orders_pass_all_checks(spark, valid_orders):
    result = run_checks(spark, valid_orders)
    assert result.status == "Success", f"Expected success, got: {result.status}"

def test_invalid_orders_fail_expected_checks(spark, invalid_orders):
    result = run_checks(spark, invalid_orders)
    
    results_df = VerificationResult.checkResultsAsDataFrame(spark, result)
    failed_checks = {
        row["constraint"]
        for row in results_df.filter("constraint_status == 'Failure'").collect()
    }
    
    assert any("isComplete" in c for c in failed_checks), "Should fail null check"
    assert any("isUnique" in c for c in failed_checks), "Should fail uniqueness check"
    assert any("isNonNegative" in c for c in failed_checks), "Should fail amount check"
    assert any("isContainedIn" in c for c in failed_checks), "Should fail status check"

def test_empty_dataset_fails_size_check(spark):
    empty_df = spark.createDataFrame([], valid_orders.schema)
    result = (
        VerificationSuite(spark)
        .onData(empty_df)
        .addCheck(
            Check(spark, CheckLevel.Error, "Size")
            .hasSize(lambda x: x > 0, "Dataset must not be empty")
        )
        .run()
    )
    assert result.status == "Error"

Metrics Repository for Historical Tracking

Deequ's metrics repository stores quality metrics over time, enabling anomaly detection:

import com.amazon.deequ.repository.fs.FileSystemMetricsRepository
import com.amazon.deequ.repository.ResultKey

val metricsRepo = FileSystemMetricsRepository(spark, "s3://my-bucket/deequ-metrics/orders/")

val resultKey = ResultKey(
  dataSetDate = System.currentTimeMillis(),
  tags = Map("pipeline_run" -> "2026-05-17", "environment" -> "production")
)

VerificationSuite()
  .onData(ordersDF)
  .useRepository(metricsRepo)
  .saveOrAppendResult(resultKey)
  .addCheck(
    Check(CheckLevel.Error, "Orders")
      .isComplete("order_id")
      .hasSize(_ > 1000)
  )
  .run()

Anomaly Detection with Historical Metrics

import com.amazon.deequ.anomalydetection.RelativeRateOfChangeStrategy

VerificationSuite()
  .onData(ordersDF)
  .useRepository(metricsRepo)
  .saveOrAppendResult(resultKey)
  .addAnomalyCheck(
    RelativeRateOfChangeStrategy(maxRateIncrease = Some(0.5), maxRateDecrease = Some(0.5)),
    Size()  // Alert if row count changes by more than 50%
  )
  .addAnomalyCheck(
    RelativeRateOfChangeStrategy(maxRateIncrease = Some(0.1)),
    Completeness("customer_id")  // Alert if null rate increases by more than 10%
  )
  .run()

AWS Glue Integration

Use Deequ in AWS Glue ETL jobs:

# glue_job.py
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pydeequ
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read source data
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
    database="my-database",
    table_name="raw_orders"
)
orders_df = dynamic_frame.toDF()

# Run Deequ checks
verification_result = (
    VerificationSuite(spark)
    .onData(orders_df)
    .addCheck(
        Check(spark, CheckLevel.Error, "Raw orders validation")
        .isComplete("order_id")
        .isComplete("customer_id")
        .isNonNegative("amount")
        .hasSize(lambda x: x > 0)
    )
    .run()
)

# Fail the Glue job if checks fail
if verification_result.status != "Success":
    raise Exception("Data quality checks failed — aborting Glue job")

# Continue processing...
job.commit()

CI/CD Pipeline for Deequ Checks

# .github/workflows/spark-dq-tests.yml
name: Deequ Data Quality Tests

on:
  push:
    paths:
      - 'spark/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Java
        uses: actions/setup-java@v3
        with:
          java-version: '11'

      - name: Run Deequ unit tests
        run: |
          sbt test
        working-directory: spark/

      - name: Run PySpark Deequ tests
        run: |
          pip install pydeequ pytest
          pytest tests/data_quality/ -v

Summary

Amazon Deequ enables data quality testing at Spark scale with: constraint checks covering completeness, uniqueness, value ranges, patterns, and custom SQL predicates; unit tests against local Spark sessions with synthetic DataFrames for both valid and invalid data scenarios; a metrics repository for storing historical metrics to power anomaly detection; relative rate of change anomaly detection that catches drift without manual threshold configuration; and AWS Glue integration to fail ETL jobs when data quality checks don't pass. Deequ's single-scan approach makes it practical at petabyte scale — add it to every Spark pipeline that produces data consumed by downstream teams.

Read more