Skip to content

Commit

Permalink
[Fix][Producer] Stop block request even if Value and Payload are both…
Browse files Browse the repository at this point in the history
… set (#1052)

### Motivation
Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil && msg.Payload != nil`, request will be blocked forever 'cause `defer request.stopBlock()` is set up after the verify logic. 
```go
if msg.Value != nil && msg.Payload != nil {
	p.log.Error("Can not set Value and Payload both")
	runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
	return
}

// The block chan must be closed when returned with exception
defer request.stopBlock()
```
Here is the PR to stop block request even if Value and Payload are both set

### Modifications

- pulsar/producer_partition.go


---------

Co-authored-by: gunli <gunli@tencent.com>
  • Loading branch information
gunli and gunli committed Jul 13, 2023
1 parent be35740 commit e45122c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
11 changes: 6 additions & 5 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

var schemaPayload []byte
var err error
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
return
}

// The block chan must be closed when returned with exception
defer request.stopBlock()
Expand Down Expand Up @@ -1117,6 +1112,12 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
return
}

if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both"))
return
}

// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
Expand Down
15 changes: 15 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) {

_, err = producer.Send(context.Background(), nil)
assert.NotNil(t, err)

_, err = producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
Value: []byte("hello"),
})
assert.NotNil(t, err)
}

func TestProducerAsyncSend(t *testing.T) {
Expand Down Expand Up @@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Done()
})
wg.Wait()

wg.Add(1)
producer.SendAsync(context.Background(), &ProducerMessage{Payload: []byte("hello"), Value: []byte("hello")},
func(id MessageID, m *ProducerMessage, e error) {
assert.NotNil(t, e)
assert.Nil(t, id)
wg.Done()
})
wg.Wait()
}

func TestProducerCompression(t *testing.T) {
Expand Down

0 comments on commit e45122c

Please sign in to comment.