Testing gRPC TLS, mTLS, and Authentication: A Practical Guide
gRPC security testing is often skipped because certificates are annoying to set up in test environments. That's exactly why certificate misconfigurations, expired credentials, and bypass vulnerabilities make it to production. This guide covers practical approaches to testing gRPC TLS and authentication without making your CI pipeline miserable.
What to Test in gRPC Security
- TLS channel security — connections are encrypted, certificates are verified
- Mutual TLS (mTLS) — both client and server verify each other's certificates
- Token-based auth — JWT, API keys, OAuth tokens in metadata
- Per-call credentials — credentials can be set at the call level, not just channel
- Interceptor logic — auth interceptors reject unauthenticated calls correctly
- Certificate expiry — expired certs fail gracefully with clear errors
- Insecure channel rejection — plaintext connections refused when TLS required
Generating Test Certificates
Before you can write tests, you need test certificates. Use a simple shell script to generate them:
#!/bin/bash
<span class="hljs-comment"># generate-test-certs.sh
<span class="hljs-built_in">set -e
CERTS_DIR=<span class="hljs-string">"./test-certs"
<span class="hljs-built_in">mkdir -p <span class="hljs-string">"$CERTS_DIR"
<span class="hljs-comment"># CA key and certificate
openssl genrsa -out <span class="hljs-string">"$CERTS_DIR/ca.key" 4096
openssl req -new -x509 -key <span class="hljs-string">"$CERTS_DIR/ca.key" \
-out <span class="hljs-string">"$CERTS_DIR/ca.crt" -days 3650 \
-subj <span class="hljs-string">"/CN=Test CA/O=TestOrg"
<span class="hljs-comment"># Server key and CSR
openssl genrsa -out <span class="hljs-string">"$CERTS_DIR/server.key" 2048
openssl req -new -key <span class="hljs-string">"$CERTS_DIR/server.key" \
-out <span class="hljs-string">"$CERTS_DIR/server.csr" \
-subj <span class="hljs-string">"/CN=localhost/O=TestOrg"
<span class="hljs-comment"># Sign server certificate with CA
openssl x509 -req -<span class="hljs-keyword">in <span class="hljs-string">"$CERTS_DIR/server.csr" \
-CA <span class="hljs-string">"$CERTS_DIR/ca.crt" -CAkey <span class="hljs-string">"$CERTS_DIR/ca.key" \
-CAcreateserial -out <span class="hljs-string">"$CERTS_DIR/server.crt" \
-days 365 -extensions v3_req \
-extfile <(<span class="hljs-built_in">echo <span class="hljs-string">'[v3_req]
subjectAltName = DNS:localhost,IP:127.0.0.1')
<span class="hljs-comment"># Client key and certificate (for mTLS)
openssl genrsa -out <span class="hljs-string">"$CERTS_DIR/client.key" 2048
openssl req -new -key <span class="hljs-string">"$CERTS_DIR/client.key" \
-out <span class="hljs-string">"$CERTS_DIR/client.csr" \
-subj <span class="hljs-string">"/CN=test-client/O=TestOrg"
openssl x509 -req -<span class="hljs-keyword">in <span class="hljs-string">"$CERTS_DIR/client.csr" \
-CA <span class="hljs-string">"$CERTS_DIR/ca.crt" -CAkey <span class="hljs-string">"$CERTS_DIR/ca.key" \
-CAcreateserial -out <span class="hljs-string">"$CERTS_DIR/client.crt" -days 365
<span class="hljs-built_in">echo <span class="hljs-string">"Test certificates generated in $CERTS_DIR"Run once and commit the test certs (they're not secrets — they're only for testing):
./generate-test-certs.sh
git add test-certs/Testing TLS in Go
Test Server with TLS
// testserver/tls.go
package testserver
import (
"crypto/tls"
"crypto/x509"
"net"
"os"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "myapp/proto"
)
func StartTLSServer(t *testing.T, svc pb.DataServiceServer) string {
t.Helper()
cert, err := tls.LoadX509KeyPair("../test-certs/server.crt", "../test-certs/server.key")
if err != nil {
t.Fatalf("Failed to load server certs: %v", err)
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
})
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer(grpc.Creds(creds))
pb.RegisterDataServiceServer(s, svc)
go s.Serve(lis)
t.Cleanup(s.Stop)
return lis.Addr().String()
}
func TLSClientConn(t *testing.T, addr string) *grpc.ClientConn {
t.Helper()
caCert, err := os.ReadFile("../test-certs/ca.crt")
if err != nil {
t.Fatalf("Failed to read CA cert: %v", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
creds := credentials.NewTLS(&tls.Config{
RootCAs: pool,
ServerName: "localhost",
})
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
t.Cleanup(func() { conn.Close() })
return conn
}Test: TLS Connection Succeeds
func TestTLS_ConnectionEstablished(t *testing.T) {
addr := StartTLSServer(t, &mockService{})
conn := TLSClientConn(t, addr)
client := pb.NewDataServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetItem(ctx, &pb.GetItemRequest{Id: "1"})
if err != nil {
t.Fatalf("TLS call failed: %v", err)
}
if resp.Id != "1" {
t.Errorf("Expected id=1, got %s", resp.Id)
}
}Test: Insecure Client Rejected
func TestTLS_InsecureClientRejected(t *testing.T) {
addr := StartTLSServer(t, &mockService{})
// Try to connect without TLS
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
// Connection may fail at dial time — that's acceptable
return
}
defer conn.Close()
client := pb.NewDataServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err = client.GetItem(ctx, &pb.GetItemRequest{Id: "1"})
if err == nil {
t.Fatal("Expected error when using insecure channel against TLS server")
}
// Any error is acceptable — the call must not succeed
t.Logf("Got expected error: %v", err)
}Test: Wrong CA Certificate Fails
func TestTLS_WrongCARejected(t *testing.T) {
addr := StartTLSServer(t, &mockService{})
// Use a different CA that didn't sign the server cert
wrongCA, err := os.ReadFile("../test-certs/wrong-ca.crt")
if err != nil {
t.Skip("wrong-ca.crt not found, skipping test")
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(wrongCA)
creds := credentials.NewTLS(&tls.Config{
RootCAs: pool,
ServerName: "localhost",
})
conn, _ := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
defer conn.Close()
client := pb.NewDataServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err = client.GetItem(ctx, &pb.GetItemRequest{Id: "1"})
if err == nil {
t.Fatal("Expected certificate verification failure")
}
// Should be a certificate-related error
if !strings.Contains(err.Error(), "certificate") &&
!strings.Contains(err.Error(), "x509") {
t.Errorf("Unexpected error type: %v", err)
}
}Testing Mutual TLS (mTLS)
mTLS requires the client to present a certificate that the server verifies.
func StartMTLSServer(t *testing.T, svc pb.DataServiceServer) string {
t.Helper()
cert, _ := tls.LoadX509KeyPair("../test-certs/server.crt", "../test-certs/server.key")
caCert, _ := os.ReadFile("../test-certs/ca.crt")
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert, // mTLS
})
lis, _ := net.Listen("tcp", "127.0.0.1:0")
s := grpc.NewServer(grpc.Creds(creds))
pb.RegisterDataServiceServer(s, svc)
go s.Serve(lis)
t.Cleanup(s.Stop)
return lis.Addr().String()
}
func TestMTLS_ClientCertRequired(t *testing.T) {
addr := StartMTLSServer(t, &mockService{})
caCert, _ := os.ReadFile("../test-certs/ca.crt")
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
// TLS without client cert — should fail
credsWithoutClientCert := credentials.NewTLS(&tls.Config{
RootCAs: pool,
ServerName: "localhost",
})
conn, _ := grpc.Dial(addr, grpc.WithTransportCredentials(credsWithoutClientCert))
defer conn.Close()
client := pb.NewDataServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err := client.GetItem(ctx, &pb.GetItemRequest{Id: "1"})
if err == nil {
t.Fatal("Expected mTLS error when client cert not provided")
}
}
func TestMTLS_ValidClientCertSucceeds(t *testing.T) {
addr := StartMTLSServer(t, &mockService{})
clientCert, _ := tls.LoadX509KeyPair("../test-certs/client.crt", "../test-certs/client.key")
caCert, _ := os.ReadFile("../test-certs/ca.crt")
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: pool,
ServerName: "localhost",
})
conn, _ := grpc.Dial(addr, grpc.WithTransportCredentials(creds))
defer conn.Close()
client := pb.NewDataServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetItem(ctx, &pb.GetItemRequest{Id: "1"})
if err != nil {
t.Fatalf("mTLS call failed: %v", err)
}
if resp == nil {
t.Error("Expected response, got nil")
}
}Testing JWT Authentication Interceptors
Server-Side Auth Interceptor
// interceptor/auth.go
func AuthInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
token := strings.TrimPrefix(tokens[0], "Bearer ")
claims, err := validateJWT(token)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid token: %v", err)
}
ctx = context.WithValue(ctx, userClaimsKey, claims)
return handler(ctx, req)
}Test: Auth Interceptor Unit Tests
func TestAuthInterceptor(t *testing.T) {
mockHandler := func(ctx context.Context, req interface{}) (interface{}, error) {
return "ok", nil
}
info := &grpc.UnaryServerInfo{FullMethod: "/DataService/GetItem"}
tests := []struct {
name string
metadata map[string]string
wantCode codes.Code
}{
{
name: "valid JWT",
metadata: map[string]string{"authorization": "Bearer " + validTestJWT()},
wantCode: codes.OK,
},
{
name: "no authorization header",
metadata: map[string]string{},
wantCode: codes.Unauthenticated,
},
{
name: "expired token",
metadata: map[string]string{"authorization": "Bearer " + expiredTestJWT()},
wantCode: codes.Unauthenticated,
},
{
name: "wrong signature",
metadata: map[string]string{"authorization": "Bearer tampered.token.here"},
wantCode: codes.Unauthenticated,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
md := metadata.New(tt.metadata)
ctx := metadata.NewIncomingContext(context.Background(), md)
_, err := AuthInterceptor(ctx, nil, info, mockHandler)
if tt.wantCode == codes.OK {
if err != nil {
t.Errorf("Expected success, got: %v", err)
}
} else {
st, ok := status.FromError(err)
if !ok || st.Code() != tt.wantCode {
t.Errorf("Expected %v, got: %v", tt.wantCode, err)
}
}
})
}
}Python: Testing with grpc.aio
import pytest
import grpc
import grpc.aio
import myapp_pb2 as pb
import myapp_pb2_grpc as pb_grpc
@pytest.fixture
def tls_channel():
with open('test-certs/ca.crt', 'rb') as f:
trusted_certs = f.read()
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel('localhost:50051', credentials)
yield channel
channel.close()
def test_tls_connection_works(tls_channel):
stub = pb_grpc.DataServiceStub(tls_channel)
response = stub.GetItem(pb.GetItemRequest(id='1'))
assert response.id == '1'
def test_insecure_connection_fails():
channel = grpc.insecure_channel('localhost:50051')
stub = pb_grpc.DataServiceStub(channel)
with pytest.raises(grpc.RpcError) as exc_info:
stub.GetItem(pb.GetItemRequest(id='1'))
# The call should fail — exact code varies by server config
assert exc_info.value.code() in (
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.UNKNOWN,
)
def test_jwt_auth_required(tls_channel):
stub = pb_grpc.DataServiceStub(tls_channel)
with pytest.raises(grpc.RpcError) as exc_info:
stub.GetProtectedItem(pb.GetItemRequest(id='1')) # No auth header
assert exc_info.value.code() == grpc.StatusCode.UNAUTHENTICATED
def test_jwt_auth_succeeds(tls_channel):
stub = pb_grpc.DataServiceStub(tls_channel)
metadata = [('authorization', f'Bearer {get_test_jwt()}')]
response = stub.GetProtectedItem(
pb.GetItemRequest(id='1'),
metadata=metadata
)
assert response is not NoneTesting Certificate Expiry
func TestExpiredServerCertificate(t *testing.T) {
// Use the expired cert (pre-generated for testing)
cert, err := tls.LoadX509KeyPair("../test-certs/expired-server.crt", "../test-certs/server.key")
if err != nil {
t.Skip("expired-server.crt not available")
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
})
lis, _ := net.Listen("tcp", "127.0.0.1:0")
s := grpc.NewServer(grpc.Creds(creds))
pb.RegisterDataServiceServer(s, &mockService{})
go s.Serve(lis)
defer s.Stop()
// Connect as client — should see certificate expired error
caCert, _ := os.ReadFile("../test-certs/ca.crt")
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
clientCreds := credentials.NewTLS(&tls.Config{
RootCAs: pool,
ServerName: "localhost",
})
conn, _ := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(clientCreds))
defer conn.Close()
client := pb.NewDataServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err = client.GetItem(ctx, &pb.GetItemRequest{Id: "1"})
if err == nil {
t.Fatal("Expected certificate expiry error")
}
if !strings.Contains(err.Error(), "expired") && !strings.Contains(err.Error(), "certificate") {
t.Errorf("Expected expiry error, got: %v", err)
}
}CI Configuration
Add cert generation to your CI setup:
# .github/workflows/test.yml
- name: Generate test certificates
run: |
chmod +x ./scripts/generate-test-certs.sh
./scripts/generate-test-certs.sh
- name: Run gRPC security tests
run: go test ./... -run TestTLS -v -timeout 60sOr use testdata/ with pre-committed test certificates to avoid regeneration:
testdata/
certs/
ca.crt
server.crt
server.key
client.crt
client.key
expired-server.crt # Pre-expired for expiry testsSecurity Testing Checklist
- TLS connection succeeds with valid certificate chain
- Insecure (plaintext) connections are rejected
- Wrong CA certificate causes connection failure
- Expired server certificate causes connection failure
- mTLS rejects clients without certificates
- mTLS rejects clients with self-signed certificates not in CA chain
- JWT auth interceptor rejects missing tokens
- JWT auth interceptor rejects expired tokens
- JWT auth interceptor rejects tampered tokens
- Auth interceptor runs before business logic (no data leaked on auth failure)
- Per-call credentials override channel credentials correctly
- Certificate error messages don't leak sensitive path information
gRPC security configuration is one of those areas where a small misconfiguration — like InsecureSkipVerify: true left over from development — silently breaks your security posture. Tests make these misconfigurations catch-on-commit rather than catch-in-production.