OPA/Gatekeeper Policy Testing: Unit Tests for Admission Control Rules

OPA/Gatekeeper Policy Testing: Unit Tests for Admission Control Rules

OPA Gatekeeper extends Kubernetes admission control with custom policies written in Rego. When a pod, service, or configmap is created or updated, Gatekeeper evaluates it against your policies and either admits or rejects the request. The problem is most teams write Gatekeeper policies without any tests, discovering failures only when something gets wrongly rejected — or wrongly admitted.

This guide covers writing Gatekeeper ConstraintTemplates with proper unit tests, integration testing against a real cluster, and CI enforcement.

Note: This guide focuses on Gatekeeper's admission control context. For testing Rego policies against static files with conftest, see our Policy-as-Code Testing with OPA and conftest article.

Gatekeeper Architecture

Gatekeeper uses two Kubernetes CRDs:

  1. ConstraintTemplate: Defines the policy logic in Rego and declares the parameters the policy accepts
  2. Constraint: An instance of a template that specifies which resources to target and what parameter values to use
ConstraintTemplate (defines policy logic)
    └── Constraint (applies policy to scope)
            └── Admission webhook (enforces at resource creation)

When a resource is created, Kubernetes calls the Gatekeeper webhook, which evaluates the resource against all matching Constraints and returns allow or a list of violations.

Installing Gatekeeper

# Install via Helm
helm repo add gatekeeper https://open-policy-agent.github.io/gatekeeper/charts
helm repo update

helm install gatekeeper gatekeeper/gatekeeper \
  --namespace gatekeeper-system \
  --create-namespace \
  --<span class="hljs-built_in">set auditInterval=30

<span class="hljs-comment"># Verify
kubectl get pods -n gatekeeper-system
kubectl get constrainttemplate

Writing a ConstraintTemplate

Example: Require Resource Limits

apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
  name: k8srequiredresources
spec:
  crd:
    spec:
      names:
        kind: K8sRequiredResources
      validation:
        openAPIV3Schema:
          type: object
          properties:
            resources:
              type: array
              items:
                type: string
                enum: [cpu, memory]
  targets:
    - target: admission.k8s.gatekeeper.sh
      rego: |
        package k8srequiredresources

        violation[{"msg": msg}] {
          container := input.review.object.spec.containers[_]
          required_resource := input.parameters.resources[_]
          
          not _has_resource_limit(container, required_resource)
          
          msg := sprintf(
            "Container '%v' in '%v' is missing resource limit for '%v'",
            [container.name, input.review.object.metadata.name, required_resource]
          )
        }

        violation[{"msg": msg}] {
          container := input.review.object.spec.initContainers[_]
          required_resource := input.parameters.resources[_]
          
          not _has_resource_limit(container, required_resource)
          
          msg := sprintf(
            "Init container '%v' in '%v' is missing resource limit for '%v'",
            [container.name, input.review.object.metadata.name, required_resource]
          )
        }

        _has_resource_limit(container, resource) {
          container.resources.limits[resource]
        }

Applying the Constraint

apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sRequiredResources
metadata:
  name: require-cpu-and-memory-limits
spec:
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
    namespaces:
      - production
      - staging
  parameters:
    resources:
      - cpu
      - memory

Unit Testing Rego Policies

OPA has a built-in test framework. Policy unit tests go alongside the policy files.

Test File Structure

policies/
├── required-resources/
│   ├── constraint_template.yaml   # Kubernetes CRD
│   ├── policy.rego                # Extracted Rego logic
│   └── policy_test.rego           # Unit tests
├── required-labels/
│   ├── constraint_template.yaml
│   ├── policy.rego
│   └── policy_test.rego

Extract the Rego from the ConstraintTemplate into a standalone .rego file for testing:

# policies/required-resources/policy.rego
package k8srequiredresources

violation[{"msg": msg}] {
  container := input.review.object.spec.containers[_]
  required_resource := input.parameters.resources[_]
  not _has_resource_limit(container, required_resource)
  msg := sprintf(
    "Container '%v' in '%v' is missing resource limit for '%v'",
    [container.name, input.review.object.metadata.name, required_resource]
  )
}

_has_resource_limit(container, resource) {
  container.resources.limits[resource]
}

Writing Unit Tests

# policies/required-resources/policy_test.rego
package k8srequiredresources_test

import data.k8srequiredresources.violation

# Helper to build input for tests
make_input(containers, params) = input {
  input := {
    "parameters": params,
    "review": {
      "object": {
        "metadata": {"name": "test-pod"},
        "spec": {
          "containers": containers,
          "initContainers": []
        }
      }
    }
  }
}

# --- Passing cases (should produce no violations) ---

