Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Make sure to properly handle filtered purges with consumers that have a wider filtered subject. #5075

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 28 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4901,17 +4901,26 @@ func (o *consumer) hasNoLocalInterest() bool {

// This is when the underlying stream has been purged.
// sseq is the new first seq for the stream after purge.
// Lock should be held.
func (o *consumer) purge(sseq uint64, slseq uint64) {
// Lock should NOT be held.
func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) {
// Do not update our state unless we know we are the leader.
if !o.isLeader() {
return
}
// Signals all have been purged for this consumer.
if sseq == 0 {
if sseq == 0 && !isWider {
sseq = slseq + 1
}

var store StreamStore
if isWider {
o.mu.RLock()
if o.mset != nil {
store = o.mset.store
}
o.mu.RUnlock()
}

o.mu.Lock()
// Do not go backwards
if o.sseq < sseq {
Expand All @@ -4920,7 +4929,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {

if o.asflr < sseq {
o.asflr = sseq - 1

// We need to remove those no longer relevant from pending.
for seq, p := range o.pending {
if seq <= o.asflr {
Expand All @@ -4934,8 +4942,24 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
delete(o.rdc, seq)
// rdq handled below.
}
if isWider && store != nil {
// Our filtered subject, which could be all, is wider than the underlying purge.
// We need to check if the pending items left are still valid.
var smv StoreMsg
if _, err := store.LoadMsg(seq, &smv); err == errDeletedMsg || err == ErrStoreMsgNotFound {
if p.Sequence > o.adflr {
o.adflr = p.Sequence
if o.adflr > o.dseq {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
delete(o.rdc, seq)
}
}
}
}

// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
Expand Down
77 changes: 77 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13534,6 +13534,13 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) {
t.Fatalf("Expected NumPending to be 10, got %d", ci.NumPending)
}

// Also check unfiltered with interleaving messages.
_, err = js.AddConsumer("S", &nats.ConsumerConfig{
Durable: "all",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// Now purge only adam.
jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "FOO.adam"})
_, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "S"), jr, time.Second)
Expand All @@ -13559,6 +13566,12 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) {
if ci.AckFloor.Stream != 20 {
t.Fatalf("Expected AckFloor for stream to be 20, got %d", ci.AckFloor.Stream)
}

ci, err = js.ConsumerInfo("S", "all")
require_NoError(t, err)
if ci.NumPending != 10 {
t.Fatalf("Expected NumPending to be 10, got %d", ci.NumPending)
}
}

// Issue #2662
Expand Down Expand Up @@ -22325,3 +22338,67 @@ func TestJetStreamConsumerNakThenAckFloorMove(t *testing.T) {
require_Equal(t, ci.AckFloor.Stream, 11)
require_Equal(t, ci.NumAckPending, 0)
}

func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
})
require_NoError(t, err)

for i := 0; i < 5; i++ {
js.Publish("foo", []byte("OK"))
js.Publish("bar", []byte("OK"))
}

// Note that there are no subject filters here, this is deliberate
// as previously the purge with filter code was checking for them.
// We want to prove that unfiltered consumers also get purged.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "my_consumer",
AckPolicy: nats.AckExplicitPolicy,
MaxAckPending: 10,
})
require_NoError(t, err)
require_Equal(t, ci.NumPending, 10)
require_Equal(t, ci.NumAckPending, 0)

sub, err := js.PullSubscribe(">", "", nats.Bind("TEST", "my_consumer"))
require_NoError(t, err)

msgs, err := sub.Fetch(10)
require_NoError(t, err)
require_Len(t, len(msgs), 10)

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 10)

require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{
Subject: "foo",
}))

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 5)

for i := 0; i < 5; i++ {
js.Publish("foo", []byte("OK"))
}
msgs, err = sub.Fetch(5)
require_NoError(t, err)
require_Len(t, len(msgs), 5)

ci, err = js.ConsumerInfo("TEST", "my_consumer")
require_NoError(t, err)
require_Equal(t, ci.NumPending, 0)
require_Equal(t, ci.NumAckPending, 10)
}
12 changes: 9 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,12 +2003,13 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
ss := store.FilteredState(fseq, preq.Subject)
fseq = ss.First
}

mset.clsMu.RLock()
for _, o := range mset.cList {
start := fseq
o.mu.RLock()
// we update consumer sequences if:
// no subject was specified, we can purge all consumers sequences
Expand All @@ -2018,10 +2019,15 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// or consumer filter subject is subset of purged subject,
// but not the other way around.
o.isEqualOrSubsetMatch(preq.Subject)
// Check if a consumer has a wider subject space then what we purged
var isWider bool
if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) {
doPurge, isWider = true, true
start = state.FirstSeq
}
o.mu.RUnlock()
if doPurge {
o.purge(fseq, lseq)

o.purge(start, lseq, isWider)
}
}
mset.clsMu.RUnlock()
Expand Down