Skip to content

Commit

Permalink
Fix tracking holes in the message block index when they are at the en…
Browse files Browse the repository at this point in the history
…d of the block

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jan 23, 2024
1 parent 5408900 commit 46fcc3a
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5135,10 +5135,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var index uint32

mbFirstSeq := atomic.LoadUint64(&mb.first.seq)
mbLastSeq := atomic.LoadUint64(&mb.last.seq)

// Capture beginning size of dmap.
dms := uint64(mb.dmap.Size())
idxSz := atomic.LoadUint64(&mb.last.seq) - mbFirstSeq + 1
idxSz := mbLastSeq - mbFirstSeq + 1

if mb.cache == nil {
// Approximation, may adjust below.
Expand All @@ -5163,12 +5164,14 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}

lbuf := uint32(len(buf))
var seq uint64
for index < lbuf {
if index+msgHdrSize > lbuf {
return errCorruptState
}
hdr := buf[index : index+msgHdrSize]
rl, seq, slen := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]), int(le.Uint16(hdr[20:]))
rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:]))
seq = le.Uint64(hdr[4:])

// Clear any headers bit that could be set.
rl &^= hbit
Expand Down Expand Up @@ -5235,6 +5238,19 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
index += rl
}

// Track holes at the end of the block, these would be missed in the
// earlier loop if we've ran out of block file to look at, but should
// be easily noticed because the seq will be below the last seq from
// the index.
if seq > 0 && seq < mbLastSeq {
for dseq := seq; dseq < mbLastSeq; dseq++ {
idx = append(idx, dbit)
if dms == 0 {
mb.dmap.Insert(dseq)
}
}
}

mb.cache.buf = buf
mb.cache.idx = idx
mb.cache.fseq = fseq
Expand Down

0 comments on commit 46fcc3a

Please sign in to comment.