Skip to content

Commit

Permalink
Fix retry logic for pubsub and pipeline (#3134)
Browse files Browse the repository at this point in the history
* Fix retry logic for pubsub and pipeline

Extend the fix from bea7299 to apply to
pipeline and pubsub as well.

Fixes #2973

* fix isort

---------

Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
  • Loading branch information
w-miller and dvora-h committed Feb 19, 2024
1 parent 2b2a2e0 commit ebb6171
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 29 deletions.
36 changes: 23 additions & 13 deletions redis/asyncio/client.py
Expand Up @@ -927,11 +927,15 @@ async def connect(self):
async def _disconnect_raise_connect(self, conn, error):
"""
Close the connection and raise an exception
if retry_on_timeout is not set or the error
is not a TimeoutError. Otherwise, try to reconnect
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
await conn.disconnect()
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
await conn.connect()

Expand Down Expand Up @@ -1344,8 +1348,8 @@ async def _disconnect_reset_raise(self, conn, error):
"""
Close the connection, reset watching state and
raise an exception if we were watching,
retry_on_timeout is not set,
or the error is not a TimeoutError
if retry_on_error is not set or the error is not one
of the specified error types.
"""
await conn.disconnect()
# if we were already watching a variable, the watch is no longer
Expand All @@ -1356,9 +1360,12 @@ async def _disconnect_reset_raise(self, conn, error):
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.aclose()
raise

Expand Down Expand Up @@ -1533,8 +1540,8 @@ async def load_scripts(self):
async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
"""
Close the connection, raise an exception if we were watching,
and raise an exception if retry_on_timeout is not set,
or the error is not a TimeoutError
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
"""
await conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand All @@ -1544,9 +1551,12 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.reset()
raise

Expand Down
50 changes: 34 additions & 16 deletions redis/client.py
Expand Up @@ -25,7 +25,12 @@
SentinelCommands,
list_or_args,
)
from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
from redis.connection import (
AbstractConnection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
)
from redis.credentials import CredentialProvider
from redis.exceptions import (
ConnectionError,
Expand Down Expand Up @@ -839,11 +844,15 @@ def clean_health_check_responses(self) -> None:
def _disconnect_raise_connect(self, conn, error) -> None:
"""
Close the connection and raise an exception
if retry_on_timeout is not set or the error
is not a TimeoutError. Otherwise, try to reconnect
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
conn.disconnect()
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
conn.connect()

Expand Down Expand Up @@ -1320,8 +1329,8 @@ def _disconnect_reset_raise(self, conn, error) -> None:
"""
Close the connection, reset watching state and
raise an exception if we were watching,
retry_on_timeout is not set,
or the error is not a TimeoutError
if retry_on_error is not set or the error is not one
of the specified error types.
"""
conn.disconnect()
# if we were already watching a variable, the watch is no longer
Expand All @@ -1332,9 +1341,12 @@ def _disconnect_reset_raise(self, conn, error) -> None:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
self.reset()
raise

Expand Down Expand Up @@ -1492,11 +1504,15 @@ def load_scripts(self):
if not exist:
s.sha = immediate("SCRIPT LOAD", s.script)

def _disconnect_raise_reset(self, conn: Redis, error: Exception) -> None:
def _disconnect_raise_reset(
self,
conn: AbstractConnection,
error: Exception,
) -> None:
"""
Close the connection, raise an exception if we were watching,
and raise an exception if TimeoutError is not part of retry_on_error,
or the error is not a TimeoutError
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
"""
conn.disconnect()
# if we were watching a variable, the watch is no longer valid
Expand All @@ -1506,11 +1522,13 @@ def _disconnect_raise_reset(self, conn: Redis, error: Exception) -> None:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
)
# if TimeoutError is not part of retry_on_error, or the error
# is not a TimeoutError, raise it
if not (
TimeoutError in conn.retry_on_error and isinstance(error, TimeoutError)
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):

self.reset()
raise error

Expand Down

0 comments on commit ebb6171

Please sign in to comment.