Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tracking holes in the message block index when they are at the end of the block #4988

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 18 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5135,10 +5135,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var index uint32

mbFirstSeq := atomic.LoadUint64(&mb.first.seq)
mbLastSeq := atomic.LoadUint64(&mb.last.seq)

// Capture beginning size of dmap.
dms := uint64(mb.dmap.Size())
idxSz := atomic.LoadUint64(&mb.last.seq) - mbFirstSeq + 1
idxSz := mbLastSeq - mbFirstSeq + 1

if mb.cache == nil {
// Approximation, may adjust below.
Expand All @@ -5163,12 +5164,14 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}

lbuf := uint32(len(buf))
var seq uint64
for index < lbuf {
if index+msgHdrSize > lbuf {
return errCorruptState
}
hdr := buf[index : index+msgHdrSize]
rl, seq, slen := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]), int(le.Uint16(hdr[20:]))
rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:]))
seq = le.Uint64(hdr[4:])

// Clear any headers bit that could be set.
rl &^= hbit
Expand Down Expand Up @@ -5235,6 +5238,19 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
index += rl
}

// Track holes at the end of the block, these would be missed in the
// earlier loop if we've ran out of block file to look at, but should
// be easily noticed because the seq will be below the last seq from
// the index.
if seq > 0 && seq < mbLastSeq {
for dseq := seq; dseq < mbLastSeq; dseq++ {
idx = append(idx, dbit)
if dms == 0 {
mb.dmap.Insert(dseq)
}
}
}

mb.cache.buf = buf
mb.cache.idx = idx
mb.cache.fseq = fseq
Expand Down