Skip to content

Commit

Permalink
[IMPROVED] NumPending calculations and subject index memory in filest…
Browse files Browse the repository at this point in the history
…ore (#4960)

Swapped out psim as a go hashmap for our stree implementation.

Stree is an adaptive radix tree implementation used for storing and
retrieving literal subjects. It also allows quick matching to wildcard
subjects, which is it's major design goal along with using less memory
in high subject cardinality situations.

This will be used in the filestore implementation to replace the PSIM
hash map which was fast at insert and lookup but suffered when trying to
filter based on wildcard subjects.

This is used specifically in calculations on NumPending with a wildcard,
and given we push folks to use larger muxed streams with down filtered
consumers and/or mirrors this was becoming a performance issue.

Signed-off-by: Derek Collison <derek@nats.io>

---------

Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Neil Twigg <neil@nats.io>
Co-authored-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison and neilalexander committed Jan 20, 2024
1 parent 4340a34 commit d9235ab
Show file tree
Hide file tree
Showing 13 changed files with 1,831 additions and 136 deletions.
236 changes: 108 additions & 128 deletions server/filestore.go

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5316,7 +5316,8 @@ func TestFileStoreFullStateBasics(t *testing.T) {

// Make sure we are tracking subjects correctly.
fs.mu.RLock()
psi := *fs.psim[subj]
info, _ := fs.psim.Find(stringToBytes(subj))
psi := *info
fs.mu.RUnlock()

require_Equal(t, psi.total, 4)
Expand All @@ -5341,7 +5342,8 @@ func TestFileStoreFullStateBasics(t *testing.T) {
require_Equal(t, fs.numMsgBlocks(), 3)
// Make sure we are tracking subjects correctly.
fs.mu.RLock()
psi = *fs.psim[subj]
info, _ = fs.psim.Find(stringToBytes(subj))
psi = *info
fs.mu.RUnlock()
require_Equal(t, psi.total, 5)
require_Equal(t, psi.fblk, 1)
Expand Down Expand Up @@ -5971,7 +5973,8 @@ func TestFileStoreCompactAndPSIMWhenDeletingBlocks(t *testing.T) {
require_Equal(t, fs.numMsgBlocks(), 1)

fs.mu.RLock()
psi := fs.psim[subj]
info, _ := fs.psim.Find(stringToBytes(subj))
psi := *info
fs.mu.RUnlock()

require_Equal(t, psi.total, 1)
Expand Down Expand Up @@ -6014,7 +6017,6 @@ func TestFileStoreTrackSubjLenForPSIM(t *testing.T) {

check := func() {
t.Helper()

var total int
for _, slen := range smap {
total += slen
Expand Down Expand Up @@ -6197,7 +6199,7 @@ func TestFileStoreNumPendingLastBySubject(t *testing.T) {
sd, blkSize := t.TempDir(), uint64(1024)
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: blkSize},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

Expand Down Expand Up @@ -6263,11 +6265,11 @@ func TestFileStoreCorruptPSIMOnDisk(t *testing.T) {

// Force bad subject.
fs.mu.Lock()
psi := fs.psim["foo.bar"]
psi, _ := fs.psim.Find(stringToBytes("foo.bar"))
bad := make([]byte, 7)
crand.Read(bad)
fs.psim[string(bad)] = psi
delete(fs.psim, "foo.bar")
fs.psim.Insert(bad, *psi)
fs.psim.Delete(stringToBytes("foo.bar"))
fs.dirty++
fs.mu.Unlock()

Expand Down
68 changes: 68 additions & 0 deletions server/stree/dump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

import (
"fmt"
"io"
"strings"
)

// For dumping out a text representation of a tree.
func (t *SubjectTree[T]) Dump(w io.Writer) {
t.dump(w, t.root, 0)
fmt.Fprintln(w)
}

// Will dump out a node.
func (t *SubjectTree[T]) dump(w io.Writer, n node, depth int) {
if n == nil {
fmt.Fprintf(w, "EMPTY\n")
return
}
if n.isLeaf() {
leaf := n.(*leaf[T])
fmt.Fprintf(w, "%s LEAF: Suffix: %q Value: %+v\n", dumpPre(depth), leaf.suffix, leaf.value)
n = nil
} else {
// We are a node type here, grab meta portion.
bn := n.base()
fmt.Fprintf(w, "%s %s Prefix: %q\n", dumpPre(depth), n.kind(), bn.prefix[:bn.prefixLen])
depth++
n.iter(func(n node) bool {
t.dump(w, n, depth)
return true
})
}
}

// For individual node/leaf dumps.
func (n *leaf[T]) kind() string { return "LEAF" }
func (n *node4) kind() string { return "NODE4" }
func (n *node16) kind() string { return "NODE16" }
func (n *node256) kind() string { return "NODE256" }

// Calculates the indendation, etc.
func dumpPre(depth int) string {
if depth == 0 {
return "-- "
} else {
var b strings.Builder
for i := 0; i < depth; i++ {
b.WriteString(" ")
}
b.WriteString("|__ ")
return b.String()
}
}
37 changes: 37 additions & 0 deletions server/stree/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

import "testing"

func require_True(t *testing.T, b bool) {
t.Helper()
if !b {
t.Fatalf("require true, but got false")
}
}

func require_False(t *testing.T, b bool) {
t.Helper()
if b {
t.Fatalf("require false, but got true")
}
}

func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}
50 changes: 50 additions & 0 deletions server/stree/leaf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

import (
"bytes"
)

// Leaf node
type leaf[T any] struct {
// This could be the whole subject, but most likely just the suffix portion.
// We will only store the suffix here and assume all prior prefix paths have
// been checked once we arrive at this leafnode.
suffix []byte
value T
}

func newLeaf[T any](suffix []byte, value T) *leaf[T] {
return &leaf[T]{copyBytes(suffix), value}
}

func (n *leaf[T]) isLeaf() bool { return true }
func (n *leaf[T]) base() *meta { return nil }
func (n *leaf[T]) match(subject []byte) bool { return bytes.Equal(subject, n.suffix) }
func (n *leaf[T]) setSuffix(suffix []byte) { n.suffix = copyBytes(suffix) }
func (n *leaf[T]) isFull() bool { return true }
func (n *leaf[T]) matchParts(parts [][]byte) ([][]byte, bool) { return matchParts(parts, n.suffix) }
func (n *leaf[T]) iter(f func(node) bool) {}
func (n *leaf[T]) children() []node { return nil }
func (n *leaf[T]) numChildren() uint16 { return 0 }
func (n *leaf[T]) path() []byte { return n.suffix }

// Not applicable to leafs and should not be called, so panic if we do.
func (n *leaf[T]) setPrefix(pre []byte) { panic("setPrefix called on leaf") }
func (n *leaf[T]) addChild(_ byte, _ node) { panic("addChild called on leaf") }
func (n *leaf[T]) findChild(_ byte) *node { panic("findChild called on leaf") }
func (n *leaf[T]) grow() node { panic("grow called on leaf") }
func (n *leaf[T]) deleteChild(_ byte) { panic("deleteChild called on leaf") }
func (n *leaf[T]) shrink() node { panic("shrink called on leaf") }
45 changes: 45 additions & 0 deletions server/stree/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

// Internal node interface.
type node interface {
isLeaf() bool
base() *meta
setPrefix(pre []byte)
addChild(c byte, n node)
findChild(c byte) *node
deleteChild(c byte)
isFull() bool
grow() node
shrink() node
matchParts(parts [][]byte) ([][]byte, bool)
kind() string
iter(f func(node) bool)
children() []node
numChildren() uint16
path() []byte
}

// Maximum prefix len
// We expect the most savings to come from long shared prefixes.
// This also makes the meta base layer exactly 64 bytes, a normal L1 cache line.
const maxPrefixLen = 60

// 64 bytes total - an L1 cache line.
type meta struct {
prefix [maxPrefixLen]byte
prefixLen uint16
size uint16
}
121 changes: 121 additions & 0 deletions server/stree/node16.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

// Node with 16 children
type node16 struct {
meta
child [16]node
key [16]byte
}

func newNode16(prefix []byte) *node16 {
nn := &node16{}
nn.setPrefix(prefix)
return nn
}

func (n *node16) isLeaf() bool { return false }
func (n *node16) base() *meta { return &n.meta }

func (n *node16) setPrefix(pre []byte) {
n.prefixLen = uint16(min(len(pre), maxPrefixLen))
for i := uint16(0); i < n.prefixLen; i++ {
n.prefix[i] = pre[i]
}
}

// Currently we do not keep node16 sorted or use bitfields for traversal so just add to the end.
// TODO(dlc) - We should revisit here with more detailed benchmarks.
func (n *node16) addChild(c byte, nn node) {
if n.size >= 16 {
panic("node16 full!")
}
n.key[n.size] = c
n.child[n.size] = nn
n.size++
}

func (n *node16) numChildren() uint16 { return n.size }
func (n *node16) path() []byte { return n.prefix[:n.prefixLen] }

func (n *node16) findChild(c byte) *node {
for i := uint16(0); i < n.size; i++ {
if n.key[i] == c {
return &n.child[i]
}
}
return nil
}

func (n *node16) isFull() bool { return n.size >= 16 }

func (n *node16) grow() node {
nn := newNode256(n.prefix[:n.prefixLen])
for i := 0; i < 16; i++ {
nn.addChild(n.key[i], n.child[i])
}
return nn
}

// Deletes a child from the node.
func (n *node16) deleteChild(c byte) {
for i, last := uint16(0), n.size-1; i < n.size; i++ {
if n.key[i] == c {
// Unsorted so just swap in last one here, else nil if last.
if i < last {
n.key[i] = n.key[last]
n.child[i] = n.child[last]
n.key[last] = 0
n.child[last] = nil
} else {
n.key[i] = 0
n.child[i] = nil
}
n.size--
return
}
}
}

// Shrink if needed and return new node, otherwise return nil.
func (n *node16) shrink() node {
if n.size > 4 {
return nil
}
nn := newNode4(nil)
for i := uint16(0); i < n.size; i++ {
nn.addChild(n.key[i], n.child[i])
}
return nn
}

// Will match parts against our prefix.no
func (n *node16) matchParts(parts [][]byte) ([][]byte, bool) {
return matchParts(parts, n.prefix[:n.prefixLen])
}

// Iterate over all children calling func f.
func (n *node16) iter(f func(node) bool) {
for i := uint16(0); i < n.size; i++ {
if !f(n.child[i]) {
return
}
}
}

// Return our children as a slice.
func (n *node16) children() []node {
return n.child[:n.size]
}

0 comments on commit d9235ab

Please sign in to comment.