NATS JetStream Persistence Testing: Consumer Durability and Fault Tolerance

NATS JetStream Persistence Testing: Consumer Durability and Fault Tolerance

NATS JetStream persistence introduces durable consumers, sequence-based message tracking, and stream retention policies. Testing these means verifying that a consumer resumes exactly where it left off after restart, that exactly-once deduplication works, and that your stream retention policy doesn't silently discard messages your consumers haven't read yet. Go's embedded NATS server makes all of this testable in-process without Docker.

Key Takeaways

Use the embedded NATS server for unit tests. natsserver.RunServer starts a real NATS server in-process in milliseconds. No Docker, no network — full JetStream support.

Test durable consumer resume. Connect, consume some messages, disconnect, reconnect, and verify the consumer starts from where it left off — not from the beginning and not skipping messages.

Test sequence gaps. Streams can have MaxAge or MaxMsgs limits. If messages expire before a slow consumer reads them, the consumer jumps a sequence. Test your consumer handles the gap without panicking.

Test exactly-once deduplication. JetStream supports Nats-Msg-Id header deduplication. Test that sending the same message ID twice results in one delivery.

Test consumer state after stream deletion and recreation. Consumer state can become invalid if the stream is deleted. Test your startup code handles this correctly.

JetStream Persistence: What Needs Testing

NATS core pub/sub has no persistence — messages are delivered to active subscribers or lost. JetStream adds a persistence layer with:

  • Streams: named, persistent message stores with configurable retention
  • Consumers: named cursors tracking progress through a stream
  • Durable consumers: survive reconnects; resume from last acknowledged sequence
  • Push vs. pull consumers: different delivery models with different failure modes

Most production issues come from consumer state management — consumers that restart from the wrong position, messages that expire before consumption, and duplicate processing. These need explicit tests.

Setup: Embedded NATS Server

// go.mod
require (
    github.com/nats-io/nats.go v1.35.0
    github.com/nats-io/nats-server/v2 v2.10.14
)
// testhelpers/nats.go
package testhelpers

import (
    "testing"
    "time"

    "github.com/nats-io/nats-server/v2/server"
    natsserver "github.com/nats-io/nats-server/v2/test"
    "github.com/nats-io/nats.go"
)

func StartJetStreamServer(t *testing.T) (*server.Server, *nats.Conn, nats.JetStreamContext) {
    t.Helper()

    opts := natsserver.DefaultTestOptions
    opts.Port = -1  // random port
    opts.JetStream = true
    opts.StoreDir = t.TempDir()  // file-based store for persistence tests

    srv := natsserver.RunServer(&opts)
    t.Cleanup(func() { srv.Shutdown() })

    nc, err := nats.Connect(srv.ClientURL())
    if err != nil {
        t.Fatalf("connect: %v", err)
    }
    t.Cleanup(nc.Close)

    js, err := nc.JetStream()
    if err != nil {
        t.Fatalf("jetstream: %v", err)
    }

    return srv, nc, js
}

Testing Basic Durable Consumer Behavior

func TestDurableConsumerBasic(t *testing.T) {
    _, _, js := testhelpers.StartJetStreamServer(t)

    // Create stream
    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"orders.*"},
    })
    require.NoError(t, err)

    // Publish messages
    for i := 1; i <= 5; i++ {
        _, err = js.Publish(fmt.Sprintf("orders.created"), []byte(fmt.Sprintf(`{"id":"ORD-%03d"}`, i)))
        require.NoError(t, err)
    }

    // Create a durable consumer
    sub, err := js.PullSubscribe("orders.*", "order-processor")
    require.NoError(t, err)

    // Fetch and process first 3
    msgs, err := sub.Fetch(3, nats.MaxWait(time.Second))
    require.NoError(t, err)
    require.Len(t, msgs, 3)

    for _, msg := range msgs {
        require.NoError(t, msg.Ack())
    }

    // Close and recreate connection — simulates consumer restart
    sub.Unsubscribe()

    // Reconnect with same durable name
    sub2, err := js.PullSubscribe("orders.*", "order-processor")
    require.NoError(t, err)
    defer sub2.Unsubscribe()

    // Should receive only messages 4 and 5 (where we left off)
    msgs2, err := sub2.Fetch(10, nats.MaxWait(time.Second))
    require.NoError(t, err)
    require.Len(t, msgs2, 2, "durable consumer should resume from sequence 4")

    var data map[string]string
    json.Unmarshal(msgs2[0].Data, &data)
    assert.Equal(t, "ORD-004", data["id"])
}

