Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support exactly once delivery #6506

Merged
merged 31 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bba894a
feat(pubsub): prepare iterator for exactly once (#6040)
hongalex May 17, 2022
4abfa9b
resolve merge conflict
hongalex Jun 7, 2022
e0c7032
Merge branch 'main' of ssh://github.com/googleapis/google-cloud-go in…
hongalex Jun 14, 2022
cef6e01
feat(pubsub): send stream ack deadline seconds on exactly once change…
hongalex Jun 14, 2022
c636f5f
feat(pubsub): add AckWithResult and NackWithResult to message (#6201)
hongalex Jun 29, 2022
aa6da2e
feat(pubsub): add helper method for parsing ErrorInfos (#6281)
hongalex Jul 8, 2022
c2de57c
feat(pubsub): complete AckResult for exactly once (#6387)
hongalex Aug 1, 2022
6eb2c9c
resolve merge with main
hongalex Aug 10, 2022
137bc7d
Merge branch 'main' of ssh://github.com/googleapis/google-cloud-go in…
hongalex Aug 11, 2022
4b80b1f
feat(pubsub): retry temporary failures for ack/modacks (#6485)
hongalex Aug 11, 2022
ee0bd5e
Merge branch 'pubsub-exactly-once' of ssh://github.com/googleapis/goo…
hongalex Aug 11, 2022
eedb852
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 11, 2022
18aaabf
remove transient invalid ack id error string
hongalex Aug 12, 2022
346d154
reduce number of mutex locks
hongalex Aug 12, 2022
c30568b
Merge branch 'pubsub-exactly-once' of github.com:hongalex/google-clou…
hongalex Aug 12, 2022
c1e50f8
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 12, 2022
e729543
pass in StreamAckDeadline seconds for streaming pull requests in fake…
hongalex Aug 13, 2022
c81c266
Merge branch 'pubsub-exactly-once' of github.com:hongalex/google-clou…
hongalex Aug 13, 2022
b637e09
fix lint issues
hongalex Aug 13, 2022
f474b62
add changes to internal/pubsub/message
hongalex Aug 13, 2022
a8feee0
implement default ack handler functions in lite
hongalex Aug 13, 2022
550c484
use pubsub package ack result
hongalex Aug 13, 2022
f2302aa
use pinned library for pubsublite
hongalex Aug 14, 2022
611e88a
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 15, 2022
27590cb
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 16, 2022
df69166
resolve all lite Ack/NackWithResult to success
hongalex Aug 16, 2022
04b9b16
Merge branch 'pubsub-exactly-once' of github.com:hongalex/google-clou…
hongalex Aug 16, 2022
0230bd5
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 18, 2022
25131f0
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 19, 2022
ad04389
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 19, 2022
71c20a7
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module cloud.google.com/go/pubsub
go 1.17

require (
cloud.google.com/go v0.102.1
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e
cloud.google.com/go/iam v0.3.0
cloud.google.com/go/kms v1.4.0
github.com/golang/protobuf v1.5.2
Expand Down
5 changes: 3 additions & 2 deletions pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2Z
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0=
cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e h1:GZ9rHNbN2TY+p6/dTeU0EADYrOc3BCqy/KwGPZHLsdA=
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e/go.mod h1:mqs3bFXrt/gPc6aOZpchX8DEdQhuJluA/7LZNutd2Nc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
Expand Down Expand Up @@ -593,6 +593,7 @@ google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad h1:kqrS+lhvaMHCxul6sKQvKJ8nAAhlVItmZV822hYFH/U=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
Expand Down
201 changes: 112 additions & 89 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,6 @@ func TestIntegration_All(t *testing.T) {
client := integrationTestClient(ctx, t)
defer client.Close()

for _, sync := range []bool{false, true} {
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
testPublishAndReceive(t, client, maxMsgs, sync, 10, 0)
}

// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, sync, 1, 5*1024*1024)
}

topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
Expand Down Expand Up @@ -228,6 +219,20 @@ func TestIntegration_All(t *testing.T) {
}
}

func TestPublishReceive(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t)

for _, sync := range []bool{false, true} {
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
testPublishAndReceive(t, client, maxMsgs, sync, false, 10, 0)
}

// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, sync, false, 1, 5*1024*1024)
}
}

// withGoogleClientInfo sets the name and version of the application in
// the `x-goog-api-client` header passed on each request and returns the
// updated context.
Expand All @@ -246,94 +251,100 @@ func withGoogleClientInfo(ctx context.Context) context.Context {
return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
}

func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) {
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatalf("TopicExists error: %v", err)
}
if !exists {
t.Errorf("topic %v should exist, but it doesn't", topic)
}
func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
t.Parallel()
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatalf("TopicExists error: %v", err)
}
if !exists {
t.Errorf("topic %v should exist, but it doesn't", topic)
}

var sub *Subscription
if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
t.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatalf("SubExists error: %v", err)
}
if !exists {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: exactlyOnceDelivery,
})
}

// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
t.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatalf("SubExists error: %v", err)
}
if !exists {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous
// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= timeout {
t.Fatal("pullN took too long")
sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= timeout {
t.Fatal("pullN took too long")
}
} else {
t.Fatalf("Pull: %v", err)
}
} else {
t.Fatalf("Pull: %v", err)
}
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
})
}

// IAM tests.
Expand Down Expand Up @@ -1979,3 +1990,15 @@ func TestIntegration_TopicRetention(t *testing.T) {
t.Fatalf("expected cleared retention duration, got: %v", got)
}
}

func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t)

for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
}

// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
}