From 77f726281c88d4f1e2c5784677ef6236c7b77df5 Mon Sep 17 00:00:00 2001 From: Anmol Sethi Date: Thu, 19 Oct 2023 03:19:53 -0700 Subject: [PATCH] wsjs: Ensure no goroutines leak after Close Closes #330 --- close.go | 4 ++-- conn.go | 7 +------ ws_js.go | 20 ++++++++++++++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/close.go b/close.go index 1053751c..c3dee7e0 100644 --- a/close.go +++ b/close.go @@ -99,14 +99,14 @@ func CloseStatus(err error) StatusCode { // Close will unblock all goroutines interacting with the connection once // complete. func (c *Conn) Close(code StatusCode, reason string) error { - defer c.wgWait() + defer c.wg.Wait() return c.closeHandshake(code, reason) } // CloseNow closes the WebSocket connection without attempting a close handshake. // Use when you do not want the overhead of the close handshake. func (c *Conn) CloseNow() (err error) { - defer c.wgWait() + defer c.wg.Wait() defer errd.Wrap(&err, "failed to close WebSocket") if c.isClosed() { diff --git a/conn.go b/conn.go index 05531c3b..e3bf2e8c 100644 --- a/conn.go +++ b/conn.go @@ -45,8 +45,6 @@ const ( type Conn struct { noCopy - wg sync.WaitGroup - subprotocol string rwc io.ReadWriteCloser client bool @@ -72,6 +70,7 @@ type Conn struct { writeHeaderBuf [8]byte writeHeader header + wg sync.WaitGroup closed chan struct{} closeMu sync.Mutex closeErr error @@ -310,7 +309,3 @@ func (c *Conn) wgGo(fn func()) { fn() }() } - -func (c *Conn) wgWait() { - c.wg.Wait() -} diff --git a/ws_js.go b/ws_js.go index 180d0564..ecb077d4 100644 --- a/ws_js.go +++ b/ws_js.go @@ -47,6 +47,7 @@ type Conn struct { // read limit for a message in bytes. msgReadLimit xsync.Int64 + wg sync.WaitGroup closingMu sync.Mutex isReadClosed xsync.Int64 closeOnce sync.Once @@ -223,6 +224,7 @@ func (c *Conn) write(ctx context.Context, typ MessageType, p []byte) error { // or the connection is closed. // It thus performs the full WebSocket close handshake. func (c *Conn) Close(code StatusCode, reason string) error { + defer c.wg.Wait() err := c.exportedClose(code, reason) if err != nil { return fmt.Errorf("failed to close WebSocket: %w", err) @@ -236,6 +238,7 @@ func (c *Conn) Close(code StatusCode, reason string) error { // note: No different from Close(StatusGoingAway, "") in WASM as there is no way to close // a WebSocket without the close handshake. func (c *Conn) CloseNow() error { + defer c.wg.Wait() return c.Close(StatusGoingAway, "") } @@ -388,10 +391,15 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context { c.isReadClosed.Store(1) ctx, cancel := context.WithCancel(ctx) + c.wg.Add(1) go func() { + defer c.CloseNow() + defer c.wg.Done() defer cancel() - c.read(ctx) - c.Close(StatusPolicyViolation, "unexpected data message") + _, _, err := c.read(ctx) + if err != nil { + c.Close(StatusPolicyViolation, "unexpected data message") + } }() return ctx } @@ -580,3 +588,11 @@ func (m *mu) unlock() { type noCopy struct{} func (*noCopy) Lock() {} + +func (c *Conn) wgGo(fn func()) { + c.wg.Add(1) + go func() { + defer c.wg.Done() + fn() + }() +}