Testing Consumer Resume After Crash

The critical invariant: a durable consumer that acknowledges messages up to sequence N must resume at N+1 after restart, not from the beginning.

func TestConsumerResumesAfterDisconnect(t *testing.T) {
    srv, nc, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "EVENTS",
        Subjects: []string{"events.>"},
        Storage:  nats.FileStorage,  // file-based for true persistence
    })
    require.NoError(t, err)

    // Publish 10 messages
    for i := 1; i <= 10; i++ {
        js.Publish("events.log", []byte(fmt.Sprintf(`{"seq":%d}`, i)))
    }

    // Consumer 1: process and ack messages 1-5
    sub1, _ := js.PullSubscribe("events.>", "event-handler")
    msgs, _ := sub1.Fetch(5, nats.MaxWait(time.Second))
    for _, m := range msgs {
        m.Ack()
    }
    sub1.Unsubscribe()
    nc.Close()

    // Simulate reconnect (new connection to same server)
    nc2, err := nats.Connect(srv.ClientURL())
    require.NoError(t, err)
    defer nc2.Close()

    js2, _ := nc2.JetStream()

    // Consumer 2: same durable name, should pick up from message 6
    sub2, _ := js2.PullSubscribe("events.>", "event-handler")
    defer sub2.Unsubscribe()

    msgs2, err := sub2.Fetch(10, nats.MaxWait(2*time.Second))
    require.NoError(t, err)

    assert.Len(t, msgs2, 5, "should receive exactly the 5 remaining messages")

    var first map[string]int
    json.Unmarshal(msgs2[0].Data, &first)
    assert.Equal(t, 6, first["seq"], "first message after resume should be seq 6")
}

Testing Exactly-Once Deduplication

JetStream supports deduplication via the Nats-Msg-Id header. Same message ID within the deduplication window is delivered only once.

func TestExactlyOnceDeduplication(t *testing.T) {
    _, _, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "PAYMENTS",
        Subjects: []string{"payments.>"},
        Duplicates: 5 * time.Minute,  // deduplication window
    })
    require.NoError(t, err)

    // Publish the same message ID twice (simulating retry after network failure)
    pubAck1, err := js.Publish("payments.process",
        []byte(`{"paymentId":"PAY-001","amount":99.99}`),
        nats.MsgId("PAY-001"))
    require.NoError(t, err)
    assert.False(t, pubAck1.Duplicate, "first publish should not be a duplicate")

    pubAck2, err := js.Publish("payments.process",
        []byte(`{"paymentId":"PAY-001","amount":99.99}`),
        nats.MsgId("PAY-001"))
    require.NoError(t, err)
    assert.True(t, pubAck2.Duplicate, "second publish with same ID should be flagged as duplicate")

    // Stream should contain only 1 message
    streamInfo, _ := js.StreamInfo("PAYMENTS")
    assert.Equal(t, uint64(1), streamInfo.State.Msgs)

    // Consumer should receive only 1 message
    sub, _ := js.PullSubscribe("payments.>", "payment-processor")
    defer sub.Unsubscribe()

    msgs, err := sub.Fetch(10, nats.MaxWait(time.Second))
    require.NoError(t, err)
    assert.Len(t, msgs, 1, "exactly one message should be delivered despite two publishes")
}

Testing Stream Retention Policies

Streams can limit retention by age or count. Test that your consumer handles the resulting sequence gaps.

