Row-Level Security Testing in PostgreSQL for Multi-Tenant SaaS

Row-Level Security Testing in PostgreSQL for Multi-Tenant SaaS

Row-Level Security (RLS) is PostgreSQL's mechanism for enforcing data access policies at the database layer. For multi-tenant SaaS applications, RLS is a powerful defense-in-depth tool — even if application-layer authorization has a bug, RLS prevents cross-tenant data leakage at the database level.

But RLS policies themselves can have bugs. This guide covers testing PostgreSQL RLS policies: policy unit tests in SQL, integration tests via your application layer, and tenant isolation verification.

Understanding RLS Policies

A typical RLS policy for a multi-tenant SaaS:

-- Enable RLS on the orders table
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders FORCE ROW LEVEL SECURITY;

-- Policy: users can only see orders from their own tenant
CREATE POLICY tenant_isolation_orders ON orders
  USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Policy: users can only see their own orders within the tenant
CREATE POLICY user_own_orders ON orders
  USING (
    tenant_id = current_setting('app.current_tenant_id')::uuid
    AND user_id = current_setting('app.current_user_id')::uuid
  );

Unit Testing RLS Policies in SQL

Test policies directly in SQL — no application layer needed:

-- tests/rls/test_tenant_isolation.sql
-- Run with pgTAP: SELECT * FROM runtests();

BEGIN;

SELECT plan(8);

-- Set up test data
CREATE TEMPORARY TABLE test_tenants AS 
  SELECT gen_random_uuid() as id, 'Tenant A' as name
  UNION SELECT gen_random_uuid(), 'Tenant B';

INSERT INTO tenants (id, name) 
  SELECT id, name FROM test_tenants;

-- Create test users and orders for each tenant
DO $$
DECLARE
  tenant_a_id uuid;
  tenant_b_id uuid;
  user_a_id uuid := gen_random_uuid();
  user_b_id uuid := gen_random_uuid();
BEGIN
  SELECT id INTO tenant_a_id FROM test_tenants WHERE name = 'Tenant A';
  SELECT id INTO tenant_b_id FROM test_tenants WHERE name = 'Tenant B';
  
  INSERT INTO orders (id, tenant_id, user_id, amount, status)
  VALUES 
    (gen_random_uuid(), tenant_a_id, user_a_id, 100, 'paid'),
    (gen_random_uuid(), tenant_a_id, user_a_id, 200, 'pending'),
    (gen_random_uuid(), tenant_b_id, user_b_id, 300, 'paid');
  
  -- Set app settings for Tenant A
  PERFORM set_config('app.current_tenant_id', tenant_a_id::text, true);
  PERFORM set_config('app.current_user_id', user_a_id::text, true);
END $$;

-- Test 1: Tenant A can see their own orders
SELECT is(
  (SELECT COUNT(*) FROM orders)::int,
  2,
  'Tenant A sees exactly 2 orders (their own)'
);

-- Test 2: Tenant A cannot see Tenant B orders
SELECT is(
  (SELECT COUNT(*) FROM orders WHERE tenant_id = (
    SELECT id FROM test_tenants WHERE name = 'Tenant B'
  ))::int,
  0,
  'Tenant A cannot see Tenant B orders even with explicit filter'
);

-- Switch to Tenant B context
DO $$
DECLARE
  tenant_b_id uuid;
  user_b_id uuid;
BEGIN
  SELECT id INTO tenant_b_id FROM test_tenants WHERE name = 'Tenant B';
  SELECT user_id INTO user_b_id FROM orders WHERE tenant_id = tenant_b_id LIMIT 1;
  PERFORM set_config('app.current_tenant_id', tenant_b_id::text, true);
  PERFORM set_config('app.current_user_id', user_b_id::text, true);
END $$;

-- Test 3: Tenant B sees only their order
SELECT is(
  (SELECT COUNT(*) FROM orders)::int,
  1,
  'Tenant B sees exactly 1 order (their own)'
);

-- Test 4: Tenant B cannot read Tenant A data by guessing IDs
SELECT is(
  (SELECT COUNT(*) FROM orders WHERE tenant_id = (
    SELECT id FROM test_tenants WHERE name = 'Tenant A'
  ))::int,
  0,
  'Tenant B cannot read Tenant A orders by specifying tenant_id'
);

-- Test 5: INSERT without tenant_id fails (if set via trigger or default)
SELECT throws_ok(
  $$ INSERT INTO orders (id, amount, status) VALUES (gen_random_uuid(), 50, 'pending') $$,
  'P0001',
  'Orders require tenant_id'
);

-- Test 6: Superuser bypasses RLS (FORCE ROW LEVEL SECURITY prevents this for table owner)
-- Test that the table owner is NOT exempt
-- (FORCE ROW LEVEL SECURITY means even table owner must follow policies)

SELECT finish();
ROLLBACK;

