Testing Kubernetes Operators and Custom Controllers

Testing Kubernetes Operators and Custom Controllers

Kubernetes operators are some of the most complex software in a typical infrastructure stack — they implement reconciliation loops that must handle partial failures, retries, status conditions, and concurrent updates correctly. Testing them requires more than unit tests with mock functions. You need to exercise the full controller machinery: the informer cache, the work queue, the event recorder, and the API server interaction.

Controller-runtime provides envtest for this purpose. Combined with kind for integration tests, you can achieve thorough test coverage without deploying to a real cluster.

The Testing Pyramid for Operators

Operator testing has three layers:

Unit tests with fake clients test individual reconcile logic in isolation. They're fast and precise but miss the full controller machinery.

envtest tests run your controller against a real API server (without the rest of Kubernetes) — etcd and the API server start as local processes. This is where most operator testing should happen.

Integration tests with kind verify the full end-to-end behavior including admission webhooks, node selection, and interactions with other controllers.

envtest: Testing Against a Real API Server

envtest from controller-runtime/pkg/envtest starts a real Kubernetes API server and etcd process locally. Your tests register CRDs, start controllers, and interact with the API server exactly as a real operator would.

Setup

Install the test binaries:

# Using setup-envtest (recommended)
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest

<span class="hljs-comment"># Download the API server and etcd binaries for a specific Kubernetes version
setup-envtest use 1.29.x --bin-dir /usr/local/kubebuilder/bin

<span class="hljs-built_in">export KUBEBUILDER_ASSETS=<span class="hljs-string">"$(setup-envtest use 1.29.x -p path)"

Test Suite Bootstrap

// internal/controller/suite_test.go
package controller_test

import (
    "context"
    "path/filepath"
    "testing"

    . "github.com/onsi/ginkgo/v2"
    . "github.com/onsi/gomega"
    "k8s.io/client-go/kubernetes/scheme"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/envtest"
    logf "sigs.k8s.io/controller-runtime/pkg/log"
    "sigs.k8s.io/controller-runtime/pkg/log/zap"

    myappv1 "github.com/mycompany/myoperator/api/v1"
)

var (
    ctx       context.Context
    cancel    context.CancelFunc
    k8sClient client.Client
    testEnv   *envtest.Environment
)

func TestControllers(t *testing.T) {
    RegisterFailHandler(Fail)
    RunSpecs(t, "Controller Suite")
}

var _ = BeforeSuite(func() {
    logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
    ctx, cancel = context.WithCancel(context.TODO())

    testEnv = &envtest.Environment{
        CRDDirectoryPaths: []string{
            filepath.Join("..", "..", "config", "crd", "bases"),
        },
        ErrorIfCRDPathMissing: true,
    }

    cfg, err := testEnv.Start()
    Expect(err).NotTo(HaveOccurred())
    Expect(cfg).NotTo(BeNil())

    err = myappv1.AddToScheme(scheme.Scheme)
    Expect(err).NotTo(HaveOccurred())

    k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
    Expect(err).NotTo(HaveOccurred())

    mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
    Expect(err).NotTo(HaveOccurred())

    err = (&MyAppReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr)
    Expect(err).NotTo(HaveOccurred())

    go func() {
        defer GinkgoRecover()
        err = mgr.Start(ctx)
        Expect(err).NotTo(HaveOccurred())
    }()
})

var _ = AfterSuite(func() {
    cancel()
    err := testEnv.Stop()
    Expect(err).NotTo(HaveOccurred())
})

Testing Reconciliation Loops

// internal/controller/myapp_controller_test.go
var _ = Describe("MyApp Controller", func() {
    const (
        timeout  = time.Second * 30
        interval = time.Millisecond * 250
    )

    Context("when creating a MyApp resource", func() {
        It("should create a Deployment and Service", func() {
            ctx := context.Background()
            namespace := "default"

            myapp := &myappv1.MyApp{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      "test-myapp",
                    Namespace: namespace,
                },
                Spec: myappv1.MyAppSpec{
                    Replicas: ptr.To(int32(2)),
                    Image:    "nginx:1.25",
                },
            }

            Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())

            // Verify Deployment is created
            deploymentKey := types.NamespacedName{Name: "test-myapp", Namespace: namespace}
            createdDeployment := &appsv1.Deployment{}

            Eventually(func() error {
                return k8sClient.Get(ctx, deploymentKey, createdDeployment)
            }, timeout, interval).Should(Succeed())

            Expect(createdDeployment.Spec.Replicas).Should(Equal(ptr.To(int32(2))))
            Expect(createdDeployment.Spec.Template.Spec.Containers[0].Image).Should(Equal("nginx:1.25"))

            // Verify Service is created
            serviceKey := types.NamespacedName{Name: "test-myapp", Namespace: namespace}
            createdService := &corev1.Service{}

            Eventually(func() error {
                return k8sClient.Get(ctx, serviceKey, createdService)
            }, timeout, interval).Should(Succeed())

            Expect(createdService.Spec.Selector).Should(HaveKeyWithValue("app", "test-myapp"))

            // Cleanup
            Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
        })
    })

    Context("when updating replicas", func() {
        It("should update the Deployment replica count", func() {
            ctx := context.Background()

            myapp := &myappv1.MyApp{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      "test-scale",
                    Namespace: "default",
                },
                Spec: myappv1.MyAppSpec{
                    Replicas: ptr.To(int32(1)),
                    Image:    "nginx:1.25",
                },
            }

            Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())

            // Wait for initial Deployment
            deploymentKey := types.NamespacedName{Name: "test-scale", Namespace: "default"}
            Eventually(func() error {
                d := &appsv1.Deployment{}
                return k8sClient.Get(ctx, deploymentKey, d)
            }, timeout, interval).Should(Succeed())

            // Scale up
            Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "test-scale", Namespace: "default"}, myapp)).Should(Succeed())
            myapp.Spec.Replicas = ptr.To(int32(5))
            Expect(k8sClient.Update(ctx, myapp)).Should(Succeed())

            // Verify Deployment is updated
            Eventually(func() int32 {
                d := &appsv1.Deployment{}
                if err := k8sClient.Get(ctx, deploymentKey, d); err != nil {
                    return 0
                }
                if d.Spec.Replicas == nil {
                    return 0
                }
                return *d.Spec.Replicas
            }, timeout, interval).Should(Equal(int32(5)))

            Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
        })
    })
})

