Skip to content

Commit

Permalink
[FIX] PurgeEx with keep and deleted bug (#4431)
Browse files Browse the repository at this point in the history
Fix for purge with keep bug with user deletes and improved search for
large number of blocks.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 25, 2023
2 parents e637f37 + 22ed97c commit e19f883
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
45 changes: 19 additions & 26 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4008,28 +4008,30 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
return -1, nil
}

// Starting index, defaults to beginning.
si := 0

// TODO(dlc) - Use new AVL and make this real for anything beyond certain size.
// Max threshold before we probe for a starting block to start our linear search.
const maxl = 256
if nb := len(fs.blks); nb > maxl {
d := nb / 8
for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} {
mb := fs.blks[i]
const linearThresh = 32
nb := len(fs.blks) - 1

if nb < linearThresh {
for i, mb := range fs.blks {
if seq <= atomic.LoadUint64(&mb.last.seq) {
break
return i, mb
}
si = i
}
return -1, nil
}

// blks are sorted in ascending order.
for i := si; i < len(fs.blks); i++ {
mb := fs.blks[i]
if seq <= atomic.LoadUint64(&mb.last.seq) {
return i, mb
// Do traditional binary search here since we know the blocks are sorted by sequence first and last.
for low, high, mid := 0, nb, nb/2; low <= high; mid = (low + high) / 2 {
mb := fs.blks[mid]
// Right now these atomic loads do not factor in, so fine to leave. Was considering
// uplifting these to fs scope to avoid atomic load but not needed.
first, last := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
if seq > last {
low = mid + 1
} else if seq < first {
high = mid - 1
} else {
return mid, mb
}
}

Expand Down Expand Up @@ -5205,16 +5207,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
if sequence > 1 {
return fs.Compact(sequence)
} else if keep > 0 {
fs.mu.RLock()
msgs, lseq := fs.state.Msgs, fs.state.LastSeq
fs.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return fs.Compact(lseq - keep + 1)
}
return 0, nil
}

eq, wc := compareFn(subject), subjectHasWildcard(subject)
Expand Down
20 changes: 20 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5569,3 +5569,23 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}

func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)

msg := bytes.Repeat([]byte("A"), 19)
for i := 0; i < 5; i++ {
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
}

n, err := fs.PurgeEx("A", 0, 0)
require_NoError(t, err)
require_True(t, n == 5)

// Purge with keep.
n, err = fs.PurgeEx(_EMPTY_, 0, 2)
require_NoError(t, err)
require_True(t, n == 3)
}

0 comments on commit e19f883

Please sign in to comment.