Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Optimize catchups for replicas and mirrors with large interior deletes. #4929

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
161 changes: 145 additions & 16 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ const (
// Maximum size of a write buffer we may consider for re-use.
maxBufReuse = 2 * 1024 * 1024
// default cache buffer expiration
defaultCacheBufferExpiration = 5 * time.Second
defaultCacheBufferExpiration = 2 * time.Second
// default sync interval
defaultSyncInterval = 2 * time.Minute
// default idle timeout to close FDs.
Expand Down Expand Up @@ -3222,20 +3222,112 @@ func (fs *fileStore) SkipMsg() uint64 {
fs.mu.Lock()
defer fs.mu.Unlock()

// Grab our current last message block.
mb := fs.lmb
if mb == nil || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize {
if mb != nil && fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go mb.recompressOnDiskIfNeeded()
}
var err error
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return 0
}
}

// Grab time and last seq.
now, seq := time.Now().UTC(), fs.state.LastSeq+1

// Write skip msg.
mb.skipMsg(seq, now)

// Update fs state.
fs.state.LastSeq, fs.state.LastTime = seq, now
if fs.state.Msgs == 0 {
fs.state.FirstSeq, fs.state.FirstTime = seq, now
}
if seq == fs.state.FirstSeq {
fs.state.FirstSeq, fs.state.FirstTime = seq+1, now
}
fs.lmb.skipMsg(seq, now)
// Mark as dirty for stream state.
fs.dirty++

return seq
}

// Skip multiple msgs. We will determine if we can fit into current lmb or we need to create a new block.
func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
fs.mu.Lock()
defer fs.mu.Unlock()

// Check sequence matches our last sequence.
if seq != fs.state.LastSeq+1 {
if seq > 0 {
return ErrSequenceMismatch
}
seq = fs.state.LastSeq + 1
}

// Limit number of dmap entries
const maxDeletes = 64 * 1024
mb := fs.lmb

numDeletes := int(num)
if mb != nil {
numDeletes += mb.dmap.Size()
}
if mb == nil || numDeletes > maxDeletes && mb.msgs > 0 || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize {
if mb != nil && fs.fcfg.Compression != NoCompression {
// We've now reached the end of this message block, if we want
// to compress blocks then now's the time to do it.
go mb.recompressOnDiskIfNeeded()
}
var err error
if mb, err = fs.newMsgBlockForWrite(); err != nil {
return err
}
}

// Insert into dmap all entries and place last as marker.
now := time.Now().UTC()
nowts := now.UnixNano()
lseq := seq + num - 1

mb.mu.Lock()
var needsRecord bool
// If we are empty update meta directly.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, lseq)
mb.last.ts = nowts
atomic.StoreUint64(&mb.first.seq, lseq+1)
mb.first.ts = nowts
} else {
needsRecord = true
for ; seq <= lseq; seq++ {
mb.dmap.Insert(seq)
}
}
mb.mu.Unlock()

// Write out our placeholder.
if needsRecord {
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
}

// Now update FS accounting.
// Update fs state.
fs.state.LastSeq, fs.state.LastTime = lseq, now
if fs.state.Msgs == 0 {
fs.state.FirstSeq, fs.state.FirstTime = lseq+1, now
}

// Mark as dirty for stream state.
fs.dirty++

return nil
}