test_no_violation_when_all_limits_set {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {
          "limits": {"cpu": "500m", "memory": "256Mi"},
          "requests": {"cpu": "100m", "memory": "128Mi"}
        }
      }
    ],
    {"resources": ["cpu", "memory"]}
  )
  count(violation) == 0 with input as inp
}

test_no_violation_when_no_resources_required {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {}
      }
    ],
    {"resources": []}  # No requirements
  )
  count(violation) == 0 with input as inp
}

test_no_violation_when_only_cpu_required_and_set {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {
          "limits": {"cpu": "1"}
          # Memory not required, so absence is fine
        }
      }
    ],
    {"resources": ["cpu"]}
  )
  count(violation) == 0 with input as inp
}

# --- Failing cases (should produce violations) ---

test_violation_when_cpu_limit_missing {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {
          "limits": {"memory": "256Mi"}
          # cpu missing
        }
      }
    ],
    {"resources": ["cpu", "memory"]}
  )
  count(violation) == 1 with input as inp
}

test_violation_when_memory_limit_missing {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {
          "limits": {"cpu": "500m"}
          # memory missing
        }
      }
    ],
    {"resources": ["cpu", "memory"]}
  )
  count(violation) == 1 with input as inp
}

test_violation_when_no_limits_at_all {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {}
      }
    ],
    {"resources": ["cpu", "memory"]}
  )
  count(violation) == 2 with input as inp  # One per missing resource
}

test_violation_per_container {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {}
      },
      {
        "name": "sidecar",
        "image": "busybox:latest",
        "resources": {}
      }
    ],
    {"resources": ["cpu"]}
  )
  # Both containers missing cpu limit = 2 violations
  count(violation) == 2 with input as inp
}

test_violation_message_includes_container_name {
  inp := make_input(
    [
      {
        "name": "my-specific-container",
        "image": "nginx:latest",
        "resources": {}
      }
    ],
    {"resources": ["cpu"]}
  )
  violations := violation with input as inp
  some v in violations
  contains(v.msg, "my-specific-container")
}

test_violation_message_includes_resource_name {
  inp := make_input(
    [
      {
        "name": "app",
        "image": "nginx:latest",
        "resources": {"limits": {"memory": "256Mi"}}
      }
    ],
    {"resources": ["cpu"]}
  )
  violations := violation with input as inp
  some v in violations
  contains(v.msg, "cpu")
}

Run tests:

# Run all tests in directory
opa <span class="hljs-built_in">test policies/ -v

<span class="hljs-comment"># Run with coverage
opa <span class="hljs-built_in">test policies/ -v --coverage

<span class="hljs-comment"># Run specific package
opa <span class="hljs-built_in">test policies/required-resources/ -v

Expected output:

PASS: 8/8 tests passed  (1.5ms)
data.k8srequiredresources_test.test_no_violation_when_all_limits_set: PASS (210µs)
data.k8srequiredresources_test.test_no_violation_when_no_resources_required: PASS (89µs)
data.k8srequiredresources_test.test_violation_when_cpu_limit_missing: PASS (134µs)
...

More Policy Examples with Tests

Require Specific Labels

# policies/required-labels/policy.rego
package k8srequiredlabels

violation[{"msg": msg}] {
  provided := {label | input.review.object.metadata.labels[label]}
  required := {label | label := input.parameters.labels[_]}
  missing := required - provided
  count(missing) > 0
  msg := sprintf(
    "Resource '%v' is missing required labels: %v",
    [input.review.object.metadata.name, missing]
  )
}
# policies/required-labels/policy_test.rego
package k8srequiredlabels_test

import data.k8srequiredlabels.violation

make_input(labels, required_labels) = input {
  input := {
    "parameters": {"labels": required_labels},
    "review": {
      "object": {
        "metadata": {
          "name": "test-resource",
          "labels": labels
        }
      }
    }
  }
}

test_no_violation_when_all_labels_present {
  inp := make_input(
    {"app": "my-app", "env": "production", "team": "platform"},
    ["app", "env", "team"]
  )
  count(violation) == 0 with input as inp
}

test_no_violation_with_extra_labels {
  inp := make_input(
    {"app": "my-app", "env": "production", "extra": "label"},
    ["app", "env"]  # Only 2 required
  )
  count(violation) == 0 with input as inp
}

test_violation_when_label_missing {
  inp := make_input(
    {"app": "my-app"},  # Missing "env" and "team"
    ["app", "env", "team"]
  )
  count(violation) == 1 with input as inp  # One violation with all missing labels
}

test_violation_message_lists_missing_labels {
  inp := make_input(
    {"app": "my-app"},
    ["app", "env"]
  )
  violations := violation with input as inp
  some v in violations
  contains(v.msg, "env")
}

Block Latest Image Tag

# policies/no-latest-tag/policy.rego
package k8snolatesttag

