Skip to content

Commit

Permalink
xds/outlierdetection: fix config handling (grpc#6361)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jun 9, 2023
1 parent a5ae5c6 commit 118e316
Show file tree
Hide file tree
Showing 20 changed files with 1,119 additions and 605 deletions.
62 changes: 62 additions & 0 deletions internal/balancer/nop/nop.go
@@ -0,0 +1,62 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package nop implements a balancer with all of its balancer operations as
// no-ops, other than returning a Transient Failure Picker on a Client Conn
// update.
package nop

import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
)

// bal is a balancer with all of its balancer operations as no-ops, other than
// returning a Transient Failure Picker on a Client Conn update.
type bal struct {
cc balancer.ClientConn
err error
}

// NewBalancer returns a no-op balancer.
func NewBalancer(cc balancer.ClientConn, err error) balancer.Balancer {
return &bal{
cc: cc,
err: err,
}
}

// UpdateClientConnState updates the bal's Client Conn with an Error Picker
// and a Connectivity State of TRANSIENT_FAILURE.
func (b *bal) UpdateClientConnState(_ balancer.ClientConnState) error {
b.cc.UpdateState(balancer.State{
Picker: base.NewErrPicker(b.err),
ConnectivityState: connectivity.TransientFailure,
})
return nil
}

// ResolverError is a no-op.
func (b *bal) ResolverError(_ error) {}

// UpdateSubConnState is a no-op.
func (b *bal) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {}

// Close is a no-op.
func (b *bal) Close() {}
119 changes: 103 additions & 16 deletions test/xds/xds_client_outlier_detection_test.go
Expand Up @@ -90,33 +90,24 @@ func (s) TestOutlierDetection_NoopConfig(t *testing.T) {
// clientResourcesMultipleBackendsAndOD returns xDS resources which correspond
// to multiple upstreams, corresponding different backends listening on
// different localhost:port combinations. The resources also configure an
// Outlier Detection Balancer set up with Failure Percentage Algorithm, which
// ejects endpoints based on failure rate.
func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions {
// Outlier Detection Balancer configured through the passed in Outlier Detection
// proto.
func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32, od *v3clusterpb.OutlierDetection) e2e.UpdateOptions {
routeConfigName := "route-" + params.DialTarget
clusterName := "cluster-" + params.DialTarget
endpointsName := "endpoints-" + params.DialTarget
return e2e.UpdateOptions{
NodeID: params.NodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)},
Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel)},
Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel, od)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, ports)},
}
}

func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel) *v3clusterpb.Cluster {
func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel, od *v3clusterpb.OutlierDetection) *v3clusterpb.Cluster {
cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel)
cluster.OutlierDetection = &v3clusterpb.OutlierDetection{
Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds
BaseEjectionTime: &durationpb.Duration{Seconds: 30},
MaxEjectionTime: &durationpb.Duration{Seconds: 300},
MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1},
FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50},
EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100},
FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 8},
FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 3},
}
cluster.OutlierDetection = od
return cluster
}

Expand Down Expand Up @@ -197,7 +188,103 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) {
NodeID: nodeID,
Host: "localhost",
SecLevel: e2e.SecurityLevelNone,
}, []uint32{port1, port2, port3})
}, []uint32{port1, port2, port3}, &v3clusterpb.OutlierDetection{
Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds
BaseEjectionTime: &durationpb.Duration{Seconds: 30},
MaxEjectionTime: &durationpb.Duration{Seconds: 300},
MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1},
FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50},
EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100},
FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 8},
FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 3},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)

fullAddresses := []resolver.Address{
{Addr: backend1.Address},
{Addr: backend2.Address},
{Addr: backend3.Address},
}
// At first, due to no statistics on each of the backends, the 3
// upstreams should all be round robined across.
if err = checkRoundRobinRPCs(ctx, client, fullAddresses); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}

