Skip to content

Commit

Permalink
Use stree for per-subject tracking in memory store
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jan 22, 2024
1 parent d9235ab commit 69d3fbe
Showing 1 changed file with 68 additions and 61 deletions.
129 changes: 68 additions & 61 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats-server/v2/server/stree"
)

// TODO(dlc) - This is a fairly simplistic approach but should do for now.
Expand All @@ -30,7 +31,7 @@ type memStore struct {
cfg StreamConfig
state StreamState
msgs map[uint64]*StoreMsg
fss map[string]*SimpleState
fss *stree.SubjectTree[SimpleState]
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
Expand All @@ -48,7 +49,7 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) {
}
ms := &memStore{
msgs: make(map[uint64]*StoreMsg),
fss: make(map[string]*SimpleState),
fss: stree.NewSubjectTree[SimpleState](),
maxp: cfg.MaxMsgsPer,
cfg: *cfg,
}
Expand Down Expand Up @@ -88,11 +89,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for subj, ss := range ms.fss {
ms.fss.Iter(func(subj []byte, ss *SimpleState) bool {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(subj, ss)
ms.enforcePerSubjectLimit(bytesToString(subj), ss)
}
}
return true
})
}
ms.mu.Unlock()

Expand All @@ -113,7 +115,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
var ss *SimpleState
var asl bool
if len(subj) > 0 {
if ss = ms.fss[subj]; ss != nil {
var ok bool
if ss, ok = ms.fss.Find(stringToBytes(subj)); ok {
asl = ms.maxp > 0 && ss.Msgs >= uint64(ms.maxp)
}
}
Expand Down Expand Up @@ -191,7 +194,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
ms.fss.Insert([]byte(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
}

Expand Down Expand Up @@ -370,7 +373,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
if isAll && sseq <= ms.state.FirstSeq {
total := ms.state.Msgs
if lastPerSubject {
total = uint64(len(ms.fss))
total = uint64(ms.fss.Size())
}
return SimpleState{
Msgs: total,
Expand Down Expand Up @@ -415,21 +418,19 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje

var havePartial bool
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
}
}
})

// If we did not encounter any partials we can return here.
if !havePartial {
Expand Down Expand Up @@ -476,7 +477,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
for seq := ms.state.FirstSeq; seq < first; seq++ {
if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) {
if lastPerSubject {
tss = ms.fss[sm.subj]
tss, _ = ms.fss.Find(stringToBytes(sm.subj))
}
// If we are last per subject, make sure to only adjust if all messages are before our first.
if tss == nil || tss.Last < first {
Expand Down Expand Up @@ -515,26 +516,29 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()

if len(ms.fss) == 0 {
if ms.fss.Size() == 0 {
return nil
}

if subject == _EMPTY_ {
subject = fwcs
}

fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
}
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), ss.First, ss)
}
}
subjs := string(subj)
oss := fss[subjs]
if oss.First == 0 { // New
fss[subjs] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subjs] = oss
}
})
return fss
}

Expand All @@ -543,7 +547,7 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
ms.mu.RLock()
defer ms.mu.RUnlock()

if len(ms.fss) == 0 {
if ms.fss.Size() == 0 {
return nil
}

Expand All @@ -553,15 +557,16 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {
isAll := filterSubject == _EMPTY_ || filterSubject == fwcs

fst := make(map[string]uint64)
for subj, ss := range ms.fss {
ms.fss.Match(stringToBytes(filterSubject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if isAll {
fst[subj] = ss.Msgs
fst[subjs] = ss.Msgs
} else {
if tts := tokenizeSubjectIntoSlice(tsa[:0], subj); isSubsetMatchTokenized(tts, fts) {
fst[subj] = ss.Msgs
if tts := tokenizeSubjectIntoSlice(tsa[:0], subjs); isSubsetMatchTokenized(tts, fts) {
fst[subjs] = ss.Msgs
}
}
}
})
return fst
}

Expand Down Expand Up @@ -755,7 +760,7 @@ func (ms *memStore) purge(fseq uint64) (uint64, error) {
ms.state.Bytes = 0
ms.state.Msgs = 0
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.fss = stree.NewSubjectTree[SimpleState]()
ms.mu.Unlock()

if cb != nil {
Expand Down Expand Up @@ -846,7 +851,7 @@ func (ms *memStore) reset() error {
ms.state.Bytes = 0
// Reset msgs and fss.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = make(map[string]*SimpleState)
ms.fss = stree.NewSubjectTree[SimpleState]()

ms.mu.Unlock()

Expand Down Expand Up @@ -950,7 +955,8 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
if subject == _EMPTY_ || subject == fwcs {
sm, ok = ms.msgs[ms.state.LastSeq]
} else if subjectIsLiteral(subject) {
if ss := ms.fss[subject]; ss != nil && ss.Msgs > 0 {
var ss *SimpleState
if ss, ok = ms.fss.Find(stringToBytes(subject)); ok && ss.Msgs > 0 {
sm, ok = ms.msgs[ss.Last]
}
} else if ss := ms.filteredStateLocked(1, subject, true); ss.Msgs > 0 {
Expand Down Expand Up @@ -982,12 +988,15 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
return nil, ms.state.LastSeq, ErrStoreEOF
}

isAll := filter == _EMPTY_ || filter == fwcs
if filter == _EMPTY_ {
filter = fwcs
}
isAll := filter == fwcs

// Skip scan of ms.fss is number of messages in the block are less than
// 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 256
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < len(ms.fss) || (wc && len(ms.fss) > linearScanMaxFSS)
doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < ms.fss.Size() || (wc && ms.fss.Size() > linearScanMaxFSS)

// Initial setup.
fseq, lseq := start, ms.state.LastSeq
Expand All @@ -996,16 +1005,14 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
subs := []string{filter}
if wc || isAll {
subs = subs[:0]
for fsubj := range ms.fss {
if isAll || subjectIsSubsetMatch(fsubj, filter) {
subs = append(subs, fsubj)
}
}
ms.fss.Match(stringToBytes(filter), func(subj []byte, val *SimpleState) {
subs = append(subs, string(subj))
})
}
fseq, lseq = ms.state.LastSeq, uint64(0)
for _, subj := range subs {
ss := ms.fss[subj]
if ss == nil {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
continue
}
if ss.firstNeedsUpdate {
Expand Down Expand Up @@ -1093,12 +1100,12 @@ func (ms *memStore) updateFirstSeq(seq uint64) {
// Remove a seq from the fss and select new first.
// Lock should be held.
func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
ss := ms.fss[subj]
if ss == nil {
ss, ok := ms.fss.Find(stringToBytes(subj))
if !ok {
return
}
if ss.Msgs == 1 {
delete(ms.fss, subj)
ms.fss.Delete(stringToBytes(subj))
return
}
ss.Msgs--
Expand Down Expand Up @@ -1198,7 +1205,7 @@ func (ms *memStore) FastState(state *StreamState) {
}
}
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.NumSubjects = ms.fss.Size()
ms.mu.RUnlock()
}

Expand All @@ -1208,7 +1215,7 @@ func (ms *memStore) State() StreamState {

state := ms.state
state.Consumers = ms.consumers
state.NumSubjects = len(ms.fss)
state.NumSubjects = ms.fss.Size()
state.Deleted = nil

// Calculate interior delete details.
Expand Down

0 comments on commit 69d3fbe

Please sign in to comment.