Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/internal/xdsclient: Add least request support in xDS #6517

Merged
merged 5 commits into from Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion balancer/leastrequest/leastrequest.go
Expand Up @@ -31,7 +31,7 @@ import (
"google.golang.org/grpc/serviceconfig"
)

// Global to stub out in tests.
// grpcranduint32 is a global to stub out in tests.
var grpcranduint32 = grpcrand.Uint32

// Name is the name of the least request balancer.
Expand Down
4 changes: 4 additions & 0 deletions internal/envconfig/envconfig.go
Expand Up @@ -39,6 +39,10 @@ var (
// PickFirstLBConfig is set if we should support configuration of the
// pick_first LB policy.
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)
// LeastRequestLB is set if we should support the least_request_experimental
// LB policy, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true".
LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", false)
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
// handshakes that can be performed.
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)
Expand Down
28 changes: 27 additions & 1 deletion test/xds/xds_client_custom_lb_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/leastrequest" // To register least_request
_ "google.golang.org/grpc/balancer/weightedroundrobin" // To register weighted_round_robin
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
Expand All @@ -41,6 +42,7 @@ import (
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -96,6 +98,11 @@ func (s) TestWrrLocality(t *testing.T) {
defer func() {
envconfig.XDSCustomLBPolicy = oldCustomLBSupport
}()
oldLeastRequestLBSupport := envconfig.LeastRequestLB
envconfig.LeastRequestLB = true
defer func() {
envconfig.LeastRequestLB = oldLeastRequestLBSupport
}()

backend1 := stubserver.StartTestService(t, nil)
port1 := testutils.ParsePort(t, backend1.Address)
Expand Down Expand Up @@ -194,12 +201,31 @@ func (s) TestWrrLocality(t *testing.T) {
{addr: backend5.Address, count: 8},
},
},
{
name: "custom_lb_least_request",
wrrLocalityConfiguration: wrrLocality(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(2),
}),
// Least request's randomness of indexes to sample (unary RPCs which
// don't persist so never any actual RPC counts over iterations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is incomplete in this sentence:

unary RPCs which don't persist so never any actual RPC counts over iterations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "The test performs a Unary RPC, and blocks until the RPC returns, and then makes the next RPC. Thus, over iterations, no RPC counts are present. This causes Least request's randomness of indexes to sample to converge onto a round robin distribution per locality. Thus, expect the same distribution as round robin above."

// converges onto a round robin distribution per locality. Thus,
// expect the same distribution as round robin above.
addressDistributionWant: []struct {
addr string
count int
}{
{addr: backend1.Address, count: 6},
{addr: backend2.Address, count: 6},
{addr: backend3.Address, count: 8},
{addr: backend4.Address, count: 8},
{addr: backend5.Address, count: 8},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

routeConfigName := "route-" + serviceName
clusterName := "cluster-" + serviceName
endpointsName := "endpoints-" + serviceName
Expand Down
1 change: 1 addition & 0 deletions xds/internal/balancer/balancer.go
Expand Up @@ -20,6 +20,7 @@
package balancer

import (
_ "google.golang.org/grpc/balancer/leastrequest" // Register the least_request_experimental balancer
_ "google.golang.org/grpc/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
Expand Down
31 changes: 29 additions & 2 deletions xds/internal/xdsclient/xdslbregistry/converter/converter.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/leastrequest"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/internal/envconfig"
Expand All @@ -41,6 +42,7 @@ import (
v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3"
v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
Expand All @@ -53,13 +55,15 @@ func init() {
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst", convertPickFirstProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin", convertRoundRobinProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality", convertWRRLocalityProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest", convertLeastRequestProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/udpa.type.v1.TypedStruct", convertV1TypedStructToServiceConfig)
xdslbregistry.Register("type.googleapis.com/xds.type.v3.TypedStruct", convertV3TypedStructToServiceConfig)
}

const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
defaultLeastRequestChoiceCount = 2
)

func convertRingHashProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
Expand Down Expand Up @@ -177,6 +181,29 @@ func convertWeightedRoundRobinProtoToServiceConfig(rawProto []byte, _ int) (json
return makeBalancerConfigJSON(weightedroundrobin.Name, lbCfgJSON), nil
}

func convertLeastRequestProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
if !envconfig.LeastRequestLB {
return nil, nil
}
lrProto := &v3leastrequestpb.LeastRequest{}
if err := proto.Unmarshal(rawProto, lrProto); err != nil {
return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
}
// "The configuration for the Least Request LB policy is the
// least_request_lb_config field. The field is optional; if not present,
// defaults will be assumed for all of its values." - A48
var choiceCount uint32 = defaultLeastRequestChoiceCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not asking you to change, but just FYI choiceCount := uint32(defaultLeastRequestChoiceCount) is shorter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched. I feel like throughout the codebase and my years on the team, x := y vs. var x type = y has always been preferred for local vars declaration to a non zero value. Noted for future.

if cc := lrProto.GetChoiceCount(); cc != nil {
choiceCount = cc.GetValue()
}
lrCfg := &leastrequest.LBConfig{ChoiceCount: choiceCount}
js, err := json.Marshal(lrCfg)
if err != nil {
return nil, fmt.Errorf("error marshaling JSON for type %T: %v", lrCfg, err)
}
return makeBalancerConfigJSON(leastrequest.Name, js), nil
}

func convertV1TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
tsProto := &v1xdsudpatypepb.TypedStruct{}
if err := proto.Unmarshal(rawProto, tsProto); err != nil {
Expand Down
37 changes: 30 additions & 7 deletions xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go
Expand Up @@ -69,6 +69,9 @@ func wrrLocalityBalancerConfig(childPolicy *internalserviceconfig.BalancerConfig
}

func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = true
easwars marked this conversation as resolved.
Show resolved Hide resolved

const customLBPolicyName = "myorg.MyCustomLeastRequestPolicy"
stub.Register(customLBPolicyName, stub.BalancerFuncs{})

Expand All @@ -78,6 +81,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
wantConfig string // JSON config
rhDisabled bool
pfDisabled bool
lrDisabled bool
}{
{
name: "ring_hash",
Expand All @@ -96,6 +100,21 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
wantConfig: `[{"ring_hash_experimental": { "minRingSize": 10, "maxRingSize": 100 }}]`,
},
{
name: "least_request",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(3),
}),
},
},
},
},
wantConfig: `[{"least_request_experimental": { "choiceCount": 3 }}]`,
},
{
name: "pick_first_shuffle",
policy: &v3clusterpb.LoadBalancingPolicy{
Expand Down Expand Up @@ -183,7 +202,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
rhDisabled: true,
},
{
name: "pick_first_disabled_pf_rr_use_first_supported",
name: "pick_first_enabled_pf_rr_use_pick_first",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
Expand All @@ -200,17 +219,16 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
},
},
wantConfig: `[{"round_robin": {}}]`,
pfDisabled: true,
wantConfig: `[{"pick_first": { "shuffleAddressList": true }}]`,
},
{
name: "pick_first_enabled_pf_rr_use_pick_first",
name: "least_request_disabled_pf_rr_use_first_supported",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3pickfirstpb.PickFirst{
ShuffleAddressList: true,
TypedConfig: testutils.MarshalAny(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(32),
}),
},
},
Expand All @@ -221,7 +239,8 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
},
},
wantConfig: `[{"pick_first": { "shuffleAddressList": true }}]`,
wantConfig: `[{"round_robin": {}}]`,
lrDisabled: true,
},
{
name: "custom_lb_type_v3_struct",
Expand Down Expand Up @@ -317,6 +336,10 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
defer func(old bool) { envconfig.XDSRingHash = old }(envconfig.XDSRingHash)
envconfig.XDSRingHash = false
}
if test.lrDisabled {
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value of envconfig.LeastRequestLB is already false. You are setting it to true for the duration of the test at the top, and then setting it to false here again.

Why not have a lrEnabled field which is set to true only for tests that require it, and here, you can check for if test.lrEnabled { // set the env var to true }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :). Good suggestion.

if test.pfDisabled {
defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig)
envconfig.PickFirstLBConfig = false
Expand Down
58 changes: 58 additions & 0 deletions xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer/leastrequest"
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -103,6 +104,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
defer func() {
envconfig.XDSCustomLBPolicy = origCustomLBSupport
}()
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = true
easwars marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
name string
cluster *v3clusterpb.Cluster
Expand Down Expand Up @@ -330,6 +333,31 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
},
{
name: "happiest-case-with-least-request-lb-policy-with-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Config: &leastrequest.LBConfig{
ChoiceCount: 2,
},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Expand Down Expand Up @@ -367,6 +395,36 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
},
{
name: "happiest-case-with-least-request-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
LbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig_{
LeastRequestLbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig{
ChoiceCount: wrapperspb.UInt32(3),
},
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Config: &leastrequest.LBConfig{
ChoiceCount: 3,
},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-configured-through-LoadBalancingPolicy",
cluster: &v3clusterpb.Cluster{
Expand Down
22 changes: 22 additions & 0 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Expand Up @@ -76,6 +76,8 @@ const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
ringHashSizeUpperBound = 8 * 1024 * 1024 // 8M

defaultLeastRequestChoiceCount = 2
easwars marked this conversation as resolved.
Show resolved Hide resolved
)

func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
Expand Down Expand Up @@ -104,6 +106,26 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu

rhLBCfg := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize))
lbPolicy = []byte(fmt.Sprintf(`[{"ring_hash_experimental": %s}]`, rhLBCfg))
case v3clusterpb.Cluster_LEAST_REQUEST:
if !envconfig.LeastRequestLB {
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}

