Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix Infinite Loop in Reader's HasNext Function #1182

Merged
merged 8 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 26 additions & 16 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,15 +570,30 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {

func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
return nil, errors.New("failed to redeliver closing or closed consumer")
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
backoff := &internal.DefaultBackoff{}
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

// wait for the request to complete
<-req.doneCh
return req.msgID, req.err
// wait for the request to complete
<-req.doneCh
return req.msgID, req.err
}
for {
msgID, err := request()
if err == nil {
return msgID, nil
}
nextDelay := backoff.Next()
if backoff.IsMaxBackoffReached() {
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay)
time.Sleep(nextDelay)
}
}

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
Expand Down Expand Up @@ -1977,16 +1992,11 @@ func (pc *partitionConsumer) hasNext() bool {
return true
}

for {
lastMsgID, err := pc.getLastMessageID()
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id from broker")
continue
} else {
pc.lastMessageInBroker = lastMsgID
break
}
lastMsgID, err := pc.getLastMessageID()
if err != nil {
return false
}
pc.lastMessageInBroker = lastMsgID

return pc.hasMoreMessages()
}
Expand Down
1 change: 1 addition & 0 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Reader interface {
Next(context.Context) (Message, error)

// HasNext checks if there is any message available to read from the current position
// If there is any errors, it will return false
HasNext() bool

// Close the reader and stop the broker to push more messages
Expand Down
56 changes: 56 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
Expand Down Expand Up @@ -1023,3 +1024,58 @@ func createPartitionedTopic(topic string, n int) error {
}
return nil
}

func TestReaderHasNextFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
topic := newTopicName()
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
r.(*reader).c.consumers[0].state.Store(consumerClosing)
assert.False(t, r.HasNext())
}

func TestReaderHasNextRetryFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
topic := newTopicName()
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
// Retry the connection 10 seconds after it's disconnected.
BackoffPolicy: newTestBackoffPolicy(10*time.Second, 10*time.Second),
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
})
assert.Nil(t, err)

// Disconnect the connection
r.(*reader).c.consumers[0].conn.Load().(internal.Connection).Close()
minTimer := time.NewTimer(10 * time.Second) // Timer to check if r.HasNext() blocked for at least 10s
maxTimer := time.NewTimer(20 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 20s

done := make(chan bool)
go func() {
assert.False(t, r.HasNext())
done <- true
}()

select {
case <-maxTimer.C:
t.Fatal("r.HasNext() blocked for more than 20s")
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
case <-done:

if minTimer.Stop() {
t.Fatal("r.HasNext() did not block for at least 10s")
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
}
if !maxTimer.Stop() {
t.Fatal("r.HasNext() blocked for more than 20s")
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
}
}

}