Skip to content

Commit

Permalink
[FIXED] KeyValue not found after server restarts (#5054)
Browse files Browse the repository at this point in the history
Do not force sequence mismatches and possible dataloss on first sequence
move past a catchup request.

When a server restarts and does a stream catchup where the snapshot from
the leader is for messages it no longer has, we would create a sequence
mismatch reset on the follower.
    
Also on a catchup if the stream state has moved past the updated request
range from a follower, meaning first for the new state is past the last
of the request, extend and continue catchup.

Signed-off-by: Derek Collison <derek@nats.io>

---------

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 9, 2024
1 parent 25ac92a commit eadab32
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 11 deletions.
25 changes: 20 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8578,10 +8578,13 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
sreq.FirstSeq = state.FirstSeq
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}

// Setup sequences to walk through.
Expand Down Expand Up @@ -8717,10 +8720,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
}
select {
case <-remoteQuitCh:
Expand Down
13 changes: 7 additions & 6 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6586,14 +6586,14 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
}
}

// Send first 100 to everyone.
// Send first 1000 to everyone.
send1k()

// Now shutdown a non-leader.
c.waitOnStreamCurrent(nl, "$G", "TEST")
nl.Shutdown()

// Send another 100.
// Send another 1000.
send1k()

// Force snapshot on the leader.
Expand All @@ -6606,7 +6606,7 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
err = js.PurgeStream("TEST")
require_NoError(t, err)

// Send another 100.
// Send another 1000.
send1k()

// We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages.
Expand All @@ -6630,10 +6630,11 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
return nil
})

// Make sure we only sent 1 sync catchup msg.
// Make sure we only sent 1002 sync catchup msgs.
// This is for the new messages, the delete range, and the EOF.
nmsgs, _, _ := sub.Pending()
if nmsgs != 1 {
t.Fatalf("Expected only 1 sync catchup msg to be sent signaling eof, but got %d", nmsgs)
if nmsgs != 1002 {
t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
}
}

Expand Down
85 changes: 85 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9514,3 +9514,88 @@ func TestNoRaceJetStreamClusterBadRestartsWithHealthzPolling(t *testing.T) {
return nil
})
}

func TestNoRaceJetStreamKVReplaceWithServerRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Shorten wait time for disconnects.
js, err := nc.JetStream(nats.MaxWait(time.Second))
require_NoError(t, err)

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)

createData := func(n int) []byte {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return b
}

_, err = kv.Create("foo", createData(160))
require_NoError(t, err)

ch := make(chan struct{})
wg := sync.WaitGroup{}

// For counting errors that should not happen.
errCh := make(chan error, 1024)

wg.Add(1)
go func() {
defer wg.Done()

var lastData []byte
var revision uint64

for {
select {
case <-ch:
return
default:
k, err := kv.Get("foo")
if err == nats.ErrKeyNotFound {
errCh <- err
} else if k != nil {
if lastData != nil && k.Revision() == revision && !bytes.Equal(lastData, k.Value()) {
errCh <- fmt.Errorf("data loss [%s][rev:%d] expected:[%q] is:[%q]\n", "foo", revision, lastData, k.Value())
}
newData := createData(160)
if revision, err = kv.Update("foo", newData, k.Revision()); err == nil {
lastData = newData
}
}
}
}
}()

// Wait a short bit.
time.Sleep(2 * time.Second)
for _, s := range c.servers {
s.Shutdown()
// Need to leave servers down for awhile to trigger bug properly.
time.Sleep(5 * time.Second)
s = c.restartServer(s)
c.waitOnServerHealthz(s)
}

// Shutdown the go routine above.
close(ch)
// Wait for it to finish.
wg.Wait()

if len(errCh) != 0 {
for err := range errCh {
t.Logf("Received err %v during test", err)
}
t.Fatalf("Encountered errors")
}
}

0 comments on commit eadab32

Please sign in to comment.