violation[{"msg": msg}] {
  container := input.review.object.spec.containers[_]
  _uses_latest_tag(container.image)
  msg := sprintf(
    "Container '%v' uses ':latest' tag or no tag in image '%v'. Specify an explicit digest or version tag.",
    [container.name, container.image]
  )
}

_uses_latest_tag(image) {
  endswith(image, ":latest")
}

_uses_latest_tag(image) {
  not contains(image, ":")  # No tag at all defaults to latest
}
# policies/no-latest-tag/policy_test.rego
package k8snolatesttag_test

import data.k8snolatesttag.violation

make_input(images) = input {
  input := {
    "parameters": {},
    "review": {
      "object": {
        "metadata": {"name": "test-pod"},
        "spec": {
          "containers": [
            {"name": sprintf("c%v", [i]), "image": image}
            | image := images[i]
          ]
        }
      }
    }
  }
}

test_no_violation_with_version_tag {
  inp := make_input(["nginx:1.25.3"])
  count(violation) == 0 with input as inp
}

test_no_violation_with_sha_digest {
  inp := make_input(["nginx@sha256:abc123def456"])
  count(violation) == 0 with input as inp
}

test_violation_with_latest_tag {
  inp := make_input(["nginx:latest"])
  count(violation) == 1 with input as inp
}

test_violation_with_no_tag {
  inp := make_input(["nginx"])
  count(violation) == 1 with input as inp
}

test_violation_for_each_container {
  inp := make_input(["nginx:latest", "redis:latest"])
  count(violation) == 2 with input as inp
}

test_mixed_containers {
  inp := make_input(["nginx:1.25.3", "redis:latest"])
  count(violation) == 1 with input as inp  # Only redis violates
}

Integration Testing Against Live Cluster

Unit tests validate Rego logic in isolation. Integration tests verify the full stack: ConstraintTemplate + Constraint + Kubernetes webhook.

import subprocess
import pytest
import yaml
from kubernetes import client, config

config.load_kube_config()
v1 = client.CoreV1Api()
custom = client.CustomObjectsApi()


def apply_manifest(yaml_str):
    result = subprocess.run(
        ["kubectl", "apply", "-f", "-"],
        input=yaml_str, capture_output=True, text=True
    )
    return result.returncode == 0, result.stderr


def try_create_pod(pod_spec_yaml):
    """Try to create a pod (dry-run server), return (success, error)."""
    result = subprocess.run(
        ["kubectl", "apply", "--dry-run=server", "-f", "-"],
        input=pod_spec_yaml, capture_output=True, text=True
    )
    return result.returncode == 0, result.stdout + result.stderr


@pytest.fixture(scope="session", autouse=True)
def deploy_constraint_template():
    """Apply ConstraintTemplate and Constraint before tests run."""
    with open("policies/required-resources/constraint_template.yaml") as f:
        ct_yaml = f.read()
    apply_manifest(ct_yaml)
    
    with open("constraints/require-limits-production.yaml") as f:
        constraint_yaml = f.read()
    apply_manifest(constraint_yaml)
    
    # Wait for Gatekeeper to sync
    import time
    time.sleep(5)


VALID_POD_YAML = """
apiVersion: v1
kind: Pod
metadata:
  name: valid-pod
  namespace: production
spec:
  containers:
  - name: app
    image: nginx:1.25.3
    resources:
      limits:
        cpu: "500m"
        memory: "256Mi"
"""

INVALID_POD_YAML = """
apiVersion: v1
kind: Pod
metadata:
  name: invalid-pod
  namespace: production
spec:
  containers:
  - name: app
    image: nginx:1.25.3
    resources: {}
"""


def test_pod_with_limits_is_accepted():
    success, output = try_create_pod(VALID_POD_YAML)
    assert success, f"Valid pod with resource limits was rejected:\n{output}"


def test_pod_without_limits_is_rejected():
    success, output = try_create_pod(INVALID_POD_YAML)
    assert not success, "Pod without resource limits should be rejected"
    assert "K8sRequiredResources" in output or "required" in output.lower()


def test_violation_message_is_descriptive():
    success, output = try_create_pod(INVALID_POD_YAML)
    assert not success
    # Verify the violation message helps the developer understand what to fix
    assert "app" in output  # container name
    assert "cpu" in output.lower() or "memory" in output.lower()  # resource name


def test_pod_outside_constrained_namespace_is_accepted():
    """Constraint only targets 'production' namespace."""
    pod_yaml = INVALID_POD_YAML.replace(
        "namespace: production",
        "namespace: development"  # Not in constraint's match scope
    ).replace("name: invalid-pod", "name: dev-pod")
    
    success, output = try_create_pod(pod_yaml)
    assert success, \
        f"Pod in unconstrained namespace should be accepted:\n{output}"

