Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/outlierdetection: fix config handling #6361

Merged
merged 6 commits into from Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 62 additions & 0 deletions internal/balancer/nop/nop.go
@@ -0,0 +1,62 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package nop implements a balancer with all of its balancer operations as
// no-ops, other than returning a Transient Failure Picker on a Client Conn
// update.
package nop

import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
)

// bal is a balancer with all of its balancer operations as no-ops, other than
// returning a Transient Failure Picker on a Client Conn update.
type bal struct {
cc balancer.ClientConn
err error
}

// NewBalancer returns a no-op balancer.
func NewBalancer(cc balancer.ClientConn, err error) balancer.Balancer {
return &bal{
cc: cc,
err: err,
}
}

// UpdateClientConnState updates the bal's Client Conn with an Error Picker
// and a Connectivity State of TRANSIENT_FAILURE.
func (b *bal) UpdateClientConnState(_ balancer.ClientConnState) error {
b.cc.UpdateState(balancer.State{
Picker: base.NewErrPicker(b.err),
ConnectivityState: connectivity.TransientFailure,
})
return nil
}

// ResolverError is a no-op.
func (b *bal) ResolverError(_ error) {}

// UpdateSubConnState is a no-op.
func (b *bal) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {}

// Close is a no-op.
func (b *bal) Close() {}
119 changes: 103 additions & 16 deletions test/xds/xds_client_outlier_detection_test.go
Expand Up @@ -90,33 +90,24 @@ func (s) TestOutlierDetection_NoopConfig(t *testing.T) {
// clientResourcesMultipleBackendsAndOD returns xDS resources which correspond
// to multiple upstreams, corresponding different backends listening on
// different localhost:port combinations. The resources also configure an
// Outlier Detection Balancer set up with Failure Percentage Algorithm, which
// ejects endpoints based on failure rate.
func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions {
// Outlier Detection Balancer configured through the passed in Outlier Detection
// proto.
func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32, od *v3clusterpb.OutlierDetection) e2e.UpdateOptions {
routeConfigName := "route-" + params.DialTarget
clusterName := "cluster-" + params.DialTarget
endpointsName := "endpoints-" + params.DialTarget
return e2e.UpdateOptions{
NodeID: params.NodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)},
Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel)},
Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel, od)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, ports)},
}
}

func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel) *v3clusterpb.Cluster {
func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel, od *v3clusterpb.OutlierDetection) *v3clusterpb.Cluster {
cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel)
cluster.OutlierDetection = &v3clusterpb.OutlierDetection{
Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds
BaseEjectionTime: &durationpb.Duration{Seconds: 30},
MaxEjectionTime: &durationpb.Duration{Seconds: 300},
MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1},
FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50},
EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100},
FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 8},
FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 3},
}
cluster.OutlierDetection = od
return cluster
}

Expand Down Expand Up @@ -197,7 +188,103 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) {
NodeID: nodeID,
Host: "localhost",
SecLevel: e2e.SecurityLevelNone,
}, []uint32{port1, port2, port3})
}, []uint32{port1, port2, port3}, &v3clusterpb.OutlierDetection{
Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds
BaseEjectionTime: &durationpb.Duration{Seconds: 30},
MaxEjectionTime: &durationpb.Duration{Seconds: 300},
MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1},
FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50},
EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100},
FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 8},
FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 3},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)

fullAddresses := []resolver.Address{
{Addr: backend1.Address},
{Addr: backend2.Address},
{Addr: backend3.Address},
}
// At first, due to no statistics on each of the backends, the 3
// upstreams should all be round robined across.
if err = checkRoundRobinRPCs(ctx, client, fullAddresses); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}

// The addresses which don't return errors.
okAddresses := []resolver.Address{
{Addr: backend1.Address},
{Addr: backend2.Address},
}
// After calling the three upstreams, one of them constantly error
// and should eventually be ejected for a period of time. This
// period of time should cause the RPC's to be round robined only
// across the two that are healthy.
if err = checkRoundRobinRPCs(ctx, client, okAddresses); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
}

