Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: send Node proto only on first discovery request on ADS stream #6078

Merged
merged 2 commits into from Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 11 additions & 7 deletions xds/internal/testutils/fakeserver/server.go
Expand Up @@ -114,13 +114,17 @@ func (wl *wrappedListener) Accept() (net.Conn, error) {
return c, err
}

// StartServer makes a new Server and gets it to start listening on a local
// port for gRPC requests. The returned cancel function should be invoked by
// the caller upon completion of the test.
func StartServer() (*Server, func(), error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err)
// StartServer makes a new Server and gets it to start listening on the given
// net.Listener. If the given net.Listener is nil, a new one is created on a
// local port for gRPC requests. The returned cancel function should be invoked
// by the caller upon completion of the test.
func StartServer(lis net.Listener) (*Server, func(), error) {
if lis == nil {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err)
}
}

s := &Server{
Expand Down
64 changes: 64 additions & 0 deletions xds/internal/testutils/resource_watcher.go
@@ -0,0 +1,64 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package testutils

import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

// TestResourceWatcher implements the xdsresource.ResourceWatcher interface,
// used to receive updates on watches registered with the xDS client, when using
// the resource-type agnostic WatchResource API.
//
// Tests can the channels provided by this tyep to get access to updates and
// errors sent by the xDS client.
type TestResourceWatcher struct {
// UpdateCh is the channel on which xDS client updates are delivered.
UpdateCh chan *xdsresource.ResourceData
// ErrorCh is the channel on which errors from the xDS client are delivered.
ErrorCh chan error
}

// OnUpdate is invoked by the xDS client to report an update on the resource
// being watched.
func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) {
select {
case w.UpdateCh <- &data:
default:
}
}

// OnError is invoked by the xDS client to report errors.
func (w *TestResourceWatcher) OnError(err error) {
select {
case w.ErrorCh <- err:
default:
}
}

// OnResourceDoesNotExist is used by the xDS client to report that the resource
// being watched no longer exists.
func (w *TestResourceWatcher) OnResourceDoesNotExist() {}

// NewTestResourceWatcher returns a TestResourceWatcher to watch for resources
// via the xDS client.
func NewTestResourceWatcher() *TestResourceWatcher {
return &TestResourceWatcher{
UpdateCh: make(chan *xdsresource.ResourceData),
ErrorCh: make(chan error),
}
}
52 changes: 13 additions & 39 deletions xds/internal/xdsclient/authority_test.go
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*
*/

package xdsclient

import (
Expand All @@ -32,41 +33,14 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
)

var emptyServerOpts = e2e.ManagementServerOptions{}

type testResourceWatcher struct {
updateCh chan *xdsresource.ResourceData
errorCh chan error
}

func (w *testResourceWatcher) OnUpdate(data xdsresource.ResourceData) {
select {
case w.updateCh <- &data:
default:
}
}

func (w *testResourceWatcher) OnError(err error) {
select {
case w.errorCh <- err:
default:
}
}

func (w *testResourceWatcher) OnResourceDoesNotExist() {}

func newTestResourceWatcher() *testResourceWatcher {
return &testResourceWatcher{
updateCh: make(chan *xdsresource.ResourceData),
errorCh: make(chan error),
}
}

var (
// Listener resource type implementation retrieved from the resource type map
// in the internal package, which is initialized when the individual resource
Expand Down Expand Up @@ -131,7 +105,7 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {
defer a.close()

rn := "xdsclient-test-lds-resource"
w := newTestResourceWatcher()
w := testutils.NewTestResourceWatcher()
cancelResource := a.watchResource(listenerResourceType, rn, w)
defer cancelResource()

Expand All @@ -155,9 +129,9 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received an update from server.")
case err := <-w.errorCh:
case err := <-w.ErrorCh:
t.Fatalf("Watch got an unexpected error update: %q. Want valid updates.", err)
case <-w.updateCh:
case <-w.UpdateCh:
// This means the OnUpdate callback was invoked and the watcher was notified.
}
if err := compareWatchState(a, rn, watchStateReceived); err != nil {
Expand All @@ -176,7 +150,7 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) {
defer a.close()

rn := "xdsclient-test-lds-resource"
w := newTestResourceWatcher()
w := testutils.NewTestResourceWatcher()
cancelResource := a.watchResource(listenerResourceType, rn, w)
defer cancelResource()

Expand All @@ -188,7 +162,7 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before verifying error propagation.")
case err := <-w.errorCh:
case err := <-w.ErrorCh:
if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection {
t.Fatal("Connection error not propagated to watchers.")
}
Expand Down Expand Up @@ -219,7 +193,7 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
defer a.close()

nameA := "xdsclient-test-lds-resourceA"
watcherA := newTestResourceWatcher()
watcherA := testutils.NewTestResourceWatcher()
cancelA := a.watchResource(listenerResourceType, nameA, watcherA)

if err := updateResourceInServer(ctx, ms, nameA, nodeID); err != nil {
Expand All @@ -231,13 +205,13 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received the update.")
case err := <-watcherA.errorCh:
case err := <-watcherA.ErrorCh:
t.Fatalf("Watch got an unexpected error update: %q; want: valid update.", err)
case <-watcherA.updateCh:
case <-watcherA.UpdateCh:
}

nameB := "xdsclient-test-lds-resourceB"
watcherB := newTestResourceWatcher()
watcherB := testutils.NewTestResourceWatcher()
cancelB := a.watchResource(listenerResourceType, nameB, watcherB)
defer cancelB()

Expand All @@ -249,9 +223,9 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("Test timed out before mgmt server got the request.")
case u := <-watcherB.updateCh:
case u := <-watcherB.UpdateCh:
t.Fatalf("Watch got an unexpected resource update: %v.", u)
case gotErr := <-watcherB.errorCh:
case gotErr := <-watcherB.ErrorCh:
wantErr := xdsresource.ErrorTypeConnection
if xdsresource.ErrType(gotErr) != wantErr {
t.Fatalf("Watch got an unexpected error:%q. Want: %q.", gotErr, wantErr)
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/loadreport_test.go
Expand Up @@ -43,7 +43,7 @@ const (
)

func (s) TestLRSClient(t *testing.T) {
fs, sCleanup, err := fakeserver.StartServer()
fs, sCleanup, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (s) TestLRSClient(t *testing.T) {
t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
}

fs2, sCleanup2, err := fakeserver.StartServer()
fs2, sCleanup2, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("failed to start fake xDS server: %v", err)
}
Expand Down