Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] NumPending calculations and subject index memory in filestore #4960

Merged
merged 12 commits into from
Jan 20, 2024
Merged
236 changes: 108 additions & 128 deletions server/filestore.go

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5316,7 +5316,8 @@ func TestFileStoreFullStateBasics(t *testing.T) {

// Make sure we are tracking subjects correctly.
fs.mu.RLock()
psi := *fs.psim[subj]
info, _ := fs.psim.Find(stringToBytes(subj))
psi := *info
fs.mu.RUnlock()

require_Equal(t, psi.total, 4)
Expand All @@ -5341,7 +5342,8 @@ func TestFileStoreFullStateBasics(t *testing.T) {
require_Equal(t, fs.numMsgBlocks(), 3)
// Make sure we are tracking subjects correctly.
fs.mu.RLock()
psi = *fs.psim[subj]
info, _ = fs.psim.Find(stringToBytes(subj))
psi = *info
fs.mu.RUnlock()
require_Equal(t, psi.total, 5)
require_Equal(t, psi.fblk, 1)
Expand Down Expand Up @@ -5971,7 +5973,8 @@ func TestFileStoreCompactAndPSIMWhenDeletingBlocks(t *testing.T) {
require_Equal(t, fs.numMsgBlocks(), 1)

fs.mu.RLock()
psi := fs.psim[subj]
info, _ := fs.psim.Find(stringToBytes(subj))
psi := *info
fs.mu.RUnlock()

require_Equal(t, psi.total, 1)
Expand Down Expand Up @@ -6014,7 +6017,6 @@ func TestFileStoreTrackSubjLenForPSIM(t *testing.T) {

check := func() {
t.Helper()

var total int
for _, slen := range smap {
total += slen
Expand Down Expand Up @@ -6197,7 +6199,7 @@ func TestFileStoreNumPendingLastBySubject(t *testing.T) {
sd, blkSize := t.TempDir(), uint64(1024)
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: blkSize},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

Expand Down Expand Up @@ -6263,11 +6265,11 @@ func TestFileStoreCorruptPSIMOnDisk(t *testing.T) {

// Force bad subject.
fs.mu.Lock()
psi := fs.psim["foo.bar"]
psi, _ := fs.psim.Find(stringToBytes("foo.bar"))
bad := make([]byte, 7)
crand.Read(bad)
fs.psim[string(bad)] = psi
delete(fs.psim, "foo.bar")
fs.psim.Insert(bad, *psi)
fs.psim.Delete(stringToBytes("foo.bar"))
fs.dirty++
fs.mu.Unlock()

Expand Down
70 changes: 70 additions & 0 deletions server/stree/dump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

import (
"fmt"
"io"
"strings"
)

// TODO(dlc) - This I used alot for debugging and testing. Probably should be made to return a string vs using fmt.Printf.
derekcollison marked this conversation as resolved.
Show resolved Hide resolved

// For dumping out a text representation of a tree.
func (t *SubjectTree[T]) Dump(w io.Writer) {
t.dump(w, t.root, 0)
fmt.Fprintln(w)
}

// Will dump out a node.
func (t *SubjectTree[T]) dump(w io.Writer, n node, depth int) {
if n == nil {
fmt.Fprintf(w, "EMPTY\n")
return
}
if n.isLeaf() {
leaf := n.(*leaf[T])
fmt.Fprintf(w, "%s LEAF: Suffix: %q Value: %+v\n", dumpPre(depth), leaf.suffix, leaf.value)
n = nil
} else {
// We are a node type here, grab meta portion.
bn := n.base()
fmt.Fprintf(w, "%s %s Prefix: %q\n", dumpPre(depth), n.kind(), bn.prefix[:bn.prefixLen])
depth++
n.iter(func(n node) bool {
t.dump(w, n, depth)
return true
})
}
}

// For individual node/leaf dumps.
func (n *leaf[T]) kind() string { return "LEAF" }
func (n *node4) kind() string { return "NODE4" }
func (n *node16) kind() string { return "NODE16" }
func (n *node256) kind() string { return "NODE256" }

// Calculates the indendation, etc.
func dumpPre(depth int) string {
if depth == 0 {
return "-- "
} else {
var b strings.Builder
for i := 0; i < depth; i++ {
b.WriteString(" ")
}
b.WriteString("|__ ")
return b.String()
}
}
37 changes: 37 additions & 0 deletions server/stree/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

import "testing"

func require_True(t *testing.T, b bool) {
t.Helper()
if !b {
t.Fatalf("require true, but got false")
}
}

func require_False(t *testing.T, b bool) {
t.Helper()
if b {
t.Fatalf("require false, but got true")
}
}

func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}
49 changes: 49 additions & 0 deletions server/stree/leaf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

import (
"bytes"
)

// Leaf node
type leaf[T any] struct {
// This could be the whole subject, but most likely just the suffix portion.
// We will only store the suffix here and assume all prior prefix paths have
// been checked once we arrive at this leafnode.
suffix []byte
value T
}

func newLeaf[T any](suffix []byte, value T) *leaf[T] {
return &leaf[T]{copyBytes(suffix), value}
}

func (n *leaf[T]) isLeaf() bool { return true }
func (n *leaf[T]) base() *meta { return nil }
func (n *leaf[T]) match(subject []byte) bool { return bytes.Equal(subject, n.suffix) }
func (n *leaf[T]) setSuffix(suffix []byte) { n.suffix = copyBytes(suffix) }
func (n *leaf[T]) isFull() bool { return true }
func (n *leaf[T]) matchParts(parts [][]byte) ([][]byte, bool) { return matchParts(parts, n.suffix) }
func (n *leaf[T]) iter(f func(node) bool) {}
func (n *leaf[T]) numChildren() uint16 { return 0 }
func (n *leaf[T]) path() []byte { return n.suffix }

// Not applicable to leafs.
func (n *leaf[T]) setPrefix(pre []byte) { panic("setPrefix called on leaf") }
func (n *leaf[T]) addChild(_ byte, _ node) { panic("addChild called on leaf") }
func (n *leaf[T]) findChild(_ byte) *node { panic("findChild called on leaf") }
func (n *leaf[T]) grow() node { panic("grow called on leaf") }
func (n *leaf[T]) deleteChild(_ byte) { panic("deleteChild called on leaf") }
func (n *leaf[T]) shrink() node { panic("shrink called on leaf") }
44 changes: 44 additions & 0 deletions server/stree/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

// Internal node interface.
type node interface {
isLeaf() bool
base() *meta
setPrefix(pre []byte)
addChild(c byte, n node)
findChild(c byte) *node
deleteChild(c byte)
isFull() bool
grow() node
shrink() node
matchParts(parts [][]byte) ([][]byte, bool)
kind() string
iter(f func(node) bool)
numChildren() uint16
path() []byte
}

// Maximum prefix len
// We expect the most savings to come from long shared prefixes.
// This also makes the meta base layer exactly 64 bytes, a normal L1 cache line.
const maxPrefixLen = 60

// 64 bytes total - an L1 cache line.
type meta struct {
prefix [maxPrefixLen]byte
prefixLen uint16
size uint16
}
114 changes: 114 additions & 0 deletions server/stree/node16.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stree

// Node with 16 children
type node16 struct {
meta
children [16]node
keys [16]byte
}

func newNode16(prefix []byte) *node16 {
nn := &node16{}
nn.setPrefix(prefix)
return nn
}

func (n *node16) isLeaf() bool { return false }
func (n *node16) base() *meta { return &n.meta }

func (n *node16) setPrefix(pre []byte) {
n.prefixLen = uint16(min(len(pre), maxPrefixLen))
for i := uint16(0); i < n.prefixLen; i++ {
n.prefix[i] = pre[i]
}
}

// Currently we do not keep node16 sorted or use bitfields for traversal so just add to the end.
// TODO(dlc) - We should revisit here with more detailed benchmarks.
func (n *node16) addChild(c byte, nn node) {
if n.size >= 16 {
panic("node16 full!")
}
n.keys[n.size] = c
n.children[n.size] = nn
n.size++
}

func (n *node16) numChildren() uint16 { return n.size }
func (n *node16) path() []byte { return n.prefix[:n.prefixLen] }

func (n *node16) findChild(c byte) *node {
for i := uint16(0); i < n.size; i++ {
if n.keys[i] == c {
return &n.children[i]
}
}
return nil
}

func (n *node16) isFull() bool { return n.size >= 16 }

func (n *node16) grow() node {
nn := newNode256(n.prefix[:n.prefixLen])
for i := 0; i < 16; i++ {
nn.addChild(n.keys[i], n.children[i])
}
return nn
}

// Deletes a child from the node.
func (n *node16) deleteChild(c byte) {
for i, last := uint16(0), n.size-1; i < n.size; i++ {
if n.keys[i] == c {
// Unsorted so just swap in last one here, else nil if last.
if i < last {
n.keys[i] = n.keys[last]
n.children[i] = n.children[last]
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
} else {
n.keys[i] = 0
n.children[i] = nil
}
n.size--
return
}
}
}

// Shrink if needed and return new node, otherwise return nil.
func (n *node16) shrink() node {
if n.size > 4 {
return nil
}
nn := newNode4(nil)
for i := uint16(0); i < n.size; i++ {
nn.addChild(n.keys[i], n.children[i])
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
}
return nn
}

// Will match parts against our prefix.no
func (n *node16) matchParts(parts [][]byte) ([][]byte, bool) {
return matchParts(parts, n.prefix[:n.prefixLen])
}

// Iterate over all children calling func f.
func (n *node16) iter(f func(node) bool) {
for i := uint16(0); i < n.size; i++ {
if !f(n.children[i]) {
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}