// TestOutlierDetectionXDSDefaultOn tests that Outlier Detection is by default
// configured on in the xDS Flow. If the Outlier Detection proto message is
// present with SuccessRateEjection unset, then Outlier Detection should be
// turned on. The test setups and xDS system with xDS resources with Outlier
// Detection present in the CDS update, but with SuccessRateEjection unset, and
// asserts that Outlier Detection is turned on and ejects upstreams.
func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) {
managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

// Working backend 1.
backend1 := stubserver.StartTestService(t, nil)
port1 := testutils.ParsePort(t, backend1.Address)
defer backend1.Stop()

// Working backend 2.
backend2 := stubserver.StartTestService(t, nil)
port2 := testutils.ParsePort(t, backend2.Address)
defer backend2.Stop()

// Backend 3 that will always return an error and eventually ejected.
backend3 := stubserver.StartTestService(t, &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return nil, errors.New("some error") },
})
port3 := testutils.ParsePort(t, backend3.Address)
defer backend3.Stop()

// Configure CDS resources with Outlier Detection set but
// EnforcingSuccessRate unset. This should cause Outlier Detection to be
// configured with SuccessRateEjection present in configuration, which will
// eventually be populated with its default values along with the knobs set
// as SuccessRate fields in the proto, and thus Outlier Detection should be
// on and actively eject upstreams.
const serviceName = "my-service-client-side-xds"
resources := clientResourcesMultipleBackendsAndOD(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
SecLevel: e2e.SecurityLevelNone,
}, []uint32{port1, port2, port3}, &v3clusterpb.OutlierDetection{
// Need to set knobs to trigger ejection within the test time frame.
Interval: &durationpb.Duration{Nanos: 50000000},
// EnforcingSuccessRateSet to nil, causes success rate algorithm to be
// turned on.
SuccessRateMinimumHosts: &wrapperspb.UInt32Value{Value: 1},
SuccessRateRequestVolume: &wrapperspb.UInt32Value{Value: 8},
SuccessRateStdevFactor: &wrapperspb.UInt32Value{Value: 1},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
Expand Down
101 changes: 42 additions & 59 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -27,17 +27,16 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/internal/balancer/nop"
"google.golang.org/grpc/internal/buffer"
xdsinternal "google.golang.org/grpc/internal/credentials/xds"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
Expand Down Expand Up @@ -75,11 +74,25 @@ type bb struct{}

// Build creates a new CDS balancer with the ClientConn.
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(clusterresolver.Name)
if builder == nil {
// Shouldn't happen, registered through imported Cluster Resolver,
// defensive programming.
logger.Errorf("%q LB policy is needed but not registered", clusterresolver.Name)
return nop.NewBalancer(cc, fmt.Errorf("%q LB policy is needed but not registered", clusterresolver.Name))
}
crParser, ok := builder.(balancer.ConfigParser)
if !ok {
// Shouldn't happen, imported Cluster Resolver builder has this method.
logger.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name)
return nop.NewBalancer(cc, fmt.Errorf("%q LB policy does not implement a config parser", clusterresolver.Name))
}
b := &cdsBalancer{
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
crParser: crParser,
xdsHI: xdsinternal.NewHandshakeInfo(nil, nil),
}
b.logger = prefixLogger((b))
Expand Down Expand Up @@ -160,6 +173,7 @@ type cdsBalancer struct {
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
crParser balancer.ConfigParser

// The certificate providers are cached here to that they can be closed when
// a new provider is to be created.
Expand Down Expand Up @@ -271,52 +285,6 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
return provider, nil
}

