Skip to content

Commit

Permalink
xds/internal/balancer/outlierdetection: Add Channelz Logger to Outlie…
Browse files Browse the repository at this point in the history
…r Detection LB (#6145)
  • Loading branch information
s-matyukevich committed Apr 25, 2023
1 parent 83c460b commit 8628e07
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
46 changes: 34 additions & 12 deletions xds/internal/balancer/outlierdetection/balancer.go
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -35,6 +36,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
Expand Down Expand Up @@ -62,13 +64,14 @@ type bb struct{}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &outlierDetectionBalancer{
cc: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*addressInfo),
scWrappers: make(map[balancer.SubConn]*subConnWrapper),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
cc: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
addrs: make(map[string]*addressInfo),
scWrappers: make(map[balancer.SubConn]*subConnWrapper),
scUpdateCh: buffer.NewUnbounded(),
pickerUpdateCh: buffer.NewUnbounded(),
channelzParentID: bOpts.ChannelzParentID,
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
Expand Down Expand Up @@ -159,10 +162,11 @@ type outlierDetectionBalancer struct {
// to suppress redundant picker updates.
recentPickerNoop bool

closed *grpcsync.Event
done *grpcsync.Event
cc balancer.ClientConn
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
cc balancer.ClientConn
logger *grpclog.PrefixLogger
channelzParentID *channelz.Identifier

// childMu guards calls into child (to uphold the balancer.Balancer API
// guarantee of synchronous calls).
Expand Down Expand Up @@ -822,7 +826,10 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() {
return
}
successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures)
if successRate < (mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)) {
requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)
if successRate < requiredSuccessRate {
channelz.Infof(logger, b.channelzParentID, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f",
addrInfo.string(), successRate, mean, stddev, requiredSuccessRate)
if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
b.ejectAddress(addrInfo)
}
Expand All @@ -849,6 +856,8 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() {
}
failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100
if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) {
channelz.Infof(logger, b.channelzParentID, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f",
addrInfo.string(), failurePercentage)
if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
b.ejectAddress(addrInfo)
}
Expand All @@ -863,7 +872,9 @@ func (b *outlierDetectionBalancer) ejectAddress(addrInfo *addressInfo) {
addrInfo.ejectionTimeMultiplier++
for _, sbw := range addrInfo.sws {
sbw.eject()
channelz.Infof(logger, b.channelzParentID, "Subchannel ejected: %s", sbw.string())
}

}

// Caller must hold b.mu.
Expand All @@ -872,6 +883,7 @@ func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) {
addrInfo.latestEjectionTimestamp = time.Time{}
for _, sbw := range addrInfo.sws {
sbw.uneject()
channelz.Infof(logger, b.channelzParentID, "Subchannel unejected: %s", sbw.string())
}
}

Expand All @@ -896,6 +908,16 @@ type addressInfo struct {
sws []*subConnWrapper
}

func (a *addressInfo) string() string {
var res strings.Builder
res.WriteString("[")
for _, sw := range a.sws {
res.WriteString(sw.string())
}
res.WriteString("]")
return res.String()
}

func newAddressInfo() *addressInfo {
return &addressInfo{
callCounter: newCallCounter(),
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/balancer/outlierdetection/balancer_test.go
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
Expand Down Expand Up @@ -304,7 +305,7 @@ func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.TestClientConn,
t.Fatalf("balancer.Get(%q) returned nil", Name)
}
tcc := testutils.NewTestClientConn(t)
odB := builder.Build(tcc, balancer.BuildOptions{})
odB := builder.Build(tcc, balancer.BuildOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, time.Now().Unix(), nil)})
return odB.(*outlierDetectionBalancer), tcc, odB.Close
}

Expand Down
5 changes: 5 additions & 0 deletions xds/internal/balancer/outlierdetection/subconn_wrapper.go
Expand Up @@ -18,6 +18,7 @@
package outlierdetection

import (
"fmt"
"unsafe"

"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -66,3 +67,7 @@ func (scw *subConnWrapper) uneject() {
isEjected: false,
})
}

func (scw *subConnWrapper) string() string {
return fmt.Sprintf("%+v", scw.addresses)
}

0 comments on commit 8628e07

Please sign in to comment.