Testing Tenant Data Isolation: PostgreSQL RLS, Schema-Per-Tenant, and Row Filters
Multi-tenant data isolation can be implemented in several ways: shared tables with a tenant_id column, separate schemas per tenant, or separate databases per tenant. Each requires different testing approaches. This post focuses on testing shared-table and RLS-based isolation — the most common pattern in modern SaaS.
Isolation Strategies
| Strategy | Approach | Complexity | Test Approach |
|---|---|---|---|
Shared table + tenant_id |
App-level filtering | Low | Test query filters |
| PostgreSQL RLS | DB-level enforcement | Medium | Test with/without policy |
| Schema per tenant | Separate namespaces | High | Test schema switching |
| Database per tenant | Full isolation | Very High | Integration tests per DB |
Testing Shared Tables with tenant_id
The simplest and riskiest approach — missing a WHERE tenant_id = ? clause exposes all data.
Unit Testing Query Builders
// repositories/project-repository.js
export class ProjectRepository {
constructor(db) { this.db = db }
async findAll(tenantId) {
return this.db('projects').where({ tenant_id: tenantId }).select('*')
}
async findById(tenantId, projectId) {
const project = await this.db('projects')
.where({ tenant_id: tenantId, id: projectId })
.first()
return project || null
}
}// repositories/project-repository.test.js
import knex from 'knex'
const db = knex({ client: 'sqlite3', connection: ':memory:' })
beforeAll(async () => {
await db.schema.createTable('projects', table => {
table.string('id').primary()
table.string('tenant_id').notNullable()
table.string('name').notNullable()
})
await db('projects').insert([
{ id: 'p1', tenant_id: 'tenant-a', name: 'Alpha' },
{ id: 'p2', tenant_id: 'tenant-b', name: 'Beta' },
])
})
test('findAll returns only tenant A projects', async () => {
const repo = new ProjectRepository(db)
const projects = await repo.findAll('tenant-a')
expect(projects).toHaveLength(1)
expect(projects[0].name).toBe('Alpha')
expect(projects[0].tenant_id).toBe('tenant-a')
})
test('findById returns null for wrong tenant', async () => {
const repo = new ProjectRepository(db)
const project = await repo.findById('tenant-a', 'p2') // p2 belongs to tenant-b
expect(project).toBeNull()
})
test('findById returns project for correct tenant', async () => {
const repo = new ProjectRepository(db)
const project = await repo.findById('tenant-a', 'p1')
expect(project.name).toBe('Alpha')
})Testing PostgreSQL Row Level Security
RLS policies enforce isolation at the database level. Test that they work correctly.
Setup RLS Policy
-- Enable RLS on the projects table
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE projects FORCE ROW LEVEL SECURITY;
-- Policy: users only see rows matching their tenant
CREATE POLICY tenant_isolation ON projects
USING (tenant_id = current_setting('app.tenant_id'));Test RLS in Node.js
import pg from 'pg'
async function withTenant(tenantId, fn) {
const client = await pool.connect()
try {
await client.query('SET app.tenant_id = $1', [tenantId])
return await fn(client)
} finally {
client.release()
}
}
test('RLS allows tenant A to see only their projects', async () => {
const projects = await withTenant('tenant-a', async (client) => {
const result = await client.query('SELECT * FROM projects')
return result.rows
})
expect(projects.every(p => p.tenant_id === 'tenant-a')).toBe(true)
expect(projects.some(p => p.tenant_id === 'tenant-b')).toBe(false)
})
test('RLS prevents tenant A from reading tenant B data via raw query', async () => {
const projects = await withTenant('tenant-a', async (client) => {
// Even a direct query without WHERE clause is filtered by RLS
const result = await client.query("SELECT * FROM projects WHERE id = 'proj-b1'")
return result.rows
})
expect(projects).toHaveLength(0)
})
test('superuser bypass RLS to verify total count', async () => {
// Test setup: use superuser to check total data
const superClient = await superuserPool.connect()
const result = await superClient.query('SET row_security = off; SELECT COUNT(*) FROM projects')
expect(parseInt(result.rows[0].count)).toBe(2) // Both tenants' data exists
superClient.release()
})Test RLS in Python (psycopg2)
import psycopg2
import pytest
def with_tenant(conn, tenant_id):
with conn.cursor() as cur:
cur.execute("SET app.tenant_id = %s", (tenant_id,))
def test_rls_filters_by_tenant(db_connection):
with_tenant(db_connection, 'tenant-a')
with db_connection.cursor() as cur:
cur.execute("SELECT * FROM projects")
rows = cur.fetchall()
tenant_ids = {row['tenant_id'] for row in rows}
assert tenant_ids == {'tenant-a'}
assert 'tenant-b' not in tenant_idsTesting Context Propagation
The app must set app.tenant_id correctly on every connection before querying:
// middleware/tenant-context.js
export function tenantContextMiddleware(req, res, next) {
const tenantId = req.user?.tenantId
if (!tenantId) return res.status(401).json({ error: 'No tenant context' })
// Store for use in DB queries
req.tenantId = tenantId
next()
}
// Test: middleware correctly sets tenant context
test('middleware sets tenant context from JWT', async () => {
const req = {
user: { tenantId: 'tenant-a', id: 'user-1' }
}
const res = { status: vi.fn().mockReturnThis(), json: vi.fn() }
const next = vi.fn()
tenantContextMiddleware(req, res, next)
expect(req.tenantId).toBe('tenant-a')
expect(next).toHaveBeenCalled()
})
test('middleware blocks requests without tenant context', async () => {
const req = { user: null }
const res = { status: vi.fn().mockReturnThis(), json: vi.fn() }
const next = vi.fn()
tenantContextMiddleware(req, res, next)
expect(res.status).toHaveBeenCalledWith(401)
expect(next).not.toHaveBeenCalled()
})Schema-Per-Tenant Testing
// db/tenant-connection.js
export function getTenantConnection(tenantId) {
return knex({
client: 'pg',
connection: {
...baseConfig,
searchPath: `tenant_${tenantId}, public`
}
})
}
// Test schema isolation
test('each tenant uses their own schema', async () => {
const dbA = getTenantConnection('tenant-a')
const dbB = getTenantConnection('tenant-b')
await dbA('projects').insert({ name: 'A Project' })
await dbB('projects').insert({ name: 'B Project' })
const projectsA = await dbA('projects').select('*')
const projectsB = await dbB('projects').select('*')
expect(projectsA).toHaveLength(1)
expect(projectsA[0].name).toBe('A Project')
expect(projectsB).toHaveLength(1)
expect(projectsB[0].name).toBe('B Project')
await dbA.destroy()
await dbB.destroy()
})Detecting Missing Tenant Filters
Add a query logger in test mode to detect queries without tenant filters:
// test-helpers/query-audit.js
export function auditTenantFilters(knexInstance, tables = ['projects', 'users', 'orders']) {
const violations = []
knexInstance.on('query', (query) => {
const sql = query.sql.toLowerCase()
for (const table of tables) {
if (sql.includes(`from ${table}`) || sql.includes(`update ${table}`)) {
if (!sql.includes('tenant_id')) {
violations.push({ sql: query.sql, bindings: query.bindings })
}
}
}
})
return {
getViolations: () => violations,
assertNoViolations: () => {
if (violations.length > 0) {
throw new Error(
`${violations.length} queries missing tenant filter:\n` +
violations.map(v => v.sql).join('\n')
)
}
}
}
}test('no tenant-unscoped queries during request processing', async () => {
const audit = auditTenantFilters(db)
await request(app)
.get('/api/projects')
.set('Authorization', `Bearer ${tokenForUser(userA)}`)
audit.assertNoViolations()
})Summary
Data isolation testing requires proving that the database layer enforces boundaries, not just the application layer. Test your repository methods with wrong-tenant IDs. Test RLS policies directly with SET app.tenant_id. Add a query auditor in test mode to catch queries missing tenant filters. Schema-per-tenant strategies need tests that verify the correct schema path is used per request. The goal is a test suite where any missing tenant filter immediately fails a test.