diff --git a/attributes/attributes.go b/attributes/attributes.go index 02f5dc53189..3efca459149 100644 --- a/attributes/attributes.go +++ b/attributes/attributes.go @@ -25,6 +25,11 @@ // later release. package attributes +import ( + "fmt" + "strings" +) + // Attributes is an immutable struct for storing and retrieving generic // key/value pairs. Keys must be hashable, and users should define their own // types for keys. Values should not be modified after they are added to an @@ -99,3 +104,27 @@ func (a *Attributes) Equal(o *Attributes) bool { } return true } + +// String prints the attribute map. If any key or values throughout the map +// implement fmt.Stringer, it calls that method and appends. +func (a *Attributes) String() string { + var sb strings.Builder + sb.WriteString("{") + first := true + for k, v := range a.m { + var key, val string + if str, ok := k.(interface{ String() string }); ok { + key = str.String() + } + if str, ok := v.(interface{ String() string }); ok { + val = str.String() + } + if !first { + sb.WriteString(", ") + } + sb.WriteString(fmt.Sprintf("%q: %q, ", key, val)) + first = false + } + sb.WriteString("}") + return sb.String() +} diff --git a/balancer/weightedroundrobin/weightedroundrobin.go b/balancer/weightedroundrobin/weightedroundrobin.go index 6fc4d1910e6..bb8de05d530 100644 --- a/balancer/weightedroundrobin/weightedroundrobin.go +++ b/balancer/weightedroundrobin/weightedroundrobin.go @@ -20,6 +20,8 @@ package weightedroundrobin import ( + "fmt" + "google.golang.org/grpc/resolver" ) @@ -66,3 +68,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo { ai, _ := v.(AddrInfo) return ai } + +func (a AddrInfo) String() string { + return fmt.Sprintf("Weight: %d", a.Weight) +} diff --git a/balancer/weightedtarget/weightedaggregator/aggregator.go b/balancer/weightedtarget/weightedaggregator/aggregator.go index 37fc41c1688..27279257ed1 100644 --- a/balancer/weightedtarget/weightedaggregator/aggregator.go +++ b/balancer/weightedtarget/weightedaggregator/aggregator.go @@ -178,6 +178,14 @@ func (wbsa *Aggregator) ResumeStateUpdates() { } } +// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a +// picker update be sent once ResumeStateUpdates is called. +func (wbsa *Aggregator) NeedUpdateStateOnResume() { + wbsa.mu.Lock() + defer wbsa.mu.Unlock() + wbsa.needUpdateStateOnResume = true +} + // UpdateState is called to report a balancer state change from sub-balancer. // It's usually called by the balancer group. // diff --git a/balancer/weightedtarget/weightedtarget.go b/balancer/weightedtarget/weightedtarget.go index 83bb7d701f1..3d5acdab6af 100644 --- a/balancer/weightedtarget/weightedtarget.go +++ b/balancer/weightedtarget/weightedtarget.go @@ -143,6 +143,18 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat b.targets = newConfig.Targets + // If the targets length is zero, it means we have removed all child + // policies from the balancer group and aggregator. + // At the start of this UpdateClientConnState() operation, a call to + // b.stateAggregator.ResumeStateUpdates() is deferred. Thus, setting the + // needUpdateStateOnResume bool to true here will ensure a new picker is + // built as part of that deferred function. Since there are now no child + // policies, the aggregated connectivity state reported form the Aggregator + // will be TRANSIENT_FAILURE. + if len(b.targets) == 0 { + b.stateAggregator.NeedUpdateStateOnResume() + } + return nil } diff --git a/balancer/weightedtarget/weightedtarget_test.go b/balancer/weightedtarget/weightedtarget_test.go index a20cb0dc1ce..5658f302a49 100644 --- a/balancer/weightedtarget/weightedtarget_test.go +++ b/balancer/weightedtarget/weightedtarget_test.go @@ -166,7 +166,8 @@ func init() { // TestWeightedTarget covers the cases that a sub-balancer is added and a // sub-balancer is removed. It verifies that the addresses and balancer configs // are forwarded to the right sub-balancer. This test is intended to test the -// glue code in weighted_target. +// glue code in weighted_target. It also tests an empty target config update, +// which should trigger a transient failure state update. func (s) TestWeightedTarget(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) @@ -306,6 +307,24 @@ func (s) TestWeightedTarget(t *testing.T) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3) } } + // Update the Weighted Target Balancer with an empty address list and no + // targets. This should cause a Transient Failure State update to the Client + // Conn. + emptyConfig, err := wtbParser.ParseConfig([]byte(`{}`)) + if err != nil { + t.Fatalf("Failed to parse balancer config: %v", err) + } + if err := wtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{}, + BalancerConfig: emptyConfig, + }); err != nil { + t.Fatalf("Failed to update ClientConn state: %v", err) + } + + state := <-cc.NewStateCh + if state != connectivity.TransientFailure { + t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state) + } } // TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index b38d27b2496..ff2a5d43398 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -524,6 +524,14 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster { return cluster } +// LocalityOptions contains options to configure a Locality. +type LocalityOptions struct { + // Ports is a set of ports on "localhost" belonging to this locality. + Ports []uint32 + // Weight is the weight of the locality, used for load balancing. + Weight uint32 +} + // EndpointOptions contains options to configure an Endpoint (or // ClusterLoadAssignment) resource. type EndpointOptions struct { @@ -533,9 +541,8 @@ type EndpointOptions struct { // Host is the hostname of the endpoints. In our e2e tests, hostname must // always be "localhost". Host string - // Ports is a set of ports on "localhost" where the endpoints corresponding - // to this resource reside. - Ports []uint32 + // Localities is a set of localities belonging to this resource. + Localities []LocalityOptions // DropPercents is a map from drop category to a drop percentage. If unset, // no drops are configured. DropPercents map[string]int @@ -546,34 +553,50 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin return EndpointResourceWithOptions(EndpointOptions{ ClusterName: clusterName, Host: host, - Ports: ports, + Localities: []LocalityOptions{ + { + Ports: ports, + Weight: 1, + }, + }, }) } // EndpointResourceWithOptions returns an xds Endpoint resource configured with // the provided options. func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment { - var lbEndpoints []*v3endpointpb.LbEndpoint - for _, port := range opts.Ports { - lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{ - HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{ - Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ - SocketAddress: &v3corepb.SocketAddress{ - Protocol: v3corepb.SocketAddress_TCP, - Address: opts.Host, - PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, + var endpoints []*v3endpointpb.LocalityLbEndpoints + for i, locality := range opts.Localities { + var lbEndpoints []*v3endpointpb.LbEndpoint + for _, port := range locality.Ports { + lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{ + HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{ + Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ + SocketAddress: &v3corepb.SocketAddress{ + Protocol: v3corepb.SocketAddress_TCP, + Address: opts.Host, + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, + }}, }}, - }}, + LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1}, + }) + } + + endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{ + Locality: &v3corepb.Locality{ + Region: fmt.Sprintf("region-%d", i+1), + Zone: fmt.Sprintf("zone-%d", i+1), + SubZone: fmt.Sprintf("subzone-%d", i+1), + }, + LbEndpoints: lbEndpoints, + LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight}, + Priority: 0, }) } + cla := &v3endpointpb.ClusterLoadAssignment{ ClusterName: opts.ClusterName, - Endpoints: []*v3endpointpb.LocalityLbEndpoints{{ - Locality: &v3corepb.Locality{SubZone: "subzone"}, - LbEndpoints: lbEndpoints, - LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1}, - Priority: 0, - }}, + Endpoints: endpoints, } var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload diff --git a/resolver/resolver.go b/resolver/resolver.go index 6215e5ef2b0..353c10b69a5 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -22,13 +22,13 @@ package resolver import ( "context" + "fmt" "net" "net/url" "strings" "google.golang.org/grpc/attributes" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/serviceconfig" ) @@ -124,7 +124,7 @@ type Address struct { Attributes *attributes.Attributes // BalancerAttributes contains arbitrary data about this address intended - // for consumption by the LB policy. These attribes do not affect SubConn + // for consumption by the LB policy. These attributes do not affect SubConn // creation, connection establishment, handshaking, etc. BalancerAttributes *attributes.Attributes @@ -151,7 +151,17 @@ func (a Address) Equal(o Address) bool { // String returns JSON formatted string representation of the address. func (a Address) String() string { - return pretty.ToJSON(a) + var sb strings.Builder + sb.WriteString(fmt.Sprintf("{Addr: %q, ", a.Addr)) + sb.WriteString(fmt.Sprintf("ServerName: %q, ", a.ServerName)) + if a.Attributes != nil { + sb.WriteString(fmt.Sprintf("Attributes: %v, ", a.Attributes.String())) + } + if a.BalancerAttributes != nil { + sb.WriteString(fmt.Sprintf("BalancerAttributes: %v", a.BalancerAttributes.String())) + } + sb.WriteString("}") + return sb.String() } // BuildOptions includes additional information for the builder to create diff --git a/test/xds/xds_client_custom_lb_test.go b/test/xds/xds_client_custom_lb_test.go new file mode 100644 index 00000000000..91ec874c64a --- /dev/null +++ b/test/xds/xds_client_custom_lb_test.go @@ -0,0 +1,231 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "testing" + + v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3" + v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3" + "github.com/golang/protobuf/proto" + structpb "github.com/golang/protobuf/ptypes/struct" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" +) + +// wrrLocality is a helper that takes a proto message and returns a +// WrrLocalityProto with the proto message marshaled into a proto.Any as a +// child. +func wrrLocality(m proto.Message) *v3wrrlocalitypb.WrrLocality { + return &v3wrrlocalitypb.WrrLocality{ + EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(m), + }, + }, + }, + }, + } +} + +// clusterWithLBConfiguration returns a cluster resource with the proto message +// passed Marshaled to an any and specified through the load_balancing_policy +// field. +func clusterWithLBConfiguration(clusterName, edsServiceName string, secLevel e2e.SecurityLevel, m proto.Message) *v3clusterpb.Cluster { + cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel) + cluster.LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(m), + }, + }, + }, + } + return cluster +} + +// TestWRRLocality tests RPC distribution across a scenario with 5 backends, +// with 2 backends in a locality with weight 1, and 3 backends in a second +// locality with weight 2. Through xDS, the test configures a +// wrr_locality_balancer with either a round robin or custom (specifying pick +// first) child load balancing policy, and asserts the correct distribution +// based on the locality weights and the endpoint picking policy specified. +func (s) TestWrrLocality(t *testing.T) { + oldCustomLBSupport := envconfig.XDSCustomLBPolicy + envconfig.XDSCustomLBPolicy = true + defer func() { + envconfig.XDSCustomLBPolicy = oldCustomLBSupport + }() + + backend1 := stubserver.StartTestService(t, nil) + port1 := testutils.ParsePort(t, backend1.Address) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, nil) + port2 := testutils.ParsePort(t, backend2.Address) + defer backend2.Stop() + backend3 := stubserver.StartTestService(t, nil) + port3 := testutils.ParsePort(t, backend3.Address) + defer backend3.Stop() + backend4 := stubserver.StartTestService(t, nil) + port4 := testutils.ParsePort(t, backend4.Address) + defer backend4.Stop() + backend5 := stubserver.StartTestService(t, nil) + port5 := testutils.ParsePort(t, backend5.Address) + defer backend5.Stop() + const serviceName = "my-service-client-side-xds" + + tests := []struct { + name string + // Configuration will be specified through load_balancing_policy field. + wrrLocalityConfiguration *v3wrrlocalitypb.WrrLocality + addressDistributionWant []resolver.Address + }{ + { + name: "rr_child", + wrrLocalityConfiguration: wrrLocality(&v3roundrobinpb.RoundRobin{}), + // Each addresses expected probability is locality weight of + // locality / total locality weights multiplied by 1 / number of + // endpoints in each locality (due to round robin across endpoints + // in a locality). Thus, address 1 and address 2 have 1/3 * 1/2 + // probability, and addresses 3 4 5 have 2/3 * 1/3 probability of + // being routed to. + addressDistributionWant: []resolver.Address{ + {Addr: backend1.Address}, + {Addr: backend1.Address}, + {Addr: backend1.Address}, + {Addr: backend1.Address}, + {Addr: backend1.Address}, + {Addr: backend1.Address}, + {Addr: backend2.Address}, + {Addr: backend2.Address}, + {Addr: backend2.Address}, + {Addr: backend2.Address}, + {Addr: backend2.Address}, + {Addr: backend2.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend4.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + {Addr: backend5.Address}, + }, + }, + // This configures custom lb as the child of wrr_locality, which points + // to our pick_first implementation. Thus, the expected distribution of + // addresses is locality weight of locality / total locality weights as + // the probability of picking the first backend within the locality + // (e.g. Address 1 for locality 1, and Address 3 for locality 2). + { + name: "custom_lb_child_pick_first", + wrrLocalityConfiguration: wrrLocality(&v3xdsxdstypepb.TypedStruct{ + TypeUrl: "type.googleapis.com/pick_first", + Value: &structpb.Struct{}, + }), + addressDistributionWant: []resolver.Address{ + {Addr: backend1.Address}, + {Addr: backend3.Address}, + {Addr: backend3.Address}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{clusterWithLBConfiguration(clusterName, endpointsName, e2e.SecurityLevelNone, test.wrrLocalityConfiguration)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Ports: []uint32{port1, port2}, + Weight: 1, + }, + { + Ports: []uint32{port3, port4, port5}, + Weight: 2, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if err := roundrobin.CheckWeightedRoundRobinRPCs(ctx, client, test.addressDistributionWant); err != nil { + t.Fatalf("Error in expected round robin: %v", err) + } + }) + } +} diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 1e3fb4d1286..91d4a6aa866 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -38,7 +38,6 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" "google.golang.org/grpc/xds/internal/balancer/outlierdetection" - "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -394,23 +393,22 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { dms[i].OutlierDetection = outlierDetectionToConfig(cu.OutlierDetection) } } + lbCfg := &clusterresolver.LBConfig{ DiscoveryMechanisms: dms, } - // lbPolicy is set only when the policy is ringhash. The default (when it's - // not set) is roundrobin. And similarly, we only need to set XDSLBPolicy - // for ringhash (it also defaults to roundrobin). - if lbp := update.lbPolicy; lbp != nil { - lbCfg.XDSLBPolicy = &internalserviceconfig.BalancerConfig{ - Name: ringhash.Name, - Config: &ringhash.LBConfig{ - MinRingSize: lbp.MinimumRingSize, - MaxRingSize: lbp.MaximumRingSize, - }, - } + bc := &internalserviceconfig.BalancerConfig{} + if err := json.Unmarshal(update.lbPolicy, bc); err != nil { + // This will never occur, valid configuration is emitted from the xDS + // Client. Validity is already checked in the xDS Client, however, this + // double validation is present because Unmarshalling and Validating are + // coupled into one json.Unmarshal operation). We will switch this in + // the future to two separate operations. + b.logger.Errorf("Emitted lbPolicy %s from xDS Client is invalid: %v", update.lbPolicy, err) + return } - + lbCfg.XDSLBPolicy = bc ccState := balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient), BalancerConfig: lbCfg, diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index 8d7face5e0a..eb687aa70f7 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -63,6 +63,7 @@ var ( IdentityInstanceName: "default2", SubjectAltNameMatchers: testSANMatchers, }, + LBPolicy: wrrLocalityLBConfigJSON, } cdsUpdateWithMissingSecurityCfg = xdsresource.ClusterUpdate{ ClusterName: serviceName, @@ -248,8 +249,11 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -304,8 +308,11 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. No security config is // passed to the CDS balancer as part of this update. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -461,7 +468,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { t.Fatal(err) } @@ -495,7 +502,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { @@ -548,7 +555,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { @@ -564,7 +571,10 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { // an update which contains bad security config. So, we expect the CDS // balancer to forward this error to the EDS balancer and eventually the // channel needs to be put in a bad state. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { t.Fatal(err) } @@ -598,7 +608,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { @@ -675,8 +685,9 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) { RootInstanceName: "default1", SubjectAltNameMatchers: testSANMatchers, }, + LBPolicy: wrrLocalityLBConfigJSON, } - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -700,6 +711,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) { RootInstanceName: "default2", SubjectAltNameMatchers: testSANMatchers, }, + LBPolicy: wrrLocalityLBConfigJSON, } if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { t.Fatal(err) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 27b2f15b465..d69465a9627 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/xds/internal/balancer/clusterresolver" "google.golang.org/grpc/xds/internal/balancer/outlierdetection" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/balancer/wrrlocality" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" @@ -60,6 +61,20 @@ var ( noopODLBCfg = outlierdetection.LBConfig{ Interval: 1<<63 - 1, } + wrrLocalityLBConfig = &internalserviceconfig.BalancerConfig{ + Name: wrrlocality.Name, + Config: &wrrlocality.LBConfig{ + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: "round_robin", + }, + }, + } + wrrLocalityLBConfigJSON, _ = json.Marshal(wrrLocalityLBConfig) + ringHashLBConfig = &internalserviceconfig.BalancerConfig{ + Name: ringhash.Name, + Config: &ringhash.LBConfig{MinRingSize: 10, MaxRingSize: 100}, + } + ringHashLBConfigJSON, _ = json.Marshal(ringHashLBConfig) ) type s struct { @@ -381,20 +396,27 @@ func (s) TestHandleClusterUpdate(t *testing.T) { wantCCS balancer.ClientConnState }{ { - name: "happy-case-with-lrs", - cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf}, - wantCCS: edsCCS(serviceName, nil, true, nil, noopODLBCfg), + name: "happy-case-with-lrs", + cdsUpdate: xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LRSServerConfig: xdsresource.ClusterLRSServerSelf, + LBPolicy: wrrLocalityLBConfigJSON, + }, + wantCCS: edsCCS(serviceName, nil, true, wrrLocalityLBConfig, noopODLBCfg), }, { - name: "happy-case-without-lrs", - cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName}, - wantCCS: edsCCS(serviceName, nil, false, nil, noopODLBCfg), + name: "happy-case-without-lrs", + cdsUpdate: xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + }, + wantCCS: edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg), }, { name: "happy-case-with-ring-hash-lb-policy", cdsUpdate: xdsresource.ClusterUpdate{ ClusterName: serviceName, - LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100}, + LBPolicy: ringHashLBConfigJSON, }, wantCCS: edsCCS(serviceName, nil, false, &internalserviceconfig.BalancerConfig{ Name: ringhash.Name, @@ -403,21 +425,25 @@ func (s) TestHandleClusterUpdate(t *testing.T) { }, { name: "happy-case-outlier-detection", - cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, OutlierDetection: &xdsresource.OutlierDetection{ - Interval: 10 * time.Second, - BaseEjectionTime: 30 * time.Second, - MaxEjectionTime: 300 * time.Second, - MaxEjectionPercent: 10, - SuccessRateStdevFactor: 1900, - EnforcingSuccessRate: 100, - SuccessRateMinimumHosts: 5, - SuccessRateRequestVolume: 100, - FailurePercentageThreshold: 85, - EnforcingFailurePercentage: 5, - FailurePercentageMinimumHosts: 5, - FailurePercentageRequestVolume: 50, - }}, - wantCCS: edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{ + cdsUpdate: xdsresource.ClusterUpdate{ + ClusterName: serviceName, + OutlierDetection: &xdsresource.OutlierDetection{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateStdevFactor: 1900, + EnforcingSuccessRate: 100, + SuccessRateMinimumHosts: 5, + SuccessRateRequestVolume: 100, + FailurePercentageThreshold: 85, + EnforcingFailurePercentage: 5, + FailurePercentageMinimumHosts: 5, + FailurePercentageRequestVolume: 50, + }, + LBPolicy: wrrLocalityLBConfigJSON, + }, + wantCCS: edsCCS(serviceName, nil, false, wrrLocalityLBConfig, outlierdetection.LBConfig{ Interval: 10 * time.Second, BaseEjectionTime: 30 * time.Second, MaxEjectionTime: 300 * time.Second, @@ -501,8 +527,11 @@ func (s) TestHandleClusterUpdateError(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { t.Fatal(err) } @@ -586,8 +615,11 @@ func (s) TestResolverError(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { t.Fatal(err) } @@ -635,8 +667,11 @@ func (s) TestUpdateSubConnState(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -665,13 +700,16 @@ func (s) TestCircuitBreaking(t *testing.T) { cancel() cdsB.Close() }() - // Here we invoke the watch callback registered on the fake xdsClient. This // will trigger the watch handler on the CDS balancer, which will update // the service's counter with the new max requests. var maxRequests uint32 = 1 - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests} - wantCCS := edsCCS(clusterName, &maxRequests, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: clusterName, + MaxRequests: &maxRequests, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(clusterName, &maxRequests, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -699,14 +737,16 @@ func (s) TestClose(t *testing.T) { // provided xdsClient. xdsC, cdsB, edsB, _, cancel := setupWithWatch(t) defer cancel() - // Here we invoke the watch callback registered on the fake xdsClient. This // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -776,8 +816,11 @@ func (s) TestExitIdle(t *testing.T) { // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the // newChildBalancer function as part of test setup. - cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName} - wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg) + cdsUpdate := xdsresource.ClusterUpdate{ + ClusterName: serviceName, + LBPolicy: wrrLocalityLBConfigJSON, + } + wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go index 234511a45dc..aa2d9674a79 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -17,6 +17,7 @@ package cdsbalancer import ( + "encoding/json" "errors" "sync" @@ -38,13 +39,9 @@ var ( type clusterHandlerUpdate struct { // securityCfg is the Security Config from the top (root) cluster. securityCfg *xdsresource.SecurityConfig - // lbPolicy is the lb policy from the top (root) cluster. - // - // Currently, we only support roundrobin or ringhash, and since roundrobin - // does need configs, this is only set to the ringhash config, if the policy - // is ringhash. In the future, if we support more policies, we can make this - // an interface, and set it to config of the other policies. - lbPolicy *xdsresource.ClusterLBPolicyRingHash + + // lbPolicy is the the child of the cluster_impl policy, for all priorities. + lbPolicy json.RawMessage // updates is a list of ClusterUpdates from all the leaf clusters. updates []xdsresource.ClusterUpdate @@ -123,6 +120,7 @@ func (ch *clusterHandler) constructClusterUpdate() { case <-ch.updateChannel: default: } + ch.updateChannel <- clusterHandlerUpdate{ securityCfg: ch.createdClusters[ch.rootClusterName].clusterUpdate.SecurityCfg, lbPolicy: ch.createdClusters[ch.rootClusterName].clusterUpdate.LBPolicy, diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index caf10955014..ee989ec3ef7 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -52,7 +52,6 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) { name string clusterName string clusterUpdate xdsresource.ClusterUpdate - lbPolicy *xdsresource.ClusterLBPolicyRingHash }{ { name: "test-update-root-cluster-EDS-success", @@ -62,16 +61,6 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) { ClusterName: edsService, }, }, - { - name: "test-update-root-cluster-EDS-with-ring-hash", - clusterName: logicalDNSService, - clusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, - LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100}, - }, - lbPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100}, - }, { name: "test-update-root-cluster-Logical-DNS-success", clusterName: logicalDNSService, @@ -111,9 +100,6 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) { if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{test.clusterUpdate}); diff != "" { t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) } - if diff := cmp.Diff(chu.lbPolicy, test.lbPolicy); diff != "" { - t.Fatalf("got unexpected lb policy in cluster update, diff (-got, +want): %v", diff) - } case <-ctx.Done(): t.Fatal("Timed out waiting for update from update channel.") } diff --git a/xds/internal/balancer/clusterimpl/tests/balancer_test.go b/xds/internal/balancer/clusterimpl/tests/balancer_test.go index cf0e7b0ce84..d335ecd7e84 100644 --- a/xds/internal/balancer/clusterimpl/tests/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/tests/balancer_test.go @@ -112,9 +112,14 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) { // drops all RPCs, but with no change in the load reporting server config. resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{ e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ - ClusterName: "endpoints-" + serviceName, - Host: "localhost", - Ports: []uint32{testutils.ParsePort(t, server.Address)}, + ClusterName: "endpoints-" + serviceName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Ports: []uint32{testutils.ParsePort(t, server.Address)}, + Weight: 1, + }, + }, DropPercents: map[string]int{"test-drop-everything": 100}, }), } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index f327c8cf5fc..65cb7a9bf98 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -27,8 +27,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" @@ -378,7 +376,6 @@ func (s) TestOutlierDetection(t *testing.T) { t.Fatal(err) } - localityID := xdsinternal.LocalityID{Zone: "zone"} // The priority configuration generated should have Outlier Detection as a // direct child due to Outlier Detection being turned on. pCfgWant := &priority.LBConfig{ @@ -393,17 +390,6 @@ func (s) TestOutlierDetection(t *testing.T) { Config: &clusterimpl.LBConfig{ Cluster: testClusterName, EDSServiceName: "test-eds-service-name", - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedtarget.Name, - Config: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(localityID.ToString): { - Weight: 100, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, - }, - }, }, }, }, diff --git a/xds/internal/balancer/clusterresolver/config_test.go b/xds/internal/balancer/clusterresolver/config_test.go index 2455b88d807..fd17f3ede6d 100644 --- a/xds/internal/balancer/clusterresolver/config_test.go +++ b/xds/internal/balancer/clusterresolver/config_test.go @@ -286,10 +286,6 @@ func TestParseConfig(t *testing.T) { } } -func newString(s string) *string { - return &s -} - func newUint32(i uint32) *uint32 { return &i } diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index b76a40355cc..06b0aec2f31 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -23,9 +23,7 @@ import ( "fmt" "sort" - "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/weightedroundrobin" - "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/hierarchy" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" @@ -34,7 +32,7 @@ import ( "google.golang.org/grpc/xds/internal/balancer/clusterimpl" "google.golang.org/grpc/xds/internal/balancer/outlierdetection" "google.golang.org/grpc/xds/internal/balancer/priority" - "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/balancer/wrrlocality" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -63,33 +61,6 @@ type priorityConfig struct { // // The built tree of balancers (see test for the output struct). // -// If xds lb policy is ROUND_ROBIN, the children will be weighted_target for -// locality picking, and round_robin for endpoint picking. -// -// ┌────────┐ -// │priority│ -// └┬──────┬┘ -// │ │ -// ┌───────────▼┐ ┌▼───────────┐ -// │cluster_impl│ │cluster_impl│ -// └─┬──────────┘ └──────────┬─┘ -// │ │ -// ┌──────────────▼─┐ ┌─▼──────────────┐ -// │locality_picking│ │locality_picking│ -// └┬──────────────┬┘ └┬──────────────┬┘ -// │ │ │ │ -// ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ -// │LRS│ │LRS│ │LRS│ │LRS│ -// └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘ -// │ │ │ │ -// ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐ -// │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ -// └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ -// -// If xds lb policy is RING_HASH, the children will be just a ring_hash policy. -// The endpoints from all localities will be flattened to one addresses list, -// and the ring_hash policy will pick endpoints from it. -// // ┌────────┐ // │priority│ // └┬──────┬┘ @@ -99,13 +70,8 @@ type priorityConfig struct { // └──────┬─────┘ └─────┬──────┘ // │ │ // ┌──────▼─────┐ ┌─────▼──────┐ -// │ ring_hash │ │ ring_hash │ +// │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer) // └────────────┘ └────────────┘ -// -// If endpointPickingPolicy is nil, roundrobin will be used. -// -// Custom locality picking policy isn't support, and weighted_target is always -// used. func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy) if err != nil { @@ -284,55 +250,11 @@ func dedupSortedIntSlice(a []int) []int { return a[:i+1] } -// rrBalancerConfig is a const roundrobin config, used as child of -// weighted-roundrobin. To avoid allocating memory everytime. -var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name} - // priorityLocalitiesToClusterImpl takes a list of localities (with the same // priority), and generates a cluster impl policy config, and a list of -// addresses. +// addresses with their path hierarchy set to [priority-name, locality-name], so +// priority and the xDS LB Policy know which child policy each address is for. func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) { - clusterImplCfg := &clusterimpl.LBConfig{ - Cluster: mechanism.Cluster, - EDSServiceName: mechanism.EDSServiceName, - LoadReportingServer: mechanism.LoadReportingServer, - MaxConcurrentRequests: mechanism.MaxConcurrentRequests, - DropCategories: drops, - // ChildPolicy is not set. Will be set based on xdsLBPolicy - } - - if xdsLBPolicy == nil || xdsLBPolicy.Name == roundrobin.Name { - // If lb policy is ROUND_ROBIN: - // - locality-picking policy is weighted_target - // - endpoint-picking policy is round_robin - logger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", roundrobin.Name) - // Child of weighted_target is hardcoded to round_robin. - wtConfig, addrs := localitiesToWeightedTarget(localities, priorityName, rrBalancerConfig) - clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig} - return clusterImplCfg, addrs, nil - } - - if xdsLBPolicy.Name == ringhash.Name { - // If lb policy is RIHG_HASH, will build one ring_hash policy as child. - // The endpoints from all localities will be flattened to one addresses - // list, and the ring_hash policy will pick endpoints from it. - logger.Infof("xds lb policy is %q, building config with ring_hash", ringhash.Name) - addrs := localitiesToRingHash(localities, priorityName) - // Set child to ring_hash, note that the ring_hash config is from - // xdsLBPolicy. - clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: ringhash.Name, Config: xdsLBPolicy.Config} - return clusterImplCfg, addrs, nil - } - - return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, roundrobin.Name, ringhash.Name) -} - -// localitiesToRingHash takes a list of localities (with the same priority), and -// generates a list of addresses. -// -// The addresses have path hierarchy set to [priority-name], so priority knows -// which child policy they are for. -func localitiesToRingHash(localities []xdsresource.Locality, priorityName string) []resolver.Address { var addrs []resolver.Address for _, locality := range localities { var lw uint32 = 1 @@ -350,54 +272,29 @@ func localitiesToRingHash(localities []xdsresource.Locality, priorityName string if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown { continue } - + addr := resolver.Address{Addr: endpoint.Address} + addr = hierarchy.Set(addr, []string{priorityName, localityStr}) + addr = internal.SetLocalityID(addr, locality.ID) + // "To provide the xds_wrr_locality load balancer information about + // locality weights received from EDS, the cluster resolver will + // populate a new locality weight attribute for each address The + // attribute will have the weight (as an integer) of the locality + // the address is part of." - A52 + addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw}) var ew uint32 = 1 if endpoint.Weight != 0 { ew = endpoint.Weight } - - // The weight of each endpoint is locality_weight * endpoint_weight. - ai := weightedroundrobin.AddrInfo{Weight: lw * ew} - addr := weightedroundrobin.SetAddrInfo(resolver.Address{Addr: endpoint.Address}, ai) - addr = hierarchy.Set(addr, []string{priorityName, localityStr}) - addr = internal.SetLocalityID(addr, locality.ID) - addrs = append(addrs, addr) - } - } - return addrs -} - -// localitiesToWeightedTarget takes a list of localities (with the same -// priority), and generates a weighted target config, and list of addresses. -// -// The addresses have path hierarchy set to [priority-name, locality-name], so -// priority and weighted target know which child policy they are for. -func localitiesToWeightedTarget(localities []xdsresource.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig) (*weightedtarget.LBConfig, []resolver.Address) { - weightedTargets := make(map[string]weightedtarget.Target) - var addrs []resolver.Address - for _, locality := range localities { - localityStr, err := locality.ID.ToString() - if err != nil { - localityStr = fmt.Sprintf("%+v", locality.ID) - } - weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy} - for _, endpoint := range locality.Endpoints { - // Filter out all "unhealthy" endpoints (unknown and healthy are - // both considered to be healthy: - // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). - if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown { - continue - } - - addr := resolver.Address{Addr: endpoint.Address} - if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 { - ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight} - addr = weightedroundrobin.SetAddrInfo(addr, ai) - } - addr = hierarchy.Set(addr, []string{priorityName, localityStr}) - addr = internal.SetLocalityID(addr, locality.ID) + addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew}) addrs = append(addrs, addr) } } - return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs + return &clusterimpl.LBConfig{ + Cluster: mechanism.Cluster, + EDSServiceName: mechanism.EDSServiceName, + LoadReportingServer: mechanism.LoadReportingServer, + MaxConcurrentRequests: mechanism.MaxConcurrentRequests, + DropCategories: drops, + ChildPolicy: xdsLBPolicy, + }, addrs, nil } diff --git a/xds/internal/balancer/clusterresolver/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go index 5fbb0b95e33..6c94cae9ed4 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/configbuilder_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/weightedroundrobin" - "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/internal/hierarchy" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" @@ -39,6 +38,7 @@ import ( "google.golang.org/grpc/xds/internal/balancer/outlierdetection" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/balancer/wrrlocality" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -68,7 +68,8 @@ var ( return out[i].Addr < out[j].Addr }) return out - })} + }), + } noopODCfg = outlierdetection.LBConfig{ Interval: 1<<63 - 1, @@ -230,21 +231,6 @@ func TestBuildPriorityConfig(t *testing.T) { Cluster: testClusterName, EDSServiceName: testEDSServiceName, DropCategories: []clusterimpl.DropConfig{}, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedtarget.Name, - Config: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(testLocalityIDs[0].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - assertString(testLocalityIDs[1].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, - }, - }, }, }, }, @@ -262,21 +248,6 @@ func TestBuildPriorityConfig(t *testing.T) { Cluster: testClusterName, EDSServiceName: testEDSServiceName, DropCategories: []clusterimpl.DropConfig{}, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedtarget.Name, - Config: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(testLocalityIDs[2].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - assertString(testLocalityIDs[3].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, - }, - }, }, }, }, @@ -393,21 +364,6 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { RequestsPerMillion: testDropOverMillion, }, }, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedtarget.Name, - Config: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(testLocalityIDs[0].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - assertString(testLocalityIDs[1].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, - }, - }, }, "priority-2-1": { Cluster: testClusterName, @@ -420,32 +376,17 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { RequestsPerMillion: testDropOverMillion, }, }, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedtarget.Name, - Config: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(testLocalityIDs[2].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - assertString(testLocalityIDs[3].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, - }, - }, }, } wantAddrs := []resolver.Address{ - testAddrWithAttrs(testAddressStrs[0][0], nil, "priority-2-0", &testLocalityIDs[0]), - testAddrWithAttrs(testAddressStrs[0][1], nil, "priority-2-0", &testLocalityIDs[0]), - testAddrWithAttrs(testAddressStrs[1][0], nil, "priority-2-0", &testLocalityIDs[1]), - testAddrWithAttrs(testAddressStrs[1][1], nil, "priority-2-0", &testLocalityIDs[1]), - testAddrWithAttrs(testAddressStrs[2][0], nil, "priority-2-1", &testLocalityIDs[2]), - testAddrWithAttrs(testAddressStrs[2][1], nil, "priority-2-1", &testLocalityIDs[2]), - testAddrWithAttrs(testAddressStrs[3][0], nil, "priority-2-1", &testLocalityIDs[3]), - testAddrWithAttrs(testAddressStrs[3][1], nil, "priority-2-1", &testLocalityIDs[3]), + testAddrWithAttrs(testAddressStrs[0][0], 20, 1, "priority-2-0", &testLocalityIDs[0]), + testAddrWithAttrs(testAddressStrs[0][1], 20, 1, "priority-2-0", &testLocalityIDs[0]), + testAddrWithAttrs(testAddressStrs[1][0], 80, 1, "priority-2-0", &testLocalityIDs[1]), + testAddrWithAttrs(testAddressStrs[1][1], 80, 1, "priority-2-0", &testLocalityIDs[1]), + testAddrWithAttrs(testAddressStrs[2][0], 20, 1, "priority-2-1", &testLocalityIDs[2]), + testAddrWithAttrs(testAddressStrs[2][1], 20, 1, "priority-2-1", &testLocalityIDs[2]), + testAddrWithAttrs(testAddressStrs[3][0], 80, 1, "priority-2-1", &testLocalityIDs[3]), + testAddrWithAttrs(testAddressStrs[3][1], 80, 1, "priority-2-1", &testLocalityIDs[3]), } if diff := cmp.Diff(gotNames, wantNames); diff != "" { @@ -594,31 +535,13 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { wantConfig: &clusterimpl.LBConfig{ Cluster: testClusterName, EDSServiceName: testEDSService, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedtarget.Name, - Config: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: roundrobin.Name, - }, - }, - assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: roundrobin.Name, - }, - }, - }, - }, - }, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-1-1", 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-1-2", 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-2-1", 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-2-2", 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, { @@ -651,26 +574,12 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { }, }, wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", newUint32(1800), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", newUint32(200), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", newUint32(7200), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", newUint32(800), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-1-1", 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-1-2", 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-2-1", 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-2-2", 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, - { - name: "unsupported child", - localities: []xdsresource.Locality{{ - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - Weight: 20, - }}, - priorityName: "test-priority", - childPolicy: &internalserviceconfig.BalancerConfig{Name: "some-child"}, - wantErr: true, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -688,267 +597,6 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { } } -func TestLocalitiesToWeightedTarget(t *testing.T) { - tests := []struct { - name string - localities []xdsresource.Locality - priorityName string - childPolicy *internalserviceconfig.BalancerConfig - lrsServer *string - wantConfig *weightedtarget.LBConfig - wantAddrs []resolver.Address - }{ - { - name: "roundrobin as child, with LRS", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - Weight: 20, - }, - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - }, - ID: internal.LocalityID{Zone: "test-zone-2"}, - Weight: 80, - }, - }, - priorityName: "test-priority", - childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - lrsServer: newString("test-lrs-server"), - wantConfig: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, - }, - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - }, - }, - { - name: "roundrobin as child, no LRS", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - Weight: 20, - }, - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - }, - ID: internal.LocalityID{Zone: "test-zone-2"}, - Weight: 80, - }, - }, - priorityName: "test-priority", - childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - // lrsServer is nil, so LRS policy will not be used. - wantConfig: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: roundrobin.Name, - }, - }, - assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: roundrobin.Name, - }, - }, - }, - }, - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - }, - }, - { - name: "weighted round robin as child, no LRS", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - Weight: 20, - }, - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-2"}, - Weight: 80, - }, - }, - priorityName: "test-priority", - childPolicy: &internalserviceconfig.BalancerConfig{Name: weightedroundrobin.Name}, - // lrsServer is nil, so LRS policy will not be used. - wantConfig: &weightedtarget.LBConfig{ - Targets: map[string]weightedtarget.Target{ - assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedroundrobin.Name, - }, - }, - assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: weightedroundrobin.Name, - }, - }, - }, - }, - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy) - if diff := cmp.Diff(got, tt.wantConfig); diff != "" { - t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) - } - if diff := cmp.Diff(got1, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { - t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) - } - }) - } -} - -func TestLocalitiesToRingHash(t *testing.T) { - tests := []struct { - name string - localities []xdsresource.Locality - priorityName string - wantAddrs []resolver.Address - }{ - { - // Check that address weights are locality_weight * endpoint_weight. - name: "with locality and endpoint weight", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - Weight: 20, - }, - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-2"}, - Weight: 80, - }, - }, - priorityName: "test-priority", - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", newUint32(1800), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", newUint32(200), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", newUint32(7200), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", newUint32(800), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - }, - }, - { - // Check that endpoint_weight is 0, weight is the locality weight. - name: "locality weight only", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - Weight: 20, - }, - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy}, - }, - ID: internal.LocalityID{Zone: "test-zone-2"}, - Weight: 80, - }, - }, - priorityName: "test-priority", - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", newUint32(20), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", newUint32(20), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", newUint32(80), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", newUint32(80), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - }, - }, - { - // Check that locality_weight is 0, weight is the endpoint weight. - name: "endpoint weight only", - localities: []xdsresource.Locality{ - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-1"}, - }, - { - Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, - }, - ID: internal.LocalityID{Zone: "test-zone-2"}, - }, - }, - priorityName: "test-priority", - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := localitiesToRingHash(tt.localities, tt.priorityName) - if diff := cmp.Diff(got, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { - t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) - } - }) - } -} - func assertString(f func() (string, error)) string { s, err := f() if err != nil { @@ -957,17 +605,16 @@ func assertString(f func() (string, error)) string { return s } -func testAddrWithAttrs(addrStr string, weight *uint32, priority string, lID *internal.LocalityID) resolver.Address { +func testAddrWithAttrs(addrStr string, localityWeight, endpointWeight uint32, priority string, lID *internal.LocalityID) resolver.Address { addr := resolver.Address{Addr: addrStr} - if weight != nil { - addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: *weight}) - } path := []string{priority} if lID != nil { path = append(path, assertString(lID.ToString)) addr = internal.SetLocalityID(addr, *lID) } addr = hierarchy.Set(addr, path) + addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) + addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: localityWeight * endpointWeight}) return addr } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 053b56f0dc8..c7c2ab9945f 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -364,23 +364,6 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil { t.Fatal(err) } - - // Change the weight of locality2 and ensure weighted roundrobin. Since - // locality2 has twice the weight of locality3, it will be picked twice as - // frequently as locality3 for RPCs. And since locality2 has a single - // backend and locality3 has two backends, the backend in locality2 will - // receive four times the traffic of each of locality3's backends. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ - {name: localityName2, weight: 2, ports: ports[1:2]}, - {name: localityName3, weight: 1, ports: ports[2:4]}, - }) - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - wantAddrs = []resolver.Address{addrs[1], addrs[1], addrs[1], addrs[1], addrs[2], addrs[3]} - if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil { - t.Fatal(err) - } } // TestEDS_EndpointsHealth tests the cluster_resolver LB policy using an EDS diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index fdcef37f2d8..68325a31c17 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -26,6 +26,7 @@ import ( corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" @@ -35,15 +36,24 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" "google.golang.org/grpc/xds/internal/balancer/priority" + "google.golang.org/grpc/xds/internal/balancer/wrrlocality" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" ) var ( - testClusterNames = []string{"test-cluster-1", "test-cluster-2"} - testSubZones = []string{"I", "II", "III", "IV"} - testEndpointAddrs []string + testClusterNames = []string{"test-cluster-1", "test-cluster-2"} + testSubZones = []string{"I", "II", "III", "IV"} + testEndpointAddrs []string + wrrLocalityLBConfig = &internalserviceconfig.BalancerConfig{ + Name: wrrlocality.Name, + Config: &wrrlocality.LBConfig{ + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: "round_robin", + }, + }, + } ) const testBackendAddrsCount = 12 @@ -75,6 +85,7 @@ func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) Cluster: testClusterName, Type: DiscoveryMechanismTypeEDS, }}, + XDSLBPolicy: wrrLocalityLBConfig, }, }); err != nil { edsb.Close() @@ -844,6 +855,7 @@ func (s) TestFallbackToDNS(t *testing.T) { DNSHostname: testDNSTarget, }, }, + XDSLBPolicy: wrrLocalityLBConfig, }, }); err != nil { t.Fatal(err) diff --git a/xds/internal/balancer/wrrlocality/balancer.go b/xds/internal/balancer/wrrlocality/balancer.go index 2ff6fccf89b..ac63e84e62f 100644 --- a/xds/internal/balancer/wrrlocality/balancer.go +++ b/xds/internal/balancer/wrrlocality/balancer.go @@ -28,8 +28,12 @@ import ( "fmt" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/weightedtarget" + "google.golang.org/grpc/internal/grpclog" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" ) // Name is the name of wrr_locality balancer. @@ -45,10 +49,6 @@ func (bb) Name() string { return Name } -func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { - return nil -} - // LBConfig is the config for the wrr locality balancer. type LBConfig struct { serviceconfig.LoadBalancingConfig @@ -56,13 +56,146 @@ type LBConfig struct { ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` } +// To plumb in a different child in tests. +var weightedTargetName = weightedtarget.Name + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + builder := balancer.Get(weightedTargetName) + if builder == nil { + // Shouldn't happen, registered through imported weighted target, + // defensive programming. + return nil + } + + // Doesn't need to intercept any balancer.ClientConn operations; pass + // through by just giving cc to child balancer. + wtb := builder.Build(cc, bOpts) + if wtb == nil { + // shouldn't happen, defensive programming. + return nil + } + wtbCfgParser, ok := builder.(balancer.ConfigParser) + if !ok { + // Shouldn't happen, imported weighted target builder has this method. + return nil + } + wrrL := &wrrLocalityBalancer{ + child: wtb, + childParser: wtbCfgParser, + } + + wrrL.logger = prefixLogger(wrrL) + wrrL.logger.Infof("Created") + return wrrL +} + func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { var lbCfg *LBConfig if err := json.Unmarshal(s, &lbCfg); err != nil { - return nil, fmt.Errorf("xds: invalid LBConfig for wrrlocality: %s, error: %v", string(s), err) + return nil, fmt.Errorf("xds_wrr_locality: invalid LBConfig: %s, error: %v", string(s), err) } if lbCfg == nil || lbCfg.ChildPolicy == nil { - return nil, errors.New("xds: invalidw LBConfig for wrrlocality: child policy field must be set") + return nil, errors.New("xds_wrr_locality: invalid LBConfig: child policy field must be set") } return lbCfg, nil } + +type attributeKey struct{} + +// Equal allows the values to be compared by Attributes.Equal. +func (a AddrInfo) Equal(o interface{}) bool { + oa, ok := o.(AddrInfo) + return ok && oa.LocalityWeight == a.LocalityWeight +} + +// AddrInfo is the locality weight of the locality an address is a part of. +type AddrInfo struct { + LocalityWeight uint32 +} + +// SetAddrInfo returns a copy of addr in which the BalancerAttributes field is +// updated with AddrInfo. +func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(attributeKey{}, addrInfo) + return addr +} + +func (a AddrInfo) String() string { + return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight) +} + +// getAddrInfo returns the AddrInfo stored in the BalancerAttributes field of +// addr. Returns false if no AddrInfo found. +func getAddrInfo(addr resolver.Address) (AddrInfo, bool) { + v := addr.BalancerAttributes.Value(attributeKey{}) + ai, ok := v.(AddrInfo) + return ai, ok +} + +// wrrLocalityBalancer wraps a weighted target balancer, and builds +// configuration for the weighted target once it receives configuration +// specifying the weighted target child balancer and locality weight +// information. +type wrrLocalityBalancer struct { + // child will be a weighted target balancer, and will be built it at + // wrrLocalityBalancer build time. Other than preparing configuration, other + // balancer operations are simply pass through. + child balancer.Balancer + + childParser balancer.ConfigParser + + logger *grpclog.PrefixLogger +} + +func (b *wrrLocalityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lbCfg, ok := s.BalancerConfig.(*LBConfig) + if !ok { + b.logger.Errorf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + return balancer.ErrBadResolverState + } + + weightedTargets := make(map[string]weightedtarget.Target) + for _, addr := range s.ResolverState.Addresses { + // This get of LocalityID could potentially return a zero value. This + // shouldn't happen though (this attribute that is set actually gets + // used to build localities in the first place), and thus don't error + // out, and just build a weighted target with undefined behavior. + locality, err := internal.GetLocalityID(addr).ToString() + if err != nil { + // Should never happen. + logger.Errorf("Failed to marshal LocalityID: %v, skipping this locality in weighted target") + } + ai, ok := getAddrInfo(addr) + if !ok { + return fmt.Errorf("xds_wrr_locality: missing locality weight information in address %q", addr) + } + weightedTargets[locality] = weightedtarget.Target{Weight: ai.LocalityWeight, ChildPolicy: lbCfg.ChildPolicy} + } + wtCfg := &weightedtarget.LBConfig{Targets: weightedTargets} + wtCfgJSON, err := json.Marshal(wtCfg) + if err != nil { + // Shouldn't happen. + return fmt.Errorf("xds_wrr_locality: error marshalling prepared config: %v", wtCfg) + } + var sc serviceconfig.LoadBalancingConfig + if sc, err = b.childParser.ParseConfig(wtCfgJSON); err != nil { + return fmt.Errorf("xds_wrr_locality: config generated %v is invalid: %v", wtCfgJSON, err) + } + + return b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: sc, + }) +} + +func (b *wrrLocalityBalancer) ResolverError(err error) { + b.child.ResolverError(err) +} + +func (b *wrrLocalityBalancer) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) { + b.child.UpdateSubConnState(sc, scState) +} + +func (b *wrrLocalityBalancer) Close() { + b.child.Close() +} diff --git a/xds/internal/balancer/wrrlocality/balancer_test.go b/xds/internal/balancer/wrrlocality/balancer_test.go index 9283b02f14b..f0da7413bdb 100644 --- a/xds/internal/balancer/wrrlocality/balancer_test.go +++ b/xds/internal/balancer/wrrlocality/balancer_test.go @@ -19,17 +19,28 @@ package wrrlocality import ( + "context" "encoding/json" "errors" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/grpctest" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" +) + +const ( + defaultTestTimeout = 5 * time.Second ) type s struct { @@ -119,3 +130,123 @@ func (s) TestParseConfig(t *testing.T) { }) } } + +// TestUpdateClientConnState tests the UpdateClientConnState method of the +// wrr_locality_experimental balancer. This UpdateClientConn operation should +// take the localities and their weights in the addresses passed in, alongside +// the endpoint picking policy defined in the Balancer Config and construct a +// weighted target configuration corresponding to these inputs. +func (s) TestUpdateClientConnState(t *testing.T) { + // Configure the stub balancer defined below as the child policy of + // wrrLocalityBalancer. + cfgCh := testutils.NewChannel() + oldWeightedTargetName := weightedTargetName + defer func() { + weightedTargetName = oldWeightedTargetName + }() + weightedTargetName = "fake_weighted_target" + stub.Register("fake_weighted_target", stub.BalancerFuncs{ + ParseConfig: func(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + var cfg weightedtarget.LBConfig + if err := json.Unmarshal(c, &cfg); err != nil { + return nil, err + } + return &cfg, nil + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + wtCfg, ok := ccs.BalancerConfig.(*weightedtarget.LBConfig) + if !ok { + return errors.New("child received config that was not a weighted target config") + } + defer cfgCh.Send(wtCfg) + return nil + }, + }) + + builder := balancer.Get(Name) + if builder == nil { + t.Fatalf("balancer.Get(%q) returned nil", Name) + } + tcc := testutils.NewTestClientConn(t) + bal := builder.Build(tcc, balancer.BuildOptions{}) + defer bal.Close() + wrrL := bal.(*wrrLocalityBalancer) + + // Create the addresses with two localities with certain locality weights. + // This represents what addresses the wrr_locality balancer will receive in + // UpdateClientConnState. + addr1 := resolver.Address{ + Addr: "locality-1", + } + addr1 = internal.SetLocalityID(addr1, internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }) + addr1 = SetAddrInfo(addr1, AddrInfo{LocalityWeight: 2}) + + addr2 := resolver.Address{ + Addr: "locality-2", + } + addr2 = internal.SetLocalityID(addr2, internal.LocalityID{ + Region: "region-2", + Zone: "zone-2", + SubZone: "subzone-2", + }) + addr2 = SetAddrInfo(addr2, AddrInfo{LocalityWeight: 1}) + addrs := []resolver.Address{addr1, addr2} + + err := wrrL.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: "round_robin", + }, + }, + ResolverState: resolver.State{ + Addresses: addrs, + }, + }) + if err != nil { + t.Fatalf("Unexpected error from UpdateClientConnState: %v", err) + } + + // Note that these inline strings declared as the key in Targets built from + // Locality ID are not exactly what is shown in the example in the gRFC. + // However, this is an implementation detail that does not affect + // correctness (confirmed with Java team). The important thing is to get + // those three pieces of information region, zone, and subzone down to the + // child layer. + wantWtCfg := &weightedtarget.LBConfig{ + Targets: map[string]weightedtarget.Target{ + "{\"region\":\"region-1\",\"zone\":\"zone-1\",\"subZone\":\"subzone-1\"}": { + Weight: 2, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: "round_robin", + }, + }, + "{\"region\":\"region-2\",\"zone\":\"zone-2\",\"subZone\":\"subzone-2\"}": { + Weight: 1, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: "round_robin", + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cfg, err := cfgCh.Receive(ctx) + if err != nil { + t.Fatalf("No signal received from UpdateClientConnState() on the child: %v", err) + } + + gotWtCfg, ok := cfg.(*weightedtarget.LBConfig) + if !ok { + // Shouldn't happen - only sends a config on this channel. + t.Fatalf("Unexpected config type: %T", gotWtCfg) + } + + if diff := cmp.Diff(gotWtCfg, wantWtCfg); diff != "" { + t.Fatalf("Child received unexpected config, diff (-got, +want): %v", diff) + } +} diff --git a/xds/internal/balancer/wrrlocality/logging.go b/xds/internal/balancer/wrrlocality/logging.go new file mode 100644 index 00000000000..42ccea0a92b --- /dev/null +++ b/xds/internal/balancer/wrrlocality/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package wrrlocality + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[wrrlocality-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *wrrLocalityBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index 3583fa929d9..9670caaca0a 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -70,7 +70,7 @@ func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantU return fmt.Errorf("received update with error type %v, want %v", gotType, wantType) } } - cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicyJSON")} + cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy")} if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" { return fmt.Errorf("received unepected diff in the cluster resource update: (-want, got):\n%s", diff) } diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 9b220fc59f2..4cc365e70ea 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -134,9 +134,13 @@ func (s) TestEDSWatch(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -153,9 +157,13 @@ func (s) TestEDSWatch(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -265,9 +273,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -277,9 +289,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -295,9 +311,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -307,9 +327,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -460,9 +484,13 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -541,9 +569,13 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -669,9 +701,13 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, @@ -801,9 +837,13 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) { Localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, - ID: internal.LocalityID{SubZone: "subzone"}, - Priority: 0, - Weight: 1, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Priority: 0, + Weight: 1, }, }, }, diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index 974e6221aab..4298ce6c088 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -305,7 +305,11 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) { { Endpoints: []xdsresource.Endpoint{{Address: "localhost:666", Weight: 1}}, Weight: 1, - ID: internal.LocalityID{SubZone: "subzone"}, + ID: internal.LocalityID{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, }, }, }, diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index ff6cf7c756a..7dd368aa5e2 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -802,7 +802,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { } cmpOpts := []cmp.Option{ cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicyJSON"), + cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"), } if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" { t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff) diff --git a/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go index 7d20b1ff61e..96ad204ad4b 100644 --- a/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go @@ -357,7 +357,6 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, wantUpdate: xdsresource.ClusterUpdate{ ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf, - LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100}, }, wantLBConfig: &internalserviceconfig.BalancerConfig{ Name: "ring_hash_experimental", @@ -589,11 +588,11 @@ func (s) TestValidateCluster_Success(t *testing.T) { // compare JSON bytes in a test. Thus, marshal into a Balancer // Config struct and compare on that. Only need to test this JSON // emission here, as this covers the possible output space. - if diff := cmp.Diff(update, test.wantUpdate, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "LBPolicy", "LBPolicyJSON")); diff != "" { + if diff := cmp.Diff(update, test.wantUpdate, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "LBPolicy")); diff != "" { t.Errorf("validateClusterAndConstructClusterUpdate(%+v) got diff: %v (-got, +want)", test.cluster, diff) } bc := &internalserviceconfig.BalancerConfig{} - if err := json.Unmarshal(update.LBPolicyJSON, bc); err != nil { + if err := json.Unmarshal(update.LBPolicy, bc); err != nil { t.Fatalf("failed to unmarshal JSON: %v", err) } if diff := cmp.Diff(bc, test.wantLBConfig); diff != "" { diff --git a/xds/internal/xdsclient/xdsresource/type_cds.go b/xds/internal/xdsclient/xdsresource/type_cds.go index cd49852d8fc..8ea9608dc9b 100644 --- a/xds/internal/xdsclient/xdsresource/type_cds.go +++ b/xds/internal/xdsclient/xdsresource/type_cds.go @@ -52,13 +52,6 @@ const ( ClusterLRSServerSelf ) -// ClusterLBPolicyRingHash represents ring_hash lb policy, and also contains its -// config. -type ClusterLBPolicyRingHash struct { - MinimumRingSize uint64 - MaximumRingSize uint64 -} - // OutlierDetection is the outlier detection configuration for a cluster. type OutlierDetection struct { // Interval is the time interval between ejection analysis sweeps. This can @@ -148,21 +141,9 @@ type ClusterUpdate struct { // a prioritized list of cluster names. PrioritizedClusterNames []string - // LBPolicy is the lb policy for this cluster. - // - // This only support round_robin and ring_hash. - // - if it's nil, the lb policy is round_robin - // - if it's not nil, the lb policy is ring_hash, the this field has the config. - // - // When we add more support policies, this can be made an interface, and - // will be set to different types based on the policy type. - LBPolicy *ClusterLBPolicyRingHash - // LBPolicyJSON represents the locality and endpoint picking policy in JSON, - // which will be the child policy of xds_cluster_impl. Once full support for - // this field across the system, the LBPolicy field will switch to this - // field. Right now we keep both to keep the system working even though - // downstream has not added support for this JSON field. - LBPolicyJSON json.RawMessage + // LBPolicy represents the locality and endpoint picking policy in JSON, + // which will be the child policy of xds_cluster_impl. + LBPolicy json.RawMessage // OutlierDetection is the outlier detection configuration for this cluster. // If nil, it means this cluster does not use the outlier detection feature. diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go index 96684488135..c117ce6e7b5 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go @@ -77,13 +77,11 @@ const ( ) func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { - var lbPolicy *ClusterLBPolicyRingHash - var lbCfgJSON json.RawMessage + var lbPolicy json.RawMessage var err error switch cluster.GetLbPolicy() { case v3clusterpb.Cluster_ROUND_ROBIN: - lbPolicy = nil // The default is round_robin, and there's no config to set. - lbCfgJSON = []byte(fmt.Sprintf(`[{%q: {"childPolicy": [{"round_robin": {}}]}}]`, "xds_wrr_locality_experimental")) + lbPolicy = []byte(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`) case v3clusterpb.Cluster_RING_HASH: if !envconfig.XDSRingHash { return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) @@ -101,10 +99,9 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu if max := rhc.GetMaximumRingSize(); max != nil { maxSize = max.GetValue() } - lbPolicy = &ClusterLBPolicyRingHash{MinimumRingSize: minSize, MaximumRingSize: maxSize} - rhLBCfgJSON := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize)) - lbCfgJSON = []byte(fmt.Sprintf(`[{%q: %s}]`, "ring_hash_experimental", rhLBCfgJSON)) + rhLBCfg := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize)) + lbPolicy = []byte(fmt.Sprintf(`[{"ring_hash_experimental": %s}]`, rhLBCfg)) default: return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) } @@ -129,7 +126,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu } if cluster.GetLoadBalancingPolicy() != nil && envconfig.XDSCustomLBPolicy { - lbCfgJSON, err = xdslbregistry.ConvertToServiceConfig(cluster.GetLoadBalancingPolicy()) + lbPolicy, err = xdslbregistry.ConvertToServiceConfig(cluster.GetLoadBalancingPolicy()) if err != nil { return ClusterUpdate{}, fmt.Errorf("error converting LoadBalancingPolicy %v in response: %+v: %v", cluster.GetLoadBalancingPolicy(), cluster, err) } @@ -137,8 +134,8 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu // converted configuration. It will do this by having the gRPC LB policy // registry parse the configuration." - A52 bc := &internalserviceconfig.BalancerConfig{} - if err := json.Unmarshal(lbCfgJSON, bc); err != nil { - return ClusterUpdate{}, fmt.Errorf("JSON generated from xDS LB policy registry: %s is invalid: %v", pretty.FormatJSON(lbCfgJSON), err) + if err := json.Unmarshal(lbPolicy, bc); err != nil { + return ClusterUpdate{}, fmt.Errorf("JSON generated from xDS LB policy registry: %s is invalid: %v", pretty.FormatJSON(lbPolicy), err) } } @@ -147,7 +144,6 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu SecurityCfg: sc, MaxRequests: circuitBreakersFromCluster(cluster), LBPolicy: lbPolicy, - LBPolicyJSON: lbCfgJSON, OutlierDetection: od, } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go index 3b47ae697a9..0c69d27ad42 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go @@ -322,7 +322,7 @@ func (s) TestValidateClusterWithSecurityConfig_EnvVarOff(t *testing.T) { if err != nil { t.Errorf("validateClusterAndConstructClusterUpdate() failed: %v", err) } - if diff := cmp.Diff(wantUpdate, gotUpdate, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" { + if diff := cmp.Diff(wantUpdate, gotUpdate, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" { t.Errorf("validateClusterAndConstructClusterUpdate() returned unexpected diff (-want, got):\n%s", diff) } } @@ -1215,7 +1215,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { if (err != nil) != test.wantErr { t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr) } - if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmp.AllowUnexported(regexp.Regexp{}), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" { + if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmp.AllowUnexported(regexp.Regexp{}), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" { t.Errorf("validateClusterAndConstructClusterUpdate() returned unexpected diff (-want, +got):\n%s", diff) } }) @@ -1357,7 +1357,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { if name != test.wantName { t.Errorf("unmarshalClusterResource(%s), got name: %s, want: %s", pretty.ToJSON(test.resource), name, test.wantName) } - if diff := cmp.Diff(update, test.wantUpdate, cmpOpts, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" { + if diff := cmp.Diff(update, test.wantUpdate, cmpOpts, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" { t.Errorf("unmarshalClusterResource(%s), got unexpected update, diff (-got +want): %v", pretty.ToJSON(test.resource), diff) } }) @@ -1507,7 +1507,7 @@ func (s) TestValidateClusterWithOutlierDetection(t *testing.T) { if (err != nil) != test.wantErr { t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr) } - if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" { + if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" { t.Errorf("validateClusterAndConstructClusterUpdate() returned unexpected diff (-want, +got):\n%s", diff) } }) diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index a3202f8c810..95333aaf61d 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -141,6 +141,17 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, SubZone: l.SubZone, } lidStr, _ := lid.ToString() + + // "Since an xDS configuration can place a given locality under multiple + // priorities, it is possible to see locality weight attributes with + // different values for the same locality." - A52 + // + // This is handled in the client by emitting the locality weight + // specified for the priority it is specified in. If the same locality + // has a different weight in two priorities, each priority will specify + // a locality with the locality weight specified for that priority, and + // thus the subsequent tree of balancers linked to that priority will + // use that locality weight as well. if localitiesWithPriority[lidStr] { return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority) }