Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retry logic for pubsub and pipeline #3134

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 23 additions & 13 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,11 +927,15 @@ async def connect(self):
async def _disconnect_raise_connect(self, conn, error):
"""
Close the connection and raise an exception
if retry_on_timeout is not set or the error
is not a TimeoutError. Otherwise, try to reconnect
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
await conn.disconnect()
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
await conn.connect()

Expand Down Expand Up @@ -1344,8 +1348,8 @@ async def _disconnect_reset_raise(self, conn, error):
"""
Close the connection, reset watching state and
raise an exception if we were watching,
retry_on_timeout is not set,
or the error is not a TimeoutError
if retry_on_error is not set or the error is not one
of the specified error types.
"""
await conn.disconnect()
# if we were already watching a variable, the watch is no longer
Expand All @@ -1356,9 +1360,12 @@ async def _disconnect_reset_raise(self, conn, error):
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.aclose()
raise

Expand Down Expand Up @@ -1533,8 +1540,8 @@ async def load_scripts(self):
async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
"""
Close the connection, raise an exception if we were watching,
and raise an exception if retry_on_timeout is not set,
or the error is not a TimeoutError
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
"""
await conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand All @@ -1544,9 +1551,12 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.reset()
raise

Expand Down
50 changes: 34 additions & 16 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
SentinelCommands,
list_or_args,
)
from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
from redis.connection import (
AbstractConnection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
)
from redis.credentials import CredentialProvider
from redis.exceptions import (
ConnectionError,
Expand Down Expand Up @@ -839,11 +844,15 @@ def clean_health_check_responses(self) -> None:
def _disconnect_raise_connect(self, conn, error) -> None:
"""
Close the connection and raise an exception
if retry_on_timeout is not set or the error
is not a TimeoutError. Otherwise, try to reconnect
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
conn.disconnect()
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
conn.connect()

Expand Down Expand Up @@ -1320,8 +1329,8 @@ def _disconnect_reset_raise(self, conn, error) -> None:
"""
Close the connection, reset watching state and
raise an exception if we were watching,
retry_on_timeout is not set,
or the error is not a TimeoutError
if retry_on_error is not set or the error is not one
of the specified error types.
"""
conn.disconnect()
# if we were already watching a variable, the watch is no longer
Expand All @@ -1332,9 +1341,12 @@ def _disconnect_reset_raise(self, conn, error) -> None:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
self.reset()
raise

Expand Down Expand Up @@ -1492,11 +1504,15 @@ def load_scripts(self):
if not exist:
s.sha = immediate("SCRIPT LOAD", s.script)

def _disconnect_raise_reset(self, conn: Redis, error: Exception) -> None:
def _disconnect_raise_reset(
self,
conn: AbstractConnection,
error: Exception,
) -> None:
"""
Close the connection, raise an exception if we were watching,
and raise an exception if TimeoutError is not part of retry_on_error,
or the error is not a TimeoutError
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
"""
conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand All @@ -1506,11 +1522,13 @@ def _disconnect_raise_reset(self, conn: Redis, error: Exception) -> None:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if TimeoutError is not part of retry_on_error, or the error
# is not a TimeoutError, raise it
if not (
TimeoutError in conn.retry_on_error and isinstance(error, TimeoutError)
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):

self.reset()
raise error

Expand Down