Testing Atlantis Terraform PR Automation: Workflows, Plan Verification, and Policy Enforcement

Testing Atlantis Terraform PR Automation: Workflows, Plan Verification, and Policy Enforcement

Atlantis automates Terraform plan and apply through pull requests. But Atlantis itself needs testing: workflow configuration, plan output validation, policy enforcement, and server health checks. This guide covers testing Atlantis workflows locally with atlantis-local, validating plan outputs with custom scripts, enforcing Terraform policies with OPA and Conftest, and monitoring Atlantis server health in production.

Key Takeaways

Test Atlantis workflows before pushing. The atlantis.yaml workflow configuration is easy to misconfigure. Test it locally with the Atlantis CLI before a failed PR workflow blocks your team.

Validate plan output in CI before applying. Parse terraform plan -out=tfplan && terraform show -json tfplan to detect risky changes (resource deletions, security group changes, IAM modifications) before they're applied.

Use OPA/Conftest for policy-as-code. Enforce Terraform policies — no public S3 buckets, required tags, approved instance types — with Rego policies that run in Atlantis pre-plan hooks.

Monitor Atlantis server health continuously. Atlantis processes webhooks from GitHub; if it's down, all Terraform PRs stall. Health checks should alert within minutes of downtime.

Test Atlantis webhook delivery. GitHub webhook delivery failures are silent. Set up tests that verify webhooks reach Atlantis and trigger the expected plan commands.

What to Test in Atlantis

Atlantis has multiple testable layers:

  1. Workflow configuration (atlantis.yaml) — correct project detection, workflow steps, apply requirements
  2. Plan output validation — detect risky changes before they're applied
  3. Policy enforcement — OPA/Conftest policies that run as pre-plan or pre-apply checks
  4. Server configuration — repos allowlist, apply requirements, authentication
  5. Server health — webhook delivery, plan execution, apply execution

Layer 1: Testing atlantis.yaml Configuration

Local Configuration Validation

# Validate atlantis.yaml syntax
atlantis validate-config

<span class="hljs-comment"># Test project detection for a specific directory
atlantis project-detect --head-commit abc123 --base-commit def456

<span class="hljs-comment"># Run a workflow locally (requires local Atlantis binary)
atlantis plan --<span class="hljs-built_in">dir modules/networking --workspace staging

atlantis.yaml Structure

# atlantis.yaml
version: 3

automerge: false
parallel_plan: true
parallel_apply: false

projects:
  - name: networking-prod
    dir: environments/prod/networking
    workspace: default
    workflow: standard
    autoplan:
      when_modified:
        - "*.tf"
        - "*.tfvars"
        - "../../../modules/networking/**/*.tf"
      enabled: true
    apply_requirements:
      - approved
      - mergeable

  - name: networking-staging
    dir: environments/staging/networking
    workspace: default
    workflow: standard
    autoplan:
      when_modified:
        - "*.tf"
        - "*.tfvars"
        - "../../../modules/networking/**/*.tf"
      enabled: true
    apply_requirements:
      - approved

workflows:
  standard:
    plan:
      steps:
        - init:
            extra_args: ["-upgrade"]
        - run: conftest test $PLANFILE --policy policies/ --input terraform
        - plan:
            extra_args: ["-var-file", "terraform.tfvars"]
    apply:
      steps:
        - apply
        - run: echo "Apply complete for $PROJECT_NAME in $WORKSPACE"

Testing Project Detection

# tests/test_atlantis_config.py
import yaml
import os
import glob
import pytest
from pathlib import Path

@pytest.fixture(scope="module")
def atlantis_config():
    with open("atlantis.yaml") as f:
        return yaml.safe_load(f)

def test_all_terraform_dirs_have_project(atlantis_config):
    """Every environment directory with .tf files should have an Atlantis project."""
    tf_dirs = set()
    for pattern in ["environments/**/*.tf", "stacks/**/*.tf"]:
        for tf_file in glob.glob(pattern, recursive=True):
            tf_dirs.add(os.path.dirname(tf_file))
    
    configured_dirs = {
        p["dir"] for p in atlantis_config.get("projects", [])
    }
    
    unconfigured = tf_dirs - configured_dirs
    assert not unconfigured, (
        f"Terraform directories not configured in atlantis.yaml:\n"
        + "\n".join(f"  - {d}" for d in sorted(unconfigured))
    )

def test_prod_projects_require_approval(atlantis_config):
    """All prod projects must require approval before apply."""
    for project in atlantis_config.get("projects", []):
        if "prod" in project.get("name", "") or "prod" in project.get("dir", ""):
            apply_requirements = project.get("apply_requirements", [])
            assert "approved" in apply_requirements, (
                f"Project {project['name']} is prod but doesn't require approval. "
                f"Current requirements: {apply_requirements}"
            )

