Skip to content

Commit

Permalink
Add support for Custom LB Policies
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Apr 26, 2023
1 parent eff0942 commit 7decbac
Show file tree
Hide file tree
Showing 25 changed files with 880 additions and 667 deletions.
21 changes: 21 additions & 0 deletions attributes/attributes.go
Expand Up @@ -25,6 +25,8 @@
// later release.
package attributes

import "strings"

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
Expand Down Expand Up @@ -99,3 +101,22 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}

// String prints the attribute map. If any key or values throughout the map
// implement fmt.Stringer, it calls that method and appends.
func (a *Attributes) String() string {
var sb strings.Builder
sb.WriteString("{")
for k, v := range a.m {
if str, ok := k.(interface{ String() string }); ok {
sb.WriteString(str.String())
sb.WriteString(", ")
}
if str, ok := v.(interface{ String() string }); ok {
sb.WriteString(str.String())
sb.WriteString(", ")
}
}
sb.WriteString("}")
return sb.String()
}
6 changes: 6 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Expand Up @@ -20,6 +20,8 @@
package weightedroundrobin

import (
"fmt"

"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -66,3 +68,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
ai, _ := v.(AddrInfo)
return ai
}

func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
83 changes: 76 additions & 7 deletions internal/testutils/xds/e2e/clientresources.go
Expand Up @@ -536,6 +536,17 @@ type EndpointOptions struct {
// Ports is a set of ports on "localhost" where the endpoints corresponding
// to this resource reside.
Ports []uint32

// PortsInLocalities represent ports in different localities. The first
// dimension represents a locality, and the second represents the ports
// within that locality.
PortsInLocalities [][]uint32

// LocalityWeights are the weights of localities specified in the first
// dimension of PortsInLocalities. Must be the same length as the first
// dimension of PortsInLocalities.
LocalityWeights []uint32

// DropPercents is a map from drop category to a drop percentage. If unset,
// no drops are configured.
DropPercents map[string]int
Expand All @@ -550,6 +561,62 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
})
}

// EndpointResourceWithOptionsMultipleLocalities returns an xDS Endpoint
// resource which specifies multiple localities, with the ports specified per
// locality placed into each localities endpoints specification.
func EndpointResourceWithOptionsMultipleLocalities(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, portsInLocality := range opts.PortsInLocalities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range portsInLocality {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}

endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
Region: fmt.Sprintf("region%d", i),
Zone: fmt.Sprintf("zone%d", i),
SubZone: fmt.Sprintf("subzone%d", i),
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: opts.LocalityWeights[i]},
Priority: 0,
})
}

cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: endpoints,
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Category: category,
DropPercentage: &v3typepb.FractionalPercent{
Numerator: uint32(val),
Denominator: v3typepb.FractionalPercent_HUNDRED,
},
})
}
if len(drops) != 0 {
cla.Policy = &v3endpointpb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
}
}
return cla
}

// EndpointResourceWithOptions returns an xds Endpoint resource configured with
// the provided options.
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
Expand All @@ -564,18 +631,20 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},
Endpoints: []*v3endpointpb.LocalityLbEndpoints{
{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
},
},
}

var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
for category, val := range opts.DropPercents {
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
Expand Down
19 changes: 16 additions & 3 deletions resolver/resolver.go
Expand Up @@ -22,13 +22,13 @@ package resolver

import (
"context"
"fmt"
"net"
"net/url"
"strings"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes

// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes

Expand All @@ -151,7 +151,20 @@ func (a Address) Equal(o Address) bool {

// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{Addr: %v, ", a.Addr))
sb.WriteString(fmt.Sprintf("ServerName: %v, ", a.ServerName))
var atrStr string
if a.Attributes != nil {
atrStr = a.Attributes.String()
}
sb.WriteString(fmt.Sprintf("Attributes: %v, ", atrStr))
var balAtrStr string
if a.BalancerAttributes != nil {
balAtrStr = a.BalancerAttributes.String()
}
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v}", balAtrStr))
return sb.String()
}

// BuildOptions includes additional information for the builder to create
Expand Down

0 comments on commit 7decbac

Please sign in to comment.