Row-Level Security Testing in PostgreSQL for Multi-Tenant SaaS
Row-Level Security (RLS) is PostgreSQL's mechanism for enforcing data access policies at the database layer. For multi-tenant SaaS applications, RLS is a powerful defense-in-depth tool — even if application-layer authorization has a bug, RLS prevents cross-tenant data leakage at the database level.
But RLS policies themselves can have bugs. This guide covers testing PostgreSQL RLS policies: policy unit tests in SQL, integration tests via your application layer, and tenant isolation verification.
Understanding RLS Policies
A typical RLS policy for a multi-tenant SaaS:
-- Enable RLS on the orders table
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
-- Policy: users can only see orders from their own tenant
CREATE POLICY tenant_isolation_orders ON orders
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- Policy: users can only see their own orders within the tenant
CREATE POLICY user_own_orders ON orders
USING (
tenant_id = current_setting('app.current_tenant_id')::uuid
AND user_id = current_setting('app.current_user_id')::uuid
);Unit Testing RLS Policies in SQL
Test policies directly in SQL — no application layer needed:
-- tests/rls/test_tenant_isolation.sql
-- Run with pgTAP: SELECT * FROM runtests();
BEGIN;
SELECT plan(8);
-- Set up test data
CREATE TEMPORARY TABLE test_tenants AS
SELECT gen_random_uuid() as id, 'Tenant A' as name
UNION SELECT gen_random_uuid(), 'Tenant B';
INSERT INTO tenants (id, name)
SELECT id, name FROM test_tenants;
-- Create test users and orders for each tenant
DO $$
DECLARE
tenant_a_id uuid;
tenant_b_id uuid;
user_a_id uuid := gen_random_uuid();
user_b_id uuid := gen_random_uuid();
BEGIN
SELECT id INTO tenant_a_id FROM test_tenants WHERE name = 'Tenant A';
SELECT id INTO tenant_b_id FROM test_tenants WHERE name = 'Tenant B';
INSERT INTO orders (id, tenant_id, user_id, amount, status)
VALUES
(gen_random_uuid(), tenant_a_id, user_a_id, 100, 'paid'),
(gen_random_uuid(), tenant_a_id, user_a_id, 200, 'pending'),
(gen_random_uuid(), tenant_b_id, user_b_id, 300, 'paid');
-- Set app settings for Tenant A
PERFORM set_config('app.current_tenant_id', tenant_a_id::text, true);
PERFORM set_config('app.current_user_id', user_a_id::text, true);
END $$;
-- Test 1: Tenant A can see their own orders
SELECT is(
(SELECT COUNT(*) FROM orders)::int,
2,
'Tenant A sees exactly 2 orders (their own)'
);
-- Test 2: Tenant A cannot see Tenant B orders
SELECT is(
(SELECT COUNT(*) FROM orders WHERE tenant_id = (
SELECT id FROM test_tenants WHERE name = 'Tenant B'
))::int,
0,
'Tenant A cannot see Tenant B orders even with explicit filter'
);
-- Switch to Tenant B context
DO $$
DECLARE
tenant_b_id uuid;
user_b_id uuid;
BEGIN
SELECT id INTO tenant_b_id FROM test_tenants WHERE name = 'Tenant B';
SELECT user_id INTO user_b_id FROM orders WHERE tenant_id = tenant_b_id LIMIT 1;
PERFORM set_config('app.current_tenant_id', tenant_b_id::text, true);
PERFORM set_config('app.current_user_id', user_b_id::text, true);
END $$;
-- Test 3: Tenant B sees only their order
SELECT is(
(SELECT COUNT(*) FROM orders)::int,
1,
'Tenant B sees exactly 1 order (their own)'
);
-- Test 4: Tenant B cannot read Tenant A data by guessing IDs
SELECT is(
(SELECT COUNT(*) FROM orders WHERE tenant_id = (
SELECT id FROM test_tenants WHERE name = 'Tenant A'
))::int,
0,
'Tenant B cannot read Tenant A orders by specifying tenant_id'
);
-- Test 5: INSERT without tenant_id fails (if set via trigger or default)
SELECT throws_ok(
$$ INSERT INTO orders (id, amount, status) VALUES (gen_random_uuid(), 50, 'pending') $$,
'P0001',
'Orders require tenant_id'
);
-- Test 6: Superuser bypasses RLS (FORCE ROW LEVEL SECURITY prevents this for table owner)
-- Test that the table owner is NOT exempt
-- (FORCE ROW LEVEL SECURITY means even table owner must follow policies)
SELECT finish();
ROLLBACK;Integration Testing via Application Layer
Test RLS through your actual application code:
// tests/multitenancy/rls-integration.test.ts
import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest';
import { Pool } from 'pg';
import { createTenantContext, TenantContext } from '~/db/tenantContext';
const pool = new Pool({ connectionString: process.env.TEST_DATABASE_URL });
async function withTenantContext<T>(
tenantId: string,
userId: string,
fn: (ctx: TenantContext) => Promise<T>
): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(`SET LOCAL app.current_tenant_id = $1`, [tenantId]);
await client.query(`SET LOCAL app.current_user_id = $1`, [userId]);
const ctx = createTenantContext(client);
const result = await fn(ctx);
await client.query('ROLLBACK'); // Test isolation — always rollback
return result;
} finally {
client.release();
}
}
describe('RLS tenant isolation', () => {
let tenantA: { id: string; userId: string };
let tenantB: { id: string; userId: string };
beforeAll(async () => {
// Create test fixtures (these exist in test DB — seeded before tests)
tenantA = { id: 'aaaaaaaa-0000-0000-0000-000000000001', userId: 'user-a-001' };
tenantB = { id: 'bbbbbbbb-0000-0000-0000-000000000001', userId: 'user-b-001' };
});
afterAll(() => pool.end());
it('tenant A query returns only tenant A records', async () => {
const orders = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
return ctx.query('SELECT * FROM orders');
});
// Every returned order must belong to tenant A
for (const order of orders.rows) {
expect(order.tenant_id).toBe(tenantA.id);
}
});
it('tenant A cannot read tenant B record by primary key', async () => {
// Get a real tenant B order ID
const tenantBOrderId = await withTenantContext(tenantB.id, tenantB.userId, async (ctx) => {
const result = await ctx.query('SELECT id FROM orders LIMIT 1');
return result.rows[0]?.id;
});
if (!tenantBOrderId) {
console.log('No tenant B orders to test — skipping');
return;
}
// Try to read it as tenant A
const result = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
return ctx.query('SELECT * FROM orders WHERE id = $1', [tenantBOrderId]);
});
// RLS must hide the row
expect(result.rows).toHaveLength(0);
});
it('tenant A count does not include tenant B records', async () => {
const countA = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
const result = await ctx.query('SELECT COUNT(*) FROM orders');
return parseInt(result.rows[0].count);
});
const countB = await withTenantContext(tenantB.id, tenantB.userId, async (ctx) => {
const result = await ctx.query('SELECT COUNT(*) FROM orders');
return parseInt(result.rows[0].count);
});
const countAll = await pool.query('SELECT COUNT(*) FROM orders');
const totalCount = parseInt(countAll.rows[0].count);
// Both tenant counts should be less than total, and they shouldn't "double count"
expect(countA).toBeLessThan(totalCount);
expect(countB).toBeLessThan(totalCount);
expect(countA + countB).toBeLessThanOrEqual(totalCount);
});
it('INSERT without tenant context fails', async () => {
// Attempt to insert without setting tenant context
const client = await pool.connect();
try {
await expect(
client.query(
'INSERT INTO orders (id, amount, status) VALUES ($1, $2, $3)',
['00000000-0000-0000-0000-000000000099', 100, 'pending']
)
).rejects.toThrow();
} finally {
client.release();
}
});
});Testing the Application's RLS Context Setup
Your application must set the tenant context for every request. Test that it does so correctly:
// src/middleware/tenantContext.ts
import { Request, Response, NextFunction } from 'express';
import { pool } from '../db/pool';
export async function tenantContextMiddleware(req: Request, res: Response, next: NextFunction) {
if (!req.user) {
return next();
}
// Acquire a client and set tenant context for this request
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(
'SELECT set_config($1, $2, true), set_config($3, $4, true)',
[
'app.current_tenant_id', req.user.tenantId,
'app.current_user_id', req.user.id,
]
);
req.dbClient = client;
res.on('finish', () => {
client.query('COMMIT').finally(() => client.release());
});
next();
} catch (err) {
client.release();
next(err);
}
}// tests/middleware/tenantContext.test.ts
import { describe, it, expect, vi } from 'vitest';
import request from 'supertest';
import app from '~/app';
describe('Tenant context middleware', () => {
it('sets tenant context for authenticated requests', async () => {
const queriesCalled: string[] = [];
// Intercept DB queries to verify set_config is called
vi.spyOn(pool, 'connect').mockImplementation(async () => ({
query: async (sql: string, params?: any[]) => {
queriesCalled.push(sql);
return { rows: [], rowCount: 0 };
},
release: vi.fn(),
} as any));
await request(app)
.get('/api/orders')
.set('Authorization', 'Bearer valid-user-token');
const setConfigQuery = queriesCalled.find((q) => q.includes('set_config'));
expect(setConfigQuery).toBeDefined();
expect(setConfigQuery).toContain('app.current_tenant_id');
expect(setConfigQuery).toContain('app.current_user_id');
});
it('does not set tenant context for unauthenticated requests', async () => {
const queriesCalled: string[] = [];
vi.spyOn(pool, 'connect').mockImplementation(async () => ({
query: async (sql: string) => {
queriesCalled.push(sql);
return { rows: [], rowCount: 0 };
},
release: vi.fn(),
} as any));
await request(app).get('/api/public-endpoint');
const setConfigQuery = queriesCalled.find((q) => q.includes('app.current_tenant_id'));
expect(setConfigQuery).toBeUndefined();
});
});Testing RLS Policy Coverage with pgTAP
Run policy tests in CI using pgTAP:
# .github/workflows/rls-tests.yml
name: RLS Policy Tests
on: [push, pull_request]
jobs:
rls-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_PASSWORD: test
POSTGRES_DB: testdb
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Install pgTAP
run: |
sudo apt-get install -y postgresql-client
psql -h localhost -U postgres -d testdb -c "CREATE EXTENSION IF NOT EXISTS pgtap"
env:
PGPASSWORD: test
- name: Run migrations
run: |
psql -h localhost -U postgres -d testdb -f db/schema.sql
psql -h localhost -U postgres -d testdb -f db/rls-policies.sql
env:
PGPASSWORD: test
- name: Seed test data
run: psql -h localhost -U postgres -d testdb -f db/test-seed.sql
env:
PGPASSWORD: test
- name: Run pgTAP RLS tests
run: |
psql -h localhost -U postgres -d testdb -f tests/rls/test_tenant_isolation.sql
psql -h localhost -U postgres -d testdb -f tests/rls/test_user_isolation.sql
psql -h localhost -U postgres -d testdb -f tests/rls/test_admin_override.sql
env:
PGPASSWORD: testVerifying No Bypass via Database Functions
Some SQL functions and views can bypass RLS — test that yours don't:
// tests/rls/bypass-prevention.test.ts
describe('RLS bypass prevention', () => {
it('COUNT(*) respects RLS — no total count leakage', async () => {
const tenantACount = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
const result = await ctx.query('SELECT COUNT(*) FROM orders');
return parseInt(result.rows[0].count);
});
// Tenant A should not be able to determine total row count across all tenants
const totalCount = await pool.query('SELECT COUNT(*) FROM orders');
const actualTotal = parseInt(totalCount.rows[0].count);
// If there are multiple tenants with data, tenant count should be less than total
if (actualTotal > tenantACount) {
expect(tenantACount).toBeLessThan(actualTotal);
}
});
it('JOIN to other tables does not bypass RLS', async () => {
// Joining orders to products should not leak orders from other tenants
const result = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
return ctx.query(`
SELECT o.id, o.tenant_id, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
`);
});
// All joined orders must still belong to tenant A
for (const row of result.rows) {
expect(row.tenant_id).toBe(tenantA.id);
}
});
it('Subqueries respect RLS', async () => {
const result = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
return ctx.query(`
SELECT * FROM orders
WHERE id IN (SELECT id FROM orders WHERE amount > 0)
`);
});
for (const row of result.rows) {
expect(row.tenant_id).toBe(tenantA.id);
}
});
});Summary
Testing PostgreSQL Row-Level Security requires:
- SQL unit tests with pgTAP — test policies directly in the database without an application layer
- Context-based isolation tests — verify each tenant sees only their data, even when querying by primary key
- Middleware tests — confirm the application sets
app.current_tenant_idandapp.current_user_idfor every authenticated request - Bypass prevention — test COUNT, JOINs, and subqueries to confirm RLS holds across SQL constructs
- CI integration — run pgTAP tests in a Postgres service container on every PR
RLS is a second line of defense, not a replacement for application-layer authorization. Together, they create defense-in-depth: application code prevents unauthorized actions at the API level, and RLS prevents data leakage even if application code has a bug. Test both layers independently to verify both layers work.