Testing NestJS Microservices: TCP, RabbitMQ, and Kafka Transport Layers
Testing NestJS microservices is harder than testing monolithic HTTP apps because you now have distributed components, message brokers, and transport-specific behavior to deal with. The test strategy depends on what you're testing: the message handler logic itself, the transport serialization, or the end-to-end flow across services.
This guide covers all three layers, with practical examples for TCP, RabbitMQ, and Kafka transports.
The Core Challenge
A NestJS microservice controller looks like this:
// orders/orders.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, EventPattern } from '@nestjs/microservices';
import { OrdersService } from './orders.service';
import { CreateOrderDto } from './dto/create-order.dto';
@Controller()
export class OrdersController {
constructor(private readonly ordersService: OrdersService) {}
@MessagePattern('create_order')
async createOrder(@Payload() createOrderDto: CreateOrderDto) {
return this.ordersService.create(createOrderDto);
}
@EventPattern('order_shipped')
async handleOrderShipped(@Payload() data: { orderId: string; trackingNumber: string }) {
await this.ordersService.markAsShipped(data.orderId, data.trackingNumber);
}
@MessagePattern('get_order')
async getOrder(@Payload() id: string) {
return this.ordersService.findOne(id);
}
}The handler methods are plain TypeScript. They receive a payload and return a value (for @MessagePattern) or nothing (for @EventPattern). This is the key insight: you can unit test handlers directly without any transport.
Unit Testing Microservice Controllers
Handler logic can be tested by calling methods directly, bypassing the transport entirely:
// orders/orders.controller.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { OrdersController } from './orders.controller';
import { OrdersService } from './orders.service';
describe('OrdersController', () => {
let controller: OrdersController;
let mockOrdersService: {
create: jest.Mock;
findOne: jest.Mock;
markAsShipped: jest.Mock;
};
beforeEach(async () => {
mockOrdersService = {
create: jest.fn(),
findOne: jest.fn(),
markAsShipped: jest.fn(),
};
const module: TestingModule = await Test.createTestingModule({
controllers: [OrdersController],
providers: [
{
provide: OrdersService,
useValue: mockOrdersService,
},
],
}).compile();
controller = module.get<OrdersController>(OrdersController);
});
describe('createOrder', () => {
it('creates an order and returns it', async () => {
const dto = { userId: 'user-1', items: [{ productId: 'prod-1', quantity: 2 }] };
const created = { id: 'order-1', ...dto, status: 'pending' };
mockOrdersService.create.mockResolvedValue(created);
const result = await controller.createOrder(dto as any);
expect(result).toEqual(created);
expect(mockOrdersService.create).toHaveBeenCalledWith(dto);
});
it('propagates service errors', async () => {
mockOrdersService.create.mockRejectedValue(new Error('Out of stock'));
await expect(controller.createOrder({ userId: 'u', items: [] } as any))
.rejects.toThrow('Out of stock');
});
});
describe('handleOrderShipped', () => {
it('marks order as shipped', async () => {
mockOrdersService.markAsShipped.mockResolvedValue(undefined);
await controller.handleOrderShipped({ orderId: 'order-1', trackingNumber: 'TRK123' });
expect(mockOrdersService.markAsShipped).toHaveBeenCalledWith('order-1', 'TRK123');
});
});
describe('getOrder', () => {
it('returns order by id', async () => {
const order = { id: 'order-1', status: 'pending' };
mockOrdersService.findOne.mockResolvedValue(order);
const result = await controller.getOrder('order-1');
expect(result).toEqual(order);
});
it('propagates not found errors', async () => {
mockOrdersService.findOne.mockRejectedValue(new Error('Order not found'));
await expect(controller.getOrder('nonexistent')).rejects.toThrow('Order not found');
});
});
});These tests run fast and cover 100% of the handler logic without any broker running.
Testing the Microservice Client Side
The other half of the equation is testing services that call microservices via ClientProxy. These are typically HTTP services that delegate to microservices:
// api-gateway/orders-client.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { firstValueFrom, timeout } from 'rxjs';
@Injectable()
export class OrdersClientService {
constructor(
@Inject('ORDERS_SERVICE') private readonly client: ClientProxy,
) {}
async createOrder(createOrderDto: any) {
return firstValueFrom(
this.client.send('create_order', createOrderDto).pipe(timeout(5000))
);
}
emitOrderShipped(orderId: string, trackingNumber: string) {
this.client.emit('order_shipped', { orderId, trackingNumber });
}
}Mock the ClientProxy to test the client service:
// api-gateway/orders-client.service.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { OrdersClientService } from './orders-client.service';
import { of } from 'rxjs';
describe('OrdersClientService', () => {
let service: OrdersClientService;
let mockClientProxy: { send: jest.Mock; emit: jest.Mock };
beforeEach(async () => {
mockClientProxy = {
send: jest.fn(),
emit: jest.fn(),
};
const module: TestingModule = await Test.createTestingModule({
providers: [
OrdersClientService,
{
provide: 'ORDERS_SERVICE',
useValue: mockClientProxy,
},
],
}).compile();
service = module.get<OrdersClientService>(OrdersClientService);
});
describe('createOrder', () => {
it('sends create_order message and returns response', async () => {
const dto = { userId: 'u1', items: [] };
const created = { id: 'order-1' };
mockClientProxy.send.mockReturnValue(of(created));
const result = await service.createOrder(dto);
expect(result).toEqual(created);
expect(mockClientProxy.send).toHaveBeenCalledWith('create_order', dto);
});
it('propagates errors from the microservice', async () => {
const { throwError } = await import('rxjs');
mockClientProxy.send.mockReturnValue(throwError(() => new Error('Service down')));
await expect(service.createOrder({})).rejects.toThrow('Service down');
});
});
describe('emitOrderShipped', () => {
it('emits order_shipped event', () => {
service.emitOrderShipped('order-1', 'TRK123');
expect(mockClientProxy.emit).toHaveBeenCalledWith('order_shipped', {
orderId: 'order-1',
trackingNumber: 'TRK123',
});
});
});
});Integration Testing with TCP Transport
TCP transport is the simplest to spin up in integration tests — no external broker needed:
// test/orders.integration.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { Transport, ClientTCP, ClientProxy } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';
import { OrdersModule } from '../src/orders/orders.module';
describe('Orders Microservice (TCP integration)', () => {
let microservice: INestMicroservice;
let client: ClientProxy;
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [OrdersModule],
}).compile();
microservice = moduleFixture.createNestMicroservice({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 4001, // use a port not used by anything else in tests
},
});
await microservice.listen();
client = new ClientTCP({ host: 'localhost', port: 4001 });
await client.connect();
});
afterAll(async () => {
await client.close();
await microservice.close();
});
it('creates an order via TCP', async () => {
const result = await firstValueFrom(
client.send('create_order', {
userId: 'user-1',
items: [{ productId: 'prod-1', quantity: 1 }],
})
);
expect(result).toMatchObject({
id: expect.any(String),
status: 'pending',
});
});
it('retrieves an order via TCP', async () => {
// First create one
const created = await firstValueFrom(
client.send('create_order', { userId: 'user-1', items: [] })
);
const retrieved = await firstValueFrom(
client.send('get_order', created.id)
);
expect(retrieved).toMatchObject({ id: created.id });
});
});TCP integration tests work without any infrastructure dependencies. They're fast enough to run in CI on every push.
Testing with RabbitMQ Transport
For RabbitMQ, you have two options: mock the transport or use a real RabbitMQ instance. For CI, testcontainers makes the latter practical:
// test/orders-rabbitmq.integration.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { Transport, ClientRMQ, ClientProxy } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { OrdersModule } from '../src/orders/orders.module';
describe('Orders Microservice (RabbitMQ integration)', () => {
let container: StartedTestContainer;
let microservice: INestMicroservice;
let client: ClientProxy;
let amqpUrl: string;
beforeAll(async () => {
// Start RabbitMQ container
container = await new GenericContainer('rabbitmq:3-management')
.withExposedPorts(5672)
.start();
const port = container.getMappedPort(5672);
amqpUrl = `amqp://localhost:${port}`;
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [OrdersModule],
}).compile();
microservice = moduleFixture.createNestMicroservice({
transport: Transport.RMQ,
options: {
urls: [amqpUrl],
queue: 'orders_queue_test',
queueOptions: { durable: false },
},
});
await microservice.listen();
client = new ClientRMQ({
urls: [amqpUrl],
queue: 'orders_queue_test',
queueOptions: { durable: false },
});
await client.connect();
}, 60000); // Container startup can take 30-60 seconds
afterAll(async () => {
await client.close();
await microservice.close();
await container.stop();
});
it('sends message to RabbitMQ and receives response', async () => {
const result = await firstValueFrom(
client.send('create_order', { userId: 'user-1', items: [] })
);
expect(result).toMatchObject({ id: expect.any(String) });
});
});For unit tests of RabbitMQ-specific handler logic (like handling dead letter queues or specific AMQP message properties), you can test at the service layer without the transport:
// When handling retry logic or message headers in your service
describe('OrdersService retry handling', () => {
it('marks order as failed after max retries', async () => {
mockRepository.findOne.mockResolvedValue({ id: 'order-1', retryCount: 5 });
mockRepository.save.mockResolvedValue({ id: 'order-1', status: 'failed' });
const result = await service.processWithRetry('order-1');
expect(result.status).toBe('failed');
expect(mockRepository.save).toHaveBeenCalledWith(
expect.objectContaining({ status: 'failed' })
);
});
});Testing Kafka Transport
Kafka is the heaviest transport to test. Unit tests of handler logic are identical to TCP — just call the methods. For integration tests, use testcontainers with kafka-js:
// test/notifications-kafka.integration.spec.ts
import { GenericContainer, StartedTestContainer, Network } from 'testcontainers';
import { Transport, ClientKafka } from '@nestjs/microservices';
import { Test, TestingModule } from '@nestjs/testing';
import { INestMicroservice } from '@nestjs/common';
import { NotificationsModule } from '../src/notifications/notifications.module';
describe('Notifications Microservice (Kafka)', () => {
let zookeeperContainer: StartedTestContainer;
let kafkaContainer: StartedTestContainer;
let network: Network;
let microservice: INestMicroservice;
let client: ClientKafka;
beforeAll(async () => {
network = await new Network().start();
zookeeperContainer = await new GenericContainer('confluentinc/cp-zookeeper:7.3.0')
.withNetwork(network)
.withNetworkAliases('zookeeper')
.withEnvironment({
ZOOKEEPER_CLIENT_PORT: '2181',
ZOOKEEPER_TICK_TIME: '2000',
})
.start();
kafkaContainer = await new GenericContainer('confluentinc/cp-kafka:7.3.0')
.withNetwork(network)
.withExposedPorts(9092)
.withEnvironment({
KAFKA_BROKER_ID: '1',
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181',
KAFKA_ADVERTISED_LISTENERS: `PLAINTEXT://localhost:${9092}`,
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1',
})
.start();
const kafkaPort = kafkaContainer.getMappedPort(9092);
const brokers = [`localhost:${kafkaPort}`];
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [NotificationsModule],
}).compile();
microservice = moduleFixture.createNestMicroservice({
transport: Transport.KAFKA,
options: {
client: { brokers },
consumer: { groupId: 'notifications-test' },
},
});
await microservice.listen();
client = new ClientKafka({
client: { brokers },
consumer: { groupId: 'test-producer' },
});
await client.connect();
}, 120000);
afterAll(async () => {
await client.close();
await microservice.close();
await kafkaContainer.stop();
await zookeeperContainer.stop();
await network.stop();
});
it('processes notification event from Kafka', async () => {
// For event patterns (fire-and-forget), just emit and verify side effects
client.emit('user.registered', { userId: 'user-1', email: 'test@example.com' });
// Give the consumer time to process
await new Promise(resolve => setTimeout(resolve, 1000));
// Verify the side effect — e.g., notification stored in DB
const notifications = await notificationsService.findByUserId('user-1');
expect(notifications).toHaveLength(1);
expect(notifications[0].type).toBe('welcome');
});
});Kafka integration tests are slow (2-3 minutes for container startup). Keep them in a separate test suite run only in CI, not on every file save.
Testing Error Scenarios and Dead Letters
A critical area often skipped: what happens when a handler throws?
// testing error handling in message handlers
describe('Error handling', () => {
it('returns RpcException for invalid payload', async () => {
const { RpcException } = await import('@nestjs/microservices');
mockOrdersService.create.mockRejectedValue(new RpcException('Invalid payload'));
await expect(controller.createOrder({} as any))
.rejects.toThrow(RpcException);
});
it('wraps unexpected errors', async () => {
mockOrdersService.create.mockRejectedValue(new Error('Database connection failed'));
// Verify your error handling middleware/filter catches this
await expect(controller.createOrder({} as any))
.rejects.toThrow();
});
});Always test RpcException handling separately from HttpException — the filter that catches HTTP exceptions does not automatically catch RPC exceptions.
Structuring Microservice Tests
Keep tests at three levels:
- Unit (
*.spec.tsnext to source): handler logic, service methods, validators. No broker, no network. Run in milliseconds. - Integration (
test/*.integration.spec.ts): TCP integration or testcontainers. Tests transport + serialization. Run in seconds to minutes. - Contract tests: Verify that message schemas match between producer and consumer. Tools like Pact or Avro schema registries work here.
The rule: push everything you possibly can into unit tests. Each level up adds orders of magnitude to run time.
In production, microservices fail in ways integration tests don't catch — broker connectivity issues, serialization mismatches with real message sizes, consumer lag. HelpMeTest runs scheduled end-to-end scenarios against your live microservice infrastructure, so you know when something degrades before it cascades into a customer-facing incident.