Skip to content

Commit

Permalink
xds: Add support for Custom LB Policies (#6224)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed May 9, 2023
1 parent 5c4bee5 commit 5e58734
Show file tree
Hide file tree
Showing 32 changed files with 955 additions and 725 deletions.
29 changes: 29 additions & 0 deletions attributes/attributes.go
Expand Up @@ -25,6 +25,11 @@
// later release.
package attributes

import (
"fmt"
"strings"
)

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
Expand Down Expand Up @@ -99,3 +104,27 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}

// String prints the attribute map. If any key or values throughout the map
// implement fmt.Stringer, it calls that method and appends.
func (a *Attributes) String() string {
var sb strings.Builder
sb.WriteString("{")
first := true
for k, v := range a.m {
var key, val string
if str, ok := k.(interface{ String() string }); ok {
key = str.String()
}
if str, ok := v.(interface{ String() string }); ok {
val = str.String()
}
if !first {
sb.WriteString(", ")
}
sb.WriteString(fmt.Sprintf("%q: %q, ", key, val))
first = false
}
sb.WriteString("}")
return sb.String()
}
6 changes: 6 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Expand Up @@ -28,6 +28,8 @@
package weightedroundrobin

import (
"fmt"

"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -61,3 +63,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
ai, _ := v.(AddrInfo)
return ai
}

func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
8 changes: 8 additions & 0 deletions balancer/weightedtarget/weightedaggregator/aggregator.go
Expand Up @@ -178,6 +178,14 @@ func (wbsa *Aggregator) ResumeStateUpdates() {
}
}

// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
// picker update be sent once ResumeStateUpdates is called.
func (wbsa *Aggregator) NeedUpdateStateOnResume() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.needUpdateStateOnResume = true
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
Expand Down
12 changes: 12 additions & 0 deletions balancer/weightedtarget/weightedtarget.go
Expand Up @@ -143,6 +143,18 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat

b.targets = newConfig.Targets

// If the targets length is zero, it means we have removed all child
// policies from the balancer group and aggregator.
// At the start of this UpdateClientConnState() operation, a call to
// b.stateAggregator.ResumeStateUpdates() is deferred. Thus, setting the
// needUpdateStateOnResume bool to true here will ensure a new picker is
// built as part of that deferred function. Since there are now no child
// policies, the aggregated connectivity state reported form the Aggregator
// will be TRANSIENT_FAILURE.
if len(b.targets) == 0 {
b.stateAggregator.NeedUpdateStateOnResume()
}

return nil
}

Expand Down
21 changes: 20 additions & 1 deletion balancer/weightedtarget/weightedtarget_test.go
Expand Up @@ -166,7 +166,8 @@ func init() {
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer. This test is intended to test the
// glue code in weighted_target.
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
Expand Down Expand Up @@ -306,6 +307,24 @@ func (s) TestWeightedTarget(t *testing.T) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
// Update the Weighted Target Balancer with an empty address list and no
// targets. This should cause a Transient Failure State update to the Client
// Conn.
emptyConfig, err := wtbParser.ParseConfig([]byte(`{}`))
if err != nil {
t.Fatalf("Failed to parse balancer config: %v", err)
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{},
BalancerConfig: emptyConfig,
}); err != nil {
t.Fatalf("Failed to update ClientConn state: %v", err)
}

state := <-cc.NewStateCh
if state != connectivity.TransientFailure {
t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state)
}
}

// TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we
Expand Down
63 changes: 43 additions & 20 deletions internal/testutils/xds/e2e/clientresources.go
Expand Up @@ -524,6 +524,14 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster {
return cluster
}

// LocalityOptions contains options to configure a Locality.
type LocalityOptions struct {
// Ports is a set of ports on "localhost" belonging to this locality.
Ports []uint32
// Weight is the weight of the locality, used for load balancing.
Weight uint32
}

// EndpointOptions contains options to configure an Endpoint (or
// ClusterLoadAssignment) resource.
type EndpointOptions struct {
Expand All @@ -533,9 +541,8 @@ type EndpointOptions struct {
// Host is the hostname of the endpoints. In our e2e tests, hostname must
// always be "localhost".
Host string
// Ports is a set of ports on "localhost" where the endpoints corresponding
// to this resource reside.
Ports []uint32
// Localities is a set of localities belonging to this resource.
Localities []LocalityOptions
// DropPercents is a map from drop category to a drop percentage. If unset,
// no drops are configured.
DropPercents map[string]int
Expand All @@ -546,34 +553,50 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Host: host,
Ports: ports,
Localities: []LocalityOptions{
{
Ports: ports,
Weight: 1,
},
},
})
}

// EndpointResourceWithOptions returns an xds Endpoint resource configured with
// the provided options.
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range opts.Ports {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range locality.Ports {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}

endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
Region: fmt.Sprintf("region-%d", i+1),
Zone: fmt.Sprintf("zone-%d", i+1),
SubZone: fmt.Sprintf("subzone-%d", i+1),
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight},
Priority: 0,
})
}

cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},
Endpoints: endpoints,
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
Expand Down
16 changes: 13 additions & 3 deletions resolver/resolver.go
Expand Up @@ -22,13 +22,13 @@ package resolver

import (
"context"
"fmt"
"net"
"net/url"
"strings"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes

// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes

Expand All @@ -151,7 +151,17 @@ func (a Address) Equal(o Address) bool {

// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{Addr: %q, ", a.Addr))
sb.WriteString(fmt.Sprintf("ServerName: %q, ", a.ServerName))
if a.Attributes != nil {
sb.WriteString(fmt.Sprintf("Attributes: %v, ", a.Attributes.String()))
}
if a.BalancerAttributes != nil {
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v", a.BalancerAttributes.String()))
}
sb.WriteString("}")
return sb.String()
}

// BuildOptions includes additional information for the builder to create
Expand Down

0 comments on commit 5e58734

Please sign in to comment.