Integration Testing via Application Layer

Test RLS through your actual application code:

// tests/multitenancy/rls-integration.test.ts
import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest';
import { Pool } from 'pg';
import { createTenantContext, TenantContext } from '~/db/tenantContext';

const pool = new Pool({ connectionString: process.env.TEST_DATABASE_URL });

async function withTenantContext<T>(
  tenantId: string,
  userId: string,
  fn: (ctx: TenantContext) => Promise<T>
): Promise<T> {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');
    await client.query(`SET LOCAL app.current_tenant_id = $1`, [tenantId]);
    await client.query(`SET LOCAL app.current_user_id = $1`, [userId]);
    const ctx = createTenantContext(client);
    const result = await fn(ctx);
    await client.query('ROLLBACK'); // Test isolation — always rollback
    return result;
  } finally {
    client.release();
  }
}

describe('RLS tenant isolation', () => {
  let tenantA: { id: string; userId: string };
  let tenantB: { id: string; userId: string };

  beforeAll(async () => {
    // Create test fixtures (these exist in test DB — seeded before tests)
    tenantA = { id: 'aaaaaaaa-0000-0000-0000-000000000001', userId: 'user-a-001' };
    tenantB = { id: 'bbbbbbbb-0000-0000-0000-000000000001', userId: 'user-b-001' };
  });

  afterAll(() => pool.end());

  it('tenant A query returns only tenant A records', async () => {
    const orders = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
      return ctx.query('SELECT * FROM orders');
    });

    // Every returned order must belong to tenant A
    for (const order of orders.rows) {
      expect(order.tenant_id).toBe(tenantA.id);
    }
  });

  it('tenant A cannot read tenant B record by primary key', async () => {
    // Get a real tenant B order ID
    const tenantBOrderId = await withTenantContext(tenantB.id, tenantB.userId, async (ctx) => {
      const result = await ctx.query('SELECT id FROM orders LIMIT 1');
      return result.rows[0]?.id;
    });

    if (!tenantBOrderId) {
      console.log('No tenant B orders to test — skipping');
      return;
    }

    // Try to read it as tenant A
    const result = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
      return ctx.query('SELECT * FROM orders WHERE id = $1', [tenantBOrderId]);
    });

    // RLS must hide the row
    expect(result.rows).toHaveLength(0);
  });

  it('tenant A count does not include tenant B records', async () => {
    const countA = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
      const result = await ctx.query('SELECT COUNT(*) FROM orders');
      return parseInt(result.rows[0].count);
    });

    const countB = await withTenantContext(tenantB.id, tenantB.userId, async (ctx) => {
      const result = await ctx.query('SELECT COUNT(*) FROM orders');
      return parseInt(result.rows[0].count);
    });

    const countAll = await pool.query('SELECT COUNT(*) FROM orders');
    const totalCount = parseInt(countAll.rows[0].count);

    // Both tenant counts should be less than total, and they shouldn't "double count"
    expect(countA).toBeLessThan(totalCount);
    expect(countB).toBeLessThan(totalCount);
    expect(countA + countB).toBeLessThanOrEqual(totalCount);
  });

  it('INSERT without tenant context fails', async () => {
    // Attempt to insert without setting tenant context
    const client = await pool.connect();
    try {
      await expect(
        client.query(
          'INSERT INTO orders (id, amount, status) VALUES ($1, $2, $3)',
          ['00000000-0000-0000-0000-000000000099', 100, 'pending']
        )
      ).rejects.toThrow();
    } finally {
      client.release();
    }
  });
});

Testing the Application's RLS Context Setup

Your application must set the tenant context for every request. Test that it does so correctly:

// src/middleware/tenantContext.ts
import { Request, Response, NextFunction } from 'express';
import { pool } from '../db/pool';

export async function tenantContextMiddleware(req: Request, res: Response, next: NextFunction) {
  if (!req.user) {
    return next();
  }

  // Acquire a client and set tenant context for this request
  const client = await pool.connect();
  
  try {
    await client.query('BEGIN');
    await client.query(
      'SELECT set_config($1, $2, true), set_config($3, $4, true)',
      [
        'app.current_tenant_id', req.user.tenantId,
        'app.current_user_id', req.user.id,
      ]
    );
    
    req.dbClient = client;
    
    res.on('finish', () => {
      client.query('COMMIT').finally(() => client.release());
    });
    
    next();
  } catch (err) {
    client.release();
    next(err);
  }
}
// tests/middleware/tenantContext.test.ts
import { describe, it, expect, vi } from 'vitest';
import request from 'supertest';
import app from '~/app';