func TestConsumerHandlesMaxAgeRetention(t *testing.T) {
    _, _, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "LOGS",
        Subjects: []string{"logs.>"},
        MaxAge:   100 * time.Millisecond,  // very short for testing
    })
    require.NoError(t, err)

    // Publish messages at t=0
    for i := 1; i <= 5; i++ {
        js.Publish("logs.app", []byte(fmt.Sprintf(`{"msg":"log-%d"}`, i)))
    }

    // Wait for messages to expire
    time.Sleep(200 * time.Millisecond)

    // Publish new messages after expiry
    for i := 6; i <= 10; i++ {
        js.Publish("logs.app", []byte(fmt.Sprintf(`{"msg":"log-%d"}`, i)))
    }

    streamInfo, _ := js.StreamInfo("LOGS")
    // Only the 5 new messages should remain
    assert.Equal(t, uint64(5), streamInfo.State.Msgs)

    // Consumer starting from beginning should get only the surviving messages
    sub, _ := js.PullSubscribe("logs.>", "log-reader",
        nats.StartSequence(1))  // explicitly start from sequence 1
    defer sub.Unsubscribe()

    msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
    require.NoError(t, err)

    // Should receive messages 6-10 only (1-5 were trimmed)
    assert.Len(t, msgs, 5)

    var data map[string]string
    json.Unmarshal(msgs[0].Data, &data)
    assert.True(t, data["msg"] >= "log-6", "first message should be from after expiry")
}

func TestConsumerHandlesMaxMsgsRetention(t *testing.T) {
    _, _, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "METRICS",
        Subjects: []string{"metrics.>"},
        MaxMsgs:  10,  // keep only last 10
    })
    require.NoError(t, err)

    // Publish 20 messages — first 10 will be trimmed
    for i := 1; i <= 20; i++ {
        js.Publish("metrics.cpu", []byte(fmt.Sprintf(`{"value":%d}`, i)))
    }

    streamInfo, _ := js.StreamInfo("METRICS")
    assert.Equal(t, uint64(10), streamInfo.State.Msgs)

    // First sequence should be 11 (not 1)
    assert.Equal(t, uint64(11), streamInfo.State.FirstSeq)
    assert.Equal(t, uint64(20), streamInfo.State.LastSeq)
}

Testing Push Consumer with Heartbeat

Push consumers with heartbeat let you detect idle streams (no new messages for a period):

func TestPushConsumerWithHeartbeat(t *testing.T) {
    _, nc, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "STATUS",
        Subjects: []string{"status.*"},
    })
    require.NoError(t, err)

    heartbeats := make(chan struct{}, 10)
    messages := make(chan *nats.Msg, 10)

    sub, err := js.Subscribe("status.*", func(msg *nats.Msg) {
        if msg.Header.Get("Status") == "100" {
            // Heartbeat (idle heartbeat from JetStream)
            heartbeats <- struct{}{}
            return
        }
        messages <- msg
        msg.Ack()
    }, nats.IdleHeartbeat(100*time.Millisecond))
    require.NoError(t, err)
    defer sub.Unsubscribe()

    // Wait for at least one heartbeat — confirms the subscription is alive
    select {
    case <-heartbeats:
        // Good — received idle heartbeat
    case <-time.After(500 * time.Millisecond):
        t.Fatal("expected idle heartbeat within 500ms")
    }

    // Now publish a message
    js.Publish("status.update", []byte(`{"state":"running"}`))

    select {
    case msg := <-messages:
        assert.Equal(t, `{"state":"running"}`, string(msg.Data))
    case <-time.After(2 * time.Second):
        t.Fatal("expected message delivery")
    }
}

Testing Consumer State Recovery After Stream Deletion

func TestConsumerHandlesStreamDeletion(t *testing.T) {
    _, nc, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "TEMP",
        Subjects: []string{"temp.*"},
    })
    require.NoError(t, err)

    js.Publish("temp.data", []byte("msg1"))

    sub, _ := js.PullSubscribe("temp.*", "temp-consumer")
    msgs, _ := sub.Fetch(1, nats.MaxWait(time.Second))
    msgs[0].Ack()
    sub.Unsubscribe()

    // Delete the stream
    err = js.DeleteStream("TEMP")
    require.NoError(t, err)

    // Recreate the stream
    _, err = js.AddStream(&nats.StreamConfig{
        Name:     "TEMP",
        Subjects: []string{"temp.*"},
    })
    require.NoError(t, err)

    // Consumer with same name on recreated stream should start fresh
    // (old durable state is gone with the stream)
    sub2, err := js.PullSubscribe("temp.*", "temp-consumer")
    require.NoError(t, err)
    defer sub2.Unsubscribe()

    js.Publish("temp.data", []byte("msg2"))

    msgs2, err := sub2.Fetch(1, nats.MaxWait(time.Second))
    require.NoError(t, err)
    require.Len(t, msgs2, 1)
    assert.Equal(t, "msg2", string(msgs2[0].Data))
}

