Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add support for Custom LB Policies #6224

Merged
merged 11 commits into from
May 9, 2023
24 changes: 24 additions & 0 deletions attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
// later release.
package attributes

import (
"fmt"
"strings"
)

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
Expand Down Expand Up @@ -99,3 +104,22 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}

// String prints the attribute map. If any key or values throughout the map
// implement fmt.Stringer, it calls that method and appends.
func (a *Attributes) String() string {
var sb strings.Builder
sb.WriteString("{")
for k, v := range a.m {
var key, val string
if str, ok := k.(interface{ String() string }); ok {
key = str.String()
}
if str, ok := v.(interface{ String() string }); ok {
val = str.String()
}
sb.WriteString(fmt.Sprintf("\"%v\": \"%v\",", key, val))
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
sb.WriteString("}")
return sb.String()
}
6 changes: 6 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package weightedroundrobin

import (
"fmt"

"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -66,3 +68,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
ai, _ := v.(AddrInfo)
return ai
}

func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
8 changes: 8 additions & 0 deletions balancer/weightedtarget/weightedaggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ func (wbsa *Aggregator) ResumeStateUpdates() {
}
}

// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
// picker update be sent once ResumeStateUpdates is called.
func (wbsa *Aggregator) NeedUpdateStateOnResume() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.needUpdateStateOnResume = true
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
Expand Down
12 changes: 12 additions & 0 deletions balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat

b.targets = newConfig.Targets

// If the targets length is zero, it means we have removed all child
// policies from the balancer group and aggregator.
// At the start of this UpdateClientConnState() operation, a call to
// b.stateAggregator.ResumeStateUpdates() is deferred. Thus, setting the
// needUpdateStateOnResume bool to true here will ensure a new picker is
// built as part of that deferred function. Since there are now no child
// policies, the aggregated connectivity state reported form the Aggregator
// will be TRANSIENT_FAILURE.
if len(b.targets) == 0 {
b.stateAggregator.NeedUpdateStateOnResume()
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
21 changes: 20 additions & 1 deletion balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func init() {
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer. This test is intended to test the
// glue code in weighted_target.
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
Expand Down Expand Up @@ -306,6 +307,24 @@ func (s) TestWeightedTarget(t *testing.T) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
// Update the Weighted Target Balancer with an empty address list and no
// targets. This should cause a Transient Failure State update to the Client
// Conn.
config4, err := wtbParser.ParseConfig([]byte(`{}`))
easwars marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{},
BalancerConfig: config4,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

state := <-cc.NewStateCh
if state != connectivity.TransientFailure {
t.Fatalf("empty target update should have triggered a TF state update, got: %v", state)
}
}

// TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we
Expand Down
83 changes: 76 additions & 7 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,17 @@ type EndpointOptions struct {
// Ports is a set of ports on "localhost" where the endpoints corresponding
// to this resource reside.
Ports []uint32

// PortsInLocalities represent ports in different localities. The first
// dimension represents a locality, and the second represents the ports
// within that locality.
PortsInLocalities [][]uint32

// LocalityWeights are the weights of localities specified in the first
// dimension of PortsInLocalities. Must be the same length as the first
// dimension of PortsInLocalities.
LocalityWeights []uint32
easwars marked this conversation as resolved.
Show resolved Hide resolved

// DropPercents is a map from drop category to a drop percentage. If unset,
// no drops are configured.
DropPercents map[string]int
Expand All @@ -550,6 +561,62 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
})
}

// EndpointResourceWithOptionsMultipleLocalities returns an xDS Endpoint
// resource which specifies multiple localities, with the ports specified per
// locality placed into each localities endpoints specification.
func EndpointResourceWithOptionsMultipleLocalities(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
easwars marked this conversation as resolved.
Show resolved Hide resolved
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, portsInLocality := range opts.PortsInLocalities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range portsInLocality {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}

endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
Region: fmt.Sprintf("region%d", i),
Zone: fmt.Sprintf("zone%d", i),
SubZone: fmt.Sprintf("subzone%d", i),
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: opts.LocalityWeights[i]},
Priority: 0,
})
}

cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: endpoints,
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Category: category,
DropPercentage: &v3typepb.FractionalPercent{
Numerator: uint32(val),
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
})
}
if len(drops) != 0 {
cla.Policy = &v3endpointpb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
}
}
return cla
}

// EndpointResourceWithOptions returns an xds Endpoint resource configured with
// the provided options.
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
Expand All @@ -564,18 +631,20 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},
Endpoints: []*v3endpointpb.LocalityLbEndpoints{
{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
},
},
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Expand Down
19 changes: 16 additions & 3 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ package resolver

import (
"context"
"fmt"
"net"
"net/url"
"strings"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes

// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes

Expand All @@ -151,7 +151,20 @@ func (a Address) Equal(o Address) bool {

// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{Addr: %v, ", a.Addr))
sb.WriteString(fmt.Sprintf("ServerName: %v, ", a.ServerName))
easwars marked this conversation as resolved.
Show resolved Hide resolved
var atrStr string
if a.Attributes != nil {
atrStr = a.Attributes.String()
}
sb.WriteString(fmt.Sprintf("Attributes: %v, ", atrStr))
easwars marked this conversation as resolved.
Show resolved Hide resolved
var balAtrStr string
if a.BalancerAttributes != nil {
balAtrStr = a.BalancerAttributes.String()
}
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v}", balAtrStr))
easwars marked this conversation as resolved.
Show resolved Hide resolved
return sb.String()
}

// BuildOptions includes additional information for the builder to create
Expand Down