Skip to content

Commit

Permalink
[fix] [issue 1064] Fix the panic when try to flush in DisableBatching…
Browse files Browse the repository at this point in the history
…=true (#1065)

Fixes #1064 

### Motivation

If we set producer `DisableBatching=true`, it will be panic when call `producer.Flush()`. More details in #1064 .

### Modifications

- Avoid panic in non-batching producer
- Add unit test to cover `Flush()` in non-batching producer.
  • Loading branch information
Gleiphir2769 committed Jul 25, 2023
1 parent 9867c29 commit 4bfd4aa
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,9 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
}
}

p.internalFlushCurrentBatch()
if !p.options.DisableBatching {
p.internalFlushCurrentBatch()
}

pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
Expand Down
44 changes: 44 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,50 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Wait()
}

func TestProducerFlushDisableBatching(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
DisableBatching: true,
})

assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()

wg := sync.WaitGroup{}
wg.Add(10)
errors := internal.NewBlockingQueue(10)

for i := 0; i < 10; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
} else {
log.Info("Published message ", id)
}
wg.Done()
})

assert.NoError(t, err)
}

err = producer.Flush()
assert.Nil(t, err)

wg.Wait()

assert.Equal(t, 0, errors.Size())
}

func TestProducerCompression(t *testing.T) {
type testProvider struct {
name string
Expand Down

0 comments on commit 4bfd4aa

Please sign in to comment.