Skip to content

Commit

Permalink
xdsclient: send Node proto only on first discovery request on ADS str…
Browse files Browse the repository at this point in the history
…eam (#6078)
  • Loading branch information
easwars committed Mar 3, 2023
1 parent ae4a231 commit b9e6d59
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 54 deletions.
18 changes: 11 additions & 7 deletions xds/internal/testutils/fakeserver/server.go
Expand Up @@ -114,13 +114,17 @@ func (wl *wrappedListener) Accept() (net.Conn, error) {
return c, err
}

// StartServer makes a new Server and gets it to start listening on a local
// port for gRPC requests. The returned cancel function should be invoked by
// the caller upon completion of the test.
func StartServer() (*Server, func(), error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err)
// StartServer makes a new Server and gets it to start listening on the given
// net.Listener. If the given net.Listener is nil, a new one is created on a
// local port for gRPC requests. The returned cancel function should be invoked
// by the caller upon completion of the test.
func StartServer(lis net.Listener) (*Server, func(), error) {
if lis == nil {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err)
}
}

s := &Server{
Expand Down
64 changes: 64 additions & 0 deletions xds/internal/testutils/resource_watcher.go
@@ -0,0 +1,64 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package testutils

import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

// TestResourceWatcher implements the xdsresource.ResourceWatcher interface,
// used to receive updates on watches registered with the xDS client, when using
// the resource-type agnostic WatchResource API.
//
// Tests can the channels provided by this tyep to get access to updates and
// errors sent by the xDS client.
type TestResourceWatcher struct {
// UpdateCh is the channel on which xDS client updates are delivered.
UpdateCh chan *xdsresource.ResourceData
// ErrorCh is the channel on which errors from the xDS client are delivered.
ErrorCh chan error
}

// OnUpdate is invoked by the xDS client to report an update on the resource
// being watched.
func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) {
select {
case w.UpdateCh <- &data:
default:
}
}

// OnError is invoked by the xDS client to report errors.
func (w *TestResourceWatcher) OnError(err error) {
select {
case w.ErrorCh <- err:
default:
}
}

// OnResourceDoesNotExist is used by the xDS client to report that the resource
// being watched no longer exists.
func (w *TestResourceWatcher) OnResourceDoesNotExist() {}

// NewTestResourceWatcher returns a TestResourceWatcher to watch for resources
// via the xDS client.
func NewTestResourceWatcher() *TestResourceWatcher {
return &TestResourceWatcher{
UpdateCh: make(chan *xdsresource.ResourceData),
ErrorCh: make(chan error),
}
}
52 changes: 13 additions & 39 deletions xds/internal/xdsclient/authority_test.go
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*
*/

package xdsclient

import (
Expand All @@ -32,41 +33,14 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
)

var emptyServerOpts = e2e.ManagementServerOptions{}

type testResourceWatcher struct {
updateCh chan *xdsresource.ResourceData
errorCh chan error
}

func (w *testResourceWatcher) OnUpdate(data xdsresource.ResourceData) {
select {
case w.updateCh <- &data:
default:
}
}

func (w *testResourceWatcher) OnError(err error) {
select {
case w.errorCh <- err:
default:
}
}

func (w *testResourceWatcher) OnResourceDoesNotExist() {}

func newTestResourceWatcher() *testResourceWatcher {
return &testResourceWatcher{
updateCh: make(chan *xdsresource.ResourceData),
errorCh: make(chan error),
}
}

var (
// Listener resource type implementation retrieved from the resource type map
// in the internal package, which is initialized when the individual resource
Expand Down Expand Up @@ -131,7 +105,7 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {
defer a.close()

rn := "xdsclient-test-lds-resource"
w := newTestResourceWatcher()
w := testutils.NewTestResourceWatcher()
cancelResource := a.watchResource(listenerResourceType, rn, w)
defer cancelResource()

Expand All @@ -155,9 +129,9 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received an update from server.")
case err := <-w.errorCh:
case err := <-w.ErrorCh:
t.Fatalf("Watch got an unexpected error update: %q. Want valid updates.", err)
case <-w.updateCh:
case <-w.UpdateCh:
// This means the OnUpdate callback was invoked and the watcher was notified.
}
if err := compareWatchState(a, rn, watchStateReceived); err != nil {
Expand All @@ -176,7 +150,7 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) {
defer a.close()

rn := "xdsclient-test-lds-resource"
w := newTestResourceWatcher()
w := testutils.NewTestResourceWatcher()
cancelResource := a.watchResource(listenerResourceType, rn, w)
defer cancelResource()

Expand All @@ -188,7 +162,7 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before verifying error propagation.")
case err := <-w.errorCh:
case err := <-w.ErrorCh:
if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection {
t.Fatal("Connection error not propagated to watchers.")
}
Expand Down Expand Up @@ -219,7 +193,7 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
defer a.close()

nameA := "xdsclient-test-lds-resourceA"
watcherA := newTestResourceWatcher()
watcherA := testutils.NewTestResourceWatcher()
cancelA := a.watchResource(listenerResourceType, nameA, watcherA)

if err := updateResourceInServer(ctx, ms, nameA, nodeID); err != nil {
Expand All @@ -231,13 +205,13 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received the update.")
case err := <-watcherA.errorCh:
case err := <-watcherA.ErrorCh:
t.Fatalf("Watch got an unexpected error update: %q; want: valid update.", err)
case <-watcherA.updateCh:
case <-watcherA.UpdateCh:
}

nameB := "xdsclient-test-lds-resourceB"
watcherB := newTestResourceWatcher()
watcherB := testutils.NewTestResourceWatcher()
cancelB := a.watchResource(listenerResourceType, nameB, watcherB)
defer cancelB()

Expand All @@ -249,9 +223,9 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before mgmt server got the request.")
case u := <-watcherB.updateCh:
case u := <-watcherB.UpdateCh:
t.Fatalf("Watch got an unexpected resource update: %v.", u)
case gotErr := <-watcherB.errorCh:
case gotErr := <-watcherB.ErrorCh:
wantErr := xdsresource.ErrorTypeConnection
if xdsresource.ErrType(gotErr) != wantErr {
t.Fatalf("Watch got an unexpected error:%q. Want: %q.", gotErr, wantErr)
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/loadreport_test.go
Expand Up @@ -43,7 +43,7 @@ const (
)

func (s) TestLRSClient(t *testing.T) {
fs, sCleanup, err := fakeserver.StartServer()
fs, sCleanup, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (s) TestLRSClient(t *testing.T) {
t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
}

fs2, sCleanup2, err := fakeserver.StartServer()
fs2, sCleanup2, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
}
Expand Down

0 comments on commit b9e6d59

Please sign in to comment.