Skip to content

Commit

Permalink
[FIXED] Fix for a panic calculating record length for slotInfo when f…
Browse files Browse the repository at this point in the history
…ollowed by dbit markers in idx. (#5009)

This was reported by @piotrpio when trying to do a secure erase of a
message the was followed by dbit markers which occur after compaction
removes records from the raw msg block. The idx then expands via dbit
markers for the idx slice.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jan 27, 2024
2 parents 5abe4cc + cc82760 commit 0e68006
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 5 deletions.
23 changes: 18 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3921,11 +3921,24 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {

// Determine record length
var rl uint32
if len(mb.cache.idx) > slot+1 {
ni := mb.cache.idx[slot+1] &^ hbit
rl = ni - ri
} else {
if slot >= len(mb.cache.idx) {
rl = mb.cache.lrl
} else {
// Need to account for dbit markers in idx.
// So we will walk until we find valid idx slot to calculate rl.
for i := 1; slot+i < len(mb.cache.idx); i++ {
ni := mb.cache.idx[slot+i] &^ hbit
if ni == dbit {
continue
}
rl = ni - ri
break
}
// check if we had all trailing dbits.
// If so use len of cache buf minus ri.
if rl == 0 {
rl = uint32(len(mb.cache.buf)) - ri
}
}
if rl < msgHdrSize {
return 0, 0, false, errBadMsg
Expand Down Expand Up @@ -5659,7 +5672,7 @@ const (
ebit = 1 << 63
// Used for marking tombstone sequences.
tbit = 1 << 62
// Used to mark a bad index as deleted.
// Used to mark an index as deleted and non-existent.
dbit = 1 << 30
)

Expand Down
55 changes: 55 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6558,6 +6558,61 @@ func TestFileStoreWriteFullStateHighSubjectCardinality(t *testing.T) {
t.Logf("Took %s to writeFullState", time.Since(start))
}

func TestFileStoreEraseMsgWithDbitSlots(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("foo", nil, []byte("abd"))
for i := 0; i < 10; i++ {
fs.SkipMsg()
}
fs.StoreMsg("foo", nil, []byte("abd"))
// Now grab that first block and compact away the skips which will
// introduce dbits into our idx.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()
// Compact.
mb.mu.Lock()
mb.compact()
mb.mu.Unlock()

removed, err := fs.EraseMsg(1)
require_NoError(t, err)
require_True(t, removed)
}

func TestFileStoreEraseMsgWithAllTrailingDbitSlots(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("foo", nil, []byte("abc"))
fs.StoreMsg("foo", nil, []byte("abcdefg"))

for i := 0; i < 10; i++ {
fs.SkipMsg()
}
// Now grab that first block and compact away the skips which will
// introduce dbits into our idx.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()
// Compact.
mb.mu.Lock()
mb.compact()
mb.mu.Unlock()

removed, err := fs.EraseMsg(2)
require_NoError(t, err)
require_True(t, removed)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 0e68006

Please sign in to comment.