Testing gRPC Streaming: Server, Client, and Bidirectional Streams
gRPC streaming RPCs are among the most powerful features of the protocol — and among the hardest to test correctly. Unlike unary calls, streams introduce ordering, backpressure, partial failure, and cancellation scenarios that require different testing approaches.
This guide covers testing all three streaming types: server streaming, client streaming, and bidirectional streaming.
The Four gRPC Call Types
Before testing, understand what you're testing:
service DataService {
// Unary — one request, one response
rpc GetItem (GetItemRequest) returns (Item);
// Server streaming — one request, many responses
rpc ListItems (ListRequest) returns (stream Item);
// Client streaming — many requests, one response
rpc UploadItems (stream Item) returns (UploadSummary);
// Bidirectional streaming — many requests, many responses
rpc SyncItems (stream SyncRequest) returns (stream SyncResponse);
}Each type has different failure modes and test requirements.
Testing Server Streaming
Server streaming sends a single request and receives a stream of responses. Common use cases: paginated results, real-time feeds, log tailing.
Go Test Example
package service_test
import (
"context"
"io"
"testing"
"time"
pb "myapp/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestServerStreaming_ReceivesAllItems(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListItems(ctx, &pb.ListRequest{
Filter: "active",
Limit: 100,
})
if err != nil {
t.Fatalf("ListItems failed: %v", err)
}
var received []string
for {
item, err := stream.Recv()
if err == io.EOF {
break // Stream completed normally
}
if err != nil {
t.Fatalf("Recv failed: %v", err)
}
received = append(received, item.Id)
}
if len(received) == 0 {
t.Error("Expected at least one item, got none")
}
// Verify ordering
for i := 1; i < len(received); i++ {
if received[i] <= received[i-1] {
t.Errorf("Items out of order at index %d: %s <= %s",
i, received[i], received[i-1])
}
}
}Test: Context Cancellation Mid-Stream
func TestServerStreaming_CancellationStopsStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.ListItems(ctx, &pb.ListRequest{Limit: 10000})
if err != nil {
t.Fatalf("ListItems failed: %v", err)
}
// Receive a few items then cancel
itemsReceived := 0
for itemsReceived < 3 {
_, err := stream.Recv()
if err != nil {
t.Fatalf("Failed before receiving 3 items: %v", err)
}
itemsReceived++
}
cancel() // Cancel the context
// Subsequent Recv should fail with Canceled
_, err = stream.Recv()
if err == nil {
t.Error("Expected error after context cancellation, got nil")
}
st, ok := status.FromError(err)
if !ok {
t.Fatalf("Expected gRPC status error, got: %v", err)
}
if st.Code() != codes.Canceled {
t.Errorf("Expected Canceled, got %v", st.Code())
}
}Test: Stream Error Partway Through
Verify the client handles server-side errors that occur after some items have been sent:
func TestServerStreaming_PartialStreamError(t *testing.T) {
// Inject a server error after 5 items (requires test server or mock)
stream, err := client.ListItems(ctx, &pb.ListRequest{
Filter: "trigger-error-after:5",
})
if err != nil {
t.Fatalf("ListItems failed: %v", err)
}
var items []*pb.Item
var streamErr error
for {
item, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
streamErr = err
break
}
items = append(items, item)
}
// Should have received the first 5 items
if len(items) != 5 {
t.Errorf("Expected 5 items before error, got %d", len(items))
}
// Error should be Internal or whatever the server set
if streamErr == nil {
t.Error("Expected stream error, got none")
}
st, _ := status.FromError(streamErr)
if st.Code() != codes.Internal {
t.Errorf("Expected Internal, got %v", st.Code())
}
}Testing Client Streaming
Client streaming sends multiple requests and receives a single summary response.
Node.js Test Example
const grpc = require('@grpc/grpc-js');
const { promisify } = require('util');
describe('Client Streaming: UploadItems', () => {
it('should upload all items and return summary', (done) => {
const call = client.uploadItems((error, response) => {
if (error) return done(error);
expect(response.itemsProcessed).toBe(3);
expect(response.failures).toBe(0);
done();
});
const items = [
{ id: 'item-1', name: 'Widget A', price: 9.99 },
{ id: 'item-2', name: 'Widget B', price: 14.99 },
{ id: 'item-3', name: 'Widget C', price: 4.99 },
];
for (const item of items) {
call.write(item);
}
call.end();
});
it('should handle empty upload gracefully', (done) => {
const call = client.uploadItems((error, response) => {
if (error) return done(error);
expect(response.itemsProcessed).toBe(0);
done();
});
call.end(); // Send no items before ending
});
it('should propagate error from invalid item', (done) => {
const call = client.uploadItems((error) => {
expect(error).toBeDefined();
expect(error.code).toBe(grpc.status.INVALID_ARGUMENT);
done();
});
call.write({ id: '', name: '', price: -1 }); // Invalid item
call.end();
});
});Test: Backpressure and Flow Control
it('should handle large upload with backpressure', (done) => {
const ITEM_COUNT = 10000;
let sent = 0;
let writeSucceeded = 0;
const call = client.uploadItems((error, response) => {
if (error) return done(error);
expect(response.itemsProcessed).toBe(ITEM_COUNT);
done();
});
function sendNext() {
if (sent >= ITEM_COUNT) {
call.end();
return;
}
const canContinue = call.write({ id: `item-${sent}`, name: `Item ${sent}`, price: 1.0 });
sent++;
writeSucceeded++;
if (canContinue) {
// Buffer not full, send immediately
setImmediate(sendNext);
} else {
// Wait for drain before sending more
call.once('drain', sendNext);
}
}
sendNext();
}, 30000); // Long timeout for large uploadTesting Bidirectional Streaming
Bidirectional streaming is the most complex — requests and responses flow independently.
Python Test Example
import asyncio
import pytest
import grpc
import myapp_pb2 as pb
import myapp_pb2_grpc as pb_grpc
@pytest.mark.asyncio
async def test_bidirectional_sync():
"""Test bidirectional SyncItems RPC"""
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = pb_grpc.DataServiceStub(channel)
# Prepare requests
requests = [
pb.SyncRequest(item_id=f'item-{i}', action='UPDATE')
for i in range(5)
]
async def request_generator():
for req in requests:
yield req
await asyncio.sleep(0.01)
responses = []
async for response in stub.SyncItems(request_generator()):
responses.append(response)
# Should receive one response per request
assert len(responses) == 5
for i, resp in enumerate(responses):
assert resp.item_id == f'item-{i}'
assert resp.status == 'OK'
@pytest.mark.asyncio
async def test_bidirectional_out_of_order_responses():
"""Verify responses may arrive in different order than requests"""
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = pb_grpc.DataServiceStub(channel)
request_ids = set()
response_ids = set()
async def requests():
for i in range(10):
item_id = f'item-{i}'
request_ids.add(item_id)
yield pb.SyncRequest(item_id=item_id, action='GET')
async for response in stub.SyncItems(requests()):
response_ids.add(response.item_id)
# All IDs should be accounted for, regardless of order
assert request_ids == response_ids
@pytest.mark.asyncio
async def test_bidirectional_server_closes_early():
"""Server closes stream before client sends all requests"""
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = pb_grpc.DataServiceStub(channel)
async def many_requests():
for i in range(1000):
yield pb.SyncRequest(item_id=f'item-{i}')
responses = []
with pytest.raises(grpc.aio.AioRpcError) as exc_info:
async for response in stub.SyncItems(many_requests()):
responses.append(response)
if len(responses) >= 3:
# Server sends error after 3 responses in test mode
pass
assert exc_info.value.code() in (
grpc.StatusCode.CANCELLED,
grpc.StatusCode.INTERNAL,
)Mock gRPC Servers for Testing
Testing against a real server is slow and environment-dependent. Use mock servers for unit tests:
Go: In-Process Test Server
func startTestServer(t *testing.T, svc pb.DataServiceServer) *grpc.ClientConn {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterDataServiceServer(s, svc)
go s.Serve(lis)
t.Cleanup(s.Stop)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
t.Cleanup(func() { conn.Close() })
return conn
}
// Mock service implementation
type mockDataService struct {
pb.UnimplementedDataServiceServer
items []*pb.Item
}
func (m *mockDataService) ListItems(req *pb.ListRequest, stream pb.DataService_ListItemsServer) error {
for _, item := range m.items {
if err := stream.Send(item); err != nil {
return err
}
}
return nil
}
func TestWithMockServer(t *testing.T) {
mock := &mockDataService{
items: []*pb.Item{
{Id: "1", Name: "Item 1"},
{Id: "2", Name: "Item 2"},
},
}
conn := startTestServer(t, mock)
client := pb.NewDataServiceClient(conn)
// Test against mock server
stream, err := client.ListItems(context.Background(), &pb.ListRequest{})
// ...
}Node.js: grpc-mock
const GrpcMock = require('grpc-mock');
const mockServer = GrpcMock.createMockServer({
protoPath: './proto/data_service.proto',
packageName: 'dataservice',
serviceName: 'DataService',
options: {
keepCase: true,
},
});
describe('DataService streaming tests', () => {
before(() => mockServer.listen('0.0.0.0:50051'));
after(() => mockServer.close());
it('should stream items', (done) => {
mockServer.addMockFor('ListItems', {
stream: true,
response: [
{ id: '1', name: 'Widget A' },
{ id: '2', name: 'Widget B' },
],
});
// Run test against mock...
});
});Testing Streaming Edge Cases
Deadline Exceeded
func TestServerStreaming_DeadlineExceeded(t *testing.T) {
// Set a very short deadline
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
stream, err := client.ListItems(ctx, &pb.ListRequest{Limit: 10000})
if err != nil {
// Deadline hit before stream even started — also acceptable
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
return
}
t.Fatalf("Unexpected error: %v", err)
}
var gotDeadline bool
for {
_, err := stream.Recv()
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
gotDeadline = true
}
break
}
}
if !gotDeadline {
t.Error("Expected DeadlineExceeded error")
}
}Stream Metadata and Trailers
func TestServerStreaming_TrailersReceived(t *testing.T) {
var trailers metadata.MD
stream, err := client.ListItems(
context.Background(),
&pb.ListRequest{},
grpc.Trailer(&trailers),
)
if err != nil {
t.Fatalf("Failed: %v", err)
}
// Drain the stream
for {
_, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("Recv error: %v", err)
}
}
// Check trailers were received
if len(trailers["x-total-count"]) == 0 {
t.Error("Expected x-total-count trailer to be set")
}
}Checklist for gRPC Streaming Tests
Server streaming:
- Stream completes with
io.EOFwhen all items sent - Correct number of items received
- Items arrive in expected order
- Context cancellation stops the stream
- Deadline exceeded during stream propagates correctly
- Server error mid-stream is received as gRPC status error
Client streaming:
- Server receives all items sent before
call.end() - Summary response is correct
- Empty stream (no items before
end()) handled gracefully - Invalid item causes appropriate error code
- Backpressure / flow control doesn't deadlock
Bidirectional streaming:
- All requests get corresponding responses
- Out-of-order responses are handled
- One side can close before the other
- Server error mid-stream propagates
- Large volumes don't deadlock
Testing gRPC streams is more involved than unary RPCs, but the failure modes — dropped messages, ordering bugs, deadlocks under backpressure — are the exact issues that show up in production if you skip them.