Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Acking a redelivered msg with more pending outstanding that should move the ack floor. #5008

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4384,8 +4384,9 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
}
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = time.Now().UnixNano()
p.Sequence = dseq
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
Expand Down Expand Up @@ -4606,6 +4607,8 @@ func (o *consumer) checkPending() {
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
// Mimic behavior in processAckMsg when pending is empty.
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}

// Update our state if needed.
Expand Down Expand Up @@ -4936,6 +4939,7 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}

// We need to remove all those being queued for redelivery under o.rdq
Expand Down
67 changes: 67 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22247,3 +22247,70 @@ func TestJetStreamDirectGetBatchMaxBytes(t *testing.T) {
expected := (int(s.getOpts().MaxPending) / msgSize) + 1
sendRequestAndCheck(&JSApiMsgGetRequest{Seq: 1, Batch: 200}, expected+1)
}

func TestJetStreamConsumerNakThenAckFloorMove(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

for i := 0; i < 11; i++ {
js.Publish("foo", []byte("OK"))
}

sub, err := js.PullSubscribe("foo", "dlc", nats.AckWait(100*time.Millisecond))
require_NoError(t, err)

msgs, err := sub.Fetch(11)
require_NoError(t, err)

// Nak first one
msgs[0].Nak()

// Ack 2-10
for i := 1; i < 10; i++ {
msgs[i].AckSync()
}
// Hold onto last.
lastMsg := msgs[10]

ci, err := sub.ConsumerInfo()
require_NoError(t, err)

require_Equal(t, ci.AckFloor.Consumer, 0)
require_Equal(t, ci.AckFloor.Stream, 0)
require_Equal(t, ci.NumAckPending, 2)

// Grab first messsage again and ack this time.
msgs, err = sub.Fetch(1)
require_NoError(t, err)
msgs[0].AckSync()

ci, err = sub.ConsumerInfo()
require_NoError(t, err)

require_Equal(t, ci.Delivered.Consumer, 12)
require_Equal(t, ci.Delivered.Stream, 11)
require_Equal(t, ci.AckFloor.Consumer, 10)
require_Equal(t, ci.AckFloor.Stream, 10)
require_Equal(t, ci.NumAckPending, 1)

// Make sure when we ack last one we collapse the AckFloor.Consumer
// with the higher delivered due to re-deliveries.
lastMsg.AckSync()
ci, err = sub.ConsumerInfo()
require_NoError(t, err)

require_Equal(t, ci.Delivered.Consumer, 12)
require_Equal(t, ci.Delivered.Stream, 11)
require_Equal(t, ci.AckFloor.Consumer, 12)
require_Equal(t, ci.AckFloor.Stream, 11)
require_Equal(t, ci.NumAckPending, 0)
}