Testing Kubernetes Operators and Custom Controllers
Kubernetes operators are some of the most complex software in a typical infrastructure stack — they implement reconciliation loops that must handle partial failures, retries, status conditions, and concurrent updates correctly. Testing them requires more than unit tests with mock functions. You need to exercise the full controller machinery: the informer cache, the work queue, the event recorder, and the API server interaction.
Controller-runtime provides envtest for this purpose. Combined with kind for integration tests, you can achieve thorough test coverage without deploying to a real cluster.
The Testing Pyramid for Operators
Operator testing has three layers:
Unit tests with fake clients test individual reconcile logic in isolation. They're fast and precise but miss the full controller machinery.
envtest tests run your controller against a real API server (without the rest of Kubernetes) — etcd and the API server start as local processes. This is where most operator testing should happen.
Integration tests with kind verify the full end-to-end behavior including admission webhooks, node selection, and interactions with other controllers.
envtest: Testing Against a Real API Server
envtest from controller-runtime/pkg/envtest starts a real Kubernetes API server and etcd process locally. Your tests register CRDs, start controllers, and interact with the API server exactly as a real operator would.
Setup
Install the test binaries:
# Using setup-envtest (recommended)
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
<span class="hljs-comment"># Download the API server and etcd binaries for a specific Kubernetes version
setup-envtest use 1.29.x --bin-dir /usr/local/kubebuilder/bin
<span class="hljs-built_in">export KUBEBUILDER_ASSETS=<span class="hljs-string">"$(setup-envtest use 1.29.x -p path)"Test Suite Bootstrap
// internal/controller/suite_test.go
package controller_test
import (
"context"
"path/filepath"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
myappv1 "github.com/mycompany/myoperator/api/v1"
)
var (
ctx context.Context
cancel context.CancelFunc
k8sClient client.Client
testEnv *envtest.Environment
)
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Suite")
}
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{
filepath.Join("..", "..", "config", "crd", "bases"),
},
ErrorIfCRDPathMissing: true,
}
cfg, err := testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
err = myappv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
err = (&MyAppReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
go func() {
defer GinkgoRecover()
err = mgr.Start(ctx)
Expect(err).NotTo(HaveOccurred())
}()
})
var _ = AfterSuite(func() {
cancel()
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})Testing Reconciliation Loops
// internal/controller/myapp_controller_test.go
var _ = Describe("MyApp Controller", func() {
const (
timeout = time.Second * 30
interval = time.Millisecond * 250
)
Context("when creating a MyApp resource", func() {
It("should create a Deployment and Service", func() {
ctx := context.Background()
namespace := "default"
myapp := &myappv1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-myapp",
Namespace: namespace,
},
Spec: myappv1.MyAppSpec{
Replicas: ptr.To(int32(2)),
Image: "nginx:1.25",
},
}
Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())
// Verify Deployment is created
deploymentKey := types.NamespacedName{Name: "test-myapp", Namespace: namespace}
createdDeployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, deploymentKey, createdDeployment)
}, timeout, interval).Should(Succeed())
Expect(createdDeployment.Spec.Replicas).Should(Equal(ptr.To(int32(2))))
Expect(createdDeployment.Spec.Template.Spec.Containers[0].Image).Should(Equal("nginx:1.25"))
// Verify Service is created
serviceKey := types.NamespacedName{Name: "test-myapp", Namespace: namespace}
createdService := &corev1.Service{}
Eventually(func() error {
return k8sClient.Get(ctx, serviceKey, createdService)
}, timeout, interval).Should(Succeed())
Expect(createdService.Spec.Selector).Should(HaveKeyWithValue("app", "test-myapp"))
// Cleanup
Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
})
})
Context("when updating replicas", func() {
It("should update the Deployment replica count", func() {
ctx := context.Background()
myapp := &myappv1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-scale",
Namespace: "default",
},
Spec: myappv1.MyAppSpec{
Replicas: ptr.To(int32(1)),
Image: "nginx:1.25",
},
}
Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())
// Wait for initial Deployment
deploymentKey := types.NamespacedName{Name: "test-scale", Namespace: "default"}
Eventually(func() error {
d := &appsv1.Deployment{}
return k8sClient.Get(ctx, deploymentKey, d)
}, timeout, interval).Should(Succeed())
// Scale up
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "test-scale", Namespace: "default"}, myapp)).Should(Succeed())
myapp.Spec.Replicas = ptr.To(int32(5))
Expect(k8sClient.Update(ctx, myapp)).Should(Succeed())
// Verify Deployment is updated
Eventually(func() int32 {
d := &appsv1.Deployment{}
if err := k8sClient.Get(ctx, deploymentKey, d); err != nil {
return 0
}
if d.Spec.Replicas == nil {
return 0
}
return *d.Spec.Replicas
}, timeout, interval).Should(Equal(int32(5)))
Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
})
})
})Testing Status Conditions
Status conditions are the operator's way of communicating state to users. Test them explicitly:
Context("status conditions", func() {
It("should set Ready condition to True when deployment is available", func() {
ctx := context.Background()
myapp := &myappv1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-conditions",
Namespace: "default",
},
Spec: myappv1.MyAppSpec{
Replicas: ptr.To(int32(1)),
Image: "nginx:1.25",
},
}
Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())
key := types.NamespacedName{Name: "test-conditions", Namespace: "default"}
// Simulate deployment becoming available
Eventually(func() error {
d := &appsv1.Deployment{}
if err := k8sClient.Get(ctx, key, d); err != nil {
return err
}
d.Status.AvailableReplicas = 1
d.Status.ReadyReplicas = 1
d.Status.Replicas = 1
return k8sClient.Status().Update(ctx, d)
}, timeout, interval).Should(Succeed())
// Verify the operator sets the Ready condition
Eventually(func() bool {
app := &myappv1.MyApp{}
if err := k8sClient.Get(ctx, key, app); err != nil {
return false
}
return meta.IsStatusConditionTrue(app.Status.Conditions, myappv1.ConditionTypeReady)
}, timeout, interval).Should(BeTrue())
Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
})
It("should set Degraded condition when image pull fails", func() {
ctx := context.Background()
myapp := &myappv1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-degraded",
Namespace: "default",
},
Spec: myappv1.MyAppSpec{
Replicas: ptr.To(int32(1)),
Image: "nonexistent-registry.example.com/myapp:broken",
},
}
Expect(k8sClient.Create(ctx, myapp)).Should(Succeed())
key := types.NamespacedName{Name: "test-degraded", Namespace: "default"}
// Simulate ImagePullBackOff
Eventually(func() error {
d := &appsv1.Deployment{}
if err := k8sClient.Get(ctx, key, d); err != nil {
return err
}
d.Status.Conditions = []appsv1.DeploymentCondition{{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
Reason: "MinimumReplicasUnavailable",
Message: "Deployment does not have minimum availability",
}}
return k8sClient.Status().Update(ctx, d)
}, timeout, interval).Should(Succeed())
Eventually(func() bool {
app := &myappv1.MyApp{}
if err := k8sClient.Get(ctx, key, app); err != nil {
return false
}
cond := meta.FindStatusCondition(app.Status.Conditions, myappv1.ConditionTypeDegraded)
return cond != nil && cond.Status == metav1.ConditionTrue
}, timeout, interval).Should(BeTrue())
Expect(k8sClient.Delete(ctx, myapp)).Should(Succeed())
})
})Testing with Fake Clients
For fast unit tests of individual reconcile logic, use fake clients instead of envtest:
func TestReconcile_CreatesDeployment(t *testing.T) {
scheme := runtime.NewScheme()
_ = myappv1.AddToScheme(scheme)
_ = appsv1.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
myapp := &myappv1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "default",
},
Spec: myappv1.MyAppSpec{
Replicas: ptr.To(int32(2)),
Image: "nginx:1.25",
},
}
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(myapp).
WithStatusSubresource(myapp).
Build()
recorder := record.NewFakeRecorder(10)
reconciler := &MyAppReconciler{
Client: fakeClient,
Scheme: scheme,
Recorder: recorder,
}
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "test-app",
Namespace: "default",
},
}
result, err := reconciler.Reconcile(context.Background(), req)
require.NoError(t, err)
assert.Equal(t, reconcile.Result{}, result)
// Verify Deployment was created
deployment := &appsv1.Deployment{}
err = fakeClient.Get(context.Background(), req.NamespacedName, deployment)
require.NoError(t, err)
assert.Equal(t, int32(2), *deployment.Spec.Replicas)
assert.Equal(t, "nginx:1.25", deployment.Spec.Template.Spec.Containers[0].Image)
// Verify event was recorded
event := <-recorder.Events
assert.Contains(t, event, "Created Deployment")
}Fake clients are ideal for testing reconcile decisions, error handling paths, and status updates without the overhead of starting a real API server.
Testing Events
Operators record events to communicate what they're doing. Verify event content:
It("should record a warning event when quota is exceeded", func() {
ctx := context.Background()
recorder := record.NewFakeRecorder(100)
// ... create resources ...
// Trigger the condition that should produce a warning
Eventually(func() bool {
select {
case event := <-recorder.Events:
return strings.Contains(event, "Warning") &&
strings.Contains(event, "QuotaExceeded")
default:
return false
}
}, timeout, interval).Should(BeTrue())
})Integration Testing with kind
For behaviors that require the full Kubernetes control plane — admission webhooks, pod scheduling, network policies — use kind:
// integration/operator_test.go
//go:build integration
func TestOperatorIntegration(t *testing.T) {
// Assumes kind cluster is already running and KUBECONFIG is set
cfg, err := config.GetConfig()
require.NoError(t, err)
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
require.NoError(t, err)
ctx := context.Background()
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "operator-integration-test",
},
}
require.NoError(t, k8sClient.Create(ctx, ns))
defer k8sClient.Delete(ctx, ns)
myapp := &myappv1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "integration-test-app",
Namespace: ns.Name,
},
Spec: myappv1.MyAppSpec{
Replicas: ptr.To(int32(1)),
Image: "nginx:1.25",
},
}
require.NoError(t, k8sClient.Create(ctx, myapp))
// Wait for pods to be running
require.Eventually(t, func() bool {
podList := &corev1.PodList{}
if err := k8sClient.List(ctx, podList,
client.InNamespace(ns.Name),
client.MatchingLabels{"app": "integration-test-app"},
); err != nil {
return false
}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodRunning {
return true
}
}
return false
}, 120*time.Second, time.Second, "pod never reached Running state")
}Run integration tests separately:
# Run only envtest tests (default)
go <span class="hljs-built_in">test ./internal/controller/...
<span class="hljs-comment"># Run integration tests (requires kind cluster)
go <span class="hljs-built_in">test -tags integration ./integration/...GitHub Actions Pipeline
name: Operator Tests
on: [push, pull_request]
jobs:
unit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.22'
- name: Install setup-envtest
run: go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
- name: Download envtest binaries
run: setup-envtest use 1.29.x --bin-dir /usr/local/kubebuilder/bin
- name: Run envtest tests
run: go test ./internal/... -v -count=1
env:
KUBEBUILDER_ASSETS: /usr/local/kubebuilder/bin
integration:
runs-on: ubuntu-latest
needs: unit
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.22'
- name: Create kind cluster
uses: helm/kind-action@v1.10.0
with:
cluster_name: operator-test
- name: Install CRDs
run: kubectl apply -f config/crd/bases/
- name: Deploy operator
run: |
docker build -t myoperator:test .
kind load docker-image myoperator:test --name operator-test
kubectl apply -f config/manager/
kubectl rollout status deployment/myoperator-controller-manager \
-n myoperator-system --timeout=60s
- name: Run integration tests
run: go test -tags integration ./integration/... -v -count=1 -timeout=10mThe split between envtest tests (fast, run on every commit) and kind integration tests (slower, run after envtest passes) keeps the feedback loop tight while still catching the full class of integration problems.
Operator testing is an investment that pays off every time a reconciliation bug would have taken down a production workload. The combination of fake clients for unit testing, envtest for controller testing, and kind for integration testing covers the full spectrum of failure modes without requiring a dedicated test cluster.