Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add support for pickfirst address shuffling from gRFC A62 #6311

Merged
merged 3 commits into from May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion balancer_conn_wrappers.go
Expand Up @@ -127,7 +127,11 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
// We get here only if the above call to Schedule succeeds, in which case it
// is guaranteed that the scheduled function will run. Therefore it is safe
// to block on this channel.
return <-errCh
err := <-errCh
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}
return err
}

// updateSubConnState is invoked by grpc to push a subConn state update to the
Expand Down
7 changes: 7 additions & 0 deletions internal/grpcrand/grpcrand.go
Expand Up @@ -79,3 +79,10 @@ func Uint32() uint32 {
defer mu.Unlock()
return r.Uint32()
}

// Shuffle implements rand.Shuffle on the grpcrand global source.
var Shuffle = func(n int, f func(int, int)) {
mu.Lock()
defer mu.Unlock()
r.Shuffle(n, f)
}
39 changes: 36 additions & 3 deletions pickfirst.go
Expand Up @@ -19,11 +19,14 @@
package grpc

import (
"encoding/json"
"errors"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/serviceconfig"
)

// PickFirstBalancerName is the name of the pick_first balancer.
Expand All @@ -43,10 +46,28 @@ func (*pickfirstBuilder) Name() string {
return PickFirstBalancerName
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

// If set to true, instructs the LB policy to shuffle the order of the list
// of addresses received from the name resolver before attempting to
// connect to them.
ShuffleAddressList bool `json:"shuffleAddressList"`
}

func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &pfConfig{}
if err := json.Unmarshal(js, cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
}
return cfg, nil
}

type pickfirstBalancer struct {
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
cfg *pfConfig
}

func (b *pickfirstBalancer) ResolverError(err error) {
Expand All @@ -69,7 +90,8 @@ func (b *pickfirstBalancer) ResolverError(err error) {
}

func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if len(state.ResolverState.Addresses) == 0 {
addrs := state.ResolverState.Addresses
if len(addrs) == 0 {
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
if b.subConn != nil {
Expand All @@ -82,12 +104,23 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return balancer.ErrBadResolverState
}

if state.BalancerConfig != nil {
cfg, ok := state.BalancerConfig.(*pfConfig)
if !ok {
return fmt.Errorf("pickfirstBalancer: received nil or illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
}
b.cfg = cfg
}

if b.cfg != nil && b.cfg.ShuffleAddressList {
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
if b.subConn != nil {
b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
b.cc.UpdateAddresses(b.subConn, addrs)
return nil
}

subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
Expand Down
52 changes: 52 additions & 0 deletions test/pickfirst_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/pickfirst"
Expand Down Expand Up @@ -379,3 +380,54 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
}()
wg.Wait()
}

func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`

// Install a shuffler that always reverses two entries.
origShuf := grpcrand.Shuffle
defer func() { grpcrand.Shuffle = origShuf }()
grpcrand.Shuffle = func(n int, f func(int, int)) {
if n != 2 {
t.Errorf("Shuffle called with n=%v; want 2", n)
}
f(0, 1) // reverse the two addresses
}

// Set up our backends.
cc, r, backends := setupPickFirst(t, 2)
addrs := stubBackendsToResolverAddrs(backends)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Push an update with both addresses and shuffling disabled. We should
// connect to backend 0.
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a config with shuffling enabled. This will reverse the addresses,
// but the channel should still be connected to backend 0.
shufState := resolver.State{
ServiceConfig: parseServiceConfig(t, r, serviceConfig),
Addresses: []resolver.Address{addrs[0], addrs[1]},
}
r.UpdateState(shufState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel
// into TransientFailure.
r.UpdateState(resolver.State{})
awaitState(ctx, t, cc, connectivity.TransientFailure)

// Send the same config as last time with shuffling enabled. Since we are
// not connected to backend 0, we should connect to backend 1.
r.UpdateState(shufState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions test/resolver_update_test.go
Expand Up @@ -174,6 +174,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
}
bal := bd.Data.(balancer.Balancer)
ccUpdateCh.Send(ccs)
ccs.BalancerConfig = nil
return bal.UpdateClientConnState(ccs)
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
Expand Down