Skip to content

Commit

Permalink
[FIXED] Stream recovery with corrupt msg block with sequence gaps. (#…
Browse files Browse the repository at this point in the history
…4344)

This is a fix for a bad msg blk detected in the field that had sequence
holes.

The stream had max msgs per subject of one and only one subject but had
lots of messages. The stream did not recover correctly, and upon further
inspection determined that a msg blk had holes, which should not be
possible.

We now detect the holes and deal with the situation appropriately.
Heavily tested on the data dump from the field.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jul 28, 2023
2 parents 3a2835c + 9243051 commit c0fe497
Showing 1 changed file with 85 additions and 28 deletions.
113 changes: 85 additions & 28 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ const (
wiThresh = int64(30 * time.Second)
// Time threshold to write index info for non FIFO cases
winfThresh = int64(2 * time.Second)
// Checksum size for hash for msg records.
recordHashSize = 8
)

func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
Expand Down Expand Up @@ -971,6 +973,10 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
startLastSeq := mb.last.seq

// Remove the .fss file and clear any cache we have set.
mb.clearCacheAndOffset()
mb.removePerSubjectInfoLocked()

buf, err := mb.loadBlock(nil)
if err != nil || len(buf) == 0 {
var ld *LostStreamData
Expand All @@ -996,9 +1002,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq, mb.last.ts = 0, 0
firstNeedsSet := true

// Remove the .fss file from disk.
mb.removePerSubjectInfoLocked()

// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
// Recreate to reset counter.
Expand Down Expand Up @@ -1070,12 +1073,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}

if index+rl > lbuf {
if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
Expand All @@ -1091,15 +1089,17 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
addToDmap(seq)
}
index += rl
mb.last.seq = seq
mb.last.ts = ts
if seq >= mb.first.seq {
mb.last.seq = seq
mb.last.ts = ts
}
continue
}

// This is for when we have index info that adjusts for deleted messages
// at the head. So the first.seq will be already set here. If this is larger
// replace what we have with this seq.
if firstNeedsSet && seq > mb.first.seq {
if firstNeedsSet && seq >= mb.first.seq {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}

Expand All @@ -1119,12 +1119,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-8])
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-8])
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-8:]) {
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
Expand Down Expand Up @@ -1165,6 +1165,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq = mb.first.seq - 1
}

// Update our fss file if needed.
if len(mb.fss) > 0 {
mb.writePerSubjectInfo()
}

return nil, nil
}

Expand Down Expand Up @@ -2598,14 +2603,42 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
fs.scb = nil
defer func() { fs.scb = cb }()

var numMsgs uint64

// collect all that are not correct.
needAttention := make(map[string]*psi)
for subj, psi := range fs.psim {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[subj] = psi
}
}

// We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
// So do a quick sanity check here. If we detect a skew do a rebuild then re-check.
if numMsgs != fs.state.Msgs {
// Clear any global subject state.
fs.psim = make(map[string]*psi)
for _, mb := range fs.blks {
mb.removeIndexFile()
ld, err := mb.rebuildState()
mb.writeIndexInfo()
if err != nil && ld != nil {
fs.addLostData(ld)
}
fs.populateGlobalPerSubjectInfo(mb)
}
// Rebuild fs state too.
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
for subj, psi := range fs.psim {
if psi.total > maxMsgsPer {
needAttention[subj] = psi
}
}
}

// Collect all the msgBlks we alter.
blks := make(map[*msgBlock]struct{})

Expand Down Expand Up @@ -3050,8 +3083,7 @@ func (mb *msgBlock) compact() {
return
}

// Close cache and index file and wipe delete map, then rebuild.
mb.clearCacheAndOffset()
// Remove index file and wipe delete map, then rebuild.
mb.removeIndexFileLocked()
mb.deleteDmap()
mb.rebuildStateLocked()
Expand All @@ -3077,6 +3109,11 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
bi := mb.cache.idx[slot]
ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0

// If this is a deleted slot return here.
if bi == dbit {
return 0, 0, false, errDeletedMsg
}

// Determine record length
var rl uint32
if len(mb.cache.idx) > slot+1 {
Expand Down Expand Up @@ -4022,7 +4059,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var le = binary.LittleEndian

var fseq uint64
var fseq, pseq uint64
var idx []uint32
var index uint32

Expand Down Expand Up @@ -4055,23 +4092,39 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
dlen := int(rl) - msgHdrSize

// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 {
if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
// This means something is off.
// TODO(dlc) - Add into bad list?
return errCorruptState
}

// Clear erase bit.
seq = seq &^ ebit
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
}

// We defer checksum checks to individual msg cache lookups to amortorize costs and
// not introduce latency for first message from a newly loaded block.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
index += mb.cache.lrl
if seq >= mb.first.seq {
// Track that we do not have holes.
// Not expected but did see it in the field.
if pseq > 0 && seq != pseq+1 {
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
for dseq := pseq + 1; dseq < seq; dseq++ {
idx = append(idx, dbit)
mb.dmap[dseq] = struct{}{}
}
}
pseq = seq

idx = append(idx, index)
mb.cache.lrl = uint32(rl)
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
}
}
index += rl
}
mb.cache.buf = buf
mb.cache.idx = idx
Expand Down Expand Up @@ -4407,6 +4460,9 @@ const hbit = 1 << 31
// Used for marking erased messages sequences.
const ebit = 1 << 63

// Used to mark a bad index as deleted.
const dbit = 1 << 30

// Will do a lookup from cache.
// Lock should be held.
func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
Expand All @@ -4417,6 +4473,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// If we have a delete map check it.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
mb.llts = time.Now().UnixNano()
return nil, errDeletedMsg
}
}
Expand Down Expand Up @@ -4559,9 +4616,9 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-8])
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-8])
hh.Write(data[slen : dlen-recordHashSize])
}
if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) {
return nil, errBadMsg
Expand Down

0 comments on commit c0fe497

Please sign in to comment.