Skip to content

Commit

Permalink
resolver: remove deprecated AddressType (#6451)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jul 17, 2023
1 parent 919fe35 commit 02946a3
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 303 deletions.
14 changes: 3 additions & 11 deletions balancer/grpclb/grpclb.go
Expand Up @@ -448,17 +448,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

addrs := ccs.ResolverState.Addresses

var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
a.Type = resolver.Backend
remoteBalancerAddrs = append(remoteBalancerAddrs, a)
} else {
backendAddrs = append(backendAddrs, a)
}
}
backendAddrs := ccs.ResolverState.Addresses

var remoteBalancerAddrs []resolver.Address
if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
// Override any balancer addresses provided via
// ccs.ResolverState.Addresses.
Expand Down
6 changes: 3 additions & 3 deletions balancer/grpclb/grpclb_test.go
Expand Up @@ -1430,11 +1430,11 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,
}
defer cc.Close()

r.UpdateState(resolver.State{Addresses: []resolver.Address{{
rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
r.UpdateState(grpclbstate.Set(rstate, &grpclbstate.State{BalancerAddresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}}})
}}}))

runRPCs(cc)
end := time.Now().Add(time.Second)
Expand Down
14 changes: 0 additions & 14 deletions balancer_conn_wrappers.go
Expand Up @@ -99,20 +99,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
// If the addresses specified in the update contain addresses of type
// "grpclb" and the selected LB policy is not "grpclb", these addresses
// will be filtered out and ccs will be modified with the updated
// address list.
if ccb.curBalancerName != grpclbName {
var addrs []resolver.Address
for _, addr := range ccs.ResolverState.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
ccs.ResolverState.Addresses = addrs
}
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
})
if !ok {
Expand Down
24 changes: 6 additions & 18 deletions clientconn.go
Expand Up @@ -54,8 +54,6 @@ import (
const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
// must match grpclbName in grpclb/grpclb.go
grpclbName = "grpclb"
)

var (
Expand Down Expand Up @@ -1153,23 +1151,13 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
}

var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
// No service config or no LB policy specified in config.
newBalancerName = PickFirstBalancerName
} else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
} else { // cc.sc.LB != nil
newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}
Expand Down
2 changes: 2 additions & 0 deletions pickfirst.go
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -114,6 +115,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
}

if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
if b.subConn != nil {
Expand Down
29 changes: 1 addition & 28 deletions resolver/resolver.go
Expand Up @@ -77,25 +77,6 @@ func GetDefaultScheme() string {
return defaultScheme
}

// AddressType indicates the address type returned by name resolution.
//
// Deprecated: use Attributes in Address instead.
type AddressType uint8

const (
// Backend indicates the address is for a backend server.
//
// Deprecated: use Attributes in Address instead.
Backend AddressType = iota
// GRPCLB indicates the address is for a grpclb load balancer.
//
// Deprecated: to select the GRPCLB load balancing policy, use a service
// config with a corresponding loadBalancingConfig. To supply balancer
// addresses to the GRPCLB load balancing policy, set State.Attributes
// using balancer/grpclb/state.Set.
GRPCLB
)

// Address represents a server the client connects to.
//
// # Experimental
Expand All @@ -111,9 +92,6 @@ type Address struct {
// the address, instead of the hostname from the Dial target string. In most cases,
// this should not be set.
//
// If Type is GRPCLB, ServerName should be the name of the remote load
// balancer, not the name of the backend.
//
// WARNING: ServerName must only be populated with trusted values. It
// is insecure to populate it with data from untrusted inputs since untrusted
// values could be used to bypass the authority checks performed by TLS.
Expand All @@ -128,11 +106,6 @@ type Address struct {
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes

// Type is the type of this address.
//
// Deprecated: use Attributes instead.
Type AddressType

// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
//
Expand All @@ -150,7 +123,7 @@ func (a Address) Equal(o Address) bool {
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
a.Attributes.Equal(o.Attributes) &&
a.BalancerAttributes.Equal(o.BalancerAttributes) &&
a.Type == o.Type && a.Metadata == o.Metadata
a.Metadata == o.Metadata
}

// String returns JSON formatted string representation of the address.
Expand Down
160 changes: 30 additions & 130 deletions test/balancer_switching_test.go
Expand Up @@ -25,6 +25,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
Expand Down Expand Up @@ -168,10 +169,13 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
}
defer cc.Close()

// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
// Push a resolver update with a GRPCLB service config and a single address
// pointing to the grpclb server we created above. This will cause the
// channel to switch to the "grpclb" balancer, which returns a single
// backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
Expand All @@ -182,7 +186,7 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: nonExistentServer}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
Expand All @@ -192,7 +196,8 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// list of addresses pushed as part of this update is different from the one
// returned by the "grpclb" balancer. So, we should see RPCs going to the
// newly configured backends, as part of the balancer switch.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -225,21 +230,24 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "nonExistentServer", Type: resolver.GRPCLB}}})
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "nonExistentServer"}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Switch to "pick_first" again by sending no grpclb server addresses.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -284,13 +292,13 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
t.Fatal(err)
}

// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}},
ServiceConfig: scpr,
})
// Push a resolver update with grpclb and a single balancer address
// pointing to the grpclb server we created above. This will cause the
// channel to switch to the "grpclb" balancer, which returns a single
// backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -329,12 +337,13 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
// fallback to the default LB policy which is pick_first. The ClientConn is
// also expected to filter out the grpclb address when sending the addresses
// list fo pick_first.
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}}
addrs = append(grpclbAddr, addrs...)
r.UpdateState(resolver.State{Addresses: addrs})
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address"}}
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig, Addresses: addrs}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: grpclbAddr}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

Expand All @@ -346,116 +355,7 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}

// TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy verifies that
// if the resolver update contains any addresses of type "grpclb", it overrides
// the LB policy specifies in the deprecated `loadBalancingPolicy` field of the
// service config.
func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()

addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}

// Push a resolver update with no service config. The addresses list contains
// the stub backend addresses and a single address pointing to the grpclb
// server we created above. This will cause the channel to switch to the
// "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Push a resolver update with a service config using the deprecated
// `loadBalancingPolicy` field pointing to round_robin. The addresses list
// contains an address of type "grpclb". This should be preferred and hence
// there should be no balancer switch.
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
ServiceConfig: scpr,
})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Switch to "round_robin" by removing the address of type "grpclb".
r.UpdateState(resolver.State{Addresses: addrs[1:]})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}

// TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the
// `loadBalancingConfig` field in the service config trumps over addresses of
// type "grpclb" when it comes to deciding which LB policy is applied on the
// channel.
func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()

addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Push a resolver update with the service config specifying "round_robin"
// through the recommended `loadBalancingConfig` field.
r.UpdateState(resolver.State{
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}

// Push a resolver update with no service config and an address of type
// "grpclb". The ClientConn should continue to use the service config received
// earlier, which specified the use of "round_robin" through the
// `loadBalancingConfig` field, and therefore the balancer should not be
// switched. And because the `loadBalancingConfig` field trumps everything
// else, the address of type "grpclb" should be ignored.
grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}
r.UpdateState(resolver.State{Addresses: append(addrs[1:], grpclbAddr)})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
}
Expand Down

0 comments on commit 02946a3

Please sign in to comment.