Skip to content

Commit

Permalink
envconfig knob
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 24, 2023
1 parent e13e8c5 commit 1f3f8d6
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 5 deletions.
4 changes: 4 additions & 0 deletions internal/envconfig/envconfig.go
Expand Up @@ -36,6 +36,10 @@ var (
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
// PickFirstLBConfig is set if we should support configuration of the
// pick_first LB policy, which can be enabled by setting the environment
// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true".
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false)
)

func boolFromEnv(envVar string, def bool) bool {
Expand Down
3 changes: 2 additions & 1 deletion pickfirst.go
Expand Up @@ -25,6 +25,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -112,7 +113,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
b.cfg = cfg
}

if b.cfg != nil && b.cfg.ShuffleAddressList {
if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList {
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
if b.subConn != nil {
Expand Down
58 changes: 58 additions & 0 deletions test/pickfirst_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -382,6 +383,8 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
}

func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig)
envconfig.PickFirstLBConfig = true
const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`

// Install a shuffler that always reverses two entries.
Expand Down Expand Up @@ -431,3 +434,58 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
t.Fatal(err)
}
}

func (s) TestPickFirst_ShuffleAddressListDisabled(t *testing.T) {
defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig)
envconfig.PickFirstLBConfig = false
const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`

// Install a shuffler that always reverses two entries.
origShuf := grpcrand.Shuffle
defer func() { grpcrand.Shuffle = origShuf }()
grpcrand.Shuffle = func(n int, f func(int, int)) {
if n != 2 {
t.Errorf("Shuffle called with n=%v; want 2", n)
}
f(0, 1) // reverse the two addresses
}

// Set up our backends.
cc, r, backends := setupPickFirst(t, 2)
addrs := stubBackendsToResolverAddrs(backends)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Push an update with both addresses and shuffling disabled. We should
// connect to backend 0.
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a config with shuffling enabled. This will reverse the addresses,
// but the channel should still be connected to backend 0.
shufState := resolver.State{
ServiceConfig: parseServiceConfig(t, r, serviceConfig),
Addresses: []resolver.Address{addrs[0], addrs[1]},
}
r.UpdateState(shufState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel
// into TransientFailure.
r.UpdateState(resolver.State{})
awaitState(ctx, t, cc, connectivity.TransientFailure)

// Send the same config as last time with shuffling enabled. Since we are
// not connected to backend 0, we should connect to backend 1 if shuffling
// is supported. However with it disabled at the start of the test, we
// will connect to backend 0 instead.
r.UpdateState(shufState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
}
3 changes: 3 additions & 0 deletions xds/internal/xdsclient/xdslbregistry/converter/converter.go
Expand Up @@ -97,6 +97,9 @@ type pfConfig struct {
}

func convertPickFirstProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
if !envconfig.PickFirstLBConfig {
return nil, nil
}
pfProto := &v3pickfirstpb.PickFirst{}
if err := proto.Unmarshal(rawProto, pfProto); err != nil {
return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
Expand Down
31 changes: 27 additions & 4 deletions xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go
Expand Up @@ -86,6 +86,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
policy *v3clusterpb.LoadBalancingPolicy
wantConfig string // JSON config
rhDisabled bool
pfDisabled bool
}{
{
name: "ring_hash",
Expand Down Expand Up @@ -177,6 +178,27 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
wantConfig: `[{"round_robin": {}}]`,
rhDisabled: true,
},
{
name: "pick_first_disabled_pf_rr_use_first_supported",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3pickfirstpb.PickFirst{
ShuffleAddressList: true,
}),
},
},
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3roundrobinpb.RoundRobin{}),
},
},
},
},
wantConfig: `[{"round_robin": {}}]`,
pfDisabled: true,
},
{
name: "custom_lb_type_v3_struct",
policy: &v3clusterpb.LoadBalancingPolicy{
Expand Down Expand Up @@ -268,11 +290,12 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.rhDisabled {
oldRingHashSupport := envconfig.XDSRingHash
defer func(old bool) { envconfig.XDSRingHash = old }(envconfig.XDSRingHash)
envconfig.XDSRingHash = false
defer func() {
envconfig.XDSRingHash = oldRingHashSupport
}()
}
if !test.pfDisabled {
defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig)
envconfig.PickFirstLBConfig = true
}
rawJSON, err := xdslbregistry.ConvertToServiceConfig(test.policy, 0)
if err != nil {
Expand Down

0 comments on commit 1f3f8d6

Please sign in to comment.