Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Memory growth on interest stream reset with high sequence numbers. #5011

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 17 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5334,33 +5334,42 @@ func (o *consumer) isMonitorRunning() bool {
return o.inMonitor
}

// If we detect that our ackfloor is higher than the stream's last sequence, return this error.
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() {
o.mu.Lock()
func (o *consumer) checkStateForInterestStream() error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState {
o.mu.Unlock()
return
o.mu.RUnlock()
return nil
}
state, err := o.store.State()
o.mu.Unlock()
o.mu.RUnlock()

if err != nil {
return
return err
}

asflr := state.AckFloor.Stream
// Protect ourselves against rolling backwards.
if asflr&(1<<63) != 0 {
return
return nil
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}
Expand All @@ -5378,4 +5387,5 @@ func (o *consumer) checkStateForInterestStream() {
}
}
}
return nil
}
3 changes: 3 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2415,6 +2415,9 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// If past the end no results.
if sseq > lseq {
// Make sure we track sequences
ss.First = fs.state.FirstSeq
ss.Last = fs.state.LastSeq
return ss
}

Expand Down
4 changes: 4 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4642,6 +4642,10 @@ func TestFileStoreAllFilteredStateWithDeleted(t *testing.T) {
checkFilteredState(6, 95, 6, 100)
remove(8, 10, 12, 14, 16, 18)
checkFilteredState(7, 88, 7, 100)

// Now check when purged that we return first and last sequences properly.
fs.Purge()
checkFilteredState(0, 0, 101, 100)
})
}

Expand Down
33 changes: 25 additions & 8 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
Expand Down Expand Up @@ -2300,7 +2300,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if mset.numConsumers() >= numExpectedConsumers {
break
}
time.Sleep(sleepTime)
select {
case <-s.quitCh:
return
case <-time.After(sleepTime):
}
}
if actual := mset.numConsumers(); actual < numExpectedConsumers {
s.Warnf("All consumers not online for '%s > %s': expected %d but only have %d", accName, mset.name(), numExpectedConsumers, actual)
Expand Down Expand Up @@ -2333,10 +2337,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// No special processing needed for when we are caught up on restart.
if ce == nil {
isRecovering = false
// Check on startup if we should snapshot/compact.
if _, b := n.Size(); b > compactSizeMin || n.NeedSnapshot() {
doSnapshot()
}
// Make sure we create a new snapshot in case things have changed such that any existing
// snapshot may no longer be valid.
doSnapshot()
// If we became leader during this time and we need to send a snapshot to our
// followers, i.e. as a result of a scale-up from R1, do it now.
if sendSnapshot && isLeader && mset != nil && n != nil {
Expand Down Expand Up @@ -2851,6 +2854,19 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if err == errLastSeqMismatch {
var state StreamState
mset.store.FastState(&state)
// If we have no msgs and the other side is delivering us a sequence past where we
// should be reset. This is possible if the other side has a stale snapshot and no longer
// has those messages. So compact and retry to reset.
if state.Msgs == 0 {
mset.store.Compact(lseq + 1)
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
}
}

// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed {
return err
Expand Down Expand Up @@ -4605,8 +4621,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
}
Expand Down Expand Up @@ -7789,6 +7805,7 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)

s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()
Expand Down
76 changes: 75 additions & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -25,6 +25,7 @@ import (
"math/rand"
"net"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -5921,3 +5922,76 @@ func TestJetStreamClusterAPIAccessViaSystemAccount(t *testing.T) {
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST"})
require_Error(t, err, NewJSNotEnabledForAccountError())
}

func TestJetStreamClusterStreamResetPreacks(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)

err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "dlc")
require_NoError(t, err)

// Put 20 msgs in.
for i := 0; i < 20; i++ {
_, err := js.Publish("foo", nil)
require_NoError(t, err)
}

// Consume and ack 10.
msgs, err := sub.Fetch(10, nats.MaxWait(time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

for _, msg := range msgs {
msg.AckSync()
}

// Now grab a non-leader server.
// We will shut it down and remove the stream data.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
mset, err := nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
fs := mset.store.(*fileStore)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
nl.Shutdown()
// In case that was the consumer leader.
c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")

// Now consume the remaining 10 and ack.
msgs, err = sub.Fetch(10, nats.MaxWait(time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

for _, msg := range msgs {
msg.AckSync()
}

// Now remove the stream manually.
require_NoError(t, os.RemoveAll(mdir))
nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)

mset, err = nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
state := mset.state()
if state.Msgs != 0 || state.FirstSeq != 100_000_020 {
return fmt.Errorf("Not correct state yet: %+v", state)
}
return nil
})
}
22 changes: 18 additions & 4 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)

// Possible race with consumer.setLeader during recovery.
mset.mu.Lock()
mset.mu.RLock()
mset.lseq = state.LastSeq
mset.mu.Unlock()
mset.mu.RUnlock()

// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {
Expand Down Expand Up @@ -5807,17 +5807,31 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
return mset, nil
}

// This is to check for dangling messages on interest retention streams.
// This is to check for dangling messages on interest retention streams. Only called on account enable.
// Issue https://github.com/nats-io/nats-server/issues/3612
func (mset *stream) checkForOrphanMsgs() {
mset.mu.RLock()
consumers := make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
consumers = append(consumers, o)
}
accName, stream := mset.acc.Name, mset.cfg.Name

var ss StreamState
mset.store.FastState(&ss)
mset.mu.RUnlock()

for _, o := range consumers {
o.checkStateForInterestStream()
if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq {
o.mu.RLock()
s, consumer := o.srv, o.name
state, _ := o.store.State()
asflr := state.AckFloor.Stream
o.mu.RUnlock()
// Warn about stream state vs our ack floor.
s.RateLimitWarnf("Detected consumer '%s > %s > %s' ack floor %d is ahead of stream's last sequence %d",
accName, stream, consumer, asflr, ss.LastSeq)
}
}
}

Expand Down