Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix Infinite Loop in Reader's HasNext Function #1182

Merged
merged 8 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 13 additions & 11 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ const (
)

type client struct {
cnxPool internal.ConnectionPool
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics
tcClient *transactionCoordinatorClient
memLimit internal.MemoryLimitController
closeOnce sync.Once
cnxPool internal.ConnectionPool
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics
tcClient *transactionCoordinatorClient
memLimit internal.MemoryLimitController
closeOnce sync.Once
operationTimeout time.Duration

log log.Logger
}
Expand Down Expand Up @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) {
c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
operationTimeout: operationTimeout,
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

Expand Down
53 changes: 37 additions & 16 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {

func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
return nil, errors.New("failed to redeliver closing or closed consumer")
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
remainTime := pc.client.operationTimeout
var backoff internal.BackoffPolicy
if pc.options.backoffPolicy != nil {
backoff = pc.options.backoffPolicy
} else {
backoff = &internal.DefaultBackoff{}
}
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

// wait for the request to complete
<-req.doneCh
return req.msgID, req.err
// wait for the request to complete
<-req.doneCh
return req.msgID, req.err
}
for {
msgID, err := request()
if err == nil {
return msgID, nil
}
if remainTime <= 0 {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
nextDelay := backoff.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
remainTime -= nextDelay
pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay)
time.Sleep(nextDelay)
}
}

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
Expand Down Expand Up @@ -1977,16 +2003,11 @@ func (pc *partitionConsumer) hasNext() bool {
return true
}

for {
lastMsgID, err := pc.getLastMessageID()
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id from broker")
continue
} else {
pc.lastMessageInBroker = lastMsgID
break
}
lastMsgID, err := pc.getLastMessageID()
if err != nil {
return false
}
pc.lastMessageInBroker = lastMsgID

return pc.hasMoreMessages()
}
Expand Down
1 change: 1 addition & 0 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Reader interface {
Next(context.Context) (Message, error)

// HasNext checks if there is any message available to read from the current position
// If there is any errors, it will return false
HasNext() bool

// Close the reader and stop the broker to push more messages
Expand Down
64 changes: 64 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -1023,3 +1024,66 @@ func createPartitionedTopic(topic string, n int) error {
}
return nil
}

func TestReaderHasNextFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
topic := newTopicName()
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
r.(*reader).c.consumers[0].state.Store(consumerClosing)
assert.False(t, r.HasNext())
}

func TestReaderHasNextRetryFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
OperationTimeout: 2 * time.Second,
})
assert.Nil(t, err)
topic := newTopicName()
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)

c := make(chan interface{})
defer close(c)

// Close the consumer events loop and assign a mock eventsCh
pc := r.(*reader).c.consumers[0]
pc.Close()
pc.state.Store(consumerReady)
pc.eventsCh = c

go func() {
for e := range c {
req, ok := e.(*getLastMsgIDRequest)
assert.True(t, ok, "unexpected event type")
req.err = errors.New("expected error")
close(req.doneCh)
}
}()
minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s
maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 3s
done := make(chan bool)
go func() {
assert.False(t, r.HasNext())
done <- true
}()

select {
case <-maxTimer.C:
t.Fatal("r.HasNext() blocked for more than 3s")
case <-done:
assert.False(t, minTimer.Stop(), "r.HasNext() did not block for at least 1s")
assert.True(t, maxTimer.Stop())
}

}