Skip to content

Commit

Permalink
[fix] [issue 1057]: Fix the producer flush opertion is not guarantee …
Browse files Browse the repository at this point in the history
…to flush all messages (#1058)

Fixes #1057

### Motivation

`dataChan` is introduced by #1029  to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed.

### Modifications

- Fix the producer flush opertion is not guarantee to flush all messages

(cherry picked from commit 9867c29)
  • Loading branch information
Gleiphir2769 authored and RobertIndie committed Sep 7, 2023
1 parent e8a247f commit 5c5af6a
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

func (p *partitionProducer) internalFlush(fr *flushRequest) {
// clear all the messages which have sent to dataChan before flush
if len(p.dataChan) != 0 {
oldDataChan := p.dataChan
p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages)
for len(oldDataChan) != 0 {
pendingData := <-oldDataChan
p.internalSend(pendingData)
}
}

p.internalFlushCurrentBatch()

Expand Down

0 comments on commit 5c5af6a

Please sign in to comment.