Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix websocket connection leak #7978

Merged
merged 13 commits into from
Dec 18, 2023
1 change: 1 addition & 0 deletions CHANGES/7978.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix websocket connection leak
93 changes: 53 additions & 40 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@
def _pong_not_received(self) -> None:
if self._req is not None and self._req.transport is not None:
self._closed = True
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)

Check warning on line 165 in aiohttp/web_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/web_ws.py#L165

Added line #L165 was not covered by tests
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._exception = asyncio.TimeoutError()
self._req.transport.close()

async def prepare(self, request: BaseRequest) -> AbstractStreamWriter:
# make pre-check to don't hide it by do_handshake() exceptions
Expand Down Expand Up @@ -382,7 +381,10 @@
await self.close()
self._eof_sent = True

async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool:
async def close(
self, *, code: int = WSCloseCode.OK, message: bytes = b"", drain: bool = True
) -> bool:
"""Close websocket connection."""
if self._writer is None:
raise RuntimeError("Call .prepare() first")

Expand All @@ -396,46 +398,53 @@
reader.feed_data(WS_CLOSING_MESSAGE, 0)
await self._waiting

if not self._closed:
self._closed = True
try:
await self._writer.close(code, message)
writer = self._payload_writer
assert writer is not None
await writer.drain()
except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
raise
except Exception as exc:
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
self._exception = exc
return True
if self._closed:
return False

if self._closing:
return True
self._closed = True
try:
await self._writer.close(code, message)
writer = self._payload_writer
assert writer is not None
if drain:
await writer.drain()
except (asyncio.CancelledError, asyncio.TimeoutError):
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
raise
except Exception as exc:
self._exception = exc
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
return True

reader = self._reader
assert reader is not None
try:
async with async_timeout.timeout(self._timeout):
msg = await reader.read()
except asyncio.CancelledError:
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
raise
except Exception as exc:
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
self._exception = exc
return True
if self._closing:
return True

if msg.type == WSMsgType.CLOSE:
self._close_code = msg.data
return True
reader = self._reader
assert reader is not None
try:
async with async_timeout.timeout(self._timeout):
msg = await reader.read()
except asyncio.CancelledError:
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
raise
except Exception as exc:
self._exception = exc
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
return True

self._close_code = WSCloseCode.ABNORMAL_CLOSURE
self._exception = asyncio.TimeoutError()
if msg.type == WSMsgType.CLOSE:
self._set_code_close_transport(msg.data)
return True
else:
return False

self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
self._exception = asyncio.TimeoutError()
return True

def _set_code_close_transport(self, code: WSCloseCode) -> None:
"""Set the close code and close the transport."""
self._close_code = code
if self._req is not None and self._req.transport is not None:
self._req.transport.close()

async def receive(self, timeout: Optional[float] = None) -> WSMessage:
if self._reader is None:
Expand Down Expand Up @@ -466,7 +475,7 @@
set_result(waiter, True)
self._waiting = None
except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
raise
except EofStream:
self._close_code = WSCloseCode.OK
Expand All @@ -488,7 +497,11 @@
self._close_code = msg.data
# Could be closed while awaiting reader.
if not self._closed and self._autoclose: # type: ignore[redundant-expr]
await self.close()
# The client is going to close the connection
# out from under us so we do not want to drain
# any pending writes as it will likely result
# writing to a broken pipe.
await self.close(drain=False)
elif msg.type == WSMsgType.CLOSING:
self._closing = True
elif msg.type == WSMsgType.PING and self._autoping:
Expand Down
8 changes: 8 additions & 0 deletions docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,14 @@ and :ref:`aiohttp-web-signals` handlers::

.. versionadded:: 3.3

:param bool autoclose: Close connection when the client sends
a :const:`~aiohttp.WSMsgType.CLOSE` message,
``True`` by default. If set to ``False``,
the connection is not closed and the
caller is responsible for calling
``request.transport.close()`` to avoid
leaking resources.


The class supports ``async for`` statement for iterating over
incoming messages::
Expand Down