Testing Status Conditions

Status conditions are the operator's way of communicating state to users. Test them explicitly:

Context("status conditions", func() {
    It("should set Ready condition to True when deployment is available", func() {
        ctx := context.Background()

        myapp := &myappv1.MyApp{
            ObjectMeta: metav1.ObjectMeta{
                Name:      "test-conditions",
                Namespace: "default",
            },
            Spec: myappv1.MyAppSpec{
                Replicas: ptr.To(int32(1)),
                Image:    "nginx:1.25",
            },
        }

        Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())

        key := types.NamespacedName{Name: "test-conditions", Namespace: "default"}

        // Simulate deployment becoming available
        Eventually(func() error {
            d := &appsv1.Deployment{}
            if err := k8sClient.Get(ctx, key, d); err != nil {
                return err
            }
            d.Status.AvailableReplicas = 1
            d.Status.ReadyReplicas = 1
            d.Status.Replicas = 1
            return k8sClient.Status().Update(ctx, d)
        }, timeout, interval).Should(Succeed())

        // Verify the operator sets the Ready condition
        Eventually(func() bool {
            app := &myappv1.MyApp{}
            if err := k8sClient.Get(ctx, key, app); err != nil {
                return false
            }
            return meta.IsStatusConditionTrue(app.Status.Conditions, myappv1.ConditionTypeReady)
        }, timeout, interval).Should(BeTrue())

        Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
    })

    It("should set Degraded condition when image pull fails", func() {
        ctx := context.Background()

        myapp := &myappv1.MyApp{
            ObjectMeta: metav1.ObjectMeta{
                Name:      "test-degraded",
                Namespace: "default",
            },
            Spec: myappv1.MyAppSpec{
                Replicas: ptr.To(int32(1)),
                Image:    "nonexistent-registry.example.com/myapp:broken",
            },
        }

        Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())

        key := types.NamespacedName{Name: "test-degraded", Namespace: "default"}

        // Simulate ImagePullBackOff
        Eventually(func() error {
            d := &appsv1.Deployment{}
            if err := k8sClient.Get(ctx, key, d); err != nil {
                return err
            }
            d.Status.Conditions = []appsv1.DeploymentCondition{{
                Type:    appsv1.DeploymentAvailable,
                Status:  corev1.ConditionFalse,
                Reason:  "MinimumReplicasUnavailable",
                Message: "Deployment does not have minimum availability",
            }}
            return k8sClient.Status().Update(ctx, d)
        }, timeout, interval).Should(Succeed())

        Eventually(func() bool {
            app := &myappv1.MyApp{}
            if err := k8sClient.Get(ctx, key, app); err != nil {
                return false
            }
            cond := meta.FindStatusCondition(app.Status.Conditions, myappv1.ConditionTypeDegraded)
            return cond != nil && cond.Status == metav1.ConditionTrue
        }, timeout, interval).Should(BeTrue())

        Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
    })
})

Testing with Fake Clients

For fast unit tests of individual reconcile logic, use fake clients instead of envtest:

