Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add support for Custom LB Policies #6224

Merged
merged 11 commits into from
May 9, 2023
21 changes: 21 additions & 0 deletions attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
// later release.
package attributes

import "strings"

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
Expand Down Expand Up @@ -99,3 +101,22 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}

// String prints the attribute map. If any key or values throughout the map
// implement fmt.Stringer, it calls that method and appends.
func (a *Attributes) String() string {
var sb strings.Builder
sb.WriteString("{")
for k, v := range a.m {
if str, ok := k.(interface{ String() string }); ok {
sb.WriteString(str.String())
sb.WriteString(", ")
}
if str, ok := v.(interface{ String() string }); ok {
sb.WriteString(str.String())
sb.WriteString(", ")
}
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
sb.WriteString("}")
return sb.String()
}
6 changes: 6 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package weightedroundrobin

import (
"fmt"

"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -66,3 +68,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
ai, _ := v.(AddrInfo)
return ai
}

func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
6 changes: 6 additions & 0 deletions balancer/weightedtarget/weightedaggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (wbsa *Aggregator) ResumeStateUpdates() {
}
}

// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
// picker update be sent once ResumeStateUpdates is called.
func (wbsa *Aggregator) NeedUpdateStateOnResume() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
wbsa.needUpdateStateOnResume = true
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
Expand Down
6 changes: 6 additions & 0 deletions balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat

b.targets = newConfig.Targets

// To send a TF state update in the case of a configuration with no targets
easwars marked this conversation as resolved.
Show resolved Hide resolved
// passed in.
if len(b.targets) == 0 {
b.stateAggregator.NeedUpdateStateOnResume()
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
83 changes: 76 additions & 7 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,17 @@ type EndpointOptions struct {
// Ports is a set of ports on "localhost" where the endpoints corresponding
// to this resource reside.
Ports []uint32

// PortsInLocalities represent ports in different localities. The first
// dimension represents a locality, and the second represents the ports
// within that locality.
PortsInLocalities [][]uint32

// LocalityWeights are the weights of localities specified in the first
// dimension of PortsInLocalities. Must be the same length as the first
// dimension of PortsInLocalities.
LocalityWeights []uint32
easwars marked this conversation as resolved.
Show resolved Hide resolved

// DropPercents is a map from drop category to a drop percentage. If unset,
// no drops are configured.
DropPercents map[string]int
Expand All @@ -550,6 +561,62 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
})
}

// EndpointResourceWithOptionsMultipleLocalities returns an xDS Endpoint
// resource which specifies multiple localities, with the ports specified per
// locality placed into each localities endpoints specification.
func EndpointResourceWithOptionsMultipleLocalities(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
easwars marked this conversation as resolved.
Show resolved Hide resolved
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, portsInLocality := range opts.PortsInLocalities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range portsInLocality {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}

endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
Region: fmt.Sprintf("region%d", i),
Zone: fmt.Sprintf("zone%d", i),
SubZone: fmt.Sprintf("subzone%d", i),
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: opts.LocalityWeights[i]},
Priority: 0,
})
}

cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: endpoints,
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Category: category,
DropPercentage: &v3typepb.FractionalPercent{
Numerator: uint32(val),
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
})
}
if len(drops) != 0 {
cla.Policy = &v3endpointpb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
}
}
return cla
}

// EndpointResourceWithOptions returns an xds Endpoint resource configured with
// the provided options.
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
Expand All @@ -564,18 +631,20 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},
Endpoints: []*v3endpointpb.LocalityLbEndpoints{
{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
},
},
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Expand Down
19 changes: 16 additions & 3 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ package resolver

import (
"context"
"fmt"
"net"
"net/url"
"strings"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes

// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes

Expand All @@ -151,7 +151,20 @@ func (a Address) Equal(o Address) bool {

// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{Addr: %v, ", a.Addr))
sb.WriteString(fmt.Sprintf("ServerName: %v, ", a.ServerName))
easwars marked this conversation as resolved.
Show resolved Hide resolved
var atrStr string
if a.Attributes != nil {
atrStr = a.Attributes.String()
}
sb.WriteString(fmt.Sprintf("Attributes: %v, ", atrStr))
easwars marked this conversation as resolved.
Show resolved Hide resolved
var balAtrStr string
if a.BalancerAttributes != nil {
balAtrStr = a.BalancerAttributes.String()
}
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v}", balAtrStr))
easwars marked this conversation as resolved.
Show resolved Hide resolved
return sb.String()
}

// BuildOptions includes additional information for the builder to create
Expand Down