// Lock should be held.
func (fs *fileStore) rebuildFirst() {
if len(fs.blks) == 0 {
Expand Down Expand Up @@ -4203,7 +4295,7 @@ func (fs *fileStore) selectNextFirst() {
// Lock should be held.
func (mb *msgBlock) resetCacheExpireTimer(td time.Duration) {
if td == 0 {
td = mb.cexp
td = mb.cexp + 100*time.Millisecond
}
if mb.ctmr == nil {
mb.ctmr = time.AfterFunc(td, mb.expireCache)
Expand Down Expand Up @@ -5066,24 +5158,30 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {

mbFirstSeq := atomic.LoadUint64(&mb.first.seq)

// Capture beginning size of dmap.
dms := uint64(mb.dmap.Size())
idxSz := atomic.LoadUint64(&mb.last.seq) - mbFirstSeq + 1

if mb.cache == nil {
// Approximation, may adjust below.
fseq = mbFirstSeq
idx = make([]uint32, 0, mb.msgs)
idx = make([]uint32, 0, idxSz)
mb.cache = &cache{}
} else {
fseq = mb.cache.fseq
idx = mb.cache.idx
if len(idx) == 0 {
idx = make([]uint32, 0, mb.msgs)
idx = make([]uint32, 0, idxSz)
}
index = uint32(len(mb.cache.buf))
buf = append(mb.cache.buf, buf...)
}

// Create FSS if we should track.
if !mb.noTrack {
var popFss bool
if mb.fssNotLoaded() {
mb.fss = make(map[string]*SimpleState)
popFss = true
}

lbuf := uint32(len(buf))
Expand Down Expand Up @@ -5123,7 +5221,9 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// If we have a hole fill it.
for dseq := mbFirstSeq + uint64(len(idx)); dseq < seq; dseq++ {
idx = append(idx, dbit)
mb.dmap.Insert(dseq)
if dms == 0 {
mb.dmap.Insert(dseq)
}
}
}
// Add to our index.
Expand All @@ -5135,12 +5235,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
}

// Make sure our dmap has this entry if it was erased.
if erased {
if erased && dms == 0 {
mb.dmap.Insert(seq)
}

// Handle FSS inline here.
if slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) {
if popFss && slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) {
bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)]
if ss := mb.fss[string(bsubj)]; ss != nil {
ss.Msgs++
Expand Down Expand Up @@ -5345,14 +5445,26 @@ func (mb *msgBlock) fssLoaded() bool {
// Used to load in the block contents.
// Lock should be held and all conditionals satisfied prior.
func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
f, err := os.Open(mb.mfn)
if err != nil {
if os.IsNotExist(err) {
err = errNoBlkData
var f *os.File
// Re-use if we have mfd open.
if mb.mfd != nil {
f = mb.mfd
if n, err := f.Seek(0, 0); n != 0 || err != nil {
f = nil
mb.closeFDsLockedNoCheck()
}
return nil, err
}
defer f.Close()
if f == nil {
var err error
f, err = os.Open(mb.mfn)
if err != nil {
if os.IsNotExist(err) {
err = errNoBlkData
}
return nil, err
}
defer f.Close()
}

var sz int
if info, err := f.Stat(); err == nil {
Expand Down Expand Up @@ -5495,17 +5607,34 @@ func (mb *msgBlock) fetchMsg(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error)
mb.mu.Lock()
defer mb.mu.Unlock()

fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
if seq < fseq || seq > lseq {
return nil, false, ErrStoreMsgNotFound
}

// See if we can short circuit if we already know msg deleted.
if mb.dmap.Exists(seq) {
// Update for scanning like cacheLookup would have.
llseq := mb.llseq
if mb.llseq == 0 || seq < mb.llseq || seq == mb.llseq+1 || seq == mb.llseq-1 {
mb.llseq = seq
}
expireOk := (seq == lseq && llseq == seq-1) || (seq == fseq && llseq == seq+1)
return nil, expireOk, errDeletedMsg
}

if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, false, err
}
}
llseq := mb.llseq

fsm, err := mb.cacheLookup(seq, sm)
if err != nil {
return nil, false, err
}
expireOk := (seq == atomic.LoadUint64(&mb.last.seq) && llseq == seq-1) || (seq == atomic.LoadUint64(&mb.first.seq) && llseq == seq+1)
expireOk := (seq == lseq && llseq == seq-1) || (seq == fseq && llseq == seq+1)
return fsm, expireOk, err
}

Expand Down
53 changes: 53 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6440,7 +6440,60 @@ func TestFileStoreExpireCacheOnLinearWalk(t *testing.T) {
}
}
checkNoCache()
}

func TestFileStoreSkipMsgs(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// Test on empty FS first.
// Make sure wrong starting sequence fails.
err = fs.SkipMsgs(10, 100)
require_Error(t, err, ErrSequenceMismatch)

err = fs.SkipMsgs(1, 100)
require_NoError(t, err)

state := fs.State()
require_Equal(t, state.FirstSeq, 101)
require_Equal(t, state.LastSeq, 100)
require_Equal(t, fs.numMsgBlocks(), 1)

// Now add alot.
err = fs.SkipMsgs(101, 100_000)
require_NoError(t, err)
state = fs.State()
require_Equal(t, state.FirstSeq, 100_101)
require_Equal(t, state.LastSeq, 100_100)
require_Equal(t, fs.numMsgBlocks(), 1)

// Now add in a message, and then skip to check dmap.
fs, err = newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("foo", nil, nil)
err = fs.SkipMsgs(2, 10)
require_NoError(t, err)
state = fs.State()
require_Equal(t, state.FirstSeq, 1)
require_Equal(t, state.LastSeq, 11)
require_Equal(t, state.Msgs, 1)
require_Equal(t, state.NumDeleted, 10)
require_Equal(t, len(state.Deleted), 10)

// Check Fast State too.
state.Deleted = nil
fs.FastState(&state)
require_Equal(t, state.FirstSeq, 1)
require_Equal(t, state.LastSeq, 11)
require_Equal(t, state.Msgs, 1)
require_Equal(t, state.NumDeleted, 10)
}

///////////////////////////////////////////////////////////////////////////
Expand Down