Skip to content

Commit

Permalink
[Improve][Producer] simplify the flush logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Jul 4, 2023
1 parent 163cd7e commit 36fc804
Showing 1 changed file with 18 additions and 27 deletions.
45 changes: 18 additions & 27 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,13 +851,16 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
p._getConn().WriteData(buffer)
}

type pendingItemCallback func()

type pendingItem struct {
sync.Mutex
buffer internal.Buffer
sequenceID uint64
sentAt time.Time
sendRequests []interface{}
completed bool
callback pendingItemCallback
}

func (p *partitionProducer) internalFlushCurrentBatch() {
Expand Down Expand Up @@ -1067,15 +1070,10 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
return
}

sendReq := &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
fr.err = e
close(fr.doneCh)
},
pi.callback = func() {
fr.err = nil
close(fr.doneCh)
}

pi.sendRequests = append(pi.sendRequests, sendReq)
}

func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
Expand Down Expand Up @@ -1196,27 +1194,17 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
batchSize := int32(0)
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
batchSize = batchSize + 1
} else { // Flush request
break
}
}
batchSize := int32(len(pi.sendRequests))
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)
}
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)

if sr.callback != nil || len(p.options.Interceptors) > 0 {
msgID := newMessageID(
Expand Down Expand Up @@ -1378,6 +1366,9 @@ func (i *pendingItem) Complete() {
}
i.completed = true
buffersPool.Put(i.buffer)
if i.callback != nil {
i.callback()
}
}

// _setConn sets the internal connection field of this partition producer atomically.
Expand Down

0 comments on commit 36fc804

Please sign in to comment.