Skip to content

Commit

Permalink
Make sure to properly handle filtered purge with a consumer with a wi…
Browse files Browse the repository at this point in the history
…der filtered subject.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 13, 2024
1 parent 29a35a3 commit 32a96c7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 7 deletions.
32 changes: 28 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4901,17 +4901,26 @@ func (o *consumer) hasNoLocalInterest() bool {

// This is when the underlying stream has been purged.
// sseq is the new first seq for the stream after purge.
// Lock should be held.
func (o *consumer) purge(sseq uint64, slseq uint64) {
// Lock should NOT be held.
func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) {
// Do not update our state unless we know we are the leader.
if !o.isLeader() {
return
}
// Signals all have been purged for this consumer.
if sseq == 0 {
if sseq == 0 && !isWider {
sseq = slseq + 1
}

var store StreamStore
if isWider {
o.mu.RLock()
if o.mset != nil {
store = o.mset.store
}
o.mu.RUnlock()
}

o.mu.Lock()
// Do not go backwards
if o.sseq < sseq {
Expand All @@ -4920,7 +4929,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {

if o.asflr < sseq {
o.asflr = sseq - 1

// We need to remove those no longer relevant from pending.
for seq, p := range o.pending {
if seq <= o.asflr {
Expand All @@ -4934,8 +4942,24 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
delete(o.rdc, seq)
// rdq handled below.
}
if isWider && store != nil {
// Our filtered subject, which could be all, is wider than the underlying purge.
// We need to check if the pending items left are still valid.
var smv StoreMsg
if _, err := store.LoadMsg(seq, &smv); err == errDeletedMsg || err == ErrStoreMsgNotFound {
if p.Sequence > o.adflr {
o.adflr = p.Sequence
if o.adflr > o.dseq {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
delete(o.rdc, seq)
}
}
}
}

// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
Expand Down
77 changes: 77 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13534,6 +13534,13 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) {
t.Fatalf("Expected NumPending to be 10, got %d", ci.NumPending)
}

// Also check unfiltered with interleaving messages.
_, err = js.AddConsumer("S", &nats.ConsumerConfig{
Durable: "all",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// Now purge only adam.
jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "FOO.adam"})
_, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "S"), jr, time.Second)
Expand All @@ -13559,6 +13566,12 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) {
if ci.AckFloor.Stream != 20 {
t.Fatalf("Expected AckFloor for stream to be 20, got %d", ci.AckFloor.Stream)
}

ci, err = js.ConsumerInfo("S", "all")
require_NoError(t, err)
if ci.NumPending != 10 {
t.Fatalf("Expected NumPending to be 10, got %d", ci.NumPending)
}
}

// Issue #2662
Expand Down Expand Up @@ -22325,3 +22338,67 @@ func TestJetStreamConsumerNakThenAckFloorMove(t *testing.T) {
require_Equal(t, ci.AckFloor.Stream, 11)
require_Equal(t, ci.NumAckPending, 0)
}

func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
})
require_NoError(t, err)

for i := 0; i < 5; i++ {
js.Publish("foo", []byte("OK"))
js.Publish("bar", []byte("OK"))
}

// Note that there are no subject filters here, this is deliberate
// as previously the purge with filter code was checking for them.
// We want to prove that unfiltered consumers also get purged.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "my_consumer",
AckPolicy: nats.AckExplicitPolicy,
MaxAckPending: 10,
})
require_NoError(t, err)
require_Equal(t, ci.NumPending, 10)
require_Equal(t, ci.NumAckPending, 0)

sub, err := js.PullSubscribe(">", "", nats.Bind("TEST", "my_consumer"))
require_NoError(t, err)

msgs, err := sub.Fetch(10)
require_NoError(t, err)
require_Len(t, len(msgs), 10)

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 10)

require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{
Subject: "foo",
}))

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 5)

for i := 0; i < 5; i++ {
js.Publish("foo", []byte("OK"))
}
msgs, err = sub.Fetch(5)
require_NoError(t, err)
require_Len(t, len(msgs), 5)

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 10)
}
12 changes: 9 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,12 +2003,13 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
ss := store.FilteredState(fseq, preq.Subject)
fseq = ss.First
}

mset.clsMu.RLock()
for _, o := range mset.cList {
start := fseq
o.mu.RLock()
// we update consumer sequences if:
// no subject was specified, we can purge all consumers sequences
Expand All @@ -2018,10 +2019,15 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// or consumer filter subject is subset of purged subject,
// but not the other way around.
o.isEqualOrSubsetMatch(preq.Subject)
// Check if a consumer has a wider subject space then what we purged
var isWider bool
if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) {
doPurge, isWider = true, true
start = state.FirstSeq
}
o.mu.RUnlock()
if doPurge {
o.purge(fseq, lseq)

o.purge(start, lseq, isWider)
}
}
mset.clsMu.RUnlock()
Expand Down

0 comments on commit 32a96c7

Please sign in to comment.