From c769300d666404f3fa51fd7ec88047935416f289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 7 Feb 2023 22:32:07 +0000 Subject: [PATCH 1/2] Allow data to drain from PythonParser after connection close. --- redis/asyncio/connection.py | 24 +++++++++++------------- tests/test_asyncio/test_connection.py | 2 -- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index e77fba30da..3126d94535 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -141,7 +141,7 @@ def decode(self, value: EncodableT, force=False) -> EncodableT: class BaseParser: """Plain Python parsing class""" - __slots__ = "_stream", "_read_size" + __slots__ = "_stream", "_read_size", "_connected" EXCEPTION_CLASSES: ExceptionMappingT = { "ERR": { @@ -172,6 +172,7 @@ class BaseParser: def __init__(self, socket_read_size: int): self._stream: Optional[asyncio.StreamReader] = None self._read_size = socket_read_size + self._connected = False def __del__(self): try: @@ -208,7 +209,7 @@ async def read_response( class PythonParser(BaseParser): """Plain Python parsing class""" - __slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") + __slots__ = ("encoder", "_buffer", "_pos", "_chunks") def __init__(self, socket_read_size: int): super().__init__(socket_read_size) @@ -226,21 +227,19 @@ def on_connect(self, connection: "Connection"): self._stream = connection._reader if self._stream is None: raise RedisError("Buffer is closed.") - self.encoder = connection.encoder + self._clear() + self._connected = True def on_disconnect(self): """Called when the stream disconnects""" - if self._stream is not None: - self._stream = None - self.encoder = None - self._clear() + self._connected = False async def can_read_destructive(self) -> bool: + if not self._connected: + raise RedisError("Buffer is closed.") if self._buffer: return True - if self._stream is None: - raise RedisError("Buffer is closed.") try: async with async_timeout.timeout(0): return await self._stream.read(1) @@ -248,6 +247,8 @@ async def can_read_destructive(self) -> bool: return False async def read_response(self, disable_decoding: bool = False): + if not self._connected: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._chunks: # augment parsing buffer with previously read data self._buffer += b"".join(self._chunks) @@ -261,8 +262,6 @@ async def read_response(self, disable_decoding: bool = False): async def _read_response( self, disable_decoding: bool = False ) -> Union[EncodableT, ResponseError, None]: - if not self._stream or not self.encoder: - raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) raw = await self._readline() response: Any byte, response = raw[:1], raw[1:] @@ -350,14 +349,13 @@ async def _readline(self) -> bytes: class HiredisParser(BaseParser): """Parser class for connections using Hiredis""" - __slots__ = BaseParser.__slots__ + ("_reader", "_connected") + __slots__ = ("_reader",) def __init__(self, socket_read_size: int): if not HIREDIS_AVAILABLE: raise RedisError("Hiredis is not available.") super().__init__(socket_read_size=socket_read_size) self._reader: Optional[hiredis.Reader] = None - self._connected: bool = False def on_connect(self, connection: "Connection"): self._stream = connection._reader diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 1851ca9a76..e2d77fc1c3 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -211,8 +211,6 @@ async def test_connection_disconect_race(parser_class): This test verifies that a read in progress can finish even if the `disconnect()` method is called. """ - if parser_class == PythonParser: - pytest.xfail("doesn't work yet with PythonParser") if parser_class == HiredisParser and not HIREDIS_AVAILABLE: pytest.skip("Hiredis not available") From dff4f82633a3c1d995783a817c42338b92260882 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 7 Feb 2023 22:33:24 +0000 Subject: [PATCH 2/2] Add Changes --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index e83660d6ac..a31a4bfc15 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Allow data to drain from async PythonParser when reading during a disconnect() * Add test and fix async HiredisParser when reading during a disconnect() (#2349) * Use hiredis-py pack_command if available. * Support `.unlink()` in ClusterPipeline