Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stree for per-subject tracking in memory store #4983

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
135 changes: 73 additions & 62 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/server/stree"
)

// TODO(dlc) - This is a fairly simplistic approach but should do for now.
Expand All @@ -30,7 +31,7 @@ type memStore struct {
cfg StreamConfig
state StreamState
msgs map[uint64]*StoreMsg
fss map[string]*SimpleState
fss *stree.SubjectTree[SimpleState]
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
Expand All @@ -48,7 +49,7 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
}
ms := &memStore{
msgs: make(map[uint64]*StoreMsg),
fss: make(map[string]*SimpleState),
fss: stree.NewSubjectTree[SimpleState](),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
Expand Down Expand Up @@ -88,11 +89,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for subj, ss := range ms.fss {
ms.fss.Iter(func(subj []byte, ss *SimpleState) bool {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(subj, ss)
ms.enforcePerSubjectLimit(bytesToString(subj), ss)
}
}
return true
})
}
ms.mu.Unlock()

Expand All @@ -113,7 +115,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
var ss *SimpleState
var asl bool
if len(subj) > 0 {
if ss = ms.fss[subj]; ss != nil {
var ok bool
if ss, ok = ms.fss.Find(stringToBytes(subj)); ok {
asl = ms.maxp > 0 && ss.Msgs >= uint64(ms.maxp)
}
}
Expand Down Expand Up @@ -191,7 +194,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
ms.fss.Insert([]byte(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
}

Expand Down Expand Up @@ -363,14 +366,17 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
return ss
}

isAll := filter == _EMPTY_ || filter == fwcs
if filter == _EMPTY_ {
filter = fwcs
}
isAll := filter == fwcs

// First check if we can optimize this part.
// This means we want all and the starting sequence was before this block.
if isAll && sseq <= ms.state.FirstSeq {
total := ms.state.Msgs
if lastPerSubject {
total = uint64(len(ms.fss))
total = uint64(ms.fss.Size())
}
return SimpleState{
Msgs: total,
Expand Down Expand Up @@ -415,21 +421,20 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje

var havePartial bool
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
subjs := bytesToString(subj)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
}
})

// If we did not encounter any partials we can return here.
if !havePartial {
Expand Down Expand Up @@ -476,7 +481,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
for seq := ms.state.FirstSeq; seq < first; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
if lastPerSubject {
tss = ms.fss[sm.subj]
tss, _ = ms.fss.Find(stringToBytes(sm.subj))
}
// If we are last per subject, make sure to only adjust if all messages are before our first.
if tss == nil || tss.Last < first {
Expand Down Expand Up @@ -515,26 +520,29 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()

if len(ms.fss) == 0 {
if ms.fss.Size() == 0 {
return nil
}

if subject == _EMPTY_ {
subject = fwcs
}

fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
}
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
}
}
oss := fss[subjs]
if oss.First == 0 { // New
fss[subjs] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subjs] = oss
}
})
return fss
}

Expand All @@ -543,7 +551,7 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
ms.mu.RLock()
defer ms.mu.RUnlock()

if len(ms.fss) == 0 {
if ms.fss.Size() == 0 {
return nil
}

Expand All @@ -553,15 +561,16 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs

fst := make(map[string]uint64)
for subj, ss := range ms.fss {
ms.fss.Match(stringToBytes(filterSubject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if isAll {
fst[subj] = ss.Msgs
fst[subjs] = ss.Msgs
} else {
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
fst[subj] = ss.Msgs
if tts := tokenizeSubjectIntoSlice(tsa[:0], subjs); isSubsetMatchTokenized(tts, fts) {
fst[subjs] = ss.Msgs
}
}
}
})
return fst
}

Expand Down Expand Up @@ -755,7 +764,7 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.mu.Unlock()

if cb != nil {
Expand Down Expand Up @@ -846,7 +855,7 @@ func (ms *memStore) reset() error {
ms.state.Bytes = 0
// Reset msgs and fss.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.fss = stree.NewSubjectTree[SimpleState]()

ms.mu.Unlock()

Expand Down Expand Up @@ -950,7 +959,8 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
if subject == _EMPTY_ || subject == fwcs {
sm, ok = ms.msgs[ms.state.LastSeq]
} else if subjectIsLiteral(subject) {
if ss := ms.fss[subject]; ss != nil && ss.Msgs > 0 {
var ss *SimpleState
if ss, ok = ms.fss.Find(stringToBytes(subject)); ok && ss.Msgs > 0 {
sm, ok = ms.msgs[ss.Last]
}
} else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 {
Expand Down Expand Up @@ -982,12 +992,15 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
return nil, ms.state.LastSeq, ErrStoreEOF
}

isAll := filter == _EMPTY_ || filter == fwcs
if filter == _EMPTY_ {
filter = fwcs
}
isAll := filter == fwcs

// Skip scan of ms.fss is number of messages in the block are less than
// 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 256
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < len(ms.fss) || (wc && len(ms.fss) > linearScanMaxFSS)
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < ms.fss.Size() || (wc && ms.fss.Size() > linearScanMaxFSS)

// Initial setup.
fseq, lseq := start, ms.state.LastSeq
Expand All @@ -996,16 +1009,14 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
subs := []string{filter}
if wc || isAll {
subs = subs[:0]
for fsubj := range ms.fss {
if isAll || subjectIsSubsetMatch(fsubj, filter) {
subs = append(subs, fsubj)
}
}
ms.fss.Match(stringToBytes(filter), func(subj []byte, val *SimpleState) {
subs = append(subs, string(subj))
})
}
fseq, lseq = ms.state.LastSeq, uint64(0)
for _, subj := range subs {
ss := ms.fss[subj]
if ss == nil {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
continue
}
if ss.firstNeedsUpdate {
Expand Down Expand Up @@ -1093,12 +1104,12 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
// Remove a seq from the fss and select new first.
// Lock should be held.
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
ss := ms.fss[subj]
if ss == nil {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
return
}
if ss.Msgs == 1 {
delete(ms.fss, subj)
ms.fss.Delete(stringToBytes(subj))
return
}
ss.Msgs--
Expand Down Expand Up @@ -1198,7 +1209,7 @@ func (ms *memStore) FastState(state *StreamState) {
}
}
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.NumSubjects = ms.fss.Size()
ms.mu.RUnlock()
}

Expand All @@ -1208,7 +1219,7 @@ func (ms *memStore) State() StreamState {

state := ms.state
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.NumSubjects = ms.fss.Size()
state.Deleted = nil

// Calculate interior delete details.
Expand Down