From eac9255449f8ece6e677dbd5b7b3e213dbb42b9d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 18 Jan 2023 14:57:50 -0800 Subject: [PATCH 1/3] xds/resolver: cleanup tests to use real xDS client --- xds/internal/resolver/xds_resolver_test.go | 151 ++++++++++++++++++--- 1 file changed, 130 insertions(+), 21 deletions(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index e35b4b6ecec..19bafd3b1af 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -447,34 +447,143 @@ func (s) TestResolverResourceName(t *testing.T) { } } -// TestXDSResolverWatchCallbackAfterClose tests the case where a service update -// from the underlying xdsClient is received after the resolver is closed. -func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer cancel() +// TestResolverWatchCallbackAfterClose tests the case where a service update +// from the underlying xDS client is received after the resolver is closed, and +// verifies that the update is not propagated to the ClientConn. +func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { + // Setup the management server that synchronizes with the test goroutine + // using two channels. The management server signals the test goroutine when + // it receives a discovery request for a route configuration resource. And + // the test goroutine signals the management server when the resolver is + // closed. + waitForRouteConfigCh := make(chan struct{}) + waitForCloseCh := make(chan struct{}) + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3RouteConfigURL { + close(waitForRouteConfigCh) + <-waitForCloseCh + } + return nil + }, + }) + if err != nil { + t.Fatalf("Failed to start xDS management server: %v", err) + } + defer mgmtServer.Stop() + + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) + } + defer cleanup() + // Configure listener and route configuration resources on the management + // server. + const serviceName = "my-service-client-side-xds" + rdsName := "route-" + serviceName + cdsName := "cluster-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)}, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } - // Call the watchAPI callback after closing the resolver, and make sure no - // update is triggerred on the ClientConn. - xdsR.Close() - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - }, - }, nil) + // Build an xDS resolver that uses the above bootstrap configuration + // Creating the xDS resolver should result in creation of the xDS client. + builder := resolver.Get(xdsScheme) + if builder == nil { + t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + } + tcc := newTestClientConn() + u, err := url.Parse("xds:///" + serviceName) + if err != nil { + t.Fatal(err) + } + r, err := builder.Build(resolver.Target{URL: *u}, tcc, resolver.BuildOptions{}) + if err != nil { + t.Fatalf("builder.Build(%v) returned err: %v", target, err) + } + + // Wait for a discovery request for a route configuration resource. + select { + case <-waitForRouteConfigCh: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for a discovery request with a route configuration resource") + } + // Close the resolver and unblock the management server. + r.Close() + close(waitForCloseCh) + + // Verify that the update from the management server is not propagated to + // the ClientConn. The xDS resolver, once closed, is expected to drop + // updates from the xDS client. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() - if gotVal, gotErr := tcc.stateCh.Receive(sCtx); gotErr != context.DeadlineExceeded { - t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal) + if _, err := tcc.stateCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatalf("ClientConn received an update from the resolver that was closed: %v", err) + } +} + +// TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method +// closes the xDS client. +func (s) TestResolverCloseClosesXDSClient(t *testing.T) { + // Override xDS client creation to use bootstrap configuration pointing to a + // dummy management server. Also close a channel when the returned xDS + // client is closed. + bootstrapCfg := &bootstrap.Config{ + XDSServer: &bootstrap.ServerConfig{ + ServerURI: "dummy-management-server-address", + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + TransportAPI: version.TransportV3, + }, + } + + closeCh := make(chan struct{}) + origNewClient := newXDSClient + newXDSClient = func() (xdsclient.XDSClient, func(), error) { + c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout) + return c, func() { + close(closeCh) + cancel() + }, err + } + defer func() { + newXDSClient = origNewClient + }() + + builder := resolver.Get(xdsScheme) + if builder == nil { + t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + } + + u, err := url.Parse("xds:///" + "dummy") + if err != nil { + t.Fatal(err) + } + r, err := builder.Build(resolver.Target{URL: *u}, newTestClientConn(), resolver.BuildOptions{}) + if err != nil { + t.Fatalf("builder.Build(%v): %v", u, err) + } + r.Close() + + select { + case <-closeCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for xDS client to be closed") } } From b319ee472707b0fa093df2870581d1b0c783459c Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 19 Jan 2023 16:26:22 -0800 Subject: [PATCH 2/3] use helper func buildResolverForTarget --- xds/internal/resolver/xds_resolver_test.go | 35 ++++------------------ 1 file changed, 5 insertions(+), 30 deletions(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 19bafd3b1af..962c8a69901 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -501,21 +501,8 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { t.Fatal(err) } - // Build an xDS resolver that uses the above bootstrap configuration - // Creating the xDS resolver should result in creation of the xDS client. - builder := resolver.Get(xdsScheme) - if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) - } - tcc := newTestClientConn() - u, err := url.Parse("xds:///" + serviceName) - if err != nil { - t.Fatal(err) - } - r, err := builder.Build(resolver.Target{URL: *u}, tcc, resolver.BuildOptions{}) - if err != nil { - t.Fatalf("builder.Build(%v) returned err: %v", target, err) - } + tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + defer rClose() // Wait for a discovery request for a route configuration resource. select { @@ -525,7 +512,7 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { } // Close the resolver and unblock the management server. - r.Close() + rClose() close(waitForCloseCh) // Verify that the update from the management server is not propagated to @@ -565,20 +552,8 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { newXDSClient = origNewClient }() - builder := resolver.Get(xdsScheme) - if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) - } - - u, err := url.Parse("xds:///" + "dummy") - if err != nil { - t.Fatal(err) - } - r, err := builder.Build(resolver.Target{URL: *u}, newTestClientConn(), resolver.BuildOptions{}) - if err != nil { - t.Fatalf("builder.Build(%v): %v", u, err) - } - r.Close() + _, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")}) + rClose() select { case <-closeCh: From fca68dc5da272f4595fe6c12c87594002d4dde7e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 23 Jan 2023 14:02:43 -0800 Subject: [PATCH 3/3] review comments #1 --- xds/internal/resolver/xds_resolver_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 962c8a69901..af772939066 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -456,13 +456,13 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { // it receives a discovery request for a route configuration resource. And // the test goroutine signals the management server when the resolver is // closed. - waitForRouteConfigCh := make(chan struct{}) - waitForCloseCh := make(chan struct{}) + waitForRouteConfigDiscoveryReqCh := make(chan struct{}) + waitForResolverCloseCh := make(chan struct{}) mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3RouteConfigURL { - close(waitForRouteConfigCh) - <-waitForCloseCh + close(waitForRouteConfigDiscoveryReqCh) + <-waitForResolverCloseCh } return nil }, @@ -506,14 +506,14 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { // Wait for a discovery request for a route configuration resource. select { - case <-waitForRouteConfigCh: + case <-waitForRouteConfigDiscoveryReqCh: case <-ctx.Done(): - t.Fatal("Timeout when waiting for a discovery request with a route configuration resource") + t.Fatal("Timeout when waiting for a discovery request for a route configuration resource") } // Close the resolver and unblock the management server. rClose() - close(waitForCloseCh) + close(waitForResolverCloseCh) // Verify that the update from the management server is not propagated to // the ClientConn. The xDS resolver, once closed, is expected to drop @@ -528,9 +528,6 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { // TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method // closes the xDS client. func (s) TestResolverCloseClosesXDSClient(t *testing.T) { - // Override xDS client creation to use bootstrap configuration pointing to a - // dummy management server. Also close a channel when the returned xDS - // client is closed. bootstrapCfg := &bootstrap.Config{ XDSServer: &bootstrap.ServerConfig{ ServerURI: "dummy-management-server-address", @@ -539,6 +536,9 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { }, } + // Override xDS client creation to use bootstrap configuration pointing to a + // dummy management server. Also close a channel when the returned xDS + // client is closed. closeCh := make(chan struct{}) origNewClient := newXDSClient newXDSClient = func() (xdsclient.XDSClient, func(), error) {