Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add support for Custom LB Policies #6224

Merged
merged 11 commits into from May 9, 2023
37 changes: 37 additions & 0 deletions attributes/attributes.go
Expand Up @@ -25,6 +25,11 @@
// later release.
package attributes

import (
"fmt"
"strings"
)

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
Expand Down Expand Up @@ -99,3 +104,35 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}

// String prints the attribute map. If any key or values throughout the map
// implement fmt.Stringer, it calls that method and appends.
func (a *Attributes) String() string {
var sb strings.Builder
sb.WriteString("{")
var firstKey, firstVal string
var firstDone bool
for k, v := range a.m {
if !firstDone {
firstDone = true
if str, ok := k.(interface{ String() string }); ok {
firstKey = str.String()
}
if str, ok := v.(interface{ String() string }); ok {
firstVal = str.String()
}
continue
}
var key, val string
if str, ok := k.(interface{ String() string }); ok {
key = str.String()
}
if str, ok := v.(interface{ String() string }); ok {
val = str.String()
}
sb.WriteString(fmt.Sprintf("%q: %q, ", key, val))
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
sb.WriteString(fmt.Sprintf("%q: %q", firstKey, firstVal))
sb.WriteString("}")
return sb.String()
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Expand Up @@ -20,6 +20,8 @@
package weightedroundrobin

import (
"fmt"

"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -66,3 +68,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
ai, _ := v.(AddrInfo)
return ai
}

func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
8 changes: 8 additions & 0 deletions balancer/weightedtarget/weightedaggregator/aggregator.go
Expand Up @@ -178,6 +178,14 @@ func (wbsa *Aggregator) ResumeStateUpdates() {
}
}

// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
// picker update be sent once ResumeStateUpdates is called.
func (wbsa *Aggregator) NeedUpdateStateOnResume() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.needUpdateStateOnResume = true
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
Expand Down
12 changes: 12 additions & 0 deletions balancer/weightedtarget/weightedtarget.go
Expand Up @@ -143,6 +143,18 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat

b.targets = newConfig.Targets

// If the targets length is zero, it means we have removed all child
// policies from the balancer group and aggregator.
// At the start of this UpdateClientConnState() operation, a call to
// b.stateAggregator.ResumeStateUpdates() is deferred. Thus, setting the
// needUpdateStateOnResume bool to true here will ensure a new picker is
// built as part of that deferred function. Since there are now no child
// policies, the aggregated connectivity state reported form the Aggregator
// will be TRANSIENT_FAILURE.
if len(b.targets) == 0 {
b.stateAggregator.NeedUpdateStateOnResume()
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
21 changes: 20 additions & 1 deletion balancer/weightedtarget/weightedtarget_test.go
Expand Up @@ -166,7 +166,8 @@ func init() {
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer. This test is intended to test the
// glue code in weighted_target.
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
Expand Down Expand Up @@ -306,6 +307,24 @@ func (s) TestWeightedTarget(t *testing.T) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
// Update the Weighted Target Balancer with an empty address list and no
// targets. This should cause a Transient Failure State update to the Client
// Conn.
emptyConfig, err := wtbParser.ParseConfig([]byte(`{}`))
if err != nil {
t.Fatalf("Failed to parse balancer config: %v", err)
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{},
BalancerConfig: emptyConfig,
}); err != nil {
t.Fatalf("Failed to update ClientConn state: %v", err)
}

state := <-cc.NewStateCh
if state != connectivity.TransientFailure {
t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state)
}
}

// TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we
Expand Down
83 changes: 76 additions & 7 deletions internal/testutils/xds/e2e/clientresources.go
Expand Up @@ -536,6 +536,17 @@ type EndpointOptions struct {
// Ports is a set of ports on "localhost" where the endpoints corresponding
// to this resource reside.
Ports []uint32

// PortsInLocalities represent ports in different localities. The first
// dimension represents a locality, and the second represents the ports
// within that locality.
PortsInLocalities [][]uint32

// LocalityWeights are the weights of localities specified in the first
// dimension of PortsInLocalities. Must be the same length as the first
// dimension of PortsInLocalities.
LocalityWeights []uint32
easwars marked this conversation as resolved.
Show resolved Hide resolved

// DropPercents is a map from drop category to a drop percentage. If unset,
// no drops are configured.
DropPercents map[string]int
Expand All @@ -550,6 +561,62 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
})
}

// EndpointResourceWithOptionsMultipleLocalities returns an xDS Endpoint
// resource which specifies multiple localities, with the ports specified per
// locality placed into each localities endpoints specification.
func EndpointResourceWithOptionsMultipleLocalities(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
easwars marked this conversation as resolved.
Show resolved Hide resolved
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, portsInLocality := range opts.PortsInLocalities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range portsInLocality {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}

endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
Region: fmt.Sprintf("region%d", i),
Zone: fmt.Sprintf("zone%d", i),
SubZone: fmt.Sprintf("subzone%d", i),
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: opts.LocalityWeights[i]},
Priority: 0,
})
}

cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: endpoints,
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Category: category,
DropPercentage: &v3typepb.FractionalPercent{
Numerator: uint32(val),
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
})
}
if len(drops) != 0 {
cla.Policy = &v3endpointpb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
}
}
return cla
}

// EndpointResourceWithOptions returns an xds Endpoint resource configured with
// the provided options.
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
Expand All @@ -564,18 +631,20 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},
Endpoints: []*v3endpointpb.LocalityLbEndpoints{
{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
},
},
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Expand Down
20 changes: 17 additions & 3 deletions resolver/resolver.go
Expand Up @@ -22,13 +22,13 @@ package resolver

import (
"context"
"fmt"
"net"
"net/url"
"strings"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes

// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes

Expand All @@ -151,7 +151,21 @@ func (a Address) Equal(o Address) bool {

// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{Addr: %q, ", a.Addr))
sb.WriteString(fmt.Sprintf("ServerName: %q, ", a.ServerName))
var atrStr string
if a.Attributes != nil {
atrStr = a.Attributes.String()
sb.WriteString(fmt.Sprintf("Attributes: %v, ", atrStr))
}
var balAtrStr string
if a.BalancerAttributes != nil {
balAtrStr = a.BalancerAttributes.String()
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v", balAtrStr))
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
sb.WriteString("}")
return sb.String()
}

// BuildOptions includes additional information for the builder to create
Expand Down