Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: correct logic used to suppress empty ADS requests on new streams #7026

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 16 additions & 20 deletions xds/internal/xdsclient/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,29 +363,21 @@ func (t *Transport) send(ctx context.Context) {
// The xDS protocol only requires that we send the node proto in the first
// discovery request on every stream. Sending the node proto in every
// request message wastes CPU resources on the client and the server.
sendNodeProto := true
sentNodeProto := false
for {
select {
case <-ctx.Done():
return
case stream = <-t.adsStreamCh:
// We have a new stream and we've to ensure that the node proto gets
// sent out in the first request on the stream. At this point, we
// might not have any registered watches. Setting this field to true
// here will ensure that the node proto gets sent out along with the
// discovery request when the first watch is registered.
if len(t.resources) == 0 {
sendNodeProto = true
continue
}

if !t.sendExisting(stream) {
// sent out in the first request on the stream.
var err error
if sentNodeProto, err = t.sendExisting(stream); err != nil {
// Send failed, clear the current stream. Attempt to resend will
// only be made after a new stream is created.
stream = nil
continue
}
sendNodeProto = false
case u, ok := <-t.adsRequestCh.Get():
if !ok {
// No requests will be sent after the adsRequestCh buffer is closed.
Expand Down Expand Up @@ -416,12 +408,12 @@ func (t *Transport) send(ctx context.Context) {
// sending response back).
continue
}
if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, resources, url, version, nonce, nackErr); err != nil {
if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, resources, url, version, nonce, nackErr); err != nil {
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, version, nonce, err)
// Send failed, clear the current stream.
stream = nil
}
sendNodeProto = false
sentNodeProto = true
}
}
}
Expand All @@ -433,7 +425,9 @@ func (t *Transport) send(ctx context.Context) {
// that here because the stream has just started and Send() usually returns
// quickly (once it pushes the message onto the transport layer) and is only
// ever blocked if we don't have enough flow control quota.
func (t *Transport) sendExisting(stream adsStream) bool {
//
// Returns true if the node proto was sent.
func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err error) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -450,16 +444,18 @@ func (t *Transport) sendExisting(stream adsStream) bool {
t.nonces = make(map[string]string)

// Send node proto only in the first request on the stream.
sendNodeProto := true
for url, resources := range t.resources {
if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
if len(resources) == 0 {
continue
}
if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, t.versions[url], "", err)
return false
return false, err
}
sendNodeProto = false
sentNodeProto = true
}

return true
return sentNodeProto, nil
}

// recv receives xDS responses on the provided ADS stream and branches out to
Expand Down
191 changes: 191 additions & 0 deletions xds/internal/xdsclient/transport/transport_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package transport_test

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -217,3 +218,193 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) {
})
}
}

func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()

// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Remove the subscription by requesting an empty list.
tr.SendRequest(version.V3ListenerURL, []string{})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}

// Ensure no request is sent since there are no resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}

tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent with the node proto.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

}

func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()

// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Send a request for a cluster resource.
tr.SendRequest(version.V3ClusterURL, []string{resource})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Remove the cluster subscription by requesting an empty list.
tr.SendRequest(version.V3ClusterURL, []string{})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}

// Ensure the proper LDS request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Ensure no cluster request is sent since there are no cluster resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}
}