diff --git a/.github/workflows/vet.sh b/.github/workflows/vet.sh index d317684d0979..362ec936642b 100755 --- a/.github/workflows/vet.sh +++ b/.github/workflows/vet.sh @@ -55,6 +55,7 @@ golint ./... 2>&1 | ( grep -vE " executeStreamingSql(Min|Rnd)Time" | grep -vE " executeSql(Min|Rnd)Time" | grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" | + grep -vE "pubsub\/subscription\.go.+ type name will be used as pubsub.PubsubWrapper by other packages" | grep -v "ClusterId" | grep -v "InstanceId" | grep -v "firestore.arrayUnion" | diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index c8b6587dfbe6..ff5c2b84e8d0 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -498,6 +498,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p } if ps.PushConfig == nil { ps.PushConfig = &pb.PushConfig{} + } else if ps.PushConfig.Wrapper == nil { + // Wrapper should default to PubsubWrapper. + ps.PushConfig.Wrapper = &pb.PushConfig_PubsubWrapper_{ + PubsubWrapper: &pb.PushConfig_PubsubWrapper{}, + } } // Consider any table set to mean the config is active. // We don't convert nil config to empty like with PushConfig above diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 1254ca0d14dd..27afb07b47cb 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1543,6 +1543,9 @@ func TestSubscriptionPushPull(t *testing.T) { // Create a push subscription. pc := &pb.PushConfig{ PushEndpoint: "some-endpoint", + Wrapper: &pb.PushConfig_PubsubWrapper_{ + PubsubWrapper: &pb.PushConfig_PubsubWrapper{}, + }, } got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{ AckDeadlineSeconds: minAckDeadlineSecs, diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 4a9f09bd83bf..b2cf82e00a6c 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -148,6 +148,10 @@ type PushConfig struct { // This field is optional and should be set only by users interested in // authenticated push. AuthenticationMethod AuthenticationMethod + + // The format of the delivered message to the push endpoint is defined by + // the chosen wrapper. When unset, `PubsubWrapper` is used. + Wrapper Wrapper } func (pc *PushConfig) toProto() *pb.PushConfig { @@ -165,12 +169,19 @@ func (pc *PushConfig) toProto() *pb.PushConfig { default: // TODO: add others here when GAIC adds more definitions. } } + if w := pc.Wrapper; w != nil { + switch wt := w.(type) { + case *PubsubWrapper: + pbCfg.Wrapper = wt.toProto() + case *NoWrapper: + pbCfg.Wrapper = wt.toProto() + default: + } + } return pbCfg } -// AuthenticationMethod is used by push points to verify the source of push requests. -// This interface defines fields that are part of a closed alpha that may not be accessible -// to all users. +// AuthenticationMethod is used by push subscriptions to verify the source of push requests. type AuthenticationMethod interface { isAuthMethod() bool } @@ -212,6 +223,49 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { } } +// Wrapper defines the format of message delivered to push endpoints. +type Wrapper interface { + isWrapper() bool +} + +// PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON +// representation of a PubsubMessage +// (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage). +type PubsubWrapper struct{} + +var _ Wrapper = (*PubsubWrapper)(nil) + +func (p *PubsubWrapper) isWrapper() bool { return true } + +func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ { + if p == nil { + return nil + } + return &pb.PushConfig_PubsubWrapper_{ + PubsubWrapper: &pb.PushConfig_PubsubWrapper{}, + } +} + +// NoWrapper denotes not wrapping the payload sent to the push endpoint. +type NoWrapper struct { + WriteMetadata bool +} + +var _ Wrapper = (*NoWrapper)(nil) + +func (n *NoWrapper) isWrapper() bool { return true } + +func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ { + if n == nil { + return nil + } + return &pb.PushConfig_NoWrapper_{ + NoWrapper: &pb.PushConfig_NoWrapper{ + WriteMetadata: n.WriteMetadata, + }, + } +} + // BigQueryConfigState denotes the possible states for a BigQuery Subscription. type BigQueryConfigState int @@ -648,6 +702,16 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { } } } + if w := pbPc.Wrapper; w != nil { + switch wt := w.(type) { + case *pb.PushConfig_PubsubWrapper_: + pc.Wrapper = &PubsubWrapper{} + case *pb.PushConfig_NoWrapper_: + pc.Wrapper = &NoWrapper{ + WriteMetadata: wt.NoWrapper.WriteMetadata, + } + } + } return pc } diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index f10996d28db9..5c615d504082 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -154,7 +154,7 @@ func TestListTopicSubscriptions(t *testing.T) { const defaultRetentionDuration = 168 * time.Hour -func TestUpdateSubscription(t *testing.T) { +func TestSubscriptionConfig(t *testing.T) { ctx := context.Background() client, srv := newFake(t) defer client.Close() @@ -191,13 +191,14 @@ func TestUpdateSubscription(t *testing.T) { ServiceAccountEmail: "foo@example.com", Audience: "client-12345", }, + Wrapper: &PubsubWrapper{}, }, EnableExactlyOnceDelivery: false, State: SubscriptionStateActive, } opt := cmpopts.IgnoreUnexported(SubscriptionConfig{}) - if !testutil.Equal(cfg, want, opt) { - t.Fatalf("\ngot %+v\nwant %+v", cfg, want) + if diff := testutil.Diff(cfg, want, opt); diff != "" { + t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff) } got, err := sub.Update(ctx, SubscriptionConfigToUpdate{ @@ -206,10 +207,13 @@ func TestUpdateSubscription(t *testing.T) { Labels: map[string]string{"label": "value"}, ExpirationPolicy: 72 * time.Hour, PushConfig: &PushConfig{ - Endpoint: "https://example.com/push", + Endpoint: "https://example2.com/push", AuthenticationMethod: &OIDCToken{ - ServiceAccountEmail: "foo@example.com", - Audience: "client-12345", + ServiceAccountEmail: "bar@example.com", + Audience: "client-98765", + }, + Wrapper: &NoWrapper{ + WriteMetadata: true, }, }, EnableExactlyOnceDelivery: true, @@ -225,17 +229,20 @@ func TestUpdateSubscription(t *testing.T) { Labels: map[string]string{"label": "value"}, ExpirationPolicy: 72 * time.Hour, PushConfig: PushConfig{ - Endpoint: "https://example.com/push", + Endpoint: "https://example2.com/push", AuthenticationMethod: &OIDCToken{ - ServiceAccountEmail: "foo@example.com", - Audience: "client-12345", + ServiceAccountEmail: "bar@example.com", + Audience: "client-98765", + }, + Wrapper: &NoWrapper{ + WriteMetadata: true, }, }, EnableExactlyOnceDelivery: true, State: SubscriptionStateActive, } - if !testutil.Equal(got, want, opt) { - t.Fatalf("\ngot %+v\nwant %+v", got, want) + if diff := testutil.Diff(got, want, opt); diff != "" { + t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff) } got, err = sub.Update(ctx, SubscriptionConfigToUpdate{ @@ -247,8 +254,8 @@ func TestUpdateSubscription(t *testing.T) { } want.RetentionDuration = 2 * time.Hour want.Labels = nil - if !testutil.Equal(got, want, opt) { - t.Fatalf("\ngot %+v\nwant %+v", got, want) + if diff := testutil.Diff(got, want, opt); diff != "" { + t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff) } _, err = sub.Update(ctx, SubscriptionConfigToUpdate{}) @@ -264,8 +271,8 @@ func TestUpdateSubscription(t *testing.T) { t.Fatal(err) } want.ExpirationPolicy = time.Duration(0) - if !testutil.Equal(got, want, opt) { - t.Fatalf("\ngot %+v\nwant %+v", got, want) + if diff := testutil.Diff(got, want, opt); diff != "" { + t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff) } }