Testing gRPC Streaming: Server, Client, and Bidirectional Streams

Testing gRPC Streaming: Server, Client, and Bidirectional Streams

gRPC streaming RPCs are among the most powerful features of the protocol — and among the hardest to test correctly. Unlike unary calls, streams introduce ordering, backpressure, partial failure, and cancellation scenarios that require different testing approaches.

This guide covers testing all three streaming types: server streaming, client streaming, and bidirectional streaming.

The Four gRPC Call Types

Before testing, understand what you're testing:

service DataService {
  // Unary — one request, one response
  rpc GetItem (GetItemRequest) returns (Item);

  // Server streaming — one request, many responses
  rpc ListItems (ListRequest) returns (stream Item);

  // Client streaming — many requests, one response
  rpc UploadItems (stream Item) returns (UploadSummary);

  // Bidirectional streaming — many requests, many responses
  rpc SyncItems (stream SyncRequest) returns (stream SyncResponse);
}

Each type has different failure modes and test requirements.

Testing Server Streaming

Server streaming sends a single request and receives a stream of responses. Common use cases: paginated results, real-time feeds, log tailing.

Go Test Example

package service_test

import (
    "context"
    "io"
    "testing"
    "time"

    pb "myapp/proto"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func TestServerStreaming_ReceivesAllItems(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.ListItems(ctx, &pb.ListRequest{
        Filter: "active",
        Limit:  100,
    })
    if err != nil {
        t.Fatalf("ListItems failed: %v", err)
    }

    var received []string
    for {
        item, err := stream.Recv()
        if err == io.EOF {
            break // Stream completed normally
        }
        if err != nil {
            t.Fatalf("Recv failed: %v", err)
        }
        received = append(received, item.Id)
    }

    if len(received) == 0 {
        t.Error("Expected at least one item, got none")
    }
    
    // Verify ordering
    for i := 1; i < len(received); i++ {
        if received[i] <= received[i-1] {
            t.Errorf("Items out of order at index %d: %s <= %s",
                i, received[i], received[i-1])
        }
    }
}

Test: Context Cancellation Mid-Stream

func TestServerStreaming_CancellationStopsStream(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())

    stream, err := client.ListItems(ctx, &pb.ListRequest{Limit: 10000})
    if err != nil {
        t.Fatalf("ListItems failed: %v", err)
    }

    // Receive a few items then cancel
    itemsReceived := 0
    for itemsReceived < 3 {
        _, err := stream.Recv()
        if err != nil {
            t.Fatalf("Failed before receiving 3 items: %v", err)
        }
        itemsReceived++
    }

    cancel() // Cancel the context

    // Subsequent Recv should fail with Canceled
    _, err = stream.Recv()
    if err == nil {
        t.Error("Expected error after context cancellation, got nil")
    }

    st, ok := status.FromError(err)
    if !ok {
        t.Fatalf("Expected gRPC status error, got: %v", err)
    }
    if st.Code() != codes.Canceled {
        t.Errorf("Expected Canceled, got %v", st.Code())
    }
}

Test: Stream Error Partway Through

Verify the client handles server-side errors that occur after some items have been sent:

func TestServerStreaming_PartialStreamError(t *testing.T) {
    // Inject a server error after 5 items (requires test server or mock)
    stream, err := client.ListItems(ctx, &pb.ListRequest{
        Filter: "trigger-error-after:5",
    })
    if err != nil {
        t.Fatalf("ListItems failed: %v", err)
    }

    var items []*pb.Item
    var streamErr error
    for {
        item, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            streamErr = err
            break
        }
        items = append(items, item)
    }

    // Should have received the first 5 items
    if len(items) != 5 {
        t.Errorf("Expected 5 items before error, got %d", len(items))
    }

    // Error should be Internal or whatever the server set
    if streamErr == nil {
        t.Error("Expected stream error, got none")
    }
    st, _ := status.FromError(streamErr)
    if st.Code() != codes.Internal {
        t.Errorf("Expected Internal, got %v", st.Code())
    }
}

Testing Client Streaming

Client streaming sends multiple requests and receives a single summary response.

Node.js Test Example

const grpc = require('@grpc/grpc-js');
const { promisify } = require('util');

