From af0e2fdf4c1c6e557e84efb652558f15d83e0cb2 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 23 Feb 2024 17:46:37 -0800 Subject: [PATCH 1/3] fix(pubsub): fix out of ordering issue with exactly once --- pubsub/iterator.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index f45f1b995a5..7ec59c086c6 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -260,6 +260,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { it.eoMu.RUnlock() it.mu.Lock() + // TODO(hongalex): fix the out of order map when appending to pendingMessages + // This makes ordering keys logic wrong + // pendingMessages maps ackID -> message, and is used // only when exactly once delivery is enabled. // At first, all messages are pending, and they @@ -313,9 +316,13 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { } } // Only return for processing messages that were successfully modack'ed. + // Iterate over the original messages slice for ordering. v := make([]*ipubsub.Message, 0, len(pendingMessages)) - for _, m := range pendingMessages { - v = append(v, m) + for _, m := range msgs { + ackID := msgAckID(m) + if _, ok := pendingMessages[ackID]; ok { + v = append(v, m) + } } return v, nil } From 74887f6412bdbc0cb6dfac52bf7244090269e749 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 26 Feb 2024 13:05:28 -0800 Subject: [PATCH 2/3] add ordering test and fix race condition with resource cleanup --- pubsub/integration_test.go | 124 +++++++++++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 18 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index a0853e29fd1..8eff7912121 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1177,29 +1177,32 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { } received := make(chan string, numItems) + ctx, cancel := context.WithCancel(ctx) go func() { - if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { - defer msg.Ack() - if msg.OrderingKey != orderingKey { - t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey) - } - - received <- string(msg.Data) - }); err != nil { - if c := status.Code(err); c != codes.Canceled { - t.Error(err) + for i := 0; i < numItems; i++ { + select { + case r := <-received: + if got, want := r, fmt.Sprintf("item-%d", i); got != want { + t.Errorf("%d: got %s, want %s", i, got, want) + } + case <-time.After(30 * time.Second): + t.Errorf("timed out after 30s waiting for item %d", i) + cancel() } } + cancel() }() - for i := 0; i < numItems; i++ { - select { - case r := <-received: - if got, want := r, fmt.Sprintf("item-%d", i); got != want { - t.Fatalf("%d: got %s, want %s", i, got, want) - } - case <-time.After(30 * time.Second): - t.Fatalf("timed out after 30s waiting for item %d", i) + if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { + defer msg.Ack() + if msg.OrderingKey != orderingKey { + t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey) + } + + received <- string(msg.Data) + }); err != nil { + if c := status.Code(err); c != codes.Canceled { + t.Error(err) } } } @@ -1445,6 +1448,91 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { } } +func TestIntegration_OrderingWithExactlyOnce(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) + defer client.Close() + + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("topic %v should exist, but it doesn't", topic) + } + var sub *Subscription + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ + Topic: topic, + EnableMessageOrdering: true, + EnableExactlyOnceDelivery: true, + }); err != nil { + t.Fatal(err) + } + defer sub.Delete(ctx) + exists, err = sub.Exists(ctx) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("subscription %s should exist, but it doesn't", sub.ID()) + } + + topic.PublishSettings.DelayThreshold = time.Second + topic.EnableMessageOrdering = true + + orderingKey := "some-ordering-key" + numItems := 10 + for i := 0; i < numItems; i++ { + r := topic.Publish(ctx, &Message{ + ID: fmt.Sprintf("id-%d", i), + Data: []byte(fmt.Sprintf("item-%d", i)), + OrderingKey: orderingKey, + }) + go func() { + if _, err := r.Get(ctx); err != nil { + t.Error(err) + } + }() + } + + received := make(chan string, numItems) + ctx, cancel := context.WithCancel(ctx) + go func() { + for i := 0; i < numItems; i++ { + select { + case r := <-received: + if got, want := r, fmt.Sprintf("item-%d", i); got != want { + t.Errorf("%d: got %s, want %s", i, got, want) + } + case <-time.After(30 * time.Second): + t.Errorf("timed out after 30s waiting for item %d", i) + cancel() + } + } + cancel() + }() + + if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { + defer msg.Ack() + if msg.OrderingKey != orderingKey { + t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey) + } + + received <- string(msg.Data) + }); err != nil { + if c := status.Code(err); c != codes.Canceled { + t.Error(err) + } + } + +} + func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) { t.Parallel() ctx := context.Background() From 3fee94f75bee84c35c8ad7f5270fc7e7984ccb1d Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 26 Feb 2024 13:06:27 -0800 Subject: [PATCH 3/3] remove TODO comment --- pubsub/iterator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 7ec59c086c6..ca3069a0cd8 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -260,9 +260,6 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { it.eoMu.RUnlock() it.mu.Lock() - // TODO(hongalex): fix the out of order map when appending to pendingMessages - // This makes ordering keys logic wrong - // pendingMessages maps ackID -> message, and is used // only when exactly once delivery is enabled. // At first, all messages are pending, and they