// "The configuration for the Least Request LB policy is the
// least_request_lb_config field. The field is optional; if not present,
// defaults will be assumed for all of its values." - A48
lr := cluster.GetLeastRequestLbConfig()
var choiceCount uint32 = defaultLeastRequestChoiceCount
if cc := lr.GetChoiceCount(); cc != nil {
choiceCount = cc.GetValue()
}
// nack if it's < 2, and also add test for it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the also add test for it part of it.

And probably quote the relevant language from the gRFC if that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched comment to: // "If choice_count < 2, the config will be rejected." - A48

if choiceCount < 2 {
return ClusterUpdate{}, fmt.Errorf("Cluster_LeastRequestLbConfig.ChoiceCount must be >= 2, got: %v", choiceCount)
}

lrLBCfg := []byte(fmt.Sprintf("{\"choiceCount\": %d}", choiceCount))
lbPolicy = []byte(fmt.Sprintf(`[{"least_request_experimental": %s}]`, lrLBCfg))
default:
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
Expand Down
15 changes: 14 additions & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
Expand Up @@ -161,6 +161,19 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "least-request-choice-count-less-than-two",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig_{
LeastRequestLbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig{
ChoiceCount: wrapperspb.UInt32(1),
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "ring-hash-max-bound-greater-than-upper-bound",
cluster: &v3clusterpb.Cluster{
Expand Down Expand Up @@ -205,7 +218,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantErr: true,
},
{
name: "least-request-unsupported-in-converter",
name: "least-request-unsupported-in-converter-since-env-var-unset",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
Expand Down