NATS JetStream Persistence Testing: Consumer Durability and Fault Tolerance
NATS JetStream persistence introduces durable consumers, sequence-based message tracking, and stream retention policies. Testing these means verifying that a consumer resumes exactly where it left off after restart, that exactly-once deduplication works, and that your stream retention policy doesn't silently discard messages your consumers haven't read yet. Go's embedded NATS server makes all of this testable in-process without Docker.
Key Takeaways
Use the embedded NATS server for unit tests. natsserver.RunServer starts a real NATS server in-process in milliseconds. No Docker, no network — full JetStream support.
Test durable consumer resume. Connect, consume some messages, disconnect, reconnect, and verify the consumer starts from where it left off — not from the beginning and not skipping messages.
Test sequence gaps. Streams can have MaxAge or MaxMsgs limits. If messages expire before a slow consumer reads them, the consumer jumps a sequence. Test your consumer handles the gap without panicking.
Test exactly-once deduplication. JetStream supports Nats-Msg-Id header deduplication. Test that sending the same message ID twice results in one delivery.
Test consumer state after stream deletion and recreation. Consumer state can become invalid if the stream is deleted. Test your startup code handles this correctly.
JetStream Persistence: What Needs Testing
NATS core pub/sub has no persistence — messages are delivered to active subscribers or lost. JetStream adds a persistence layer with:
- Streams: named, persistent message stores with configurable retention
- Consumers: named cursors tracking progress through a stream
- Durable consumers: survive reconnects; resume from last acknowledged sequence
- Push vs. pull consumers: different delivery models with different failure modes
Most production issues come from consumer state management — consumers that restart from the wrong position, messages that expire before consumption, and duplicate processing. These need explicit tests.
Setup: Embedded NATS Server
// go.mod
require (
github.com/nats-io/nats.go v1.35.0
github.com/nats-io/nats-server/v2 v2.10.14
)// testhelpers/nats.go
package testhelpers
import (
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
)
func StartJetStreamServer(t *testing.T) (*server.Server, *nats.Conn, nats.JetStreamContext) {
t.Helper()
opts := natsserver.DefaultTestOptions
opts.Port = -1 // random port
opts.JetStream = true
opts.StoreDir = t.TempDir() // file-based store for persistence tests
srv := natsserver.RunServer(&opts)
t.Cleanup(func() { srv.Shutdown() })
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("connect: %v", err)
}
t.Cleanup(nc.Close)
js, err := nc.JetStream()
if err != nil {
t.Fatalf("jetstream: %v", err)
}
return srv, nc, js
}Testing Basic Durable Consumer Behavior
func TestDurableConsumerBasic(t *testing.T) {
_, _, js := testhelpers.StartJetStreamServer(t)
// Create stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.*"},
})
require.NoError(t, err)
// Publish messages
for i := 1; i <= 5; i++ {
_, err = js.Publish(fmt.Sprintf("orders.created"), []byte(fmt.Sprintf(`{"id":"ORD-%03d"}`, i)))
require.NoError(t, err)
}
// Create a durable consumer
sub, err := js.PullSubscribe("orders.*", "order-processor")
require.NoError(t, err)
// Fetch and process first 3
msgs, err := sub.Fetch(3, nats.MaxWait(time.Second))
require.NoError(t, err)
require.Len(t, msgs, 3)
for _, msg := range msgs {
require.NoError(t, msg.Ack())
}
// Close and recreate connection — simulates consumer restart
sub.Unsubscribe()
// Reconnect with same durable name
sub2, err := js.PullSubscribe("orders.*", "order-processor")
require.NoError(t, err)
defer sub2.Unsubscribe()
// Should receive only messages 4 and 5 (where we left off)
msgs2, err := sub2.Fetch(10, nats.MaxWait(time.Second))
require.NoError(t, err)
require.Len(t, msgs2, 2, "durable consumer should resume from sequence 4")
var data map[string]string
json.Unmarshal(msgs2[0].Data, &data)
assert.Equal(t, "ORD-004", data["id"])
}Testing Consumer Resume After Crash
The critical invariant: a durable consumer that acknowledges messages up to sequence N must resume at N+1 after restart, not from the beginning.
func TestConsumerResumesAfterDisconnect(t *testing.T) {
srv, nc, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
Storage: nats.FileStorage, // file-based for true persistence
})
require.NoError(t, err)
// Publish 10 messages
for i := 1; i <= 10; i++ {
js.Publish("events.log", []byte(fmt.Sprintf(`{"seq":%d}`, i)))
}
// Consumer 1: process and ack messages 1-5
sub1, _ := js.PullSubscribe("events.>", "event-handler")
msgs, _ := sub1.Fetch(5, nats.MaxWait(time.Second))
for _, m := range msgs {
m.Ack()
}
sub1.Unsubscribe()
nc.Close()
// Simulate reconnect (new connection to same server)
nc2, err := nats.Connect(srv.ClientURL())
require.NoError(t, err)
defer nc2.Close()
js2, _ := nc2.JetStream()
// Consumer 2: same durable name, should pick up from message 6
sub2, _ := js2.PullSubscribe("events.>", "event-handler")
defer sub2.Unsubscribe()
msgs2, err := sub2.Fetch(10, nats.MaxWait(2*time.Second))
require.NoError(t, err)
assert.Len(t, msgs2, 5, "should receive exactly the 5 remaining messages")
var first map[string]int
json.Unmarshal(msgs2[0].Data, &first)
assert.Equal(t, 6, first["seq"], "first message after resume should be seq 6")
}Testing Exactly-Once Deduplication
JetStream supports deduplication via the Nats-Msg-Id header. Same message ID within the deduplication window is delivered only once.
func TestExactlyOnceDeduplication(t *testing.T) {
_, _, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "PAYMENTS",
Subjects: []string{"payments.>"},
Duplicates: 5 * time.Minute, // deduplication window
})
require.NoError(t, err)
// Publish the same message ID twice (simulating retry after network failure)
pubAck1, err := js.Publish("payments.process",
[]byte(`{"paymentId":"PAY-001","amount":99.99}`),
nats.MsgId("PAY-001"))
require.NoError(t, err)
assert.False(t, pubAck1.Duplicate, "first publish should not be a duplicate")
pubAck2, err := js.Publish("payments.process",
[]byte(`{"paymentId":"PAY-001","amount":99.99}`),
nats.MsgId("PAY-001"))
require.NoError(t, err)
assert.True(t, pubAck2.Duplicate, "second publish with same ID should be flagged as duplicate")
// Stream should contain only 1 message
streamInfo, _ := js.StreamInfo("PAYMENTS")
assert.Equal(t, uint64(1), streamInfo.State.Msgs)
// Consumer should receive only 1 message
sub, _ := js.PullSubscribe("payments.>", "payment-processor")
defer sub.Unsubscribe()
msgs, err := sub.Fetch(10, nats.MaxWait(time.Second))
require.NoError(t, err)
assert.Len(t, msgs, 1, "exactly one message should be delivered despite two publishes")
}Testing Stream Retention Policies
Streams can limit retention by age or count. Test that your consumer handles the resulting sequence gaps.
func TestConsumerHandlesMaxAgeRetention(t *testing.T) {
_, _, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "LOGS",
Subjects: []string{"logs.>"},
MaxAge: 100 * time.Millisecond, // very short for testing
})
require.NoError(t, err)
// Publish messages at t=0
for i := 1; i <= 5; i++ {
js.Publish("logs.app", []byte(fmt.Sprintf(`{"msg":"log-%d"}`, i)))
}
// Wait for messages to expire
time.Sleep(200 * time.Millisecond)
// Publish new messages after expiry
for i := 6; i <= 10; i++ {
js.Publish("logs.app", []byte(fmt.Sprintf(`{"msg":"log-%d"}`, i)))
}
streamInfo, _ := js.StreamInfo("LOGS")
// Only the 5 new messages should remain
assert.Equal(t, uint64(5), streamInfo.State.Msgs)
// Consumer starting from beginning should get only the surviving messages
sub, _ := js.PullSubscribe("logs.>", "log-reader",
nats.StartSequence(1)) // explicitly start from sequence 1
defer sub.Unsubscribe()
msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
require.NoError(t, err)
// Should receive messages 6-10 only (1-5 were trimmed)
assert.Len(t, msgs, 5)
var data map[string]string
json.Unmarshal(msgs[0].Data, &data)
assert.True(t, data["msg"] >= "log-6", "first message should be from after expiry")
}
func TestConsumerHandlesMaxMsgsRetention(t *testing.T) {
_, _, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "METRICS",
Subjects: []string{"metrics.>"},
MaxMsgs: 10, // keep only last 10
})
require.NoError(t, err)
// Publish 20 messages — first 10 will be trimmed
for i := 1; i <= 20; i++ {
js.Publish("metrics.cpu", []byte(fmt.Sprintf(`{"value":%d}`, i)))
}
streamInfo, _ := js.StreamInfo("METRICS")
assert.Equal(t, uint64(10), streamInfo.State.Msgs)
// First sequence should be 11 (not 1)
assert.Equal(t, uint64(11), streamInfo.State.FirstSeq)
assert.Equal(t, uint64(20), streamInfo.State.LastSeq)
}Testing Push Consumer with Heartbeat
Push consumers with heartbeat let you detect idle streams (no new messages for a period):
func TestPushConsumerWithHeartbeat(t *testing.T) {
_, nc, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "STATUS",
Subjects: []string{"status.*"},
})
require.NoError(t, err)
heartbeats := make(chan struct{}, 10)
messages := make(chan *nats.Msg, 10)
sub, err := js.Subscribe("status.*", func(msg *nats.Msg) {
if msg.Header.Get("Status") == "100" {
// Heartbeat (idle heartbeat from JetStream)
heartbeats <- struct{}{}
return
}
messages <- msg
msg.Ack()
}, nats.IdleHeartbeat(100*time.Millisecond))
require.NoError(t, err)
defer sub.Unsubscribe()
// Wait for at least one heartbeat — confirms the subscription is alive
select {
case <-heartbeats:
// Good — received idle heartbeat
case <-time.After(500 * time.Millisecond):
t.Fatal("expected idle heartbeat within 500ms")
}
// Now publish a message
js.Publish("status.update", []byte(`{"state":"running"}`))
select {
case msg := <-messages:
assert.Equal(t, `{"state":"running"}`, string(msg.Data))
case <-time.After(2 * time.Second):
t.Fatal("expected message delivery")
}
}Testing Consumer State Recovery After Stream Deletion
func TestConsumerHandlesStreamDeletion(t *testing.T) {
_, nc, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEMP",
Subjects: []string{"temp.*"},
})
require.NoError(t, err)
js.Publish("temp.data", []byte("msg1"))
sub, _ := js.PullSubscribe("temp.*", "temp-consumer")
msgs, _ := sub.Fetch(1, nats.MaxWait(time.Second))
msgs[0].Ack()
sub.Unsubscribe()
// Delete the stream
err = js.DeleteStream("TEMP")
require.NoError(t, err)
// Recreate the stream
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEMP",
Subjects: []string{"temp.*"},
})
require.NoError(t, err)
// Consumer with same name on recreated stream should start fresh
// (old durable state is gone with the stream)
sub2, err := js.PullSubscribe("temp.*", "temp-consumer")
require.NoError(t, err)
defer sub2.Unsubscribe()
js.Publish("temp.data", []byte("msg2"))
msgs2, err := sub2.Fetch(1, nats.MaxWait(time.Second))
require.NoError(t, err)
require.Len(t, msgs2, 1)
assert.Equal(t, "msg2", string(msgs2[0].Data))
}Testing Concurrent Consumers in a Queue Group
JetStream queue groups distribute messages across competing consumers — each message delivered to exactly one:
func TestQueueGroupDistribution(t *testing.T) {
_, _, js := testhelpers.StartJetStreamServer(t)
_, err := js.AddStream(&nats.StreamConfig{
Name: "WORK",
Subjects: []string{"work.*"},
})
require.NoError(t, err)
// Publish 10 messages
for i := 1; i <= 10; i++ {
js.Publish("work.task", []byte(fmt.Sprintf(`{"task":%d}`, i)))
}
var (
mu sync.Mutex
consumer1 []int
consumer2 []int
)
handleMsg := func(bucket *[]int) nats.MsgHandler {
return func(msg *nats.Msg) {
var data map[string]int
json.Unmarshal(msg.Data, &data)
mu.Lock()
*bucket = append(*bucket, data["task"])
mu.Unlock()
msg.Ack()
}
}
sub1, _ := js.QueueSubscribe("work.*", "task-workers", handleMsg(&consumer1))
sub2, _ := js.QueueSubscribe("work.*", "task-workers", handleMsg(&consumer2))
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
// Wait for all 10 messages to be processed
require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(consumer1)+len(consumer2) == 10
}, 5*time.Second, 50*time.Millisecond)
mu.Lock()
defer mu.Unlock()
// Each message processed exactly once across both consumers
assert.Len(t, append(consumer1, consumer2...), 10)
// Both consumers got at least some work (distribution, not all-or-nothing)
assert.Greater(t, len(consumer1), 0, "consumer 1 should have received some messages")
assert.Greater(t, len(consumer2), 0, "consumer 2 should have received some messages")
// No duplicates
all := append(consumer1, consumer2...)
sort.Ints(all)
for i, v := range all {
assert.Equal(t, i+1, v, "each task should appear exactly once")
}
}Key-Value Store Testing
JetStream's KV store (built on streams) has its own testing requirements:
func TestKeyValueStore(t *testing.T) {
_, _, js := testhelpers.StartJetStreamServer(t)
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "config",
History: 5,
TTL: time.Hour,
})
require.NoError(t, err)
// Set and get
rev, err := kv.Put("feature.dark-mode", []byte("enabled"))
require.NoError(t, err)
assert.Greater(t, rev, uint64(0))
entry, err := kv.Get("feature.dark-mode")
require.NoError(t, err)
assert.Equal(t, "enabled", string(entry.Value()))
assert.Equal(t, rev, entry.Revision())
// Update with CAS (compare-and-swap)
newRev, err := kv.Update("feature.dark-mode", []byte("disabled"), rev)
require.NoError(t, err)
assert.Greater(t, newRev, rev)
// Stale update should fail
_, err = kv.Update("feature.dark-mode", []byte("enabled"), rev)
assert.Error(t, err, "update with old revision should fail")
// Delete
err = kv.Delete("feature.dark-mode")
require.NoError(t, err)
_, err = kv.Get("feature.dark-mode")
assert.ErrorIs(t, err, nats.ErrKeyNotFound)
// History preserved
history, err := kv.History("feature.dark-mode")
require.NoError(t, err)
assert.Len(t, history, 3) // set, update, delete
}What to Test vs. What to Skip
Test:
- Durable consumer resume position after disconnect
- Sequence continuity after acknowledge
- Deduplication with
Nats-Msg-Id - Stream retention (MaxAge, MaxMsgs) and consumer gap handling
- Queue group distribution
- KV CAS (compare-and-swap) correctness
Skip:
- NATS server clustering behavior (not your code)
- Network-level partitioning (use chaos testing tools)
- NATS server configuration tuning (benchmark concern, not correctness)