describe('Client Streaming: UploadItems', () => {
  it('should upload all items and return summary', (done) => {
    const call = client.uploadItems((error, response) => {
      if (error) return done(error);

      expect(response.itemsProcessed).toBe(3);
      expect(response.failures).toBe(0);
      done();
    });

    const items = [
      { id: 'item-1', name: 'Widget A', price: 9.99 },
      { id: 'item-2', name: 'Widget B', price: 14.99 },
      { id: 'item-3', name: 'Widget C', price: 4.99 },
    ];

    for (const item of items) {
      call.write(item);
    }
    call.end();
  });

  it('should handle empty upload gracefully', (done) => {
    const call = client.uploadItems((error, response) => {
      if (error) return done(error);
      expect(response.itemsProcessed).toBe(0);
      done();
    });

    call.end(); // Send no items before ending
  });

  it('should propagate error from invalid item', (done) => {
    const call = client.uploadItems((error) => {
      expect(error).toBeDefined();
      expect(error.code).toBe(grpc.status.INVALID_ARGUMENT);
      done();
    });

    call.write({ id: '', name: '', price: -1 }); // Invalid item
    call.end();
  });
});

Test: Backpressure and Flow Control

it('should handle large upload with backpressure', (done) => {
  const ITEM_COUNT = 10000;
  let sent = 0;
  let writeSucceeded = 0;

  const call = client.uploadItems((error, response) => {
    if (error) return done(error);
    expect(response.itemsProcessed).toBe(ITEM_COUNT);
    done();
  });

  function sendNext() {
    if (sent >= ITEM_COUNT) {
      call.end();
      return;
    }

    const canContinue = call.write({ id: `item-${sent}`, name: `Item ${sent}`, price: 1.0 });
    sent++;
    writeSucceeded++;

    if (canContinue) {
      // Buffer not full, send immediately
      setImmediate(sendNext);
    } else {
      // Wait for drain before sending more
      call.once('drain', sendNext);
    }
  }

  sendNext();
}, 30000); // Long timeout for large upload

Testing Bidirectional Streaming

Bidirectional streaming is the most complex — requests and responses flow independently.

Python Test Example

import asyncio
import pytest
import grpc
import myapp_pb2 as pb
import myapp_pb2_grpc as pb_grpc

@pytest.mark.asyncio
async def test_bidirectional_sync():
    """Test bidirectional SyncItems RPC"""
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = pb_grpc.DataServiceStub(channel)

        # Prepare requests
        requests = [
            pb.SyncRequest(item_id=f'item-{i}', action='UPDATE')
            for i in range(5)
        ]

        async def request_generator():
            for req in requests:
                yield req
                await asyncio.sleep(0.01)

        responses = []
        async for response in stub.SyncItems(request_generator()):
            responses.append(response)

        # Should receive one response per request
        assert len(responses) == 5
        for i, resp in enumerate(responses):
            assert resp.item_id == f'item-{i}'
            assert resp.status == 'OK'


@pytest.mark.asyncio
async def test_bidirectional_out_of_order_responses():
    """Verify responses may arrive in different order than requests"""
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = pb_grpc.DataServiceStub(channel)

        request_ids = set()
        response_ids = set()

        async def requests():
            for i in range(10):
                item_id = f'item-{i}'
                request_ids.add(item_id)
                yield pb.SyncRequest(item_id=item_id, action='GET')

        async for response in stub.SyncItems(requests()):
            response_ids.add(response.item_id)

        # All IDs should be accounted for, regardless of order
        assert request_ids == response_ids


@pytest.mark.asyncio
async def test_bidirectional_server_closes_early():
    """Server closes stream before client sends all requests"""
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = pb_grpc.DataServiceStub(channel)

        async def many_requests():
            for i in range(1000):
                yield pb.SyncRequest(item_id=f'item-{i}')

        responses = []
        with pytest.raises(grpc.aio.AioRpcError) as exc_info:
            async for response in stub.SyncItems(many_requests()):
                responses.append(response)
                if len(responses) >= 3:
                    # Server sends error after 3 responses in test mode
                    pass

        assert exc_info.value.code() in (
            grpc.StatusCode.CANCELLED,
            grpc.StatusCode.INTERNAL,
        )

Mock gRPC Servers for Testing

Testing against a real server is slow and environment-dependent. Use mock servers for unit tests:

Go: In-Process Test Server