def test_no_autoplan_on_root_modules(atlantis_config):
    """Root modules shouldn't be autoplanned — only environments should."""
    for project in atlantis_config.get("projects", []):
        dir_path = project.get("dir", "")
        assert not dir_path.startswith("modules/"), (
            f"Project {project['name']} points to modules/ directory. "
            "Root modules should not be directly plannable — use environment references."
        )

def test_workflow_exists_for_each_project(atlantis_config):
    """Each project must reference a workflow that exists."""
    defined_workflows = set(atlantis_config.get("workflows", {}).keys())
    defined_workflows.add("default")  # Atlantis built-in
    
    for project in atlantis_config.get("projects", []):
        workflow = project.get("workflow", "default")
        assert workflow in defined_workflows, (
            f"Project {project['name']} references undefined workflow: {workflow}"
        )

def test_autoplan_patterns_exist(atlantis_config):
    """when_modified patterns must reference paths that actually exist."""
    for project in atlantis_config.get("projects", []):
        autoplan = project.get("autoplan", {})
        patterns = autoplan.get("when_modified", [])
        dir_path = project.get("dir", ".")
        
        for pattern in patterns:
            # Resolve relative pattern from project dir
            if not pattern.startswith("/"):
                full_pattern = str(Path(dir_path) / pattern)
            else:
                full_pattern = pattern
            
            # Check if at least one file matches this pattern
            matches = glob.glob(full_pattern, recursive=True)
            if not matches:
                pytest.warns(
                    UserWarning,
                    match=f"Pattern {pattern} in project {project['name']} matches no files"
                )

Layer 2: Plan Output Validation

Validate Terraform plans before they're applied:

# scripts/validate_plan.py
"""
Called from Atlantis workflow as a pre-apply script.
Parses terraform plan JSON and blocks apply for risky changes.
"""

import json
import sys
import subprocess
from typing import Any

