Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): fix out of order issue when exactly once is enabled #9472

Merged
merged 5 commits into from Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
124 changes: 106 additions & 18 deletions pubsub/integration_test.go
Expand Up @@ -1177,29 +1177,32 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
}

received := make(chan string, numItems)
ctx, cancel := context.WithCancel(ctx)
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Errorf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Errorf("timed out after 30s waiting for item %d", i)
cancel()
}
}
cancel()
}()

for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Fatalf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Fatalf("timed out after 30s waiting for item %d", i)
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}
}
Expand Down Expand Up @@ -1445,6 +1448,91 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
}
}

func TestIntegration_OrderingWithExactlyOnce(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
EnableExactlyOnceDelivery: true,
}); err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
}

topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true

orderingKey := "some-ordering-key"
numItems := 10
for i := 0; i < numItems; i++ {
r := topic.Publish(ctx, &Message{
ID: fmt.Sprintf("id-%d", i),
Data: []byte(fmt.Sprintf("item-%d", i)),
OrderingKey: orderingKey,
})
go func() {
if _, err := r.Get(ctx); err != nil {
t.Error(err)
}
}()
}

received := make(chan string, numItems)
ctx, cancel := context.WithCancel(ctx)
go func() {
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Errorf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Errorf("timed out after 30s waiting for item %d", i)
cancel()
}
}
cancel()
}()

if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}

}

func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down
8 changes: 6 additions & 2 deletions pubsub/iterator.go
Expand Up @@ -313,9 +313,13 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
}
}
// Only return for processing messages that were successfully modack'ed.
// Iterate over the original messages slice for ordering.
v := make([]*ipubsub.Message, 0, len(pendingMessages))
for _, m := range pendingMessages {
v = append(v, m)
for _, m := range msgs {
ackID := msgAckID(m)
if _, ok := pendingMessages[ackID]; ok {
v = append(v, m)
}
}
return v, nil
}
Expand Down