func outlierDetectionToConfig(od *xdsresource.OutlierDetection) outlierdetection.LBConfig { // Already validated - no need to return error
if od == nil {
// "If the outlier_detection field is not set in the Cluster message, a
// "no-op" outlier_detection config will be generated, with interval set
// to the maximum possible value and all other fields unset." - A50
return outlierdetection.LBConfig{
Interval: 1<<63 - 1,
}
}

// "if the enforcing_success_rate field is set to 0, the config
// success_rate_ejection field will be null and all success_rate_* fields
// will be ignored." - A50
var sre *outlierdetection.SuccessRateEjection
if od.EnforcingSuccessRate != 0 {
sre = &outlierdetection.SuccessRateEjection{
StdevFactor: od.SuccessRateStdevFactor,
EnforcementPercentage: od.EnforcingSuccessRate,
MinimumHosts: od.SuccessRateMinimumHosts,
RequestVolume: od.SuccessRateRequestVolume,
}
}

// "If the enforcing_failure_percent field is set to 0 or null, the config
// failure_percent_ejection field will be null and all failure_percent_*
// fields will be ignored." - A50
var fpe *outlierdetection.FailurePercentageEjection
if od.EnforcingFailurePercentage != 0 {
fpe = &outlierdetection.FailurePercentageEjection{
Threshold: od.FailurePercentageThreshold,
EnforcementPercentage: od.EnforcingFailurePercentage,
MinimumHosts: od.FailurePercentageMinimumHosts,
RequestVolume: od.FailurePercentageRequestVolume,
}
}

return outlierdetection.LBConfig{
Interval: internalserviceconfig.Duration(od.Interval),
BaseEjectionTime: internalserviceconfig.Duration(od.BaseEjectionTime),
MaxEjectionTime: internalserviceconfig.Duration(od.MaxEjectionTime),
MaxEjectionPercent: od.MaxEjectionPercent,
SuccessRateEjection: sre,
FailurePercentageEjection: fpe,
}
}

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying cluster_resolver balancer.
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
Expand Down Expand Up @@ -390,28 +358,43 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
b.logger.Infof("Unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
if envconfig.XDSOutlierDetection {
dms[i].OutlierDetection = outlierDetectionToConfig(cu.OutlierDetection)
odJSON := cu.OutlierDetection
// "In the cds LB policy, if the outlier_detection field is not set in
// the Cluster resource, a "no-op" outlier_detection config will be
// generated in the corresponding DiscoveryMechanism config, with all
// fields unset." - A50
if odJSON == nil {
// This will pick up top level defaults in Cluster Resolver
// ParseConfig, but sre and fpe will be nil still so still a
// "no-op" config.
odJSON = json.RawMessage(`{}`)
}
Comment on lines +362 to +371
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic be moved to where we produce the JSON OD config from the proto instead? This is part of converting from xds OD config to OD's JSON config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately not, because the language in the gRFC explicitly states "in the cds lb policy". The issue was I mapped that language to the paragraph following to. We triaged this, and this was the only behavior scoped to the cds lb policy. @murgatroid99

dms[i].OutlierDetection = odJSON
}
}

// Prepare Cluster Resolver config, marshal into JSON, and then Parse it to
// get configuration to send downward to Cluster Resolver.
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
XDSLBPolicy: update.lbPolicy,
}
crLBCfgJSON, err := json.Marshal(lbCfg)
if err != nil {
// Shouldn't happen, since we just prepared struct.
b.logger.Errorf("cds_balancer: error marshalling prepared config: %v", lbCfg)
return
}

bc := &internalserviceconfig.BalancerConfig{}
if err := json.Unmarshal(update.lbPolicy, bc); err != nil {
// This will never occur, valid configuration is emitted from the xDS
// Client. Validity is already checked in the xDS Client, however, this
// double validation is present because Unmarshalling and Validating are
// coupled into one json.Unmarshal operation). We will switch this in
// the future to two separate operations.
b.logger.Errorf("Emitted lbPolicy %s from xDS Client is invalid: %v", update.lbPolicy, err)
var sc serviceconfig.LoadBalancingConfig
if sc, err = b.crParser.ParseConfig(crLBCfgJSON); err != nil {
b.logger.Errorf("cds_balancer: cluster_resolver config generated %v is invalid: %v", crLBCfgJSON, err)
return
}
lbCfg.XDSLBPolicy = bc

ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
BalancerConfig: sc,
}
if err := b.childLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("Encountered error when sending config {%+v} to child policy: %v", ccState, err)
Expand Down