Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: make comparison of server configs in bootstrap more reliable #6112

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/googlecloud"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/structpb"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"

_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
)

const (
Expand Down Expand Up @@ -116,9 +116,14 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
if balancerName == "" {
balancerName = tdURL
}
serverConfig := &bootstrap.ServerConfig{
ServerURI: balancerName,
Creds: grpc.WithCredentialsBundle(google.NewDefaultCredentials()),
serverConfig, err := bootstrap.ServerConfigFromJSON([]byte(fmt.Sprintf(`
{
"server_uri": "%s",
"channel_creds": [{"type": "google_default"}],
"server_features": ["xds_v3"]
}`, balancerName)))
if err != nil {
return nil, fmt.Errorf("failed to build bootstrap configuration: %v", err)
}
config := &bootstrap.Config{
XDSServer: serverConfig,
Expand Down
20 changes: 13 additions & 7 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package googledirectpath

import (
"fmt"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -212,15 +213,20 @@ func TestBuildXDS(t *testing.T) {
},
}
}
serverConfig := &bootstrap.ServerConfig{
ServerURI: tdURL,
wantServerConfig, err := bootstrap.ServerConfigFromJSON([]byte(fmt.Sprintf(`{
"server_uri": "%s",
"channel_creds": [{"type": "google_default"}],
"server_features": ["xds_v3"]
}`, tdURL)))
if err != nil {
t.Fatalf("Failed to build server bootstrap config: %v", err)
}
wantConfig := &bootstrap.Config{
XDSServer: serverConfig,
XDSServer: wantServerConfig,
ClientDefaultListenerResourceNameTemplate: "%s",
Authorities: map[string]*bootstrap.Authority{
"traffic-director-c2p.xds.googleapis.com": {
XDSServer: serverConfig,
XDSServer: wantServerConfig,
},
},
NodeProto: wantNode,
Expand All @@ -234,9 +240,9 @@ func TestBuildXDS(t *testing.T) {
protocmp.Transform(),
}
select {
case c := <-configCh:
if diff := cmp.Diff(c, wantConfig, cmpOpts); diff != "" {
t.Fatalf("%v", diff)
case gotConfig := <-configCh:
if diff := cmp.Diff(wantConfig, gotConfig, cmpOpts); diff != "" {
t.Fatalf("Unexpected diff in bootstrap config (-want +got):\n%s", diff)
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for client config")
Expand Down
4 changes: 3 additions & 1 deletion xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const (
var (
defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{
ServerURI: "self_server",
CredsType: "self_creds",
Creds: bootstrap.ChannelCreds{
Type: "insecure",
},
}
noopODLBCfg = outlierdetection.LBConfig{
Interval: 1<<63 - 1,
Expand Down
8 changes: 6 additions & 2 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ var (
}
testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
Creds: bootstrap.ChannelCreds{
Type: "google_default",
},
}

cmpOpts = cmp.Options{
Expand Down Expand Up @@ -720,7 +722,9 @@ func (s) TestUpdateLRSServer(t *testing.T) {

testLRSServerConfig2 := &bootstrap.ServerConfig{
ServerURI: "trafficdirector-another.googleapis.com:443",
CredsType: "google_default",
Creds: bootstrap.ChannelCreds{
Type: "google_default",
},
}
// Update LRS server to a different name.
if err := b.UpdateClientConnState(balancer.ClientConnState{
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
} else {
// Old is not nil, new is not nil, compare string values, if
// different, stop old and start new.
if *b.lrsServer != *newConfig.LoadReportingServer {
if !b.lrsServer.Equal(newConfig.LoadReportingServer) {
b.lrsServer = newConfig.LoadReportingServer
stopOldLoadReport = true
startNewLoadReport = true
Expand Down
166 changes: 166 additions & 0 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package clusterimpl_test

import (
"context"
"fmt"
"net"
"strconv"
"strings"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"

_ "google.golang.org/grpc/xds"
)

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestConfigUpdateWithSameLoadReportingServerConfig tests the scenario where
// the clusterimpl LB policy receives a config update with no change in the load
// reporting server configuration. The test verifies that the existing load
// repoting stream is not terminated and that a new load reporting stream is not
// created.
func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
// Create an xDS management server that serves ADS and LRS requests.
opts := e2e.ManagementServerOptions{SupportLoadReportingService: true}
mgmtServer, nodeID, _, resolver, mgmtServerCleanup := e2e.SetupManagementServer(t, opts)
defer mgmtServerCleanup()

// Start a server backend exposing the test service.
backend := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
}
backend.StartServer()
defer backend.Stop()

// Extract the host and port where the server backend is running.
_, p, err := net.SplitHostPort(backend.Address)
if err != nil {
t.Fatalf("Invalid serving address for server backend: %v", err)
}
port, err := strconv.ParseUint(p, 10, 32)
if err != nil {
t.Fatalf("Invalid serving port for server backend: %v", err)
}
t.Logf("Started server backend at %q", backend.Address)

// Configure the xDS management server with default resources. Override the
// default cluster to include an LRS server config pointing to self.
const serviceName = "my-test-xds-service"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: uint32(port),
SecLevel: e2e.SecurityLevelNone,
})
resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Ensure that an LRS stream is created.
if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
t.Fatalf("Failure when waiting for an LRS stream to be opened: %v", err)
}

// Configure a new resource on the management server with drop config that
// drops all RPCs, but with no change in the load reporting server config.
resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{
e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: "endpoints-" + serviceName,
Host: "localhost",
Ports: []uint32{uint32(port)},
DropPercents: map[string]int{"test-drop-everything": 100},
}),
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Repeatedly send RPCs until we sees that they are getting dropped, or the
// test context deadline expires. The former indicates that new config with
// drops has been applied.
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err != nil && status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "RPC is dropped") {
break
}
}
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for RPCs to be dropped after config update")
}

// Ensure that the old LRS stream is not closed.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := mgmtServer.LRSServer.LRSStreamCloseChan.Receive(sCtx); err == nil {
t.Fatal("LRS stream closed when expected not to")
}

// Also ensure that a new LRS stream is not created.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(sCtx); err == nil {
t.Fatal("New LRS stream created when expected not to")
}
}
4 changes: 3 additions & 1 deletion xds/internal/balancer/clusterresolver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ const (

var testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
Creds: bootstrap.ChannelCreds{
Type: "google_default",
},
}

func TestParseConfig(t *testing.T) {
Expand Down
12 changes: 3 additions & 9 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -53,6 +52,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/httpfilter/router"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
Expand Down Expand Up @@ -213,10 +213,7 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) {

// Add top-level xDS server config corresponding to the above
// management server.
test.bootstrapCfg.XDSServer = &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
test.bootstrapCfg.XDSServer = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address)

// Override xDS client creation to use bootstrap configuration
// specified by the test.
Expand Down Expand Up @@ -538,10 +535,7 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
// closes the xDS client.
func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
bootstrapCfg := &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy-management-server-address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
},
XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"),
}

// Override xDS client creation to use bootstrap configuration pointing to a
Expand Down
21 changes: 21 additions & 0 deletions xds/internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package testutils

import (
"fmt"
"testing"

"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
)
Expand All @@ -45,3 +49,20 @@ func BuildResourceName(typ xdsresource.ResourceType, auth, id string, ctxParams
ContextParams: ctxParams,
}).String()
}

// ServerConfigForAddress returns a bootstrap.ServerConfig for the given address
// with default values of insecure channel_creds and v3 server_features.
func ServerConfigForAddress(t *testing.T, addr string) *bootstrap.ServerConfig {
t.Helper()

jsonCfg := fmt.Sprintf(`{
"server_uri": "%s",
"channel_creds": [{"type": "insecure"}],
"server_features": ["xds_v3"]
}`, addr)
sc, err := bootstrap.ServerConfigFromJSON([]byte(jsonCfg))
if err != nil {
t.Fatalf("Failed to create server config from JSON %s: %v", jsonCfg, err)
}
return sc
}