func startTestServer(t *testing.T, svc pb.DataServiceServer) *grpc.ClientConn {
    t.Helper()

    lis, err := net.Listen("tcp", "127.0.0.1:0")
    if err != nil {
        t.Fatalf("Failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterDataServiceServer(s, svc)

    go s.Serve(lis)
    t.Cleanup(s.Stop)

    conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
    if err != nil {
        t.Fatalf("Failed to connect: %v", err)
    }
    t.Cleanup(func() { conn.Close() })

    return conn
}

// Mock service implementation
type mockDataService struct {
    pb.UnimplementedDataServiceServer
    items []*pb.Item
}

func (m *mockDataService) ListItems(req *pb.ListRequest, stream pb.DataService_ListItemsServer) error {
    for _, item := range m.items {
        if err := stream.Send(item); err != nil {
            return err
        }
    }
    return nil
}

func TestWithMockServer(t *testing.T) {
    mock := &mockDataService{
        items: []*pb.Item{
            {Id: "1", Name: "Item 1"},
            {Id: "2", Name: "Item 2"},
        },
    }
    conn := startTestServer(t, mock)
    client := pb.NewDataServiceClient(conn)

    // Test against mock server
    stream, err := client.ListItems(context.Background(), &pb.ListRequest{})
    // ...
}

Node.js: grpc-mock

const GrpcMock = require('grpc-mock');

const mockServer = GrpcMock.createMockServer({
  protoPath: './proto/data_service.proto',
  packageName: 'dataservice',
  serviceName: 'DataService',
  options: {
    keepCase: true,
  },
});

describe('DataService streaming tests', () => {
  before(() => mockServer.listen('0.0.0.0:50051'));
  after(() => mockServer.close());

  it('should stream items', (done) => {
    mockServer.addMockFor('ListItems', {
      stream: true,
      response: [
        { id: '1', name: 'Widget A' },
        { id: '2', name: 'Widget B' },
      ],
    });

    // Run test against mock...
  });
});

Testing Streaming Edge Cases

Deadline Exceeded

func TestServerStreaming_DeadlineExceeded(t *testing.T) {
    // Set a very short deadline
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
    defer cancel()

    stream, err := client.ListItems(ctx, &pb.ListRequest{Limit: 10000})
    if err != nil {
        // Deadline hit before stream even started — also acceptable
        st, _ := status.FromError(err)
        if st.Code() == codes.DeadlineExceeded {
            return
        }
        t.Fatalf("Unexpected error: %v", err)
    }

    var gotDeadline bool
    for {
        _, err := stream.Recv()
        if err != nil {
            st, _ := status.FromError(err)
            if st.Code() == codes.DeadlineExceeded {
                gotDeadline = true
            }
            break
        }
    }

    if !gotDeadline {
        t.Error("Expected DeadlineExceeded error")
    }
}

Stream Metadata and Trailers

func TestServerStreaming_TrailersReceived(t *testing.T) {
    var trailers metadata.MD

    stream, err := client.ListItems(
        context.Background(),
        &pb.ListRequest{},
        grpc.Trailer(&trailers),
    )
    if err != nil {
        t.Fatalf("Failed: %v", err)
    }

    // Drain the stream
    for {
        _, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            t.Fatalf("Recv error: %v", err)
        }
    }

    // Check trailers were received
    if len(trailers["x-total-count"]) == 0 {
        t.Error("Expected x-total-count trailer to be set")
    }
}

Checklist for gRPC Streaming Tests

Server streaming:

  • Stream completes with io.EOF when all items sent
  • Correct number of items received
  • Items arrive in expected order
  • Context cancellation stops the stream
  • Deadline exceeded during stream propagates correctly
  • Server error mid-stream is received as gRPC status error

Client streaming:

  • Server receives all items sent before call.end()
  • Summary response is correct
  • Empty stream (no items before end()) handled gracefully
  • Invalid item causes appropriate error code
  • Backpressure / flow control doesn't deadlock

Bidirectional streaming:

  • All requests get corresponding responses
  • Out-of-order responses are handled
  • One side can close before the other
  • Server error mid-stream propagates
  • Large volumes don't deadlock

Testing gRPC streams is more involved than unary RPCs, but the failure modes — dropped messages, ordering bugs, deadlocks under backpressure — are the exact issues that show up in production if you skip them.

Read more