Skip to content

Commit

Permalink
[IMPROVED] Do not load blocks when calling LoadNextMsg for the first …
Browse files Browse the repository at this point in the history
…time after restart or long inactivity (#4969)

If LoadNextMsg was called with start sequence == FirstSeq, we would walk
all blocks, which if fss was loaded we could skip fairly quickly.
However on a restart or no activity past a sync interval this could
cause memory blocks to be loaded in just to be skipped.

#4906

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jan 18, 2024
2 parents 52b3944 + b4b1f91 commit 4340a34
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
10 changes: 10 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5960,6 +5960,16 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
start = fs.state.FirstSeq
}

// If start is less than or equal to beginning of our stream, meaning our first call,
// let's check the psim to see if we can skip ahead.
if start <= fs.state.FirstSeq {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
if ss.First > start {
start = ss.First
}
}

if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
Expand Down
37 changes: 37 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6496,6 +6496,43 @@ func TestFileStoreSkipMsgs(t *testing.T) {
require_Equal(t, state.NumDeleted, 10)
}

func TestFileStoreOptimizeFirstLoadNextMsgWithSequenceZero(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 4096},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("ZZZ"), 33) // ~100bytes

for i := 0; i < 5000; i++ {
fs.StoreMsg("foo.A", nil, msg)
}
// This will create alot of blocks, ~167.
// Just used to check that we do not load these in when searching.
// Now add in 10 for foo.bar at the end.
for i := 0; i < 10; i++ {
fs.StoreMsg("foo.B", nil, msg)
}
// The bug would not be visible on running server per se since we would have had fss loaded
// and that sticks around a bit longer, we would use that to skip over the early blocks. So stop
// and restart the filestore.
fs.Stop()
fs, err = newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 4096},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Now fetch the next message for foo.B but set starting sequence to 0.
_, nseq, err := fs.LoadNextMsg("foo.B", false, 0, nil)
require_NoError(t, err)
require_Equal(t, nseq, 5001)
// Now check how many blks are loaded, should be only 1.
require_Equal(t, fs.cacheLoads(), 1)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 4340a34

Please sign in to comment.