Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/resolver: cleanup tests to use real xDS client 2/n #5952

Merged
merged 3 commits into from Jan 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
126 changes: 105 additions & 21 deletions xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -447,34 +447,118 @@ func (s) TestResolverResourceName(t *testing.T) {
}
}

// TestXDSResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xdsClient is received after the resolver is closed.
func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer cancel()
// TestResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xDS client is received after the resolver is closed, and
// verifies that the update is not propagated to the ClientConn.
func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
// Setup the management server that synchronizes with the test goroutine
// using two channels. The management server signals the test goroutine when
// it receives a discovery request for a route configuration resource. And
// the test goroutine signals the management server when the resolver is
// closed.
waitForRouteConfigDiscoveryReqCh := make(chan struct{})
waitForResolverCloseCh := make(chan struct{})
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() == version.V3RouteConfigURL {
close(waitForRouteConfigDiscoveryReqCh)
<-waitForResolverCloseCh
}
return nil
},
})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
cdsName := "cluster-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Call the watchAPI callback after closing the resolver, and make sure no
// update is triggerred on the ClientConn.
xdsR.Close()
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}},
},
},
}, nil)
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()

// Wait for a discovery request for a route configuration resource.
select {
case <-waitForRouteConfigDiscoveryReqCh:
case <-ctx.Done():
t.Fatal("Timeout when waiting for a discovery request for a route configuration resource")
}

// Close the resolver and unblock the management server.
rClose()
close(waitForResolverCloseCh)

// Verify that the update from the management server is not propagated to
// the ClientConn. The xDS resolver, once closed, is expected to drop
// updates from the xDS client.
Copy link
Contributor

@zasweq zasweq Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for myself: so is this what happens here? The management server is running one line at a time not concurrently. So when you unblock the management server above, it is free to process/send back the update on line 500? Which is why you block, to make sure update happens after the resolver closure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand what you mean by The management server is running one line at a time not concurrently?

Which is why you block, to make sure update happens after the resolver closure?

Yes. The point of this test is to ensure that any updates received from the management server after the resolver is closed, is not propagated up the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning it is only running one function at a time and not splicing between them (i.e. the function with the receive that is blocking), and actually sending the update you specified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The management server can run more than one function at any point in time. Whenever it gets a discovery request, it will invoke this callback. But we know that our xDS client does not re-send discovery requests randomly (if that happens, more than one callback will get blocked on this channel, and a goroutine will leak and the test will fail), and therefore we can reliably block the callback until we close the resolver and unblock it afterwards.

sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if gotVal, gotErr := tcc.stateCh.Receive(sCtx); gotErr != context.DeadlineExceeded {
t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal)
if _, err := tcc.stateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("ClientConn received an update from the resolver that was closed: %v", err)
}
}

// TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method
// closes the xDS client.
func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
bootstrapCfg := &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy-management-server-address",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So xDS Client creation doesn't fail with the NewConfigForTesting even with a non-existent management server? When would that fail, first request sent out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The management server URI is used as the dial target when creating the ClientConn to the management server. And we don't perform a blocking dial there. So, if the address does not exist, we will get to know only at RPC time.

Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV3,
},
}

// Override xDS client creation to use bootstrap configuration pointing to a
// dummy management server. Also close a channel when the returned xDS
// client is closed.
closeCh := make(chan struct{})
origNewClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
return c, func() {
close(closeCh)
cancel()
}, err
}
defer func() {
newXDSClient = origNewClient
}()

_, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")})
rClose()

select {
case <-closeCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for xDS client to be closed")
}
}

Expand Down