Skip to content

Commit

Permalink
client: add support for pickfirst address shuffling from gRFC A62 (#6311
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dfawley committed May 24, 2023
1 parent a6e1acf commit 59134c3
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 4 deletions.
6 changes: 5 additions & 1 deletion balancer_conn_wrappers.go
Expand Up @@ -127,7 +127,11 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
// We get here only if the above call to Schedule succeeds, in which case it
// is guaranteed that the scheduled function will run. Therefore it is safe
// to block on this channel.
return <-errCh
err := <-errCh
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}
return err
}

// updateSubConnState is invoked by grpc to push a subConn state update to the
Expand Down
7 changes: 7 additions & 0 deletions internal/grpcrand/grpcrand.go
Expand Up @@ -79,3 +79,10 @@ func Uint32() uint32 {
defer mu.Unlock()
return r.Uint32()
}

// Shuffle implements rand.Shuffle on the grpcrand global source.
var Shuffle = func(n int, f func(int, int)) {
mu.Lock()
defer mu.Unlock()
r.Shuffle(n, f)
}
39 changes: 36 additions & 3 deletions pickfirst.go
Expand Up @@ -19,11 +19,14 @@
package grpc

import (
"encoding/json"
"errors"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/serviceconfig"
)

// PickFirstBalancerName is the name of the pick_first balancer.
Expand All @@ -43,10 +46,28 @@ func (*pickfirstBuilder) Name() string {
return PickFirstBalancerName
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

// If set to true, instructs the LB policy to shuffle the order of the list
// of addresses received from the name resolver before attempting to
// connect to them.
ShuffleAddressList bool `json:"shuffleAddressList"`
}

func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &pfConfig{}
if err := json.Unmarshal(js, cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
}
return cfg, nil
}

type pickfirstBalancer struct {
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
cfg *pfConfig
}

func (b *pickfirstBalancer) ResolverError(err error) {
Expand All @@ -69,7 +90,8 @@ func (b *pickfirstBalancer) ResolverError(err error) {
}

func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if len(state.ResolverState.Addresses) == 0 {
addrs := state.ResolverState.Addresses
if len(addrs) == 0 {
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
if b.subConn != nil {
Expand All @@ -82,12 +104,23 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return balancer.ErrBadResolverState
}

if state.BalancerConfig != nil {
cfg, ok := state.BalancerConfig.(*pfConfig)
if !ok {
return fmt.Errorf("pickfirstBalancer: received nil or illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
}
b.cfg = cfg
}

if b.cfg != nil && b.cfg.ShuffleAddressList {
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
if b.subConn != nil {
b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
b.cc.UpdateAddresses(b.subConn, addrs)
return nil
}

subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
Expand Down
52 changes: 52 additions & 0 deletions test/pickfirst_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/pickfirst"
Expand Down Expand Up @@ -379,3 +380,54 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
}()
wg.Wait()
}

func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`

// Install a shuffler that always reverses two entries.
origShuf := grpcrand.Shuffle
defer func() { grpcrand.Shuffle = origShuf }()
grpcrand.Shuffle = func(n int, f func(int, int)) {
if n != 2 {
t.Errorf("Shuffle called with n=%v; want 2", n)
}
f(0, 1) // reverse the two addresses
}

// Set up our backends.
cc, r, backends := setupPickFirst(t, 2)
addrs := stubBackendsToResolverAddrs(backends)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Push an update with both addresses and shuffling disabled. We should
// connect to backend 0.
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a config with shuffling enabled. This will reverse the addresses,
// but the channel should still be connected to backend 0.
shufState := resolver.State{
ServiceConfig: parseServiceConfig(t, r, serviceConfig),
Addresses: []resolver.Address{addrs[0], addrs[1]},
}
r.UpdateState(shufState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel
// into TransientFailure.
r.UpdateState(resolver.State{})
awaitState(ctx, t, cc, connectivity.TransientFailure)

// Send the same config as last time with shuffling enabled. Since we are
// not connected to backend 0, we should connect to backend 1.
r.UpdateState(shufState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions test/resolver_update_test.go
Expand Up @@ -174,6 +174,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
}
bal := bd.Data.(balancer.Balancer)
ccUpdateCh.Send(ccs)
ccs.BalancerConfig = nil
return bal.UpdateClientConnState(ccs)
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
Expand Down

0 comments on commit 59134c3

Please sign in to comment.