Testing Concurrent Consumers in a Queue Group

JetStream queue groups distribute messages across competing consumers — each message delivered to exactly one:

func TestQueueGroupDistribution(t *testing.T) {
    _, _, js := testhelpers.StartJetStreamServer(t)

    _, err := js.AddStream(&nats.StreamConfig{
        Name:     "WORK",
        Subjects: []string{"work.*"},
    })
    require.NoError(t, err)

    // Publish 10 messages
    for i := 1; i <= 10; i++ {
        js.Publish("work.task", []byte(fmt.Sprintf(`{"task":%d}`, i)))
    }

    var (
        mu          sync.Mutex
        consumer1   []int
        consumer2   []int
    )

    handleMsg := func(bucket *[]int) nats.MsgHandler {
        return func(msg *nats.Msg) {
            var data map[string]int
            json.Unmarshal(msg.Data, &data)
            mu.Lock()
            *bucket = append(*bucket, data["task"])
            mu.Unlock()
            msg.Ack()
        }
    }

    sub1, _ := js.QueueSubscribe("work.*", "task-workers", handleMsg(&consumer1))
    sub2, _ := js.QueueSubscribe("work.*", "task-workers", handleMsg(&consumer2))
    defer sub1.Unsubscribe()
    defer sub2.Unsubscribe()

    // Wait for all 10 messages to be processed
    require.Eventually(t, func() bool {
        mu.Lock()
        defer mu.Unlock()
        return len(consumer1)+len(consumer2) == 10
    }, 5*time.Second, 50*time.Millisecond)

    mu.Lock()
    defer mu.Unlock()

    // Each message processed exactly once across both consumers
    assert.Len(t, append(consumer1, consumer2...), 10)

    // Both consumers got at least some work (distribution, not all-or-nothing)
    assert.Greater(t, len(consumer1), 0, "consumer 1 should have received some messages")
    assert.Greater(t, len(consumer2), 0, "consumer 2 should have received some messages")

    // No duplicates
    all := append(consumer1, consumer2...)
    sort.Ints(all)
    for i, v := range all {
        assert.Equal(t, i+1, v, "each task should appear exactly once")
    }
}

Key-Value Store Testing

JetStream's KV store (built on streams) has its own testing requirements:

func TestKeyValueStore(t *testing.T) {
    _, _, js := testhelpers.StartJetStreamServer(t)

    kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
        Bucket:  "config",
        History: 5,
        TTL:     time.Hour,
    })
    require.NoError(t, err)

    // Set and get
    rev, err := kv.Put("feature.dark-mode", []byte("enabled"))
    require.NoError(t, err)
    assert.Greater(t, rev, uint64(0))

    entry, err := kv.Get("feature.dark-mode")
    require.NoError(t, err)
    assert.Equal(t, "enabled", string(entry.Value()))
    assert.Equal(t, rev, entry.Revision())

    // Update with CAS (compare-and-swap)
    newRev, err := kv.Update("feature.dark-mode", []byte("disabled"), rev)
    require.NoError(t, err)
    assert.Greater(t, newRev, rev)

    // Stale update should fail
    _, err = kv.Update("feature.dark-mode", []byte("enabled"), rev)
    assert.Error(t, err, "update with old revision should fail")

    // Delete
    err = kv.Delete("feature.dark-mode")
    require.NoError(t, err)

    _, err = kv.Get("feature.dark-mode")
    assert.ErrorIs(t, err, nats.ErrKeyNotFound)

    // History preserved
    history, err := kv.History("feature.dark-mode")
    require.NoError(t, err)
    assert.Len(t, history, 3)  // set, update, delete
}

What to Test vs. What to Skip

Test:

  • Durable consumer resume position after disconnect
  • Sequence continuity after acknowledge
  • Deduplication with Nats-Msg-Id
  • Stream retention (MaxAge, MaxMsgs) and consumer gap handling
  • Queue group distribution
  • KV CAS (compare-and-swap) correctness

Skip:

  • NATS server clustering behavior (not your code)
  • Network-level partitioning (use chaos testing tools)
  • NATS server configuration tuning (benchmark concern, not correctness)

Read more