describe('Tenant context middleware', () => {
  it('sets tenant context for authenticated requests', async () => {
    const queriesCalled: string[] = [];
    
    // Intercept DB queries to verify set_config is called
    vi.spyOn(pool, 'connect').mockImplementation(async () => ({
      query: async (sql: string, params?: any[]) => {
        queriesCalled.push(sql);
        return { rows: [], rowCount: 0 };
      },
      release: vi.fn(),
    } as any));

    await request(app)
      .get('/api/orders')
      .set('Authorization', 'Bearer valid-user-token');

    const setConfigQuery = queriesCalled.find((q) => q.includes('set_config'));
    expect(setConfigQuery).toBeDefined();
    expect(setConfigQuery).toContain('app.current_tenant_id');
    expect(setConfigQuery).toContain('app.current_user_id');
  });

  it('does not set tenant context for unauthenticated requests', async () => {
    const queriesCalled: string[] = [];

    vi.spyOn(pool, 'connect').mockImplementation(async () => ({
      query: async (sql: string) => {
        queriesCalled.push(sql);
        return { rows: [], rowCount: 0 };
      },
      release: vi.fn(),
    } as any));

    await request(app).get('/api/public-endpoint');

    const setConfigQuery = queriesCalled.find((q) => q.includes('app.current_tenant_id'));
    expect(setConfigQuery).toBeUndefined();
  });
});

Testing RLS Policy Coverage with pgTAP

Run policy tests in CI using pgTAP:

# .github/workflows/rls-tests.yml
name: RLS Policy Tests
on: [push, pull_request]

jobs:
  rls-tests:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_PASSWORD: test
          POSTGRES_DB: testdb
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4

      - name: Install pgTAP
        run: |
          sudo apt-get install -y postgresql-client
          psql -h localhost -U postgres -d testdb -c "CREATE EXTENSION IF NOT EXISTS pgtap"
        env:
          PGPASSWORD: test

      - name: Run migrations
        run: |
          psql -h localhost -U postgres -d testdb -f db/schema.sql
          psql -h localhost -U postgres -d testdb -f db/rls-policies.sql
        env:
          PGPASSWORD: test

      - name: Seed test data
        run: psql -h localhost -U postgres -d testdb -f db/test-seed.sql
        env:
          PGPASSWORD: test

      - name: Run pgTAP RLS tests
        run: |
          psql -h localhost -U postgres -d testdb -f tests/rls/test_tenant_isolation.sql
          psql -h localhost -U postgres -d testdb -f tests/rls/test_user_isolation.sql
          psql -h localhost -U postgres -d testdb -f tests/rls/test_admin_override.sql
        env:
          PGPASSWORD: test

Verifying No Bypass via Database Functions

Some SQL functions and views can bypass RLS — test that yours don't:

// tests/rls/bypass-prevention.test.ts
describe('RLS bypass prevention', () => {
  it('COUNT(*) respects RLS — no total count leakage', async () => {
    const tenantACount = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
      const result = await ctx.query('SELECT COUNT(*) FROM orders');
      return parseInt(result.rows[0].count);
    });

    // Tenant A should not be able to determine total row count across all tenants
    const totalCount = await pool.query('SELECT COUNT(*) FROM orders');
    const actualTotal = parseInt(totalCount.rows[0].count);

    // If there are multiple tenants with data, tenant count should be less than total
    if (actualTotal > tenantACount) {
      expect(tenantACount).toBeLessThan(actualTotal);
    }
  });

  it('JOIN to other tables does not bypass RLS', async () => {
    // Joining orders to products should not leak orders from other tenants
    const result = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
      return ctx.query(`
        SELECT o.id, o.tenant_id, p.name 
        FROM orders o
        JOIN products p ON o.product_id = p.id
      `);
    });

    // All joined orders must still belong to tenant A
    for (const row of result.rows) {
      expect(row.tenant_id).toBe(tenantA.id);
    }
  });

  it('Subqueries respect RLS', async () => {
    const result = await withTenantContext(tenantA.id, tenantA.userId, async (ctx) => {
      return ctx.query(`
        SELECT * FROM orders 
        WHERE id IN (SELECT id FROM orders WHERE amount > 0)
      `);
    });

    for (const row of result.rows) {
      expect(row.tenant_id).toBe(tenantA.id);
    }
  });
});

Summary

Testing PostgreSQL Row-Level Security requires:

  • SQL unit tests with pgTAP — test policies directly in the database without an application layer
  • Context-based isolation tests — verify each tenant sees only their data, even when querying by primary key
  • Middleware tests — confirm the application sets app.current_tenant_id and app.current_user_id for every authenticated request
  • Bypass prevention — test COUNT, JOINs, and subqueries to confirm RLS holds across SQL constructs
  • CI integration — run pgTAP tests in a Postgres service container on every PR

RLS is a second line of defense, not a replacement for application-layer authorization. Together, they create defense-in-depth: application code prevents unauthorized actions at the API level, and RLS prevents data leakage even if application code has a bug. Test both layers independently to verify both layers work.

Read more