// The addresses which don't return errors.
okAddresses := []resolver.Address{
{Addr: backend1.Address},
{Addr: backend2.Address},
}
// After calling the three upstreams, one of them constantly error
// and should eventually be ejected for a period of time. This
// period of time should cause the RPC's to be round robined only
// across the two that are healthy.
if err = checkRoundRobinRPCs(ctx, client, okAddresses); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
}

// TestOutlierDetectionXDSDefaultOn tests that Outlier Detection is by default
// configured on in the xDS Flow. If the Outlier Detection proto message is
// present with SuccessRateEjection unset, then Outlier Detection should be
// turned on. The test setups and xDS system with xDS resources with Outlier
// Detection present in the CDS update, but with SuccessRateEjection unset, and
// asserts that Outlier Detection is turned on and ejects upstreams.
func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) {
managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

// Working backend 1.
backend1 := stubserver.StartTestService(t, nil)
port1 := testutils.ParsePort(t, backend1.Address)
defer backend1.Stop()

// Working backend 2.
backend2 := stubserver.StartTestService(t, nil)
port2 := testutils.ParsePort(t, backend2.Address)
defer backend2.Stop()

// Backend 3 that will always return an error and eventually ejected.
backend3 := stubserver.StartTestService(t, &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return nil, errors.New("some error") },
})
port3 := testutils.ParsePort(t, backend3.Address)
defer backend3.Stop()

// Configure CDS resources with Outlier Detection set but
// EnforcingSuccessRate unset. This should cause Outlier Detection to be
// configured with SuccessRateEjection present in configuration, which will
// eventually be populated with its default values along with the knobs set
// as SuccessRate fields in the proto, and thus Outlier Detection should be
// on and actively eject upstreams.
const serviceName = "my-service-client-side-xds"
resources := clientResourcesMultipleBackendsAndOD(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
SecLevel: e2e.SecurityLevelNone,
}, []uint32{port1, port2, port3}, &v3clusterpb.OutlierDetection{
// Need to set knobs to trigger ejection within the test time frame.
Interval: &durationpb.Duration{Nanos: 50000000},
// EnforcingSuccessRateSet to nil, causes success rate algorithm to be
// turned on.
SuccessRateMinimumHosts: &wrapperspb.UInt32Value{Value: 1},
SuccessRateRequestVolume: &wrapperspb.UInt32Value{Value: 8},
SuccessRateStdevFactor: &wrapperspb.UInt32Value{Value: 1},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
Expand Down
101 changes: 42 additions & 59 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -27,17 +27,16 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/internal/balancer/nop"
"google.golang.org/grpc/internal/buffer"
xdsinternal "google.golang.org/grpc/internal/credentials/xds"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
Expand Down Expand Up @@ -75,11 +74,25 @@ type bb struct{}

// Build creates a new CDS balancer with the ClientConn.
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(clusterresolver.Name)
if builder == nil {
// Shouldn't happen, registered through imported Cluster Resolver,
// defensive programming.
logger.Errorf("%q LB policy is needed but not registered", clusterresolver.Name)
return nop.NewBalancer(cc, fmt.Errorf("%q LB policy is needed but not registered", clusterresolver.Name))
}
crParser, ok := builder.(balancer.ConfigParser)
if !ok {
// Shouldn't happen, imported Cluster Resolver builder has this method.
logger.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name)
return nop.NewBalancer(cc, fmt.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name))
}
b := &cdsBalancer{
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
crParser: crParser,
xdsHI: xdsinternal.NewHandshakeInfo(nil, nil),
}
b.logger = prefixLogger((b))
Expand Down Expand Up @@ -160,6 +173,7 @@ type cdsBalancer struct {
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
crParser balancer.ConfigParser

// The certificate providers are cached here to that they can be closed when
// a new provider is to be created.
Expand Down Expand Up @@ -271,52 +285,6 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
return provider, nil
}

