Skip to content

Commit

Permalink
clusterresolver: push empty config to child policy upon removal of cl…
Browse files Browse the repository at this point in the history
…uster resource
  • Loading branch information
easwars committed Mar 16, 2023
1 parent 6f44ae8 commit b286c3c
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
161 changes: 161 additions & 0 deletions xds/internal/balancer/cdsbalancer/tests/balancer_test.go
@@ -0,0 +1,161 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package tests

import (
"context"
"fmt"
"net"
"strconv"
"strings"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"

_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the "cds_experimental" LB policy.
)

const (
clusterName = "cluster-my-service-client-side-xds"
edsServiceName = "endpoints-my-service-client-side-xds"
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestClusterResourceRemoved tests the case where the cds LB policy is
// configured as the top-level LB policy. The test verifies that removal of the
// associated cluster resource results in RPCs failing with Unavailable and the
// channel moving to TransientFailure.
func (s) TestClusterResourceRemoved(t *testing.T) {
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup1()

// Start a test backend and extract its host and port.
backend := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
}
backend.StartServer()
defer backend.Stop()
_, p, err := net.SplitHostPort(backend.Address)
if err != nil {
t.Fatalf("Failed to split test backend address %q: %v", backend.Address, err)
}
port, err := strconv.ParseUint(p, 10, 32)
if err != nil {
t.Fatalf("Failed to parse test backend port %q: %v", backend.Address, err)
}

// Configure cluster and endpoints resources in the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(port)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create an xDS xdsClient for use by the cluster_resolver LB policy.
xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()

// Create a manual resolver and push a service config specifying the use of
// the cds LB policy as the top-level LB policy, and a corresponding config
// with a single cluster.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cds_experimental":{
"cluster": "%s"
}
}]
}`, clusterName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Delete the cluster resource from the mangement server.
resources.Clusters = nil
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Ensure that RPCs start to fail with expected error.
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
_, err := client.EmptyCall(sCtx, &testpb.Empty{})
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") {
break
}
if err != nil {
t.Logf("EmptyCall RPC failed: %v", err)
}
}
if ctx.Err() != nil {
t.Fatalf("RPCs did not fail after removal of Cluster resource")
}

// Ensure that the ClientConn is in TransientFailure.
if state := cc.GetState(); state != connectivity.TransientFailure {
t.Fatalf("Unexpected connectivity state for ClientConn, got: %s, want %s", state, connectivity.TransientFailure)
}
}
6 changes: 6 additions & 0 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Expand Up @@ -224,6 +224,12 @@ func (rr *resourceResolver) stop() {
for _, r := range cm {
r.r.stop()
}

// stop() is called when the LB policy is closed or when the underlying
// cluster resource is removed by the management server. In the latter case,
// an empty config update needs to be pushed to the child policy to ensure
// that a picker that fails RPCs is sent up to the channel.
rr.updateChannel <- &resourceUpdate{}
}

// generateLocked collects updates from all resolvers. It pushes the combined
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/authority.go
Expand Up @@ -483,6 +483,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// There are no more watchers for this resource, delete the state
// associated with it, and instruct the transport to send a request
// which does not include this resource name.
a.logger.Debugf("Removing last watch for type %q, resource name %q", rType.TypeEnum(), resourceName)
delete(resources, resourceName)
a.sendDiscoveryRequestLocked(rType, resources)
}
Expand Down

0 comments on commit b286c3c

Please sign in to comment.