Skip to content

Commit

Permalink
Make sure to properly set first and match for consumers that match al…
Browse files Browse the repository at this point in the history
…l for filtered purge.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 13, 2024
1 parent 29a35a3 commit 4a9d01c
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
4 changes: 4 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3385,6 +3385,10 @@ func (o *consumer) isFilteredMatch(subj string) bool {
// of one of the filter subjects.
// Lock should be held.
func (o *consumer) isEqualOrSubsetMatch(subj string) bool {
// Empty means matches all or FWCS.
if len(o.subjf) == 0 {
return true
}
for _, filter := range o.subjf {
if !filter.hasWildcard && subj == filter.subject {
return true
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func TestJetStreamConsumerIsEqualOrSubsetMatch(t *testing.T) {
subject string
result bool
}{
{"no filter", []string{}, "foo.bar", false},
{"no filter", []string{}, "foo.bar", true},
{"literal match", []string{"foo.baz", "foo.bar"}, "foo.bar", true},
{"literal mismatch", []string{"foo.baz", "foo.bar"}, "foo.ban", false},
{"literal match", []string{"bar.>", "foo.>"}, "foo.>", true},
Expand Down
66 changes: 66 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22325,3 +22325,69 @@ func TestJetStreamConsumerNakThenAckFloorMove(t *testing.T) {
require_Equal(t, ci.AckFloor.Stream, 11)
require_Equal(t, ci.NumAckPending, 0)
}

func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
})
require_NoError(t, err)

for i := 0; i < 5; i++ {
js.Publish("foo", []byte("OK"))
}
for i := 0; i < 5; i++ {
js.Publish("bar", []byte("OK"))
}

// Note that there are no subject filters here, this is deliberate
// as previously the purge with filter code was checking for them.
// We want to prove that unfiltered consumers also get purged.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "my_consumer",
AckPolicy: nats.AckExplicitPolicy,
MaxAckPending: 10,
})
require_NoError(t, err)
require_Equal(t, ci.NumPending, 10)
require_Equal(t, ci.NumAckPending, 0)

sub, err := js.PullSubscribe(">", "", nats.Bind("TEST", "my_consumer"))
require_NoError(t, err)

msgs, err := sub.Fetch(10)
require_NoError(t, err)
require_Len(t, len(msgs), 10)

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 10)

require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{
Subject: "foo",
}))

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 5)

for i := 0; i < 5; i++ {
js.Publish("foo", []byte("OK"))
}
msgs, err = sub.Fetch(5)
require_NoError(t, err)
require_Len(t, len(msgs), 5)

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 10)
}
5 changes: 3 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,8 +2003,9 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
fseq = ss.First
if ss := store.FilteredState(fseq, preq.Subject); ss.First > fseq {
fseq = ss.First
}
}

mset.clsMu.RLock()
Expand Down

0 comments on commit 4a9d01c

Please sign in to comment.