Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert #2104, provide way to disable disconnects in read_response() #2506

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3351e42
Add regression tests and fixes for issue #1128
kristjanvalur Dec 12, 2022
023c645
Apply suggestions from code review
kristjanvalur Dec 13, 2022
f873523
Simplify test
kristjanvalur Dec 13, 2022
b0ff44b
Revert change to Redis() object
kristjanvalur Dec 15, 2022
6c268a1
Make disconnect_on_error optional for Connection.read_response()
kristjanvalur Dec 15, 2022
03171f5
Close a Connection whenever an exception is raised for send_command()
kristjanvalur Dec 15, 2022
4b6caaa
Add CHANGES
kristjanvalur Dec 15, 2022
76db492
New methods are keyword-only. Fix linting.
kristjanvalur Dec 15, 2022
04c5a97
isort
kristjanvalur Dec 15, 2022
6838db8
Clarify comment.
kristjanvalur Dec 15, 2022
b26fda3
fix typo and missing word.
kristjanvalur Dec 15, 2022
6345a69
more fixes to comment, and indentation
kristjanvalur Dec 15, 2022
524c454
Fix tests for resumable read_response to use "disconnect_on_error"
kristjanvalur Jan 5, 2023
1c32296
Fix unittest to not rely on read buffer behaviour
kristjanvalur Jan 5, 2023
c97d3c4
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Jan 10, 2023
b9c8c69
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Jan 12, 2023
6733632
Merge remote-tracking branch 'upstream/master' into kristjan/interrup…
kristjanvalur Jan 23, 2023
22f29cd
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Feb 8, 2023
8df2c00
Fix CHANGES
kristjanvalur Feb 8, 2023
1bc5074
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Feb 9, 2023
22a67d3
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Feb 23, 2023
f42bbd7
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Mar 16, 2023
794b285
Fixing unit test on python 3.11
kristjanvalur Mar 17, 2023
3ec9029
Merge remote-tracking branch 'upstream/master' into kristjan/interrup…
kristjanvalur Mar 17, 2023
0b599bd
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Mar 20, 2023
284af1b
Merge branch 'master' into kristjan/interrupted-send-receive
kristjanvalur Apr 3, 2023
e34f456
Attempted deadlock fix
kristjanvalur Apr 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
@@ -1,3 +1,4 @@
* Revert #2104, add `disconnect_on_error` option to `read_response()` (#2506)
* Allow data to drain from async PythonParser when reading during a disconnect()
* Use asyncio.timeout() instead of async_timeout.timeout() for python >= 3.11 (#2602)
* Add test and fix async HiredisParser when reading during a disconnect() (#2349)
Expand Down
20 changes: 14 additions & 6 deletions redis/asyncio/client.py
Expand Up @@ -512,8 +512,6 @@ async def _try_send_command_parse_response(self, conn, *args, **options):
await conn.disconnect(nowait=True)
raise
finally:
if self.single_connection_client:
self._single_conn_lock.release()
if not self.connection:
await self.connection_pool.release(conn)

Expand All @@ -525,12 +523,20 @@ async def execute_command(self, *args, **options):
command_name = args[0]
conn = self.connection or await pool.get_connection(command_name, **options)

# locking / unlocking must be handled in same task, otherwise we will deadlock
# if parent task is cancelled.
# Parent task is preferable because it always gets cancelled, child task won´t
# get cancelled if parent is cancelled and the lock may be held indefinitely.
if self.single_connection_client:
await self._single_conn_lock.acquire()

return await asyncio.shield(
self._try_send_command_parse_response(conn, *args, **options)
)
try:
return await asyncio.shield(
self._try_send_command_parse_response(conn, *args, **options)
)
finally:
if self.single_connection_client:
self._single_conn_lock.release()

async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
Expand Down Expand Up @@ -816,7 +822,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
await conn.connect()

read_timeout = None if block else timeout
response = await self._execute(conn, conn.read_response, timeout=read_timeout)
response = await self._execute(
conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False
)

if conn.health_check_interval and response == self.health_check_response:
# ignore the health check message as user might not expect it
Expand Down
28 changes: 18 additions & 10 deletions redis/asyncio/connection.py
Expand Up @@ -803,7 +803,11 @@ async def send_packed_command(
raise ConnectionError(
f"Error {err_no} while writing to socket. {errmsg}."
) from e
except Exception:
except BaseException:
# BaseExceptions can be raised when a socket send operation is not
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
# to send un-sent data. However, the send_packed_command() API
# does not support it so there is no point in keeping the connection open.
await self.disconnect(nowait=True)
raise

Expand All @@ -826,7 +830,9 @@ async def can_read_destructive(self):
async def read_response(
self,
disable_decoding: bool = False,
*,
timeout: Optional[float] = None,
disconnect_on_error: bool = True,
kristjanvalur marked this conversation as resolved.
Show resolved Hide resolved
):
"""Read the response from a previously sent command"""
read_timeout = timeout if timeout is not None else self.socket_timeout
Expand All @@ -842,22 +848,24 @@ async def read_response(
)
except asyncio.TimeoutError:
if timeout is not None:
# user requested timeout, return None
# user requested timeout, return None. Operation can be retried
return None
# it was a self.socket_timeout error.
await self.disconnect(nowait=True)
if disconnect_on_error:
await self.disconnect(nowait=True)
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
except OSError as e:
await self.disconnect(nowait=True)
if disconnect_on_error:
await self.disconnect(nowait=True)
raise ConnectionError(
f"Error while reading from {self.host}:{self.port} : {e.args}"
)
except asyncio.CancelledError:
# need this check for 3.7, where CancelledError
# is subclass of Exception, not BaseException
raise
except Exception:
await self.disconnect(nowait=True)
except BaseException:
# Also by default close in case of BaseException. A lot of code
# relies on this behaviour when doing Command/Response pairs.
# See #1128.
if disconnect_on_error:
await self.disconnect(nowait=True)
raise

if self.health_check_interval:
Expand Down
2 changes: 1 addition & 1 deletion redis/client.py
Expand Up @@ -1526,7 +1526,7 @@ def try_read():
return None
else:
conn.connect()
return conn.read_response()
return conn.read_response(disconnect_on_error=False)

response = self._execute(conn, try_read)

Expand Down
24 changes: 18 additions & 6 deletions redis/connection.py
Expand Up @@ -831,7 +831,11 @@ def send_packed_command(self, command, check_health=True):
errno = e.args[0]
errmsg = e.args[1]
raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.")
except Exception:
except BaseException:
# BaseExceptions can be raised when a socket send operation is not
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
# to send un-sent data. However, the send_packed_command() API
# does not support it so there is no point in keeping the connection open.
self.disconnect()
raise

Expand All @@ -856,23 +860,31 @@ def can_read(self, timeout=0):
self.disconnect()
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")

def read_response(self, disable_decoding=False):
def read_response(
self, disable_decoding=False, *, disconnect_on_error: bool = True
):
"""Read the response from a previously sent command"""

host_error = self._host_error()

try:
response = self._parser.read_response(disable_decoding=disable_decoding)
except socket.timeout:
self.disconnect()
if disconnect_on_error:
self.disconnect()
raise TimeoutError(f"Timeout reading from {host_error}")
except OSError as e:
self.disconnect()
if disconnect_on_error:
self.disconnect()
raise ConnectionError(
f"Error while reading from {host_error}" f" : {e.args}"
)
except Exception:
self.disconnect()
except BaseException:
# Also by default close in case of BaseException. A lot of code
# relies on this behaviour when doing Command/Response pairs.
# See #1128.
if disconnect_on_error:
self.disconnect()
raise

if self.health_check_interval:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_asyncio/test_commands.py
@@ -1,9 +1,11 @@
"""
Tests async overrides of commands from their mixins
"""
import asyncio
import binascii
import datetime
import re
import sys
from string import ascii_letters

import pytest
Expand All @@ -18,6 +20,11 @@
skip_unless_arch_bits,
)

if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
from asyncio import timeout as async_timeout
else:
from async_timeout import timeout as async_timeout

REDIS_6_VERSION = "5.9.0"


Expand Down Expand Up @@ -2999,6 +3006,37 @@ async def test_module_list(self, r: redis.Redis):
for x in await r.module_list():
assert isinstance(x, dict)

@pytest.mark.onlynoncluster
async def test_interrupted_command(self, r: redis.Redis):
"""
Regression test for issue #1128: An Un-handled BaseException
will leave the socket with un-read response to a previous
command.
"""
ready = asyncio.Event()

async def helper():
with pytest.raises(asyncio.CancelledError):
# blocking pop
ready.set()
await r.brpop(["nonexist"])
# If the following is not done, further Timout operations will fail,
# because the timeout won't catch its Cancelled Error if the task
# has a pending cancel. Python documentation probably should reflect this.
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
asyncio.current_task().uncancel()
# if all is well, we can continue. The following should not hang.
await r.set("status", "down")
Comment on lines +3028 to +3029
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still tbd to validate that not only you don't "hang", but also that the next read response matches the command.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a separate test which continues an interrupted read seems proper.


task = asyncio.create_task(helper())
await ready.wait()
await asyncio.sleep(0.01)
# the task is now sleeping, lets send it an exception
task.cancel()
# If all is well, the task should finish right away, otherwise fail with Timeout
async with async_timeout(0.1):
await task


@pytest.mark.onlynoncluster
class TestBinarySave:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_asyncio/test_connection.py
Expand Up @@ -184,7 +184,7 @@ async def test_connection_parse_response_resume(r: redis.Redis):
conn._parser._stream = MockStream(message, interrupt_every=2)
for i in range(100):
try:
response = await conn.read_response()
response = await conn.read_response(disconnect_on_error=False)
break
except MockStream.TestError:
pass
Expand Down
35 changes: 35 additions & 0 deletions tests/test_commands.py
@@ -1,9 +1,12 @@
import binascii
import datetime
import re
import threading
import time
from asyncio import CancelledError
from string import ascii_letters
from unittest import mock
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -4726,6 +4729,38 @@ def test_psync(self, r):
res = r2.psync(r2.client_id(), 1)
assert b"FULLRESYNC" in res

@pytest.mark.onlynoncluster
def test_interrupted_command(self, r: redis.Redis):
"""
Regression test for issue #1128: An Un-handled BaseException
will leave the socket with un-read response to a previous
command.
"""

ok = False

def helper():
Copy link

@ikonst ikonst Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def helper():
def helper() -> None:

IIUC there's no way to get a retval from a thread, though you can pass a mutable object and use that to communicate.

In particular it might be that we need to marshal errors to avoid them being silently ignored? e.g. https://github.com/bjoluc/pytest-reraise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is relying on the thread actually finishing. There is no way to interrupt a hanging thread in python. If these tests fail, the test suite hangs. I'm open to suggestions on how to avoid that. I suppose that an old-school timeout can be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there is no type hinting in redis-py, not going to start adding that to unit tests :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I mostly just meant for you to remove the retval. No need to type-annotate, though I do notice that some stuff is type-annotated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, there is the odd type hinting, but It is not consistently applied. I guess people add it to help their dev tools with code completion.

with pytest.raises(CancelledError):
# blocking pop
with patch.object(
r.connection._parser, "read_response", side_effect=CancelledError
):
r.brpop(["nonexist"])
# if all is well, we can continue.
r.set("status", "down") # should not hang
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also exercise the "command expects int but got back str" scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It excercises the originally reported scenario: A blocking wait is initiated, and even though a new command is sent on the channel, the server is still in blocking wait mode.
It is harder to interrupt a half-read response, although not impossible by using mocking. I can look into that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That scenario doesn't require half-read. It's an interrupt between send_command and parse_response:

send_command("EXISTS foo")
<interrupt>
send_command("GET foo")
parse_response() --> expected str, got int (from previous command)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a half-read test case.

nonlocal ok
ok = True

thread = threading.Thread(target=helper)
thread.start()
thread.join(0.1)
try:
assert not thread.is_alive()
assert ok
finally:
# disconnect here so that fixture cleanup can proceed
r.connection.disconnect()


@pytest.mark.onlynoncluster
class TestBinarySave:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_connection.py
Expand Up @@ -160,7 +160,7 @@ def test_connection_parse_response_resume(r: redis.Redis, parser_class):
conn._parser._sock = mock_socket
for i in range(100):
try:
response = conn.read_response()
response = conn.read_response(disconnect_on_error=False)
break
except MockSocket.TestError:
pass
Expand Down