From 16e0fb84932a3999d523abe7babe46e792ab8728 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 23 Jan 2023 13:30:00 -0800 Subject: [PATCH 1/2] xds: fix panic involving double close of channel in xDS transport --- xds/internal/xdsclient/transport/loadreport.go | 9 +++++++++ .../xdsclient/transport/loadreport_test.go | 14 +++++++++++++- xds/internal/xdsclient/transport/transport.go | 1 - 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/xds/internal/xdsclient/transport/loadreport.go b/xds/internal/xdsclient/transport/loadreport.go index a683afd5793..d6ecd954d3a 100644 --- a/xds/internal/xdsclient/transport/loadreport.go +++ b/xds/internal/xdsclient/transport/loadreport.go @@ -62,6 +62,11 @@ func (t *Transport) lrsStartStream() { ctx, cancel := context.WithCancel(context.Background()) t.lrsCancelStream = cancel + + // Create a new done channel everytime a new stream is created. This ensures + // that we don't close the same channel multiple times (from lrsRunner() + // goroutine) when multiple streams are created and closed. + t.lrsRunnerDoneCh = make(chan struct{}) go t.lrsRunner(ctx) } @@ -78,7 +83,11 @@ func (t *Transport) lrsStopStream() { t.lrsCancelStream() t.logger.Infof("Stopping LRS stream") + + // Wait for the runner goroutine to exit, and set the done channel to nil. + // The done channel will be recreated when a new stream is created. <-t.lrsRunnerDoneCh + t.lrsRunnerDoneCh = nil } // lrsRunner starts an LRS stream to report load data to the management server. diff --git a/xds/internal/xdsclient/transport/loadreport_test.go b/xds/internal/xdsclient/transport/loadreport_test.go index f6203c9b442..815ca25b27b 100644 --- a/xds/internal/xdsclient/transport/loadreport_test.go +++ b/xds/internal/xdsclient/transport/loadreport_test.go @@ -54,7 +54,7 @@ func (s) TestReportLoad(t *testing.T) { NodeProto: nodeProto, } - // Create a transport to the fake server. + // Create a transport to the fake management server. tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, UpdateHandler: func(transport.ResourceUpdate) error { return nil }, // No ADS validation. @@ -190,4 +190,16 @@ func (s) TestReportLoad(t *testing.T) { if _, err := mgmtServer.LRSStreamCloseChan.Receive(ctx); err != nil { t.Fatal("Timeout waiting for LRS stream to close") } + + // Calling the load reporting API again should result in the creation of a + // new LRS stream. This ensures that creating and closing multiple streams + // works smoothly. + _, cancelLRS3 := tr.ReportLoad() + if err != nil { + t.Fatalf("Failed to start LRS load reporting: %v", err) + } + if _, err := mgmtServer.LRSStreamOpenChan.Receive(ctx); err != nil { + t.Fatalf("Timeout when waiting for LRS stream to be created: %v", err) + } + cancelLRS3() } diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index e0b9807c164..814ca5f8726 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -202,7 +202,6 @@ func New(opts Options) (*Transport, error) { versions: make(map[string]string), nonces: make(map[string]string), adsRunnerDoneCh: make(chan struct{}), - lrsRunnerDoneCh: make(chan struct{}), } // This context is used for sending and receiving RPC requests and From d06759d8320fc916b736be06cef89a42eafe8f7c Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 23 Jan 2023 14:15:37 -0800 Subject: [PATCH 2/2] eliminate paranoia --- xds/internal/xdsclient/transport/loadreport.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/xds/internal/xdsclient/transport/loadreport.go b/xds/internal/xdsclient/transport/loadreport.go index d6ecd954d3a..58a2e5dedb6 100644 --- a/xds/internal/xdsclient/transport/loadreport.go +++ b/xds/internal/xdsclient/transport/loadreport.go @@ -84,10 +84,9 @@ func (t *Transport) lrsStopStream() { t.lrsCancelStream() t.logger.Infof("Stopping LRS stream") - // Wait for the runner goroutine to exit, and set the done channel to nil. - // The done channel will be recreated when a new stream is created. + // Wait for the runner goroutine to exit. The done channel will be + // recreated when a new stream is created. <-t.lrsRunnerDoneCh - t.lrsRunnerDoneCh = nil } // lrsRunner starts an LRS stream to report load data to the management server.