Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clusterresolver: push empty config to child policy upon removal of cluster resource #6125

Merged
merged 5 commits into from Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
188 changes: 0 additions & 188 deletions xds/internal/balancer/clusterresolver/clusterresolver_test.go
Expand Up @@ -147,22 +147,6 @@ func (f *fakeChildBalancer) waitForClientConnStateChangeVerifyBalancerConfig(ctx
return nil
}

func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error {
_, err := f.clientConnState.Receive(ctx)
if err != nil {
return err
}
return nil
}

func (f *fakeChildBalancer) waitForResolverError(ctx context.Context) error {
_, err := f.resolverError.Receive(ctx)
if err != nil {
return err
}
return nil
}

func (f *fakeChildBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
val, err := f.subConnState.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -258,178 +242,6 @@ func (s) TestSubConnStateChange(t *testing.T) {
}
}

// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is
// handled correctly.
//
// If it's resource-not-found, watch will NOT be canceled, the EDS impl will
// receive an empty EDS update, and new RPCs will fail.
//
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSService),
}); err != nil {
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsLB.waitForClientConnStateChange(ctx); err != nil {
t.Fatalf("EDS impl got unexpected update: %v", err)
}

connectionErr := xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, connectionErr)

sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}

sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal(err)
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}

resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, resourceErr)
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
// error, but we still handles it).
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal(err)
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}

// An update with the same service name should not trigger a new watch.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSService),
}); err != nil {
t.Fatal(err)
}
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if _, err := xdsC.WaitForWatchEDS(sCtx); err != context.DeadlineExceeded {
t.Fatal("got unexpected new EDS watch")
}
}

// TestErrorFromResolver verifies that resolver errors are handled correctly.
//
// If it's resource-not-found, watch will be canceled, the EDS impl will receive
// an empty EDS update, and new RPCs will fail.
//
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromResolver(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSService),
}); err != nil {
t.Fatal(err)
}

if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsLB.waitForClientConnStateChange(ctx); err != nil {
t.Fatalf("EDS impl got unexpected update: %v", err)
}

connectionErr := xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)

sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}

sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal("eds impl got EDS resp, want timeout error")
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}

resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
edsB.ResolverError(resourceErr)
if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal(err)
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}

// An update with the same service name should trigger a new watch, because
// the previous watch was canceled.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSService),
}); err != nil {
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
}

// Given a list of resource names, verifies that EDS requests for the same are
// sent by the EDS balancer, through the fake xDS client.
func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error {
Expand Down