Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add support for cloud storage subscriptions #7977

Merged
merged 16 commits into from Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion pubsub/pstest/fake.go
Expand Up @@ -505,6 +505,9 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.GetBigqueryConfig() != nil && ps.GetBigqueryConfig().GetTable() != "" {
ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE
}
if ps.CloudStorageConfig != nil && ps.CloudStorageConfig.Bucket != "" {
ps.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE
}
ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
Expand Down Expand Up @@ -608,7 +611,6 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
case "bigquery_config":
// If bq config is nil here, it will be cleared.
// Otherwise, we'll consider the subscription active if any table is set.
sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig()
lahuang4 marked this conversation as resolved.
Show resolved Hide resolved
if sub.proto.GetBigqueryConfig() != nil {
if sub.proto.GetBigqueryConfig().GetTable() != "" {
sub.proto.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE
Expand All @@ -617,6 +619,14 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
}
}

case "cloud_storage_config":
sub.proto.CloudStorageConfig = req.GetSubscription().GetCloudStorageConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's some invariant here that ensures req.GetSubscription() doesn't return nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it's checked in the first lines above

// As long as the storage config is not nil, we assume it's valid
// without additional checks.
if sub.proto.GetCloudStorageConfig() != nil {
sub.proto.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE
}

case "ack_deadline_seconds":
a := req.Subscription.AckDeadlineSeconds
if err := checkAckDeadline(a); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -1530,6 +1530,7 @@ func TestStreaming_SubscriptionProperties(t *testing.T) {
}
}

// Test switching between the various subscription types: push to endpoint, bigquery, cloud storage, and pull.
func TestSubscriptionPushPull(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(ctx, t)
Expand Down Expand Up @@ -1586,6 +1587,21 @@ func TestSubscriptionPushPull(t *testing.T) {
if got.BigqueryConfig != nil {
t.Errorf("sub.BigqueryConfig should be nil, got %s", got.BigqueryConfig)
}

// Update the subscription to write to Cloud Storage.
csc := &pb.CloudStorageConfig{
Bucket: "fake-bucket",
}
updateSub.CloudStorageConfig = csc
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"cloud_storage_config"}},
})
want2 := csc
want2.State = pb.CloudStorageConfig_ACTIVE
if diff := testutil.Diff(got.CloudStorageConfig, want2); diff != "" {
t.Errorf("sub.CloudStorageConfig mismatch: %s", diff)
}
}

func TestSubscriptionMessageOrdering(t *testing.T) {
Expand Down
183 changes: 174 additions & 9 deletions pubsub/subscription.go
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
durpb "google.golang.org/protobuf/types/known/durationpb"
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"

Expand Down Expand Up @@ -280,6 +281,112 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
return pbCfg
}

// CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription.
type CloudStorageConfigState int

const (
// CloudStorageConfigStateUnspecified is the default value. This value is unused.
CloudStorageConfigStateUnspecified = iota

// CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage.
CloudStorageConfigActive

// CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors.
CloudStorageConfigPermissionDenied

// CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist.
CloudStorageConfigNotFound
)

// Configuration options for how to write the message data to Cloud Storage.
type isCloudStorageOutputFormat interface {
isCloudStorageOutputFormat()
}

// CloudStorageOutputFormatTextConfig is the configuration for writing
// message data in text format. Message payloads will be written to files
// as raw text, separated by a newline.
type CloudStorageOutputFormatTextConfig struct{}

// CloudStorageOutputFormatAvroConfig is the configuration for writing
// message data in Avro format. Message payloads and metadata will be written
// to the files as an Avro binary.
type CloudStorageOutputFormatAvroConfig struct {
// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key as additional fields in the output.
WriteMetadata bool
}

func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {}

func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {}

// CloudStorageConfig configures the subscription to deliver to Cloud Storage.
type CloudStorageConfig struct {
// User-provided name for the Cloud Storage bucket.
// The bucket must be created by the user. The bucket name must be without
// any prefix like "gs://". See the [bucket naming
// requirements] (https://cloud.google.com/storage/docs/buckets#naming).
Bucket string

// User-provided prefix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenamePrefix string

// User-provided suffix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenameSuffix string

// Configuration for how to write message data. Options are
// CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig.
// Defaults to text format.
OutputFormat isCloudStorageOutputFormat

// The maximum duration that can elapse before a new Cloud Storage file is
// created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed
// the subscription's acknowledgement deadline.
MaxDuration optional.Duration

// The maximum bytes that can be written to a Cloud Storage file before a new
// file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded
// in cases where messages are larger than the limit.
MaxBytes int64

// Output only. An output-only field that indicates whether or not the
// subscription can receive messages.
State CloudStorageConfigState
}

func (bc *CloudStorageConfig) toProto() *pb.CloudStorageConfig {
if bc == nil {
return nil
}
var dur *durationpb.Duration
if bc.MaxDuration != nil {
dur = durationpb.New(optional.ToDuration(bc.MaxDuration))
}
pbCfg := &pb.CloudStorageConfig{
Bucket: bc.Bucket,
FilenamePrefix: bc.FilenamePrefix,
FilenameSuffix: bc.FilenameSuffix,
MaxDuration: dur,
MaxBytes: bc.MaxBytes,
State: pb.CloudStorageConfig_State(bc.State),
}
if out := bc.OutputFormat; out != nil {
if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{}
} else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{
AvroConfig: &pb.CloudStorageConfig_AvroConfig{
WriteMetadata: cfg.WriteMetadata,
},
}
}
}
return pbCfg
}

// SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int

Expand All @@ -296,7 +403,9 @@ const (
SubscriptionStateResourceError
)

// SubscriptionConfig describes the configuration of a subscription.
// SubscriptionConfig describes the configuration of a subscription. If none of
// PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will
// pull and ack messages using API methods. At most one of these fields may be set.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
Expand All @@ -305,17 +414,23 @@ type SubscriptionConfig struct {
Topic *Topic

// If push delivery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// and `CloudStorageConfig` can be set. If all are empty, then the
hongalex marked this conversation as resolved.
Show resolved Hide resolved
// subscriber will pull and ack messages using API methods.
PushConfig PushConfig

// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// and `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
BigQueryConfig BigQueryConfig

// If delivery to Cloud Storage is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// and `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
CloudStorageConfig CloudStorageConfig

// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
Expand Down Expand Up @@ -442,6 +557,12 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
if cfg.BigQueryConfig.Table != "" {
pbBigQueryConfig = cfg.BigQueryConfig.toProto()
}
var pbCloudStorageConfig *pb.CloudStorageConfig
// Use the bucket as sentinel value here. If it is blank,
// that's equivalent to clearing the config and reverting to pull.
if cfg.CloudStorageConfig.Bucket != "" {
pbCloudStorageConfig = cfg.CloudStorageConfig.toProto()
}
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
Expand All @@ -459,6 +580,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
CloudStorageConfig: pbCloudStorageConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Expand Down Expand Up @@ -507,6 +629,9 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil {
subC.CloudStorageConfig = *cs
}
return subC, nil
}

Expand Down Expand Up @@ -543,6 +668,31 @@ func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
return bq
}

func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig {
if pbCSC == nil {
return nil
}

csc := &CloudStorageConfig{
Bucket: pbCSC.GetBucket(),
FilenamePrefix: pbCSC.GetFilenamePrefix(),
FilenameSuffix: pbCSC.GetFilenameSuffix(),
MaxBytes: pbCSC.GetMaxBytes(),
State: CloudStorageConfigState(pbCSC.GetState()),
}
if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 {
csc.MaxDuration = dur
}
if out := pbCSC.OutputFormat; out != nil {
if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatTextConfig{}
} else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()}
}
}
return csc
}

// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
Expand Down Expand Up @@ -786,14 +936,21 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {

// SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct {
// If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig.
// If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
PushConfig *PushConfig

// If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig.
// If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
BigQueryConfig *BigQueryConfig

// If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription,
lahuang4 marked this conversation as resolved.
Show resolved Hide resolved
CloudStorageConfig *CloudStorageConfig

// If non-zero, the ack deadline is changed.
AckDeadline time.Duration

Expand Down Expand Up @@ -855,6 +1012,14 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update
psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
lahuang4 marked this conversation as resolved.
Show resolved Hide resolved
paths = append(paths, "bigquery_config")
}
if cfg.CloudStorageConfig != nil {
if cfg.CloudStorageConfig.Bucket == "" {
psub.CloudStorageConfig = nil
} else {
psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto()
}
paths = append(paths, "cloud_storage_config")
}
if cfg.AckDeadline != 0 {
psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
paths = append(paths, "ack_deadline_seconds")
Expand Down
54 changes: 54 additions & 0 deletions pubsub/subscription_test.go
Expand Up @@ -497,6 +497,60 @@ func TestBigQuerySubscription(t *testing.T) {
}
}

func TestCloudStorageSubscription(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "t")
bucket := "fake-bucket"
csCfg := CloudStorageConfig{
Bucket: bucket,
FilenamePrefix: "some-prefix",
FilenameSuffix: "some-suffix",
OutputFormat: &CloudStorageOutputFormatAvroConfig{
lahuang4 marked this conversation as resolved.
Show resolved Hide resolved
WriteMetadata: true,
},
MaxDuration: 10 * time.Minute,
MaxBytes: 10e5,
}

subConfig := SubscriptionConfig{
Topic: topic,
CloudStorageConfig: csCfg,
}
csSub, err := client.CreateSubscription(ctx, "s", subConfig)
if err != nil {
t.Fatal(err)
}
cfg, err := csSub.Config(ctx)
if err != nil {
t.Fatal(err)
}

want := csCfg
want.State = CloudStorageConfigActive
if diff := testutil.Diff(cfg.CloudStorageConfig, want); diff != "" {
t.Fatalf("create cloud storage subscription mismatch: \n%s", diff)
}

// Test resetting to a pull based subscription.
cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
CloudStorageConfig: &CloudStorageConfig{},
})
if err != nil {
t.Fatal(err)
}
got := cfg.CloudStorageConfig
want = CloudStorageConfig{}
if diff := testutil.Diff(got, want); diff != "" {
t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff)
}

hongalex marked this conversation as resolved.
Show resolved Hide resolved
}

func TestExactlyOnceDelivery_AckSuccess(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
Expand Down