Skip to content

Commit

Permalink
[FIXED] Memory growth on interest stream reset with high sequence num…
Browse files Browse the repository at this point in the history
…bers. (#5011)

If an interest stream has been reset with consumers that have very high
ack floors, we could explode the pre-ack state.

The pre-ack state should be minimal, and in this case, when the stream's
last sequence is below the ackfloor, we should not process pre-acks
during startup.

Also fixed a bug that was not properly doing snapshots when the quick
check state was a purged state.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jan 29, 2024
2 parents 5ecf769 + 1fbd614 commit 02d0d83
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 20 deletions.
24 changes: 17 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5334,33 +5334,42 @@ func (o *consumer) isMonitorRunning() bool {
return o.inMonitor
}

// If we detect that our ackfloor is higher than the stream's last sequence, return this error.
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() {
o.mu.Lock()
func (o *consumer) checkStateForInterestStream() error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState {
o.mu.Unlock()
return
o.mu.RUnlock()
return nil
}
state, err := o.store.State()
o.mu.Unlock()
o.mu.RUnlock()

if err != nil {
return
return err
}

asflr := state.AckFloor.Stream
// Protect ourselves against rolling backwards.
if asflr&(1<<63) != 0 {
return
return nil
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}
Expand All @@ -5378,4 +5387,5 @@ func (o *consumer) checkStateForInterestStream() {
}
}
}
return nil
}
3 changes: 3 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2415,6 +2415,9 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// If past the end no results.
if sseq > lseq {
// Make sure we track sequences
ss.First = fs.state.FirstSeq
ss.Last = fs.state.LastSeq
return ss
}

Expand Down
4 changes: 4 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4642,6 +4642,10 @@ func TestFileStoreAllFilteredStateWithDeleted(t *testing.T) {
checkFilteredState(6, 95, 6, 100)
remove(8, 10, 12, 14, 16, 18)
checkFilteredState(7, 88, 7, 100)

// Now check when purged that we return first and last sequences properly.
fs.Purge()
checkFilteredState(0, 0, 101, 100)
})
}

Expand Down
33 changes: 25 additions & 8 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
Expand Down Expand Up @@ -2300,7 +2300,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if mset.numConsumers() >= numExpectedConsumers {
break
}
time.Sleep(sleepTime)
select {
case <-s.quitCh:
return
case <-time.After(sleepTime):
}
}
if actual := mset.numConsumers(); actual < numExpectedConsumers {
s.Warnf("All consumers not online for '%s > %s': expected %d but only have %d", accName, mset.name(), numExpectedConsumers, actual)
Expand Down Expand Up @@ -2333,10 +2337,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// No special processing needed for when we are caught up on restart.
if ce == nil {
isRecovering = false
// Check on startup if we should snapshot/compact.
if _, b := n.Size(); b > compactSizeMin || n.NeedSnapshot() {
doSnapshot()
}
// Make sure we create a new snapshot in case things have changed such that any existing
// snapshot may no longer be valid.
doSnapshot()
// If we became leader during this time and we need to send a snapshot to our
// followers, i.e. as a result of a scale-up from R1, do it now.
if sendSnapshot && isLeader && mset != nil && n != nil {
Expand Down Expand Up @@ -2851,6 +2854,19 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if err == errLastSeqMismatch {
var state StreamState
mset.store.FastState(&state)
// If we have no msgs and the other side is delivering us a sequence past where we
// should be reset. This is possible if the other side has a stale snapshot and no longer
// has those messages. So compact and retry to reset.
if state.Msgs == 0 {
mset.store.Compact(lseq + 1)
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
}
}

// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed {
return err
Expand Down Expand Up @@ -4605,8 +4621,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s > %s' [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err)
}
}
}
Expand Down Expand Up @@ -7789,6 +7805,7 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)

s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()
Expand Down
76 changes: 75 additions & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -25,6 +25,7 @@ import (
"math/rand"
"net"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -5921,3 +5922,76 @@ func TestJetStreamClusterAPIAccessViaSystemAccount(t *testing.T) {
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST"})
require_Error(t, err, NewJSNotEnabledForAccountError())
}

func TestJetStreamClusterStreamResetPreacks(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
Replicas: 3,
})
require_NoError(t, err)

err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "dlc")
require_NoError(t, err)

// Put 20 msgs in.
for i := 0; i < 20; i++ {
_, err := js.Publish("foo", nil)
require_NoError(t, err)
}

// Consume and ack 10.
msgs, err := sub.Fetch(10, nats.MaxWait(time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

for _, msg := range msgs {
msg.AckSync()
}

// Now grab a non-leader server.
// We will shut it down and remove the stream data.
nl := c.randomNonStreamLeader(globalAccountName, "TEST")
mset, err := nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
fs := mset.store.(*fileStore)
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
nl.Shutdown()
// In case that was the consumer leader.
c.waitOnConsumerLeader(globalAccountName, "TEST", "dlc")

// Now consume the remaining 10 and ack.
msgs, err = sub.Fetch(10, nats.MaxWait(time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

for _, msg := range msgs {
msg.AckSync()
}

// Now remove the stream manually.
require_NoError(t, os.RemoveAll(mdir))
nl = c.restartServer(nl)
c.waitOnServerCurrent(nl)

mset, err = nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
state := mset.state()
if state.Msgs != 0 || state.FirstSeq != 100_000_020 {
return fmt.Errorf("Not correct state yet: %+v", state)
}
return nil
})
}
22 changes: 18 additions & 4 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
mset.store.FastState(&state)

// Possible race with consumer.setLeader during recovery.
mset.mu.Lock()
mset.mu.RLock()
mset.lseq = state.LastSeq
mset.mu.Unlock()
mset.mu.RUnlock()

// If no msgs (new stream), set dedupe state loaded to true.
if state.Msgs == 0 {
Expand Down Expand Up @@ -5807,17 +5807,31 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
return mset, nil
}

// This is to check for dangling messages on interest retention streams.
// This is to check for dangling messages on interest retention streams. Only called on account enable.
// Issue https://github.com/nats-io/nats-server/issues/3612
func (mset *stream) checkForOrphanMsgs() {
mset.mu.RLock()
consumers := make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
consumers = append(consumers, o)
}
accName, stream := mset.acc.Name, mset.cfg.Name

var ss StreamState
mset.store.FastState(&ss)
mset.mu.RUnlock()

for _, o := range consumers {
o.checkStateForInterestStream()
if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq {
o.mu.RLock()
s, consumer := o.srv, o.name
state, _ := o.store.State()
asflr := state.AckFloor.Stream
o.mu.RUnlock()
// Warn about stream state vs our ack floor.
s.RateLimitWarnf("Detected consumer '%s > %s > %s' ack floor %d is ahead of stream's last sequence %d",
accName, stream, consumer, asflr, ss.LastSeq)
}
}
}

Expand Down

0 comments on commit 02d0d83

Please sign in to comment.