Skip to content

Commit

Permalink
Fix attribute get and call ParseConfig on child
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed May 4, 2023
1 parent fc6e6a8 commit 119986e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
52 changes: 32 additions & 20 deletions xds/internal/balancer/wrrlocality/balancer.go
Expand Up @@ -74,8 +74,14 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
// shouldn't happen, defensive programming.
return nil
}
wtbCfgParser, ok := builder.(balancer.ConfigParser)
if !ok {
// Shouldn't happen, imported weighted target builder has this method.
return nil
}
wrrL := &wrrLocalityBalancer{
child: wtb,
child: wtb,
configParser: wtbCfgParser,
}

wrrL.logger = prefixLogger(wrrL)
Expand Down Expand Up @@ -119,19 +125,11 @@ func (a AddrInfo) String() string {
}

// getAddrInfo returns the AddrInfo stored in the BalancerAttributes field of
// addr. Returns an AddrInfo with LocalityWeight of 1 if no AddrInfo found.
func getAddrInfo(addr resolver.Address) AddrInfo {
// addr. Returns false if no AddrInfo found.
func getAddrInfo(addr resolver.Address) (AddrInfo, bool) {
v := addr.BalancerAttributes.Value(attributeKey{})
var ai AddrInfo
var ok bool
if ai, ok = v.(AddrInfo); !ok {
// Shouldn't happen, but to avoid undefiend behavior of 0 locality
// weight.
return AddrInfo{
LocalityWeight: 1,
}
}
return ai
ai, ok := v.(AddrInfo)
return ai, ok
}

// wrrLocalityBalancer wraps a weighted target balancer, and builds
Expand All @@ -145,6 +143,8 @@ type wrrLocalityBalancer struct {
// Other balancer operations you pass through.
child balancer.Balancer

configParser balancer.ConfigParser

logger *grpclog.PrefixLogger
}

Expand All @@ -157,24 +157,36 @@ func (b *wrrLocalityBalancer) UpdateClientConnState(s balancer.ClientConnState)

weightedTargets := make(map[string]weightedtarget.Target)
for _, addr := range s.ResolverState.Addresses {
// These gets of attributes could potentially return zero values. This
// shouldn't happen though, and thus don't error out, and just build a
// weighted target with undefined behavior. (For the attribute in this
// package, I defaulted the getter to return a value with defined
// behavior rather than zero value).
// This get of LocalityID could potentially return a zero value. This
// shouldn't happen though (this attribute that is set actually gets
// used to build localities in the first place), and thus don't error
// out, and just build a weighted target with undefined behavior.
locality := internal.GetLocalityID(addr)
localityString, err := locality.ToString()
if err != nil {
// Should never happen.
logger.Infof("failed to marshal LocalityID: %v, skipping this locality in weighted target")
}
ai := getAddrInfo(addr)
ai, ok := getAddrInfo(addr)
if !ok {
return fmt.Errorf("addr: %v is misisng locality weight information", addr)
}
weightedTargets[localityString] = weightedtarget.Target{Weight: ai.LocalityWeight, ChildPolicy: lbCfg.ChildPolicy}
}
wtCfg := &weightedtarget.LBConfig{Targets: weightedTargets}
wtCfgJSON, err := json.Marshal(wtCfg)
if err != nil {
// Shouldn't happen.
return fmt.Errorf("error marshalling prepared wtCfg: %v", wtCfg)
}
var scLBCfg serviceconfig.LoadBalancingConfig
if scLBCfg, err = b.configParser.ParseConfig(wtCfgJSON); err != nil {
return fmt.Errorf("config generated %v by wrr_locality_experimental is invalid: %v", wtCfgJSON, err)
}

return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: wtCfg,
BalancerConfig: scLBCfg,
})
}

Expand Down
7 changes: 7 additions & 0 deletions xds/internal/balancer/wrrlocality/balancer_test.go
Expand Up @@ -158,6 +158,13 @@ func (s) TestUpdateClientConnState(t *testing.T) {
// child.
weightedTargetName = "mock_weighted_target"
stub.Register("mock_weighted_target", stub.BalancerFuncs{
ParseConfig: func(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg weightedtarget.LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
return &cfg, nil
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
wtCfg, ok := ccs.BalancerConfig.(*weightedtarget.LBConfig)
if !ok {
Expand Down

0 comments on commit 119986e

Please sign in to comment.