NATS.io Testing Guide: Testing JetStream, KV, and Pub/Sub Patterns

NATS.io Testing Guide: Testing JetStream, KV, and Pub/Sub Patterns

NATS testing is unusually clean compared to other message brokers: you can run an embedded NATS server in the same process as your tests with zero Docker setup. This guide covers testing core pub/sub, JetStream persistent streams, and the key-value store using Go's testing package and the nats-server/v2 library.

Key Takeaways

Use the embedded NATS server for unit and integration tests. natsserver.RunServer() starts a real NATS server in-process. No Docker, no ports to manage, tests run fast.

JetStream requires explicit stream creation in tests. Unlike core NATS, JetStream streams don't auto-create. Each test that uses JetStream must create its stream or use a shared fixture.

Test message delivery guarantees explicitly. Core NATS is at-most-once (fire and forget). JetStream is at-least-once. Test both paths — a subscriber missing a core NATS message is not a bug; a JetStream consumer missing a message is.

Use unique subjects per test. Prefix subjects with the test name to prevent bleed between concurrent tests.

Test consumer restart scenarios. JetStream's durable consumers replay from the last ack. Simulate a consumer restart and verify it picks up unprocessed messages.

NATS Testing Fundamentals

NATS offers three messaging models with different delivery guarantees:

  • Core NATS pub/sub: at-most-once, no persistence
  • JetStream: at-least-once persistent streams, consumers, replay
  • Key-Value (KV): persistent key-value store backed by JetStream

Each model needs different test strategies. The embedded server approach handles all three.

Setup

go get github.com/nats-io/nats.go@latest
go get github.com/nats-io/nats-server/v2@latest

Embedded Server Fixture

Start a real NATS server per test or per suite:

// testing/nats_helpers.go
package testhelpers

import (
    "testing"
    "time"

    "github.com/nats-io/nats-server/v2/server"
    natsserver "github.com/nats-io/nats-server/v2/test"
    "github.com/nats-io/nats.go"
)

func StartNATSServer(t *testing.T) *server.Server {
    t.Helper()
    opts := &server.Options{
        Port:      -1, // Random port
        JetStream: true,
        StoreDir:  t.TempDir(),
    }
    s := natsserver.RunServer(opts)
    t.Cleanup(func() { s.Shutdown() })
    return s
}

func ConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
    t.Helper()
    nc, err := nats.Connect(s.ClientURL())
    if err != nil {
        t.Fatalf("failed to connect: %v", err)
    }
    t.Cleanup(nc.Close)
    return nc
}

Testing Core Pub/Sub

Core NATS is fire-and-forget. Test that a subscriber receives a published message:

func TestCorePubSub(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)

    received := make(chan *nats.Msg, 1)
    _, err := nc.Subscribe("orders.new", func(msg *nats.Msg) {
        received <- msg
    })
    if err != nil {
        t.Fatal(err)
    }

    err = nc.Publish("orders.new", []byte(`{"order_id":"123"}`))
    if err != nil {
        t.Fatal(err)
    }

    select {
    case msg := <-received:
        if string(msg.Data) != `{"order_id":"123"}` {
            t.Errorf("unexpected message: %s", msg.Data)
        }
    case <-time.After(1 * time.Second):
        t.Fatal("timed out waiting for message")
    }
}

func TestCorePublishNoSubscriberIsNotError(t *testing.T) {
    // Core NATS: publishing to a subject with no subscribers is valid
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)

    err := nc.Publish("events.void", []byte("hello"))
    if err != nil {
        t.Errorf("expected no error publishing to unsubscribed subject, got: %v", err)
    }
}

Testing JetStream Streams

JetStream requires creating streams before publishing:

func TestJetStreamPublishAndConsume(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)
    js, err := nc.JetStream()
    if err != nil {
        t.Fatal(err)
    }

    // Create stream
    _, err = js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"orders.>"},
    })
    if err != nil {
        t.Fatal(err)
    }

    // Publish
    ack, err := js.Publish("orders.new", []byte(`{"order_id":"456"}`))
    if err != nil {
        t.Fatal(err)
    }
    if ack.Sequence != 1 {
        t.Errorf("expected sequence 1, got %d", ack.Sequence)
    }

    // Subscribe and receive
    sub, err := js.SubscribeSync("orders.new")
    if err != nil {
        t.Fatal(err)
    }
    defer sub.Unsubscribe()

    msg, err := sub.NextMsg(2 * time.Second)
    if err != nil {
        t.Fatal(err)
    }
    msg.Ack()

    if string(msg.Data) != `{"order_id":"456"}` {
        t.Errorf("unexpected message data: %s", msg.Data)
    }
}

Testing Durable Consumers and Replay

JetStream durable consumers track ack positions — test that an unacked message is redelivered:

