Skip to content

Commit

Permalink
xds/internal/xdsclient: Custom LB xDS Client Changes (#6165)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Apr 26, 2023
1 parent 8628e07 commit eff0942
Show file tree
Hide file tree
Showing 13 changed files with 1,452 additions and 261 deletions.
13 changes: 8 additions & 5 deletions internal/envconfig/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ var (
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true)
// XDSAggregateAndDNS indicates whether processing of aggregated cluster
// and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
// XDSAggregateAndDNS indicates whether processing of aggregated cluster and
// DNS cluster is enabled, which can be disabled by setting the environment
// variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
// to "false".
XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)

// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
Expand All @@ -89,4 +88,8 @@ var (

// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
// XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which
// can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "true".
XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", false)
)
11 changes: 9 additions & 2 deletions xds/internal/balancer/ringhash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,22 @@ type LBConfig struct {
}

const (
defaultMinSize = 1024
defaultMaxSize = 4096
defaultMinSize = 1024
defaultMaxSize = 4096
ringHashSizeUpperBound = 8 * 1024 * 1024 // 8M
)

func parseConfig(c json.RawMessage) (*LBConfig, error) {
var cfg LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
if cfg.MinRingSize > ringHashSizeUpperBound {
return nil, fmt.Errorf("min_ring_size value of %d is greater than max supported value %d for this field", cfg.MinRingSize, ringHashSizeUpperBound)
}
if cfg.MaxRingSize > ringHashSizeUpperBound {
return nil, fmt.Errorf("max_ring_size value of %d is greater than max supported value %d for this field", cfg.MaxRingSize, ringHashSizeUpperBound)
}
if cfg.MinRingSize == 0 {
cfg.MinRingSize = defaultMinSize
}
Expand Down
12 changes: 12 additions & 0 deletions xds/internal/balancer/ringhash/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ func (s) TestParseConfig(t *testing.T) {
envConfigCap: 8000,
want: &LBConfig{MinRingSize: 8000, MaxRingSize: 8000},
},
{
name: "min greater than upper bound",
js: `{"minRingSize": 8388610, "maxRingSize": 10}`,
want: nil,
wantErr: true,
},
{
name: "max greater than upper bound",
js: `{"minRingSize": 10, "maxRingSize": 8388610}`,
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
68 changes: 68 additions & 0 deletions xds/internal/balancer/wrrlocality/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package wrrlocality provides an implementation of the wrr locality LB policy,
// as defined in [A52 - xDS Custom LB Policies].
//
// [A52 - xDS Custom LB Policies]: https://github.com/grpc/proposal/blob/master/A52-xds-custom-lb-policies.md
package wrrlocality

import (
"encoding/json"
"errors"
"fmt"

"google.golang.org/grpc/balancer"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
)

// Name is the name of wrr_locality balancer.
const Name = "xds_wrr_locality_experimental"

func init() {
balancer.Register(bb{})
}

type bb struct{}

func (bb) Name() string {
return Name
}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
return nil
}

// LBConfig is the config for the wrr locality balancer.
type LBConfig struct {
serviceconfig.LoadBalancingConfig
// ChildPolicy is the config for the child policy.
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}

func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var lbCfg *LBConfig
if err := json.Unmarshal(s, &lbCfg); err != nil {
return nil, fmt.Errorf("xds: invalid LBConfig for wrrlocality: %s, error: %v", string(s), err)
}
if lbCfg == nil || lbCfg.ChildPolicy == nil {
return nil, errors.New("xds: invalidw LBConfig for wrrlocality: child policy field must be set")
}
return lbCfg, nil
}
121 changes: 121 additions & 0 deletions xds/internal/balancer/wrrlocality/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package wrrlocality

import (
"encoding/json"
"errors"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func (s) TestParseConfig(t *testing.T) {
const errParseConfigName = "errParseConfigBalancer"
stub.Register(errParseConfigName, stub.BalancerFuncs{
ParseConfig: func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return nil, errors.New("some error")
},
})

parser := bb{}
tests := []struct {
name string
input string
wantCfg serviceconfig.LoadBalancingConfig
wantErr string
}{
{
name: "happy-case-round robin-child",
input: `{"childPolicy": [{"round_robin": {}}]}`,
wantCfg: &LBConfig{
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
},
{
name: "invalid-json",
input: "{{invalidjson{{",
wantErr: "invalid character",
},

{
name: "child-policy-field-isn't-set",
input: `{}`,
wantErr: "child policy field must be set",
},
{
name: "child-policy-type-is-empty",
input: `{"childPolicy": []}`,
wantErr: "invalid loadBalancingConfig: no supported policies found in []",
},
{
name: "child-policy-empty-config",
input: `{"childPolicy": [{"": {}}]}`,
wantErr: "invalid loadBalancingConfig: no supported policies found in []",
},
{
name: "child-policy-type-isn't-registered",
input: `{"childPolicy": [{"doesNotExistBalancer": {"cluster": "test_cluster"}}]}`,
wantErr: "invalid loadBalancingConfig: no supported policies found in [doesNotExistBalancer]",
},
{
name: "child-policy-config-is-invalid",
input: `{"childPolicy": [{"errParseConfigBalancer": {"cluster": "test_cluster"}}]}`,
wantErr: "error parsing loadBalancingConfig for policy \"errParseConfigBalancer\"",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
// Substring match makes this very tightly coupled to the
// internalserviceconfig.BalancerConfig error strings. However, it
// is important to distinguish the different types of error messages
// possible as the parser has a few defined buckets of ways it can
// error out.
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
}
if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
}
if test.wantErr != "" {
return
}
if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff)
}
})
}
}
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/tests/cds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw")}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicyJSON")}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the cluster resource update: (-want, got):\n%s", diff)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/tests/resource_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
}
cmpOpts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw"),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicyJSON"),
}
if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" {
t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff)
Expand Down

0 comments on commit eff0942

Please sign in to comment.