Amazon Redshift Testing: Unit Tests, Data Validation, and CI Integration
Testing Redshift pipelines is expensive and slow when done against a live cluster. This guide covers running Redshift-compatible tests against LocalStack, validating COPY/UNLOAD workflows, asserting on distribution and sort key behavior, and integrating Great Expectations for ongoing data quality monitoring — all in a CI pipeline that costs nothing per run.
Key Takeaways
LocalStack gives you a Redshift-compatible endpoint locally. With LocalStack Pro, you can run most Redshift SQL including COPY from S3, stored procedures, and window functions without a live cluster.
Test COPY commands by mocking S3 with LocalStack. Stage files in a local S3 bucket, then run COPY against it — same code path as production, zero AWS cost.
Distribution keys affect correctness, not just performance. A wrong DISTKEY can cause silent data skew in aggregations — write tests that verify row counts match expected values after joins.
Great Expectations validates data contracts at pipeline boundaries. Define expectations on column types, value ranges, and null rates; run them as CI gates before downstream models execute.
Parametrize tests by environment. The same test suite should run against LocalStack in CI and a real Redshift dev cluster with a single env var flip.
The Redshift Testing Problem
Amazon Redshift is a columnar warehouse with a PostgreSQL-compatible wire protocol but significant behavioral differences: COPY from S3, distribution styles (KEY, ALL, EVEN, AUTO), sort keys (compound vs. interleaved), and Redshift-specific SQL extensions. Testing all of this against a live cluster means:
- Every test run costs real money
- Cluster startup takes minutes, not seconds
- Parallel test runs compete for WLM queues
- CI becomes slow and flaky
The solution is a layered approach: LocalStack for unit and integration tests, Great Expectations for data quality validation, and a real Redshift dev cluster for performance and distribution key validation.
Setting Up LocalStack for Redshift
LocalStack Pro includes a Redshift emulator. Set it up with Docker Compose:
# docker-compose.yml
version: "3.8"
services:
localstack:
image: localstack/localstack-pro:3.4
ports:
- "4566:4566"
environment:
SERVICES: s3,redshift,redshift-data
LOCALSTACK_AUTH_TOKEN: ${LOCALSTACK_AUTH_TOKEN}
DEBUG: 0
volumes:
- /var/run/docker.sock:/var/run/docker.sockCreate a Redshift cluster via the AWS CLI pointed at LocalStack:
export AWS_DEFAULT_REGION=us-east-1
<span class="hljs-built_in">export AWS_ACCESS_KEY_ID=<span class="hljs-built_in">test
<span class="hljs-built_in">export AWS_SECRET_ACCESS_KEY=<span class="hljs-built_in">test
<span class="hljs-built_in">export AWS_ENDPOINT_URL=http://localhost:4566
aws redshift create-cluster \
--cluster-identifier test-cluster \
--node-type dc2.large \
--number-of-nodes 1 \
--master-username admin \
--master-user-password TestPass1! \
--db-name testdbConnecting to LocalStack Redshift in Python
Use psycopg2 or redshift-connector — both work against LocalStack's PostgreSQL-compatible endpoint:
# tests/conftest.py
import pytest
import psycopg2
import boto3
@pytest.fixture(scope="session")
def redshift_conn():
conn = psycopg2.connect(
host="localhost",
port=5439,
database="testdb",
user="admin",
password="TestPass1!",
)
conn.autocommit = True
yield conn
conn.close()
@pytest.fixture(scope="session")
def s3_client():
return boto3.client(
"s3",
endpoint_url="http://localhost:4566",
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
@pytest.fixture(autouse=True)
def setup_schema(redshift_conn):
with redshift_conn.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS analytics")
yield
with redshift_conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS analytics CASCADE")Testing COPY and UNLOAD Commands
COPY is how data lands in Redshift. Test that your COPY statements parse files correctly, handle bad rows, and land data in the right shape.
# tests/test_copy_command.py
import json
import pytest
def test_copy_from_json(redshift_conn, s3_client):
# Create S3 bucket and upload fixture data
s3_client.create_bucket(Bucket="test-data")
records = [
{"order_id": "o1", "amount": 100.0, "status": "completed"},
{"order_id": "o2", "amount": 50.0, "status": "pending"},
]
s3_client.put_object(
Bucket="test-data",
Key="orders/2026-01-15.json",
Body="\n".join(json.dumps(r) for r in records),
)
with redshift_conn.cursor() as cur:
cur.execute("""
CREATE TABLE analytics.orders (
order_id VARCHAR(50),
amount DECIMAL(10,2),
status VARCHAR(20)
)
""")
cur.execute("""
COPY analytics.orders
FROM 's3://test-data/orders/'
ACCESS_KEY_ID 'test'
SECRET_ACCESS_KEY 'test'
FORMAT AS JSON 'auto'
REGION 'us-east-1'
""")
cur.execute("SELECT COUNT(*) FROM analytics.orders")
count = cur.fetchone()[0]
assert count == 2
cur.execute("SELECT amount FROM analytics.orders WHERE order_id = 'o1'")
amount = cur.fetchone()[0]
assert float(amount) == 100.0
def test_unload_produces_correct_output(redshift_conn, s3_client):
s3_client.create_bucket(Bucket="test-output")
with redshift_conn.cursor() as cur:
cur.execute("""
CREATE TABLE analytics.summary (
region VARCHAR(50),
total_sales DECIMAL(10,2)
)
""")
cur.execute("""
INSERT INTO analytics.summary VALUES
('us-east', 1500.00),
('eu-west', 2300.50)
""")
cur.execute("""
UNLOAD ('SELECT * FROM analytics.summary ORDER BY region')
TO 's3://test-output/summary/'
ACCESS_KEY_ID 'test'
SECRET_ACCESS_KEY 'test'
CSV HEADER
PARALLEL OFF
""")
objects = s3_client.list_objects_v2(Bucket="test-output", Prefix="summary/")
assert objects["KeyCount"] >= 1
content = s3_client.get_object(
Bucket="test-output",
Key=objects["Contents"][0]["Key"]
)["Body"].read().decode()
assert "us-east" in content
assert "1500.00" in contentTesting Distribution Keys and Sort Keys
Distribution keys determine how rows are spread across compute nodes. The wrong DISTKEY causes data skew — all rows land on one node, making joins and aggregations slow and potentially incorrect when using approximate counts.
# tests/test_distribution.py
def test_distkey_join_produces_correct_count(redshift_conn):
"""Verify that a DISTKEY join does not drop or duplicate rows."""
with redshift_conn.cursor() as cur:
cur.execute("""
CREATE TABLE analytics.customers (
customer_id INT,
name VARCHAR(100),
country VARCHAR(50)
)
DISTKEY(customer_id)
SORTKEY(customer_id)
""")
cur.execute("""
CREATE TABLE analytics.orders (
order_id INT,
customer_id INT,
amount DECIMAL(10,2)
)
DISTKEY(customer_id)
SORTKEY(customer_id, order_id)
""")
cur.execute("""
INSERT INTO analytics.customers VALUES
(1, 'Alice', 'US'),
(2, 'Bob', 'UK'),
(3, 'Carol', 'DE')
""")
cur.execute("""
INSERT INTO analytics.orders VALUES
(101, 1, 200.0),
(102, 1, 150.0),
(103, 2, 300.0)
-- customer 3 has no orders intentionally
""")
# Collocated join (both tables distributed on customer_id)
cur.execute("""
SELECT c.customer_id, COUNT(o.order_id) AS order_count
FROM analytics.customers c
LEFT JOIN analytics.orders o USING (customer_id)
GROUP BY c.customer_id
ORDER BY c.customer_id
""")
rows = cur.fetchall()
assert len(rows) == 3 # all customers present
assert rows[0] == (1, 2) # Alice: 2 orders
assert rows[1] == (2, 1) # Bob: 1 order
assert rows[2] == (3, 0) # Carol: 0 orders (LEFT JOIN preserves her)Data Quality with Great Expectations
Great Expectations (GX) validates data against a schema of expectations. Connect it to Redshift and run expectations as part of your CI pipeline.
pip install great-expectations sqlalchemy redshift-connector# tests/test_data_quality.py
import great_expectations as gx
def test_orders_data_quality():
context = gx.get_context()
datasource = context.sources.add_or_update_sql(
name="redshift_test",
connection_string=(
"redshift+redshift_connector://admin:TestPass1!"
"@localhost:5439/testdb"
),
)
asset = datasource.add_table_asset(
name="orders",
table_name="orders",
schema_name="analytics",
)
batch_request = asset.build_batch_request()
validator = context.get_validator(batch_request=batch_request)
# Column presence and types
validator.expect_column_to_exist("order_id")
validator.expect_column_to_exist("amount")
validator.expect_column_values_to_be_of_type("order_id", "TEXT")
# Null constraints
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("amount")
# Value ranges
validator.expect_column_values_to_be_between(
"amount", min_value=0.0, max_value=1_000_000.0
)
# Accepted values
validator.expect_column_values_to_be_in_set(
"status", ["pending", "completed", "refunded", "cancelled"]
)
# Row count sanity
validator.expect_table_row_count_to_be_between(min_value=1)
result = validator.validate()
assert result.success, f"Data quality failed: {result}"GitHub Actions CI Integration
# .github/workflows/redshift-tests.yml
name: Redshift Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start LocalStack
uses: LocalStack/setup-localstack@v2
with:
image-tag: "3.4"
use-pro: "true"
configuration: SERVICES=s3,redshift,redshift-data
env:
LOCALSTACK_AUTH_TOKEN: ${{ secrets.LOCALSTACK_AUTH_TOKEN }}
- name: Create Redshift cluster
run: |
aws --endpoint-url=http://localhost:4566 redshift create-cluster \
--cluster-identifier test-cluster \
--node-type dc2.large \
--number-of-nodes 1 \
--master-username admin \
--master-user-password TestPass1! \
--db-name testdb
env:
AWS_DEFAULT_REGION: us-east-1
AWS_ACCESS_KEY_ID: test
AWS_SECRET_ACCESS_KEY: test
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- run: pip install -r requirements-test.txt
- run: pytest tests/ -v --tb=short
env:
REDSHIFT_HOST: localhost
REDSHIFT_PORT: 5439
REDSHIFT_DB: testdb
AWS_ENDPOINT_URL: http://localhost:4566For teams without LocalStack Pro, use a dedicated Redshift dev cluster and scope tests to a schema named after the branch (SET search_path TO branch_${BRANCH_NAME}), then drop it after the run.
What to Test vs. What to Skip
Test these:
- COPY command parsing — correct column mapping, JSON/CSV format handling, NULL handling
- SQL transformation logic — window functions, CTEs, aggregations against known fixture data
- UNLOAD output shape — correct column ordering, header presence, delimiter
- Distribution key join correctness — verify row counts after collocated joins
- Sort key query filtering — ensure range predicates return expected subsets
- Data quality contracts — null rates, value ranges, accepted categorical values
Skip these:
- Redshift WLM queue behavior — infrastructure configuration, not application logic
- Vacuum and analyze scheduling — operational concerns, not testable in unit tests
- Node count scaling effects — performance testing domain, not correctness
- Spectrum queries across S3 data lake — integration test against real S3, not a unit test target
- Redshift RA3 managed storage limits — infrastructure boundary, AWS responsibility