func TestDurableConsumerReplaysUnackedMessages(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)
    js, _ := nc.JetStream()

    js.AddStream(&nats.StreamConfig{
        Name:     "TASKS",
        Subjects: []string{"tasks.>"},
    })

    // Publish a message
    js.Publish("tasks.process", []byte(`{"task_id":"t1"}`))

    // First consumer: receive but don't ack
    sub1, _ := js.PullSubscribe("tasks.process", "worker")
    msgs, _ := sub1.Fetch(1, nats.MaxWait(2*time.Second))
    if len(msgs) != 1 {
        t.Fatal("expected 1 message")
    }
    sub1.Unsubscribe() // Close without acking

    // Second consumer with same durable name: should redeliver
    sub2, _ := js.PullSubscribe("tasks.process", "worker")
    msgs2, _ := sub2.Fetch(1, nats.MaxWait(5*time.Second))
    if len(msgs2) != 1 {
        t.Fatal("expected redelivered message")
    }
    if string(msgs2[0].Data) != `{"task_id":"t1"}` {
        t.Errorf("unexpected data: %s", msgs2[0].Data)
    }
    msgs2[0].Ack()
}

Testing the Key-Value Store

NATS KV is backed by JetStream. Test put, get, and watch operations:

func TestKeyValueStore(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)
    js, _ := nc.JetStream()

    kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
        Bucket: "config",
    })
    if err != nil {
        t.Fatal(err)
    }

    // Put and get
    _, err = kv.Put("feature.dark_mode", []byte("true"))
    if err != nil {
        t.Fatal(err)
    }

    entry, err := kv.Get("feature.dark_mode")
    if err != nil {
        t.Fatal(err)
    }
    if string(entry.Value()) != "true" {
        t.Errorf("expected 'true', got %s", entry.Value())
    }

    // Delete
    err = kv.Delete("feature.dark_mode")
    if err != nil {
        t.Fatal(err)
    }

    _, err = kv.Get("feature.dark_mode")
    if err != nats.ErrKeyNotFound {
        t.Errorf("expected ErrKeyNotFound after delete, got: %v", err)
    }
}

func TestKeyValueWatch(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)
    js, _ := nc.JetStream()

    kv, _ := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "flags"})

    watcher, err := kv.Watch("flags.>")
    if err != nil {
        t.Fatal(err)
    }
    defer watcher.Stop()

    // Skip initial empty entry
    <-watcher.Updates()

    kv.Put("flags.rollout", []byte("50"))

    select {
    case update := <-watcher.Updates():
        if update == nil {
            t.Fatal("expected update, got nil")
        }
        if string(update.Value()) != "50" {
            t.Errorf("unexpected value: %s", update.Value())
        }
    case <-time.After(2 * time.Second):
        t.Fatal("timed out waiting for KV watch update")
    }
}

Testing Request-Reply

NATS request-reply is synchronous messaging. Test that the responder receives and responds correctly:

func TestRequestReply(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)

    // Responder
    nc.Subscribe("rpc.user.get", func(msg *nats.Msg) {
        msg.Respond([]byte(`{"user_id":"u1","name":"Alice"}`))
    })

    // Caller
    reply, err := nc.Request("rpc.user.get", []byte(`{"user_id":"u1"}`), 2*time.Second)
    if err != nil {
        t.Fatal(err)
    }

    if string(reply.Data) != `{"user_id":"u1","name":"Alice"}` {
        t.Errorf("unexpected reply: %s", reply.Data)
    }
}

func TestRequestReplyTimeout(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)

    // No responder registered
    _, err := nc.Request("rpc.missing", []byte("hello"), 100*time.Millisecond)
    if err == nil {
        t.Fatal("expected timeout error, got nil")
    }
}

Testing with Subjects and Wildcards

NATS supports * (single token) and > (all remaining tokens) wildcards. Test routing correctness:

func TestWildcardSubscription(t *testing.T) {
    s := StartNATSServer(t)
    nc := ConnectToServer(t, s)

    orderMessages := make(chan string, 10)
    nc.Subscribe("orders.*", func(msg *nats.Msg) {
        orderMessages <- msg.Subject
    })

    nc.Publish("orders.created", nil)
    nc.Publish("orders.shipped", nil)
    nc.Publish("payments.done", nil) // Should NOT match

    nc.Flush()
    time.Sleep(50 * time.Millisecond)

    close(orderMessages)
    var received []string
    for s := range orderMessages {
        received = append(received, s)
    }

    if len(received) != 2 {
        t.Errorf("expected 2 messages, got %d: %v", len(received), received)
    }
}

CI Configuration

# .github/workflows/test.yml
- name: Run NATS tests
  run: go test ./... -v -timeout 60s

No Docker needed — the embedded server runs in-process.

Summary

NATS makes testing remarkably clean:

  • Embedded server means zero external dependencies in CI
  • Core pub/sub tests verify routing and wildcard behavior
  • JetStream tests verify persistence, consumer replay, and ack semantics
  • KV tests verify watch and consistency

The biggest testing gap teams leave is durable consumer replay — that unacked message redelivery path is what makes JetStream useful for reliability, and it's exactly what you should test first.

Read more