Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clusterresolver: comply with A37 for handling errors from discovery mechanisms #6461

Merged
merged 5 commits into from Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion xds/internal/balancer/clusterresolver/configbuilder.go
Expand Up @@ -190,7 +190,18 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint
})
}

priorities := groupLocalitiesByPriority(edsResp.Localities)
var priorities [][]xdsresource.Locality
// Triggered by an NACK or resource-not-found error before update, or a
// empty localities list in a update. In either case want to create a
// priority, and send down empty address list, causing TF for that priority.
if len(edsResp.Localities) == 0 {
// "If any discovery mechanism instance experiences an error retrieving
// data, and it has not previously reported any results, it should
// report a result that is a single priority with no endpoints." - A37
priorities = [][]xdsresource.Locality{{}}
} else {
priorities = groupLocalitiesByPriority(edsResp.Localities)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit. You can shorten this even further:

priorities := [][]xdsresource.Locality{{}}
if len(edsResp.Localities) != 0 {
	priorities = groupLocalitiesByPriority(edsResp.Localities)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. Switched.

retNames := g.generate(priorities)
retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
var retAddrs []resolver.Address
Expand Down
184 changes: 173 additions & 11 deletions xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
Expand Up @@ -18,6 +18,7 @@ package e2e_test

import (
"context"
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -596,8 +597,8 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
// DNS resolver yet. Once the DNS resolver pushes an update, the test verifies
// that we switch to the DNS cluster and can make a successful RPC. At this
// point when the DNS cluster returns an error, the test verifies that RPCs are
// still successful. This is the expected behavior because pick_first (the leaf
// policy) ignores resolver errors when it is not in TransientFailure.
// still successful. This is the expected behavior because the cluster resolver
// policy eats errors from DNS Resolver after it has returned an error.
func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
Expand All @@ -612,8 +613,8 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
addrs, _ := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
// cluster. Also configure an empty endpoints resource for the EDS cluster
// that contains no endpoints.
// cluster. Also configure an endpoints resource for the EDS cluster which
// triggers an NACK.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry: s/an NACK/a NACK/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hahaha, switched.

const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
Expand Down Expand Up @@ -698,13 +699,174 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
}
}

// TestAggregateCluster_BadEDS_GoodToBadDNS tests the case where the top-level
// cluster is an aggregate cluster that resolves to an EDS and LOGICAL_DNS
// cluster. The test first sends an EDS response which triggers an NACK. Once
// the DNS resolver pushes an update, the test verifies that we switch to the
// DNS cluster and can make a successful RPC.
func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, _ := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
// cluster. Also configure an empty endpoints resource for the EDS cluster
// that contains no endpoints.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
{
LoadBalancingWeight: &wrapperspb.UInt32Value{
Value: 0, // causes an NACK
},
},
}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{nackEndpointResource},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

client := testgrpc.NewTestServiceClient(cc)

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs})

// Ensure that RPCs start getting routed to the first backend since the
// child policy for a LOGICAL_DNS cluster is pick_first by default.
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
t.Logf("EmptyCall() failed: %v", err)
continue
}
if peer.Addr.String() == addrs[0].Addr {
break
}
}
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for RPCs to be routed to backend %q in the DNS cluster", addrs[0].Addr)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that worked :). Note that the other tests in this file don't use this helper (since you always rewrite these haha :D).

}

// TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level
// cluster is an aggregate cluster that resolves to an LOGICAL_DNS and EDS
// cluster. When the DNS Resolver returns an error and EDS cluster returns a
// good update, this test verifies the cluster_resolver balancer correctly falls
// back from the LOGICAL_DNS cluster to the EDS cluster.
func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS
// cluster. Also configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Push an error through the DNS resolver.
dnsR.ReportError(errors.New("some error"))

// RPCs should work, higher level DNS cluster errors so should fallback to
// EDS cluster.
client := testgrpc.NewTestServiceClient(cc)
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}
}

// TestAggregateCluster_BadEDS_BadDNS tests the case where the top-level cluster
// is an aggregate cluster that resolves to an EDS and LOGICAL_DNS cluster. When
// the EDS request returns a resource that contains no endpoints, the test
// verifies that we switch to the DNS cluster. When the DNS cluster returns an
// error, the test verifies that RPCs fail with the error returned by the DNS
// resolver, and thus, ensures that pick_first (the leaf policy) does not ignore
// resolver errors.
// error, the test verifies that RPCs fail with the error triggered by the DNS
// Discovery Mechanism (from sending an empty address list down).
func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
Expand Down Expand Up @@ -769,14 +931,14 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
dnsErr := fmt.Errorf("DNS error")
dnsR.ReportError(dnsErr)

// Ensure that the error returned from the DNS resolver is reported to the
// caller of the RPC.
// Ensure that the error from the DNS Resolver leads to an empty address
// update for both priorities.
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable)
}
if err == nil || !strings.Contains(err.Error(), dnsErr.Error()) {
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
if err == nil || !strings.Contains(err.Error(), "produced zero addresses") {
t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err)
}
}

