Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] KeyValue not found after server restarts #5054

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 20 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8578,10 +8578,13 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
sreq.FirstSeq = state.FirstSeq
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}

// Setup sequences to walk through.
Expand Down Expand Up @@ -8717,10 +8720,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
}
select {
case <-remoteQuitCh:
Expand Down
13 changes: 7 additions & 6 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6586,14 +6586,14 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
}
}

// Send first 100 to everyone.
// Send first 1000 to everyone.
send1k()

// Now shutdown a non-leader.
c.waitOnStreamCurrent(nl, "$G", "TEST")
nl.Shutdown()

// Send another 100.
// Send another 1000.
send1k()

// Force snapshot on the leader.
Expand All @@ -6606,7 +6606,7 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
err = js.PurgeStream("TEST")
require_NoError(t, err)

// Send another 100.
// Send another 1000.
send1k()

// We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages.
Expand All @@ -6630,10 +6630,11 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
return nil
})

// Make sure we only sent 1 sync catchup msg.
// Make sure we only sent 1002 sync catchup msgs.
// This is for the new messages, the delete range, and the EOF.
nmsgs, _, _ := sub.Pending()
if nmsgs != 1 {
t.Fatalf("Expected only 1 sync catchup msg to be sent signaling eof, but got %d", nmsgs)
if nmsgs != 1002 {
t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
}
}

Expand Down
85 changes: 85 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9514,3 +9514,88 @@ func TestNoRaceJetStreamClusterBadRestartsWithHealthzPolling(t *testing.T) {
return nil
})
}

func TestNoRaceJetStreamKVReplaceWithServerRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.randomServer())
defer nc.Close()
// Shorten wait time for disconnects.
js, err := nc.JetStream(nats.MaxWait(time.Second))
require_NoError(t, err)

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)

createData := func(n int) []byte {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return b
}

_, err = kv.Create("foo", createData(160))
require_NoError(t, err)

ch := make(chan struct{})
wg := sync.WaitGroup{}

// For counting errors that should not happen.
errCh := make(chan error, 1024)

wg.Add(1)
go func() {
defer wg.Done()

var lastData []byte
var revision uint64

for {
select {
case <-ch:
return
default:
k, err := kv.Get("foo")
if err == nats.ErrKeyNotFound {
errCh <- err
} else if k != nil {
if lastData != nil && k.Revision() == revision && !bytes.Equal(lastData, k.Value()) {
errCh <- fmt.Errorf("data loss [%s][rev:%d] expected:[%q] is:[%q]\n", "foo", revision, lastData, k.Value())
}
newData := createData(160)
if revision, err = kv.Update("foo", newData, k.Revision()); err == nil {
lastData = newData
}
}
}
}
}()

// Wait a short bit.
time.Sleep(2 * time.Second)
for _, s := range c.servers {
s.Shutdown()
// Need to leave servers down for awhile to trigger bug properly.
time.Sleep(5 * time.Second)
s = c.restartServer(s)
c.waitOnServerHealthz(s)
}

// Shutdown the go routine above.
close(ch)
// Wait for it to finish.
wg.Wait()

if len(errCh) != 0 {
for err := range errCh {
t.Logf("Received err %v during test", err)
}
t.Fatalf("Encountered errors")
}
}