NATS.io Testing Guide: Testing JetStream, KV, and Pub/Sub Patterns
NATS testing is unusually clean compared to other message brokers: you can run an embedded NATS server in the same process as your tests with zero Docker setup. This guide covers testing core pub/sub, JetStream persistent streams, and the key-value store using Go's testing package and the nats-server/v2 library.
Key Takeaways
Use the embedded NATS server for unit and integration tests. natsserver.RunServer() starts a real NATS server in-process. No Docker, no ports to manage, tests run fast.
JetStream requires explicit stream creation in tests. Unlike core NATS, JetStream streams don't auto-create. Each test that uses JetStream must create its stream or use a shared fixture.
Test message delivery guarantees explicitly. Core NATS is at-most-once (fire and forget). JetStream is at-least-once. Test both paths — a subscriber missing a core NATS message is not a bug; a JetStream consumer missing a message is.
Use unique subjects per test. Prefix subjects with the test name to prevent bleed between concurrent tests.
Test consumer restart scenarios. JetStream's durable consumers replay from the last ack. Simulate a consumer restart and verify it picks up unprocessed messages.
NATS Testing Fundamentals
NATS offers three messaging models with different delivery guarantees:
- Core NATS pub/sub: at-most-once, no persistence
- JetStream: at-least-once persistent streams, consumers, replay
- Key-Value (KV): persistent key-value store backed by JetStream
Each model needs different test strategies. The embedded server approach handles all three.
Setup
go get github.com/nats-io/nats.go@latest
go get github.com/nats-io/nats-server/v2@latestEmbedded Server Fixture
Start a real NATS server per test or per suite:
// testing/nats_helpers.go
package testhelpers
import (
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
)
func StartNATSServer(t *testing.T) *server.Server {
t.Helper()
opts := &server.Options{
Port: -1, // Random port
JetStream: true,
StoreDir: t.TempDir(),
}
s := natsserver.RunServer(opts)
t.Cleanup(func() { s.Shutdown() })
return s
}
func ConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
t.Helper()
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
t.Cleanup(nc.Close)
return nc
}Testing Core Pub/Sub
Core NATS is fire-and-forget. Test that a subscriber receives a published message:
func TestCorePubSub(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
received := make(chan *nats.Msg, 1)
_, err := nc.Subscribe("orders.new", func(msg *nats.Msg) {
received <- msg
})
if err != nil {
t.Fatal(err)
}
err = nc.Publish("orders.new", []byte(`{"order_id":"123"}`))
if err != nil {
t.Fatal(err)
}
select {
case msg := <-received:
if string(msg.Data) != `{"order_id":"123"}` {
t.Errorf("unexpected message: %s", msg.Data)
}
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for message")
}
}
func TestCorePublishNoSubscriberIsNotError(t *testing.T) {
// Core NATS: publishing to a subject with no subscribers is valid
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
err := nc.Publish("events.void", []byte("hello"))
if err != nil {
t.Errorf("expected no error publishing to unsubscribed subject, got: %v", err)
}
}Testing JetStream Streams
JetStream requires creating streams before publishing:
func TestJetStreamPublishAndConsume(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
// Create stream
_, err = js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
})
if err != nil {
t.Fatal(err)
}
// Publish
ack, err := js.Publish("orders.new", []byte(`{"order_id":"456"}`))
if err != nil {
t.Fatal(err)
}
if ack.Sequence != 1 {
t.Errorf("expected sequence 1, got %d", ack.Sequence)
}
// Subscribe and receive
sub, err := js.SubscribeSync("orders.new")
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Fatal(err)
}
msg.Ack()
if string(msg.Data) != `{"order_id":"456"}` {
t.Errorf("unexpected message data: %s", msg.Data)
}
}Testing Durable Consumers and Replay
JetStream durable consumers track ack positions — test that an unacked message is redelivered:
func TestDurableConsumerReplaysUnackedMessages(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
js, _ := nc.JetStream()
js.AddStream(&nats.StreamConfig{
Name: "TASKS",
Subjects: []string{"tasks.>"},
})
// Publish a message
js.Publish("tasks.process", []byte(`{"task_id":"t1"}`))
// First consumer: receive but don't ack
sub1, _ := js.PullSubscribe("tasks.process", "worker")
msgs, _ := sub1.Fetch(1, nats.MaxWait(2*time.Second))
if len(msgs) != 1 {
t.Fatal("expected 1 message")
}
sub1.Unsubscribe() // Close without acking
// Second consumer with same durable name: should redeliver
sub2, _ := js.PullSubscribe("tasks.process", "worker")
msgs2, _ := sub2.Fetch(1, nats.MaxWait(5*time.Second))
if len(msgs2) != 1 {
t.Fatal("expected redelivered message")
}
if string(msgs2[0].Data) != `{"task_id":"t1"}` {
t.Errorf("unexpected data: %s", msgs2[0].Data)
}
msgs2[0].Ack()
}Testing the Key-Value Store
NATS KV is backed by JetStream. Test put, get, and watch operations:
func TestKeyValueStore(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
js, _ := nc.JetStream()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "config",
})
if err != nil {
t.Fatal(err)
}
// Put and get
_, err = kv.Put("feature.dark_mode", []byte("true"))
if err != nil {
t.Fatal(err)
}
entry, err := kv.Get("feature.dark_mode")
if err != nil {
t.Fatal(err)
}
if string(entry.Value()) != "true" {
t.Errorf("expected 'true', got %s", entry.Value())
}
// Delete
err = kv.Delete("feature.dark_mode")
if err != nil {
t.Fatal(err)
}
_, err = kv.Get("feature.dark_mode")
if err != nats.ErrKeyNotFound {
t.Errorf("expected ErrKeyNotFound after delete, got: %v", err)
}
}
func TestKeyValueWatch(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
js, _ := nc.JetStream()
kv, _ := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "flags"})
watcher, err := kv.Watch("flags.>")
if err != nil {
t.Fatal(err)
}
defer watcher.Stop()
// Skip initial empty entry
<-watcher.Updates()
kv.Put("flags.rollout", []byte("50"))
select {
case update := <-watcher.Updates():
if update == nil {
t.Fatal("expected update, got nil")
}
if string(update.Value()) != "50" {
t.Errorf("unexpected value: %s", update.Value())
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for KV watch update")
}
}Testing Request-Reply
NATS request-reply is synchronous messaging. Test that the responder receives and responds correctly:
func TestRequestReply(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
// Responder
nc.Subscribe("rpc.user.get", func(msg *nats.Msg) {
msg.Respond([]byte(`{"user_id":"u1","name":"Alice"}`))
})
// Caller
reply, err := nc.Request("rpc.user.get", []byte(`{"user_id":"u1"}`), 2*time.Second)
if err != nil {
t.Fatal(err)
}
if string(reply.Data) != `{"user_id":"u1","name":"Alice"}` {
t.Errorf("unexpected reply: %s", reply.Data)
}
}
func TestRequestReplyTimeout(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
// No responder registered
_, err := nc.Request("rpc.missing", []byte("hello"), 100*time.Millisecond)
if err == nil {
t.Fatal("expected timeout error, got nil")
}
}Testing with Subjects and Wildcards
NATS supports * (single token) and > (all remaining tokens) wildcards. Test routing correctness:
func TestWildcardSubscription(t *testing.T) {
s := StartNATSServer(t)
nc := ConnectToServer(t, s)
orderMessages := make(chan string, 10)
nc.Subscribe("orders.*", func(msg *nats.Msg) {
orderMessages <- msg.Subject
})
nc.Publish("orders.created", nil)
nc.Publish("orders.shipped", nil)
nc.Publish("payments.done", nil) // Should NOT match
nc.Flush()
time.Sleep(50 * time.Millisecond)
close(orderMessages)
var received []string
for s := range orderMessages {
received = append(received, s)
}
if len(received) != 2 {
t.Errorf("expected 2 messages, got %d: %v", len(received), received)
}
}CI Configuration
# .github/workflows/test.yml
- name: Run NATS tests
run: go test ./... -v -timeout 60sNo Docker needed — the embedded server runs in-process.
Summary
NATS makes testing remarkably clean:
- Embedded server means zero external dependencies in CI
- Core pub/sub tests verify routing and wildcard behavior
- JetStream tests verify persistence, consumer replay, and ack semantics
- KV tests verify watch and consistency
The biggest testing gap teams leave is durable consumer replay — that unacked message redelivery path is what makes JetStream useful for reliability, and it's exactly what you should test first.