Skip to content

Commit

Permalink
[Fix][Producer] check if message is nil
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Jul 4, 2023
1 parent 163cd7e commit 5d3da50
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,11 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
if msg == nil {
runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil"))
return
}

// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
Expand Down
11 changes: 11 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func TestSimpleProducer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, ID)
}

_, err = producer.Send(context.Background(), nil)
assert.NotNil(t, err)
}

func TestProducerAsyncSend(t *testing.T) {
Expand Down Expand Up @@ -152,6 +155,14 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Wait()

assert.Equal(t, 0, errors.Size())

wg.Add(1)
producer.SendAsync(context.Background(), nil, func(id MessageID, m *ProducerMessage, e error) {
assert.NotNil(t, e)
assert.Nil(t, id)
wg.Done()
})
wg.Wait()
}

func TestProducerCompression(t *testing.T) {
Expand Down

0 comments on commit 5d3da50

Please sign in to comment.