Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/internal/xdsclient: Add least request support in xDS #6517

Merged
merged 5 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"google.golang.org/grpc/serviceconfig"
)

// Global to stub out in tests.
// grpcranduint32 is a global to stub out in tests.
var grpcranduint32 = grpcrand.Uint32

// Name is the name of the least request balancer.
Expand Down
4 changes: 4 additions & 0 deletions internal/envconfig/envconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (
// PickFirstLBConfig is set if we should support configuration of the
// pick_first LB policy.
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)
// LeastRequestLB is set if we should support the least_request_experimental
// LB policy, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true".
LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", false)
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
// handshakes that can be performed.
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)
Expand Down
30 changes: 29 additions & 1 deletion test/xds/xds_client_custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/leastrequest" // To register least_request
_ "google.golang.org/grpc/balancer/weightedroundrobin" // To register weighted_round_robin
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
Expand All @@ -41,6 +42,7 @@ import (
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -96,6 +98,11 @@ func (s) TestWrrLocality(t *testing.T) {
defer func() {
envconfig.XDSCustomLBPolicy = oldCustomLBSupport
}()
oldLeastRequestLBSupport := envconfig.LeastRequestLB
envconfig.LeastRequestLB = true
defer func() {
envconfig.LeastRequestLB = oldLeastRequestLBSupport
}()

backend1 := stubserver.StartTestService(t, nil)
port1 := testutils.ParsePort(t, backend1.Address)
Expand Down Expand Up @@ -194,12 +201,33 @@ func (s) TestWrrLocality(t *testing.T) {
{addr: backend5.Address, count: 8},
},
},
{
name: "custom_lb_least_request",
wrrLocalityConfiguration: wrrLocality(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(2),
}),
// The test performs a Unary RPC, and blocks until the RPC returns,
// and then makes the next Unary RPC. Thus, over iterations, no RPC
// counts are present. This causes least request's randomness of
// indexes to sample to converge onto a round robin distribution per
// locality. Thus, expect the same distribution as round robin
// above.
addressDistributionWant: []struct {
addr string
count int
}{
{addr: backend1.Address, count: 6},
{addr: backend2.Address, count: 6},
{addr: backend3.Address, count: 8},
{addr: backend4.Address, count: 8},
{addr: backend5.Address, count: 8},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

routeConfigName := "route-" + serviceName
clusterName := "cluster-" + serviceName
endpointsName := "endpoints-" + serviceName
Expand Down
1 change: 1 addition & 0 deletions xds/internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package balancer

import (
_ "google.golang.org/grpc/balancer/leastrequest" // Register the least_request_experimental balancer
_ "google.golang.org/grpc/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
Expand Down
31 changes: 29 additions & 2 deletions xds/internal/xdsclient/xdslbregistry/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/leastrequest"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/internal/envconfig"
Expand All @@ -41,6 +42,7 @@ import (
v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3"
v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
Expand All @@ -53,13 +55,15 @@ func init() {
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst", convertPickFirstProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin", convertRoundRobinProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality", convertWRRLocalityProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest", convertLeastRequestProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/udpa.type.v1.TypedStruct", convertV1TypedStructToServiceConfig)
xdslbregistry.Register("type.googleapis.com/xds.type.v3.TypedStruct", convertV3TypedStructToServiceConfig)
}

const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
defaultLeastRequestChoiceCount = 2
)

func convertRingHashProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
Expand Down Expand Up @@ -177,6 +181,29 @@ func convertWeightedRoundRobinProtoToServiceConfig(rawProto []byte, _ int) (json
return makeBalancerConfigJSON(weightedroundrobin.Name, lbCfgJSON), nil
}

func convertLeastRequestProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
if !envconfig.LeastRequestLB {
return nil, nil
}
lrProto := &v3leastrequestpb.LeastRequest{}
if err := proto.Unmarshal(rawProto, lrProto); err != nil {
return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
}
// "The configuration for the Least Request LB policy is the
// least_request_lb_config field. The field is optional; if not present,
// defaults will be assumed for all of its values." - A48
choiceCount := uint32(defaultLeastRequestChoiceCount)
if cc := lrProto.GetChoiceCount(); cc != nil {
choiceCount = cc.GetValue()
}
lrCfg := &leastrequest.LBConfig{ChoiceCount: choiceCount}
js, err := json.Marshal(lrCfg)
if err != nil {
return nil, fmt.Errorf("error marshaling JSON for type %T: %v", lrCfg, err)
}
return makeBalancerConfigJSON(leastrequest.Name, js), nil
}

func convertV1TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
tsProto := &v1xdsudpatypepb.TypedStruct{}
if err := proto.Unmarshal(rawProto, tsProto); err != nil {
Expand Down
37 changes: 30 additions & 7 deletions xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func wrrLocalityBalancerConfig(childPolicy *internalserviceconfig.BalancerConfig
}

func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = false

const customLBPolicyName = "myorg.MyCustomLeastRequestPolicy"
stub.Register(customLBPolicyName, stub.BalancerFuncs{})

Expand All @@ -78,6 +81,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
wantConfig string // JSON config
rhDisabled bool
pfDisabled bool
lrEnabled bool
}{
{
name: "ring_hash",
Expand All @@ -96,6 +100,22 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
wantConfig: `[{"ring_hash_experimental": { "minRingSize": 10, "maxRingSize": 100 }}]`,
},
{
name: "least_request",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(3),
}),
},
},
},
},
wantConfig: `[{"least_request_experimental": { "choiceCount": 3 }}]`,
lrEnabled: true,
},
{
name: "pick_first_shuffle",
policy: &v3clusterpb.LoadBalancingPolicy{
Expand Down Expand Up @@ -183,7 +203,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
rhDisabled: true,
},
{
name: "pick_first_disabled_pf_rr_use_first_supported",
name: "pick_first_enabled_pf_rr_use_pick_first",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
Expand All @@ -200,17 +220,16 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
},
},
wantConfig: `[{"round_robin": {}}]`,
pfDisabled: true,
wantConfig: `[{"pick_first": { "shuffleAddressList": true }}]`,
},
{
name: "pick_first_enabled_pf_rr_use_pick_first",
name: "least_request_disabled_pf_rr_use_first_supported",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3pickfirstpb.PickFirst{
ShuffleAddressList: true,
TypedConfig: testutils.MarshalAny(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(32),
}),
},
},
Expand All @@ -221,7 +240,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
},
},
wantConfig: `[{"pick_first": { "shuffleAddressList": true }}]`,
wantConfig: `[{"round_robin": {}}]`,
},
{
name: "custom_lb_type_v3_struct",
Expand Down Expand Up @@ -317,6 +336,10 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
defer func(old bool) { envconfig.XDSRingHash = old }(envconfig.XDSRingHash)
envconfig.XDSRingHash = false
}
if test.lrEnabled {
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = true
}
if test.pfDisabled {
defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig)
envconfig.PickFirstLBConfig = false
Expand Down
58 changes: 58 additions & 0 deletions xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer/leastrequest"
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -103,6 +104,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
defer func() {
envconfig.XDSCustomLBPolicy = origCustomLBSupport
}()
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = true
easwars marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
name string
cluster *v3clusterpb.Cluster
Expand Down Expand Up @@ -330,6 +333,31 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
},
{
name: "happiest-case-with-least-request-lb-policy-with-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Config: &leastrequest.LBConfig{
ChoiceCount: 2,
},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Expand Down Expand Up @@ -367,6 +395,36 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
},
{
name: "happiest-case-with-least-request-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
LbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig_{
LeastRequestLbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig{
ChoiceCount: wrapperspb.UInt32(3),
},
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Config: &leastrequest.LBConfig{
ChoiceCount: 3,
},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-configured-through-LoadBalancingPolicy",
cluster: &v3clusterpb.Cluster{
Expand Down
22 changes: 22 additions & 0 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
ringHashSizeUpperBound = 8 * 1024 * 1024 // 8M

defaultLeastRequestChoiceCount = 2
easwars marked this conversation as resolved.
Show resolved Hide resolved
)

func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
Expand Down Expand Up @@ -104,6 +106,26 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu

rhLBCfg := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize))
lbPolicy = []byte(fmt.Sprintf(`[{"ring_hash_experimental": %s}]`, rhLBCfg))
case v3clusterpb.Cluster_LEAST_REQUEST:
if !envconfig.LeastRequestLB {
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}