func outlierDetectionToConfig(od *xdsresource.OutlierDetection) outlierdetection.LBConfig { // Already validated - no need to return error
if od == nil {
// "If the outlier_detection field is not set in the Cluster message, a
// "no-op" outlier_detection config will be generated, with interval set
// to the maximum possible value and all other fields unset." - A50
return outlierdetection.LBConfig{
Interval: 1<<63 - 1,
}
}

// "if the enforcing_success_rate field is set to 0, the config
// success_rate_ejection field will be null and all success_rate_* fields
// will be ignored." - A50
var sre *outlierdetection.SuccessRateEjection
if od.EnforcingSuccessRate != 0 {
sre = &outlierdetection.SuccessRateEjection{
StdevFactor: od.SuccessRateStdevFactor,
EnforcementPercentage: od.EnforcingSuccessRate,
MinimumHosts: od.SuccessRateMinimumHosts,
RequestVolume: od.SuccessRateRequestVolume,
}
}

// "If the enforcing_failure_percent field is set to 0 or null, the config
// failure_percent_ejection field will be null and all failure_percent_*
// fields will be ignored." - A50
var fpe *outlierdetection.FailurePercentageEjection
if od.EnforcingFailurePercentage != 0 {
fpe = &outlierdetection.FailurePercentageEjection{
Threshold: od.FailurePercentageThreshold,
EnforcementPercentage: od.EnforcingFailurePercentage,
MinimumHosts: od.FailurePercentageMinimumHosts,
RequestVolume: od.FailurePercentageRequestVolume,
}
}

return outlierdetection.LBConfig{
Interval: internalserviceconfig.Duration(od.Interval),
BaseEjectionTime: internalserviceconfig.Duration(od.BaseEjectionTime),
MaxEjectionTime: internalserviceconfig.Duration(od.MaxEjectionTime),
MaxEjectionPercent: od.MaxEjectionPercent,
SuccessRateEjection: sre,
FailurePercentageEjection: fpe,
}
}

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying cluster_resolver balancer.
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
Expand Down Expand Up @@ -390,28 +358,43 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
b.logger.Infof("Unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
if envconfig.XDSOutlierDetection {
dms[i].OutlierDetection = outlierDetectionToConfig(cu.OutlierDetection)
odJSON := cu.OutlierDetection
// "In the cds LB policy, if the outlier_detection field is not set in
// the Cluster resource, a "no-op" outlier_detection config will be
// generated in the corresponding DiscoveryMechanism config, with all
// fields unset." - A50
if odJSON == nil {
// This will pick up top level defaults in Cluster Resolver
// ParseConfig, but sre and fpe will be nil still so still a
// "no-op" config.
odJSON = json.RawMessage(`{}`)
}
dms[i].OutlierDetection = odJSON
}
}

// Prepare Cluster Resolver config, marshal into JSON, and then Parse it to
// get configuration to send downward to Cluster Resolver.
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
XDSLBPolicy: update.lbPolicy,
}
crLBCfgJSON, err := json.Marshal(lbCfg)
if err != nil {
// Shouldn't happen, since we just prepared struct.
b.logger.Errorf("cds_balancer: error marshalling prepared config: %v", lbCfg)
return
}

bc := &internalserviceconfig.BalancerConfig{}
if err := json.Unmarshal(update.lbPolicy, bc); err != nil {
// This will never occur, valid configuration is emitted from the xDS
// Client. Validity is already checked in the xDS Client, however, this
// double validation is present because Unmarshalling and Validating are
// coupled into one json.Unmarshal operation). We will switch this in
// the future to two separate operations.
b.logger.Errorf("Emitted lbPolicy %s from xDS Client is invalid: %v", update.lbPolicy, err)
var sc serviceconfig.LoadBalancingConfig
if sc, err = b.crParser.ParseConfig(crLBCfgJSON); err != nil {
b.logger.Errorf("cds_balancer: cluster_resolver config generated %v is invalid: %v", crLBCfgJSON, err)
return
}
lbCfg.XDSLBPolicy = bc

ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
BalancerConfig: sc,
}
if err := b.childLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("Encountered error when sending config {%+v} to child policy: %v", ccState, err)
Expand Down

0 comments on commit 118e316

Please sign in to comment.