Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do more efficient matching of filter subjects in consumer #4864

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 19 additions & 13 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,13 @@ type consumer struct {

// A single subject filter.
type subjectFilter struct {
subject string
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
hasWildcard bool
subject string
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
hasWildcard bool
tokenizedSubject []string
}

type subjectFilters []*subjectFilter
Expand Down Expand Up @@ -936,8 +937,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects)
for _, filter := range subjects {
sub := &subjectFilter{
subject: filter,
hasWildcard: subjectHasWildcard(filter),
subject: filter,
hasWildcard: subjectHasWildcard(filter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, filter),
}
o.subjf = append(o.subjf, sub)
}
Expand Down Expand Up @@ -1858,8 +1860,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
newSubjf := make(subjectFilters, 0, len(newSubjects))
for _, newFilter := range newSubjects {
fs := &subjectFilter{
subject: newFilter,
hasWildcard: subjectHasWildcard(newFilter),
subject: newFilter,
hasWildcard: subjectHasWildcard(newFilter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, newFilter),
}
// If given subject was present, we will retain its fields values
// so `getNextMgs` can take advantage of already buffered `pmsgs`.
Expand Down Expand Up @@ -3361,9 +3364,10 @@ func (o *consumer) isFilteredMatch(subj string) bool {
}
// It's quicker to first check for non-wildcard filters, then
// iterate again to check for subset match.
// TODO(dlc) at speed might be better to just do a sublist with L2 and/or possibly L1.
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
for _, filter := range o.subjf {
if subjectIsSubsetMatch(subj, filter.subject) {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
return true
}
}
Expand Down Expand Up @@ -3945,8 +3949,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
} else {
if o.subjf != nil {
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], pmsg.subj)
for i, filter := range o.subjf {
if subjectIsSubsetMatch(pmsg.subj, filter.subject) {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
o.subjf[i].currentSeq--
o.subjf[i].nextSeq--
break
Expand Down
89 changes: 89 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nuid"
)

func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) {
Expand Down Expand Up @@ -792,3 +794,90 @@ func TestJetStreamConsumerMultipleFiltersLastPerSubject(t *testing.T) {
require_Equal(t, "two", msg.Subject)

}

func consumerWithFilterSubjects(filterSubjects []string) *consumer {
c := consumer{}
for _, filter := range filterSubjects {
sub := &subjectFilter{
subject: filter,
hasWildcard: subjectHasWildcard(filter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, filter),
}
c.subjf = append(c.subjf, sub)
}

return &c
}

func filterSubjects(n int) []string {
fs := make([]string, 0, n)
for {
literals := []string{"foo", "bar", nuid.Next(), "xyz", "abcdef"}
fs = append(fs, strings.Join(literals, "."))
if len(fs) == n {
return fs
}
// Create more filterSubjects by going through the literals and replacing one with the '*' wildcard.
l := len(literals)
for i := 0; i < l; i++ {
e := make([]string, l)
for j := 0; j < l; j++ {
if j == i {
e[j] = "*"
} else {
e[j] = literals[j]
}
}
fs = append(fs, strings.Join(e, "."))
if len(fs) == n {
return fs
}
}
}
}

func TestJetStreamConsumerIsFilteredMatch(t *testing.T) {
for _, test := range []struct {
name string
filterSubjects []string
subject string
result bool
}{
{"no filter", []string{}, "foo.bar", true},
{"literal match", []string{"foo.baz", "foo.bar"}, "foo.bar", true},
{"literal mismatch", []string{"foo.baz", "foo.bar"}, "foo.ban", false},
{"wildcard > match", []string{"bar.>", "foo.>"}, "foo.bar", true},
{"wildcard > match", []string{"bar.>", "foo.>"}, "bar.foo", true},
{"wildcard > mismatch", []string{"bar.>", "foo.>"}, "baz.foo", false},
{"wildcard * match", []string{"bar.*", "foo.*"}, "foo.bar", true},
{"wildcard * match", []string{"bar.*", "foo.*"}, "bar.foo", true},
{"wildcard * mismatch", []string{"bar.*", "foo.*"}, "baz.foo", false},
{"wildcard * match", []string{"foo.*.x", "foo.*.y"}, "foo.bar.x", true},
{"wildcard * match", []string{"foo.*.x", "foo.*.y", "foo.*.z"}, "foo.bar.z", true},
{"many mismatch", filterSubjects(100), "foo.bar.do.not.match.any.filter.subject", false},
{"many match", filterSubjects(100), "foo.bar.12345.xyz.abcdef", true}, // will be matched by "foo.bar.*.xyz.abcdef"
} {
test := test

t.Run(test.name, func(t *testing.T) {
t.Parallel()

c := consumerWithFilterSubjects(test.filterSubjects)
if res := c.isFilteredMatch(test.subject); res != test.result {
t.Fatalf("Subject %q filtered match of %v, should be %v, got %v",
test.subject, test.filterSubjects, test.result, res)
}
})
}
}

func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) {
subject := "foo.bar.do.not.match.any.filter.subject"
for n := 1; n <= 1024; n *= 2 {
name := fmt.Sprintf("%d filter subjects", int(n))
c := consumerWithFilterSubjects(filterSubjects(int(n)))
b.Run(name, func(b *testing.B) {
c.isFilteredMatch(subject)
})
}
}
6 changes: 4 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5291,6 +5291,8 @@ func (mset *stream) Store() StreamStore {
// Lock should be held.
func (mset *stream) partitionUnique(name string, partitions []string) bool {
for _, partition := range partitions {
psa := [32]string{}
pts := tokenizeSubjectIntoSlice(psa[:0], partition)
for n, o := range mset.consumers {
// Skip the consumer being checked.
if n == name {
Expand All @@ -5300,8 +5302,8 @@ func (mset *stream) partitionUnique(name string, partitions []string) bool {
return false
}
for _, filter := range o.subjf {
if subjectIsSubsetMatch(partition, filter.subject) ||
subjectIsSubsetMatch(filter.subject, partition) {
if isSubsetMatchTokenized(pts, filter.tokenizedSubject) ||
isSubsetMatchTokenized(filter.tokenizedSubject, pts) {
return false
}
}
Expand Down