Skip to content

Commit

Permalink
[FIXED] Acking a redelivered msg with more pending outstanding that s…
Browse files Browse the repository at this point in the history
…hould move the ack floor. (#5008)

A prior fix to deal with larger consumer delivery numbers due to
redeliveries caused the ack floor calculations to not properly work when
a gap was filled by an ack of a redelivered msg.

The ack floor would move and fix itself when the pending went to zero.
This fixes the bug introduced and goes back to the pending sequence
always being the original consumer delivery sequence.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jan 27, 2024
2 parents 98b4e22 + 70196bd commit 5abe4cc
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
6 changes: 5 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4384,8 +4384,9 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)
}
if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
p.Timestamp = time.Now().UnixNano()
p.Sequence = dseq
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
Expand Down Expand Up @@ -4606,6 +4607,8 @@ func (o *consumer) checkPending() {
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
// Mimic behavior in processAckMsg when pending is empty.
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}

// Update our state if needed.
Expand Down Expand Up @@ -4936,6 +4939,7 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}

// We need to remove all those being queued for redelivery under o.rdq
Expand Down
67 changes: 67 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22247,3 +22247,70 @@ func TestJetStreamDirectGetBatchMaxBytes(t *testing.T) {
expected := (int(s.getOpts().MaxPending) / msgSize) + 1
sendRequestAndCheck(&JSApiMsgGetRequest{Seq: 1, Batch: 200}, expected+1)
}

func TestJetStreamConsumerNakThenAckFloorMove(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

for i := 0; i < 11; i++ {
js.Publish("foo", []byte("OK"))
}

sub, err := js.PullSubscribe("foo", "dlc", nats.AckWait(100*time.Millisecond))
require_NoError(t, err)

msgs, err := sub.Fetch(11)
require_NoError(t, err)

// Nak first one
msgs[0].Nak()

// Ack 2-10
for i := 1; i < 10; i++ {
msgs[i].AckSync()
}
// Hold onto last.
lastMsg := msgs[10]

ci, err := sub.ConsumerInfo()
require_NoError(t, err)

require_Equal(t, ci.AckFloor.Consumer, 0)
require_Equal(t, ci.AckFloor.Stream, 0)
require_Equal(t, ci.NumAckPending, 2)

// Grab first messsage again and ack this time.
msgs, err = sub.Fetch(1)
require_NoError(t, err)
msgs[0].AckSync()

ci, err = sub.ConsumerInfo()
require_NoError(t, err)

require_Equal(t, ci.Delivered.Consumer, 12)
require_Equal(t, ci.Delivered.Stream, 11)
require_Equal(t, ci.AckFloor.Consumer, 10)
require_Equal(t, ci.AckFloor.Stream, 10)
require_Equal(t, ci.NumAckPending, 1)

// Make sure when we ack last one we collapse the AckFloor.Consumer
// with the higher delivered due to re-deliveries.
lastMsg.AckSync()
ci, err = sub.ConsumerInfo()
require_NoError(t, err)

require_Equal(t, ci.Delivered.Consumer, 12)
require_Equal(t, ci.Delivered.Stream, 11)
require_Equal(t, ci.AckFloor.Consumer, 12)
require_Equal(t, ci.AckFloor.Stream, 11)
require_Equal(t, ci.NumAckPending, 0)
}

0 comments on commit 5abe4cc

Please sign in to comment.