Skip to content

Commit

Permalink
[fix] Close consumer resources if creation fails (#1070)
Browse files Browse the repository at this point in the history
### Motivation

When a consumer fails to get created, we should close any resources that it created to prevent leaks of internal resources and leaks of the consumer on the broker side. The broker leak could happen if the connection was left open. These fixes are similar to #1061.

### Modifications

* Close `ackGroupingTracker` and `chunkedMsgCtxMap` if `grabConn` fails. We cannot call `Close` on the consumer because the state is not `Ready`. If we re-design the consumer, it could be nice to be able to call `Close` in this scenario.
* Call `Close` on the consumer in cases where we move it to `Ready` but determine it is not able to be created.
* Fix typo in comment

(cherry picked from commit a3fcc9a)
  • Loading branch information
michaeljmarshall authored and RobertIndie committed Sep 7, 2023
1 parent 1724dc9 commit 191685f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 4 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
pc.nackTracker.Close()
pc.ackGroupingTracker.close()
pc.chunkedMsgCtxMap.Close()
return nil, err
}
pc.log.Info("Created consumer")
Expand All @@ -381,7 +383,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
pc.nackTracker.Close()
pc.Close()
return nil, err
}
if msgID.entryID != noMessageEntry {
Expand All @@ -390,7 +392,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
// use the WithoutClear version because the dispatcher is not started yet
err = pc.requestSeekWithoutClear(msgID.messageID)
if err != nil {
pc.nackTracker.Close()
pc.Close()
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ func (p *partitionProducer) setProducerState(state producerState) {
p.state.Swap(int32(state))
}

// set a new consumerState and return the last state
// set a new producerState and return the last state
// returns bool if the new state has been set or not
func (p *partitionProducer) casProducerState(oldState, newState producerState) bool {
return p.state.CAS(int32(oldState), int32(newState))
Expand Down

0 comments on commit 191685f

Please sign in to comment.