Run:

pytest tests/gatekeeper/ -v

Testing Audit Mode

Gatekeeper has an audit mode that retrospectively evaluates existing resources against constraints. Test that audit catches pre-existing violations:

import time
from kubernetes import client, config

config.load_kube_config()
custom = client.CustomObjectsApi()


def get_audit_violations(constraint_kind, constraint_name):
    """Get violations reported by Gatekeeper audit."""
    constraint = custom.get_cluster_custom_object(
        group="constraints.gatekeeper.sh",
        version="v1beta1",
        plural=constraint_kind.lower() + "s",
        name=constraint_name
    )
    return constraint.get("status", {}).get("violations", [])


def test_audit_detects_existing_violations():
    """Audit should catch pods already running without resource limits."""
    # Wait for audit cycle (auditInterval=30s by default)
    time.sleep(35)
    
    violations = get_audit_violations(
        "K8sRequiredResources",
        "require-cpu-and-memory-limits"
    )
    
    # Violations should be present (cluster likely has pods without limits)
    # This test checks that audit is working, not that there are zero violations
    assert violations is not None, "Audit did not run or returned no status"
    
    # If you have known non-compliant pods, verify they appear in audit
    violation_names = [v["name"] for v in violations]
    assert "known-non-compliant-pod" in violation_names, \
        f"Expected violation not found. Violations: {violation_names}"

CI/CD Pipeline

name: Gatekeeper Policy CI

on:
  push:
    paths:
      - 'policies/**'
      - 'constraints/**'

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Install OPA
        run: |
          curl -L -o opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64_static
          chmod +x opa
          sudo mv opa /usr/local/bin/
      
      - name: Run Rego unit tests
        run: |
          opa test policies/ -v --exit-zero-on-skipped
          echo "All unit tests passed"
      
      - name: Check test coverage
        run: |
          opa test policies/ --coverage --format=json | \
            jq '.coverage | if . < 80 then error("Coverage \(.)% is below 80%") else "Coverage \(.)%: OK" end'

  validate-templates:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Validate ConstraintTemplate YAML
        run: |
          for template in policies/*/constraint_template.yaml; do
            kubectl apply --dry-run=client -f "$template"
            echo "OK: $template"
          done

  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    steps:
      - uses: actions/checkout@v4
      
      - name: Create KinD cluster
        uses: helm/kind-action@v1.8.0
      
      - name: Install Gatekeeper
        run: |
          helm repo add gatekeeper https://open-policy-agent.github.io/gatekeeper/charts
          helm install gatekeeper gatekeeper/gatekeeper \
            --namespace gatekeeper-system --create-namespace \
            --set auditInterval=10 \
            --wait
      
      - name: Deploy policies
        run: |
          kubectl apply -f policies/required-resources/constraint_template.yaml
          kubectl apply -f constraints/
          sleep 5  # Let Gatekeeper sync
      
      - name: Run integration tests
        run: pytest tests/gatekeeper/ -v

Common Mistakes in Gatekeeper Policy Testing

Testing only the happy path: Most teams test that valid resources are accepted but skip testing that invalid resources are actually rejected. Always test both.

Not testing violation messages: The violation message is what your developers read when a deployment fails. Test that it includes actionable information (what's wrong, which container, what to fix).

Forgetting initContainers: Pod policies that check spec.containers often forget spec.initContainers. Test with pods that have init containers.

Constraint scope gaps: Constraints target specific namespaces, resource kinds, or label selectors. Test that resources outside the scope are not affected.

Race conditions in integration tests: Gatekeeper has a sync delay after deploying new constraints. Always add a sleep or poll for constraint status before running integration tests.

Rego logic with missing fields: Kubernetes resources have many optional fields. Test what happens when resources, securityContext, or labels are absent — Rego's undefined behavior can surprise you.

# Fragile: panics if resources or limits is absent
container.resources.limits.cpu

# Safe: use object.get or check existence first
resource := object.get(object.get(container, "resources", {}), "limits", {})
resource.cpu  # Still undefined if cpu missing, but won't error

Summary

Testing OPA/Gatekeeper admission control requires three layers:

  1. Rego unit tests (opa test): Fast, isolated, test policy logic directly — both violation and non-violation cases, message content, and edge cases like missing fields
  2. Integration tests (live cluster with dry-run): Verify the complete Gatekeeper stack — ConstraintTemplate + Constraint + webhook — actually rejects what it should
  3. Audit verification: Confirm audit mode catches pre-existing violations in running clusters

Aim for 80%+ Rego coverage enforced in CI. Every new policy should ship with its tests. Policies without tests are policies you can't change safely — you won't know if a refactor broke the logic until something slips through in production.

Read more