func TestReconcile_CreatesDeployment(t *testing.T) {
    scheme := runtime.NewScheme()
    _ = myappv1.AddToScheme(scheme)
    _ = appsv1.AddToScheme(scheme)
    _ = corev1.AddToScheme(scheme)

    myapp := &myappv1.MyApp{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "test-app",
            Namespace: "default",
        },
        Spec: myappv1.MyAppSpec{
            Replicas: ptr.To(int32(2)),
            Image:    "nginx:1.25",
        },
    }

    fakeClient := fake.NewClientBuilder().
        WithScheme(scheme).
        WithObjects(myapp).
        WithStatusSubresource(myapp).
        Build()

    recorder := record.NewFakeRecorder(10)

    reconciler := &MyAppReconciler{
        Client:   fakeClient,
        Scheme:   scheme,
        Recorder: recorder,
    }

    req := reconcile.Request{
        NamespacedName: types.NamespacedName{
            Name:      "test-app",
            Namespace: "default",
        },
    }

    result, err := reconciler.Reconcile(context.Background(), req)
    require.NoError(t, err)
    assert.Equal(t, reconcile.Result{}, result)

    // Verify Deployment was created
    deployment := &appsv1.Deployment{}
    err = fakeClient.Get(context.Background(), req.NamespacedName, deployment)
    require.NoError(t, err)
    assert.Equal(t, int32(2), *deployment.Spec.Replicas)
    assert.Equal(t, "nginx:1.25", deployment.Spec.Template.Spec.Containers[0].Image)

    // Verify event was recorded
    event := <-recorder.Events
    assert.Contains(t, event, "Created Deployment")
}

Fake clients are ideal for testing reconcile decisions, error handling paths, and status updates without the overhead of starting a real API server.

Testing Events

Operators record events to communicate what they're doing. Verify event content:

It("should record a warning event when quota is exceeded", func() {
    ctx := context.Background()
    recorder := record.NewFakeRecorder(100)

    // ... create resources ...

    // Trigger the condition that should produce a warning
    Eventually(func() bool {
        select {
        case event := <-recorder.Events:
            return strings.Contains(event, "Warning") &&
                strings.Contains(event, "QuotaExceeded")
        default:
            return false
        }
    }, timeout, interval).Should(BeTrue())
})

Integration Testing with kind

For behaviors that require the full Kubernetes control plane — admission webhooks, pod scheduling, network policies — use kind:

// integration/operator_test.go
//go:build integration

func TestOperatorIntegration(t *testing.T) {
    // Assumes kind cluster is already running and KUBECONFIG is set
    cfg, err := config.GetConfig()
    require.NoError(t, err)

    k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
    require.NoError(t, err)

    ctx := context.Background()
    ns := &corev1.Namespace{
        ObjectMeta: metav1.ObjectMeta{
            Name: "operator-integration-test",
        },
    }
    require.NoError(t, k8sClient.Create(ctx, ns))
    defer k8sClient.Delete(ctx, ns)

    myapp := &myappv1.MyApp{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "integration-test-app",
            Namespace: ns.Name,
        },
        Spec: myappv1.MyAppSpec{
            Replicas: ptr.To(int32(1)),
            Image:    "nginx:1.25",
        },
    }

    require.NoError(t, k8sClient.Create(ctx, myapp))

    // Wait for pods to be running
    require.Eventually(t, func() bool {
        podList := &corev1.PodList{}
        if err := k8sClient.List(ctx, podList,
            client.InNamespace(ns.Name),
            client.MatchingLabels{"app": "integration-test-app"},
        ); err != nil {
            return false
        }
        for _, pod := range podList.Items {
            if pod.Status.Phase == corev1.PodRunning {
                return true
            }
        }
        return false
    }, 120*time.Second, time.Second, "pod never reached Running state")
}

Run integration tests separately:

# Run only envtest tests (default)
go <span class="hljs-built_in">test ./internal/controller/...

<span class="hljs-comment"># Run integration tests (requires kind cluster)
go <span class="hljs-built_in">test -tags integration ./integration/...

GitHub Actions Pipeline

name: Operator Tests
on: [push, pull_request]

jobs:
  unit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-go@v5
        with:
          go-version: '1.22'
      - name: Install setup-envtest
        run: go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
      - name: Download envtest binaries
        run: setup-envtest use 1.29.x --bin-dir /usr/local/kubebuilder/bin
      - name: Run envtest tests
        run: go test ./internal/... -v -count=1
        env:
          KUBEBUILDER_ASSETS: /usr/local/kubebuilder/bin

  integration:
    runs-on: ubuntu-latest
    needs: unit
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-go@v5
        with:
          go-version: '1.22'
      - name: Create kind cluster
        uses: helm/kind-action@v1.10.0
        with:
          cluster_name: operator-test
      - name: Install CRDs
        run: kubectl apply -f config/crd/bases/
      - name: Deploy operator
        run: |
          docker build -t myoperator:test .
          kind load docker-image myoperator:test --name operator-test
          kubectl apply -f config/manager/
          kubectl rollout status deployment/myoperator-controller-manager \
            -n myoperator-system --timeout=60s
      - name: Run integration tests
        run: go test -tags integration ./integration/... -v -count=1 -timeout=10m

The split between envtest tests (fast, run on every commit) and kind integration tests (slower, run after envtest passes) keeps the feedback loop tight while still catching the full class of integration problems.

Operator testing is an investment that pays off every time a reconciliation bug would have taken down a production workload. The combination of fake clients for unit testing, envtest for controller testing, and kind for integration testing covers the full spectrum of failure modes without requiring a dedicated test cluster.

Read more