Skip to content

Commit

Permalink
[Fix] Fix available permits in MessageReceived (#1181)
Browse files Browse the repository at this point in the history
Fixes #1180

### Motivation
In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent.
---------

Co-authored-by: panjinjun <1619-panjinjun@users.noreply.git.sysop.bigo.sg>
(cherry picked from commit 5d25827)
  • Loading branch information
panszobe authored and RobertIndie committed Feb 29, 2024
1 parent 6ce5421 commit 9fdefe2
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.metrics.MessagesReceived.Add(float64(numMsgs))
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))

var bytesReceived int
var (
bytesReceived int
skippedMessages int32
)
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil || payload == nil {
Expand All @@ -1102,6 +1105,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}
if ackSet != nil && !ackSet.Test(uint(i)) {
pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i)
skippedMessages++
continue
}

Expand All @@ -1120,6 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header

if pc.messageShouldBeDiscarded(trackingMsgID) {
pc.AckID(trackingMsgID)
skippedMessages++
continue
}

Expand All @@ -1144,6 +1149,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}

if pc.ackGroupingTracker.isDuplicate(msgID) {
skippedMessages++
continue
}

Expand Down Expand Up @@ -1218,6 +1224,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.markScaleIfNeed()
}

if skippedMessages > 0 {
pc.availablePermits.add(skippedMessages)
}

// send messages to the dispatcher
pc.queueCh <- messages
return nil
Expand Down

0 comments on commit 9fdefe2

Please sign in to comment.