def get_plan_json(plan_file: str) -> dict:
    result = subprocess.run(
        ["terraform", "show", "-json", plan_file],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        print(f"Error reading plan: {result.stderr}", file=sys.stderr)
        sys.exit(1)
    return json.loads(result.stdout)

def check_deletions(plan: dict) -> list[str]:
    """Flag resource deletions that require human review."""
    risky_deletions = []
    
    resource_changes = plan.get("resource_changes", [])
    for change in resource_changes:
        actions = change.get("change", {}).get("actions", [])
        
        if "delete" in actions:
            resource_type = change.get("type", "")
            resource_name = change.get("address", "")
            
            # Always flag data store deletions
            risky_types = [
                "aws_rds_instance", "aws_dynamodb_table",
                "aws_s3_bucket", "aws_elasticache_cluster",
                "aws_db_instance", "google_sql_database_instance"
            ]
            
            if any(t in resource_type for t in risky_types):
                risky_deletions.append(
                    f"DATA STORE DELETION: {resource_name} ({resource_type})"
                )
            elif "replace" in actions:
                risky_deletions.append(
                    f"DESTRUCTIVE REPLACEMENT: {resource_name} ({resource_type}) "
                    "will be destroyed and recreated"
                )
    
    return risky_deletions

def check_security_changes(plan: dict) -> list[str]:
    """Flag security-sensitive changes."""
    warnings = []
    
    for change in plan.get("resource_changes", []):
        resource_type = change.get("type", "")
        
        # Security group changes
        if "security_group" in resource_type:
            after = change.get("change", {}).get("after", {})
            
            for rule in after.get("ingress", []) + after.get("ingress_rules", []):
                cidr_blocks = rule.get("cidr_blocks", [])
                if "0.0.0.0/0" in cidr_blocks:
                    port_range = f"{rule.get('from_port', '?')}-{rule.get('to_port', '?')}"
                    warnings.append(
                        f"OPEN SECURITY GROUP: {change['address']} allows 0.0.0.0/0 "
                        f"on ports {port_range}"
                    )
        
        # IAM changes
        if "iam_" in resource_type and "policy" in resource_type:
            after = change.get("change", {}).get("after", {})
            policy_str = after.get("policy") or after.get("policy_document", "")
            
            if isinstance(policy_str, str):
                try:
                    policy = json.loads(policy_str)
                    for statement in policy.get("Statement", []):
                        if statement.get("Effect") == "Allow":
                            actions = statement.get("Action", [])
                            resources = statement.get("Resource", [])
                            
                            if "*" in actions or (isinstance(actions, str) and actions == "*"):
                                warnings.append(
                                    f"WILDCARD IAM ACTION: {change['address']} has Action=*"
                                )
                            
                            if "*" in resources or (isinstance(resources, str) and resources == "*"):
                                warnings.append(
                                    f"WILDCARD IAM RESOURCE: {change['address']} has Resource=*"
                                )
                except json.JSONDecodeError:
                    pass
    
    return warnings

def check_scale_changes(plan: dict, threshold: int = 10) -> list[str]:
    """Flag plans that change more resources than expected."""
    warnings = []
    
    resource_changes = plan.get("resource_changes", [])
    change_count = sum(
        1 for c in resource_changes
        if c.get("change", {}).get("actions", []) != ["no-op"]
    )
    
    if change_count > threshold:
        warnings.append(
            f"LARGE CHANGE: This plan modifies {change_count} resources "
            f"(threshold: {threshold}). Review carefully."
        )
    
    return warnings

def main():
    plan_file = sys.argv[1] if len(sys.argv) > 1 else "tfplan"
    
    plan = get_plan_json(plan_file)
    
    errors = []
    warnings = []
    
    # Hard blocks (fail the apply)
    errors.extend(check_deletions(plan))
    
    # Warnings (human must explicitly approve)
    warnings.extend(check_security_changes(plan))
    warnings.extend(check_scale_changes(plan, threshold=20))
    
    if warnings:
        print("\n⚠️  PLAN WARNINGS (review required):")
        for w in warnings:
            print(f"  WARNING: {w}")
    
    if errors:
        print("\n❌ PLAN BLOCKED:")
        for e in errors:
            print(f"  ERROR: {e}")
        print(
            "\nThis plan has been blocked by automated policy. "
            "Add 'atlantis plan-override' to your PR comment to override after manual review."
        )
        sys.exit(1)
    
    print(f"\n✅ Plan validation passed ({len(warnings)} warnings, 0 errors)")

if __name__ == "__main__":
    main()

Layer 3: Policy Enforcement with OPA/Conftest

# policies/terraform.rego
package terraform

import rego.v1

# Deny public S3 buckets
deny contains msg if {
    resource := input.resource_changes[_]
    resource.type == "aws_s3_bucket_acl"
    resource.change.after.acl == "public-read"
    msg := sprintf("S3 bucket ACL cannot be public-read: %v", [resource.address])
}

deny contains msg if {
    resource := input.resource_changes[_]
    resource.type == "aws_s3_bucket"
    resource.change.after.acl == "public-read-write"
    msg := sprintf("S3 bucket cannot have public-read-write ACL: %v", [resource.address])
}

# Require approved instance types
approved_instance_types := {
    "t3.micro", "t3.small", "t3.medium", "t3.large",
    "t3a.micro", "t3a.small", "t3a.medium",
    "m6i.large", "m6i.xlarge", "m6i.2xlarge",
    "c6i.large", "c6i.xlarge"
}

deny contains msg if {
    resource := input.resource_changes[_]
    resource.type == "aws_instance"
    instance_type := resource.change.after.instance_type
    not instance_type in approved_instance_types
    msg := sprintf(
        "Instance type %v is not in the approved list. Approved: %v",
        [instance_type, approved_instance_types]
    )
}

# Require required tags on all taggable resources
required_tags := {"Environment", "Team", "CostCenter"}

taggable_resources := {
    "aws_instance", "aws_s3_bucket", "aws_rds_instance",
    "aws_eks_cluster", "aws_lambda_function"
}

deny contains msg if {
    resource := input.resource_changes[_]
    resource.type in taggable_resources
    resource.change.actions[_] in {"create", "update"}
    
    existing_tags := object.keys(resource.change.after.tags)
    missing := required_tags - existing_tags
    count(missing) > 0
    
    msg := sprintf(
        "%v (%v) is missing required tags: %v",
        [resource.address, resource.type, missing]
    )
}
# Test Conftest policies against a plan
terraform plan -out=tfplan
terraform show -json tfplan > plan.json

<span class="hljs-comment"># Test against policies
conftest <span class="hljs-built_in">test plan.json --policy policies/ --input terraform

<span class="hljs-comment"># Run policy tests (Conftest has built-in test support)
conftest verify --policy policies/
# policies/terraform_test.rego
package terraform_test

import rego.v1

# Test that public S3 ACL is denied
test_deny_public_s3_acl if {
    deny["S3 bucket ACL cannot be public-read: aws_s3_bucket_acl.example"] with input as {
        "resource_changes": [{
            "address": "aws_s3_bucket_acl.example",
            "type": "aws_s3_bucket_acl",
            "change": {
                "actions": ["create"],
                "after": {"acl": "public-read"}
            }
        }]
    }
}

# Test that private S3 ACL is allowed
test_allow_private_s3_acl if {
    count(deny) == 0 with input as {
        "resource_changes": [{
            "address": "aws_s3_bucket_acl.example",
            "type": "aws_s3_bucket_acl",
            "change": {
                "actions": ["create"],
                "after": {"acl": "private"}
            }
        }]
    }
}

# Test required tags enforcement
test_deny_missing_tags if {
    some msg in deny
    contains(msg, "missing required tags") with input as {
        "resource_changes": [{
            "address": "aws_instance.example",
            "type": "aws_instance",
            "change": {
                "actions": ["create"],
                "after": {
                    "instance_type": "t3.micro",
                    "tags": {"Name": "test"}  # Missing Team, CostCenter, Environment
                }
            }
        }]
    }
}

Layer 4: Atlantis Server Health Testing

# tests/test_atlantis_server.py
import pytest
import requests
import os

ATLANTIS_URL = os.environ.get("ATLANTIS_URL", "https://atlantis.internal.example.com")

class TestAtlantisHealth:
    
    def test_health_endpoint(self):
        """Atlantis /healthz must return 200."""
        response = requests.get(f"{ATLANTIS_URL}/healthz", timeout=10)
        assert response.status_code == 200, f"Atlantis health check failed: {response.text}"
    
    def test_status_endpoint(self):
        """Atlantis /status must return project count and running operations."""
        response = requests.get(f"{ATLANTIS_URL}/status", timeout=10)
        assert response.status_code == 200
        
        data = response.json()
        assert "numProjects" in data
        assert "numOperations" in data
        
        # Alert if there are stuck operations
        if data.get("numOperations", 0) > 10:
            pytest.warns(
                UserWarning,
                match=f"Atlantis has {data['numOperations']} running operations — possible stuck plans"
            )
    
    def test_webhook_deliverable(self):
        """
        Test that Atlantis can receive webhooks from GitHub.
        Sends a test event and verifies it's processed.
        """
        # Use GitHub's redeliver webhook endpoint or send a ping event
        test_payload = {
            "zen": "Responsive is better than fast.",
            "hook_id": 12345,
        }
        
        response = requests.post(
            f"{ATLANTIS_URL}/events",
            json=test_payload,
            headers={
                "X-GitHub-Event": "ping",
                "X-Hub-Signature-256": compute_hmac(test_payload, os.environ["ATLANTIS_WEBHOOK_SECRET"])
            },
            timeout=10
        )
        
        # Ping events should return 200 even if not actionable
        assert response.status_code in [200, 400], (
            f"Atlantis webhook endpoint returned unexpected status: {response.status_code}"
        )

CI Integration

# .github/workflows/atlantis-validation.yml
name: Atlantis Configuration Validation

on:
  pull_request:
    paths:
      - 'atlantis.yaml'
      - 'policies/**'
      - 'environments/**/*.tf'

jobs:
  validate-config:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Install Atlantis CLI
        run: |
          curl -sSL https://github.com/runatlantis/atlantis/releases/latest/download/atlantis_linux_amd64.zip -o atlantis.zip
          unzip atlantis.zip && sudo mv atlantis /usr/local/bin/
      
      - name: Validate atlantis.yaml
        run: atlantis validate-config
      
      - name: Run configuration tests
        run: |
          pip install pytest pyyaml
          pytest tests/test_atlantis_config.py -v
  
  validate-policies:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Install Conftest
        run: |
          curl -sSL https://github.com/open-policy-agent/conftest/releases/latest/download/conftest_Linux_x86_64.tar.gz | tar xz
          sudo mv conftest /usr/local/bin/
      
      - name: Run policy unit tests
        run: conftest verify --policy policies/

Monitoring Atlantis with HelpMeTest

Atlantis is a critical piece of infrastructure — if it's down, all Terraform changes via PR are blocked. HelpMeTest can run continuous health checks against your Atlantis instance:

*** Test Cases ***
Atlantis Server Health
    [Documentation]    Verify Atlantis is healthy and processing webhooks
    ${response}=    GET    ${ATLANTIS_URL}/healthz
    Status Should Be    200    ${response}
    
    ${status}=    GET    ${ATLANTIS_URL}/status
    ${ops}=    Get JSON Value    ${status}    numOperations
    Should Be True    ${ops} < 20    msg=Atlantis has ${ops} stuck operations

Set up a 5-minute monitoring interval — if Atlantis goes down, you'll know before your team starts asking why their PRs aren't getting plan comments.

Summary

Testing Atlantis comprehensively means testing:

  1. Workflow configuration (atlantis.yaml) — structure validation, project detection coverage, approval requirements, workflow references
  2. Plan validation scripts — detect risky changes (data store deletions, open security groups, wildcard IAM) before apply
  3. OPA/Conftest policies — policy-as-code for required tags, approved instance types, no public S3; test policies themselves with Conftest's built-in test framework
  4. Server health — continuous /healthz monitoring, webhook delivery validation, stuck operation detection

Atlantis is the gatekeeper for all your Terraform changes. Test it like the critical infrastructure it is.

Read more