// "The configuration for the Least Request LB policy is the
// least_request_lb_config field. The field is optional; if not present,
// defaults will be assumed for all of its values." - A48
lr := cluster.GetLeastRequestLbConfig()
var choiceCount uint32 = defaultLeastRequestChoiceCount
if cc := lr.GetChoiceCount(); cc != nil {
choiceCount = cc.GetValue()
}
// "If choice_count < 2, the config will be rejected." - A48
if choiceCount < 2 {
return ClusterUpdate{}, fmt.Errorf("Cluster_LeastRequestLbConfig.ChoiceCount must be >= 2, got: %v", choiceCount)
}

lrLBCfg := []byte(fmt.Sprintf("{\"choiceCount\": %d}", choiceCount))
lbPolicy = []byte(fmt.Sprintf(`[{"least_request_experimental": %s}]`, lrLBCfg))
default:
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
Expand Down
15 changes: 14 additions & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "least-request-choice-count-less-than-two",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig_{
LeastRequestLbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig{
ChoiceCount: wrapperspb.UInt32(1),
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "ring-hash-max-bound-greater-than-upper-bound",
cluster: &v3clusterpb.Cluster{
Expand Down Expand Up @@ -205,7 +218,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantErr: true,
},
{
name: "least-request-unsupported-in-converter",
name: "least-request-unsupported-in-converter-since-env-var-unset",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
Expand Down