Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support exactly once delivery #6506

Merged
merged 31 commits into from Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bba894a
feat(pubsub): prepare iterator for exactly once (#6040)
hongalex May 17, 2022
4abfa9b
resolve merge conflict
hongalex Jun 7, 2022
e0c7032
Merge branch 'main' of ssh://github.com/googleapis/google-cloud-go in…
hongalex Jun 14, 2022
cef6e01
feat(pubsub): send stream ack deadline seconds on exactly once change…
hongalex Jun 14, 2022
c636f5f
feat(pubsub): add AckWithResult and NackWithResult to message (#6201)
hongalex Jun 29, 2022
aa6da2e
feat(pubsub): add helper method for parsing ErrorInfos (#6281)
hongalex Jul 8, 2022
c2de57c
feat(pubsub): complete AckResult for exactly once (#6387)
hongalex Aug 1, 2022
6eb2c9c
resolve merge with main
hongalex Aug 10, 2022
137bc7d
Merge branch 'main' of ssh://github.com/googleapis/google-cloud-go in…
hongalex Aug 11, 2022
4b80b1f
feat(pubsub): retry temporary failures for ack/modacks (#6485)
hongalex Aug 11, 2022
ee0bd5e
Merge branch 'pubsub-exactly-once' of ssh://github.com/googleapis/goo…
hongalex Aug 11, 2022
eedb852
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 11, 2022
18aaabf
remove transient invalid ack id error string
hongalex Aug 12, 2022
346d154
reduce number of mutex locks
hongalex Aug 12, 2022
c30568b
Merge branch 'pubsub-exactly-once' of github.com:hongalex/google-clou…
hongalex Aug 12, 2022
c1e50f8
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 12, 2022
e729543
pass in StreamAckDeadline seconds for streaming pull requests in fake…
hongalex Aug 13, 2022
c81c266
Merge branch 'pubsub-exactly-once' of github.com:hongalex/google-clou…
hongalex Aug 13, 2022
b637e09
fix lint issues
hongalex Aug 13, 2022
f474b62
add changes to internal/pubsub/message
hongalex Aug 13, 2022
a8feee0
implement default ack handler functions in lite
hongalex Aug 13, 2022
550c484
use pubsub package ack result
hongalex Aug 13, 2022
f2302aa
use pinned library for pubsublite
hongalex Aug 14, 2022
611e88a
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 15, 2022
27590cb
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 16, 2022
df69166
resolve all lite Ack/NackWithResult to success
hongalex Aug 16, 2022
04b9b16
Merge branch 'pubsub-exactly-once' of github.com:hongalex/google-clou…
hongalex Aug 16, 2022
0230bd5
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 18, 2022
25131f0
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 19, 2022
ad04389
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 19, 2022
71c20a7
Merge branch 'main' into pubsub-exactly-once
hongalex Aug 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
124 changes: 124 additions & 0 deletions internal/pubsub/message.go
Expand Up @@ -14,6 +14,7 @@
package pubsub

import (
"context"
"time"
)

Expand All @@ -24,6 +25,14 @@ type AckHandler interface {

// OnNack processes a message nack.
OnNack()

// OnAckWithResult processes a message ack and returns
// a result that shows if it succeeded.
OnAckWithResult() *AckResult

// OnNackWithResult processes a message nack and returns
// a result that shows if it succeeded.
OnNackWithResult() *AckResult
}

// Message represents a Pub/Sub message.
Expand Down Expand Up @@ -85,6 +94,121 @@ func (m *Message) Nack() {
}
}

// AcknowledgeStatus represents the status of an Ack or Nack request.
type AcknowledgeStatus int

const (
// AcknowledgeStatusSuccess indicates the request was a success.
AcknowledgeStatusSuccess AcknowledgeStatus = iota
// AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions.
AcknowledgeStatusPermissionDenied
// AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error.
AcknowledgeStatusFailedPrecondition
// AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid.
AcknowledgeStatusInvalidAckID
// AcknowledgeStatusOther indicates another unknown error was returned.
AcknowledgeStatusOther
)

// AckResult holds the result from a call to Ack or Nack.
type AckResult struct {
ready chan struct{}
res AcknowledgeStatus
err error
}

// Ready returns a channel that is closed when the result is ready.
// When the Ready channel is closed, Get is guaranteed not to block.
func (r *AckResult) Ready() <-chan struct{} { return r.ready }

// Get returns the status and/or error result of a Ack, Nack, or Modack call.
// Get blocks until the Ack/Nack completes or the context is done.
func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) {
// If the result is already ready, return it even if the context is done.
select {
case <-r.Ready():
return r.res, r.err
default:
}
select {
case <-ctx.Done():
// Explicitly return AcknowledgeStatusOther for context cancelled cases,
// since the default is success.
return AcknowledgeStatusOther, ctx.Err()
case <-r.Ready():
return r.res, r.err
}
}

// NewAckResult creates a AckResult.
func NewAckResult() *AckResult {
return &AckResult{
ready: make(chan struct{}),
}
}

// SetAckResult sets the ack response and error for a ack result and closes
// the Ready channel. Any call after the first for the same AckResult
// is a no-op.
func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) {
select {
case <-r.Ready():
return
default:
r.res = res
r.err = err
close(r.ready)
}
}

// AckWithResult acknowledges a message in Pub/Sub and it will not be
// delivered to this subscription again.
//
// You should avoid acknowledging messages until you have
// *finished* processing them, so that in the event of a failure,
// you receive the message again.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of acknowledgement
// operation. If the operation completes successfully, the message is
// guaranteed NOT to be re-delivered. Otherwise, the result will
// contain an error with more details about the failure and the
// message may be re-delivered.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
// Since acks in Cloud Pub/Sub are best effort when exactly-once
// delivery is disabled, the message may be re-delivered. Because
// re-deliveries are possible, you should ensure that your processing
// code is idempotent, as you may receive any given message more than
// once.
func (m *Message) AckWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnAckWithResult()
}
return nil
}

// NackWithResult declines to acknowledge the message which indicates that
// the client will not or cannot process a Message. This will cause the message
// to be re-delivered to subscribers. Re-deliveries may take place immediately
// or after a delay.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of nack
// operation. If the operation completes successfully, the result will
// contain AckResponse.Success. Otherwise, the result will contain an error
// with more details about the failure.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
func (m *Message) NackWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnNackWithResult()
}
return nil
}

// NewMessage creates a message with an AckHandler implementation, which should
// not be nil.
func NewMessage(ackh AckHandler) *Message {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/go.mod
Expand Up @@ -3,7 +3,7 @@ module cloud.google.com/go/pubsub
go 1.17

require (
cloud.google.com/go v0.102.1
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e
cloud.google.com/go/iam v0.3.0
cloud.google.com/go/kms v1.4.0
github.com/golang/protobuf v1.5.2
Expand Down
5 changes: 3 additions & 2 deletions pubsub/go.sum
Expand Up @@ -29,8 +29,8 @@ cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2Z
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0=
cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e h1:GZ9rHNbN2TY+p6/dTeU0EADYrOc3BCqy/KwGPZHLsdA=
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e/go.mod h1:mqs3bFXrt/gPc6aOZpchX8DEdQhuJluA/7LZNutd2Nc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
Expand Down Expand Up @@ -594,6 +594,7 @@ google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
Expand Down