Expand Down
25 changes: 12 additions & 13 deletions xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go
Expand Up @@ -41,7 +41,6 @@ import (
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
Expand Down Expand Up @@ -538,7 +537,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
}
defer cc.Close()
testClient := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
t.Fatal(err)
}

Expand All @@ -561,7 +560,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -921,7 +920,7 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) {
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1070,31 +1069,31 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) {
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
t.Fatal(err)
}
}

// waitForAllPrioritiesRemovedError repeatedly makes RPCs using the
// TestServiceClient until they fail with an error which indicates that all
// priorities have been removed. A non-nil error is returned if the context
// expires before RPCs fail with the expected error.
func waitForAllPrioritiesRemovedError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
// TestServiceClient until they fail with an error which indicates that no
// resolver addresses have been produced. A non-nil error is returned if the
// context expires before RPCs fail with the expected error.
func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
t.Log("EmptyCall() succeeded after EDS update with no localities")
t.Log("EmptyCall() succeeded after error in Discovery Mechanism")
continue
}
if code := status.Code(err); code != codes.Unavailable {
t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable)
continue
}
if !strings.Contains(err.Error(), priority.ErrAllPrioritiesRemoved.Error()) {
t.Logf("EmptyCall() = %v, want %v", err, priority.ErrAllPrioritiesRemoved)
if !strings.Contains(err.Error(), "produced zero addresses") {
t.Logf("EmptyCall() = %v, want %v", err, "produced zero addresses")
continue
}
return nil
}
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and priority.ErrAllPrioritiesRemoved error")
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and produced zero addresses")
}
9 changes: 0 additions & 9 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Expand Up @@ -38,7 +38,6 @@ type resourceUpdate struct {
// from underlying concrete resolvers.
type topLevelResolver interface {
onUpdate()
onError(error)
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -277,11 +276,3 @@ func (rr *resourceResolver) onUpdate() {
rr.generateLocked()
rr.mu.Unlock()
}

func (rr *resourceResolver) onError(err error) {
select {
case <-rr.updateChannel:
default:
}
rr.updateChannel <- &resourceUpdate{err: err}
}
18 changes: 15 additions & 3 deletions xds/internal/balancer/clusterresolver/resource_resolver_dns.go
Expand Up @@ -75,13 +75,15 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
}
u, err := url.Parse("dns:///" + target)
if err != nil {
topLevelResolver.onError(fmt.Errorf("failed to parse dns hostname %q in clusterresolver LB policy", target))
ret.updateReceived = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably throw a warning log here and in the next error case. I would even be OK with an error log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep it consistent with the error logs already in codebase (from ReportError), switched to Info log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Log lines should be capitalized, unlike error strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, noted. Switched.

ret.topLevelResolver.onUpdate()
return ret
}

r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{})
if err != nil {
topLevelResolver.onError(fmt.Errorf("failed to build DNS resolver for target %q: %v", target, err))
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
return ret
}
ret.dnsR = r
Expand Down Expand Up @@ -142,7 +144,17 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
dr.logger.Infof("DNS discovery mechanism for resource %q reported error: %v", dr.target, err)
}

dr.topLevelResolver.onError(err)
dr.mu.Lock()
// Don't report any errors after first error or good address update.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it important to suppress subsequent errors? It probably doesn't have any downstream effects today, but are there any other good reasons?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm btw I talked to Doug yesterday about this in our 1:1. The channel eats errors from the resolver if it's already received a good address update, however he mentioned it's also fine to add an extra eat for that scenario at this level. My logic was to suppress since no downstream effects, and also to keep it similar to how you handled subsequent errors in eds resolver (write an empty update, next error returns early).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can be a little more verbose in this comment. Maybe this is too verbose, but something to indicate why suppressing the errors is beneficial here:

// If a previous good update was received, suppress the error and continue using the
// previous update. If RPCs were succeeding prior to this, they will continue to do so.
// Also suppress errors if we previously received an error, since there will be no
// downstream effects of propagating this error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. Used your comment.

if dr.updateReceived {
dr.mu.Unlock()
return
}
dr.addrs = nil
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are calling onUpdate for both good updates and errors, we no longer need the onError method on the topLevelResolver. EDS discovery mechanism does not use it either. So, you can delete the implementation of that method in resourceResolver and remove that method from the topLevelResolver interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm ok. Will delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually being called in DNS Discovery Mechanism Construction errors, but as per the back and forth in chat I need to switch this to do same thing DNS errors before update do - create the priority and send TF down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this in Discovery Mechanism Constructor same way as OnError - set updateReceived to true and then call onUpdate on the top level resolver. Let me know if you want to change the bool (I need to set it for lastUpdate() to return true).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you want to change the bool

What do you mean by this? I'm good with how it is looking now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the construction to also set this bool and trigger TF. Thus, I don't know if the naming is appropriate. Has it received an update, or is it just triggering an error? I'm fine leaving as is also though.

}

func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
Expand Down