Skip to content

Commit

Permalink
[fix] Send Close Command on Producer/Consumer create timeout (#1061)
Browse files Browse the repository at this point in the history
### Motivation

This change is the same as apache/pulsar#13161 and apache/pulsar#16616, and is justified by these lines of our binary protocol spec:

* https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L301-L304
* https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L468-L471

### Modifications

* When a producer or a consumer times out during creation, make an attempt to close the producer or consumer by sending the appropriate close command. Failures can safely be ignored because the only time that the close will actually matter is when the TCP connection is open for other protocol messages. The one nuance is that we send the close command to the same address pair that we send the create command.
  • Loading branch information
michaeljmarshall committed Jul 20, 2023
1 parent 28f61d2 commit d4e08c6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,15 @@ func (pc *partitionConsumer) grabConn() error {

if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
if err == internal.ErrRequestTimeOut {
requestID := pc.client.rpcClient.NewRequestID()
cmdClose := &pb.CommandCloseConsumer{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
}

Expand Down
8 changes: 8 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ func (p *partitionProducer) grabCnx() error {
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
if err == internal.ErrRequestTimeOut {
id := p.client.rpcClient.NewRequestID()
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
&pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
}
return err
}

Expand Down

0 comments on commit d4e08c6

Please sign in to comment.