Skip to content

Commit

Permalink
[IMPROVED] When taking over make sure to sync and reset clfs for clus…
Browse files Browse the repository at this point in the history
…tered streams. (#4365)

If the failed state of clfs drifts between leaders and followers,
replicas could discard and skip messages possibly incorrectly. This will
force sync if we have a non-zero clfs state when a leader takes over.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 3, 2023
2 parents 66a8e81 + 081140e commit 34199ab
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 7 deletions.
17 changes: 10 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2291,9 +2291,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

case isLeader = <-lch:
if isLeader {
if sendSnapshot && mset != nil && n != nil {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
if mset != nil && n != nil {
// Send a snapshot if being asked or if we are tracking
// a failed state so that followers sync.
if clfs := mset.clearCLFS(); clfs > 0 || sendSnapshot {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false
}
}
if isRestore {
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
Expand Down Expand Up @@ -2714,15 +2718,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// Grab last sequence and CLFS.
last, clfs := mset.lastSeqAndCLFS()

// We can skip if we know this is less than what we already have.
if lseq-clfs < last {
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
mset.account(), mset.name(), lseq+1-clfs, last)
// Check for any preAcks in case we are interest based.

mset.mu.Lock()
seq := lseq + 1 - mset.clfs
mset.clearAllPreAcks(seq)
// Check for any preAcks in case we are interest based.
mset.clearAllPreAcks(lseq + 1 - mset.clfs)
mset.mu.Unlock()
continue
}
Expand Down
200 changes: 200 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4785,3 +4785,203 @@ func TestJetStreamAccountUsageDrifts(t *testing.T) {
checkAccount(sir1.State.Bytes, sir3.State.Bytes)
}
}

func TestJetStreamClusterStreamFailTracking(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

m := nats.NewMsg("foo")
m.Data = []byte("OK")

b, bsz := 0, 5
sendBatch := func() {
for i := b * bsz; i < b*bsz+bsz; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}
b++
}

sendBatch()

_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

sendBatch()

// Now stop one and restart.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
mset, err := nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
// Reset raft
mset.resetClusteredState(nil)
time.Sleep(100 * time.Millisecond)

nl.Shutdown()
nl.WaitForShutdown()

sendBatch()

nl = c.restartServer(nl)

sendBatch()

for {
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
if nl == c.streamLeader(globalAccountName, "TEST") {
break
}
}

sendBatch()

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)

// Make sure all in order.
errCh := make(chan error, 100)
var wg sync.WaitGroup
wg.Add(1)

expected, seen := b*bsz, 0

sub, err := js.Subscribe("foo", func(msg *nats.Msg) {
expectedID := fmt.Sprintf("ID:%d", seen)
if v := msg.Header.Get(JSMsgId); v != expectedID {
errCh <- err
wg.Done()
msg.Sub.Unsubscribe()
return
}
seen++
if seen >= expected {
wg.Done()
msg.Sub.Unsubscribe()
}
})
require_NoError(t, err)
defer sub.Unsubscribe()

wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}

func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

m := nats.NewMsg("foo")
m.Data = []byte("OK")

// Send 1000 a dupe every msgID.
for i := 0; i < 1000; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}

// Now stop one.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
nl.Shutdown()
nl.WaitForShutdown()

// Now send more and make sure leader snapshots.
for i := 1000; i < 2000; i++ {
msgId := fmt.Sprintf("ID:%d", i)
m.Header.Set(JSMsgId, msgId)
// Send it twice on purpose.
js.PublishMsg(m)
js.PublishMsg(m)
}

sl := c.streamLeader(globalAccountName, "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())

// Now restart nl
nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)

// Move leader to NL
for {
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
if nl == c.streamLeader(globalAccountName, "TEST") {
break
}
}

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 1,
})
require_NoError(t, err)

// Make sure all in order.
errCh := make(chan error, 100)
var wg sync.WaitGroup
wg.Add(1)

expected, seen := 2000, 0

sub, err := js.Subscribe("foo", func(msg *nats.Msg) {
expectedID := fmt.Sprintf("ID:%d", seen)
if v := msg.Header.Get(JSMsgId); v != expectedID {
errCh <- err
wg.Done()
msg.Sub.Unsubscribe()
return
}
seen++
if seen >= expected {
wg.Done()
msg.Sub.Unsubscribe()
}
})
require_NoError(t, err)
defer sub.Unsubscribe()

wg.Wait()
if len(errCh) > 0 {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}
8 changes: 8 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,14 @@ func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
return mset.lseq, mset.clfs
}

func (mset *stream) clearCLFS() uint64 {
mset.mu.Lock()
defer mset.mu.Unlock()
clfs := mset.clfs
mset.clfs = 0
return clfs
}

func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
Expand Down

0 comments on commit 34199ab

Please sign in to comment.