Skip to content

Commit

Permalink
Failover handling improvements for RedisCluster and Async RedisCluster (
Browse files Browse the repository at this point in the history
#2377)

* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements

* Fixed linters

* Type fixes

* Added to CHANGES

* Added getter and setter for the client's retry object and added more tests

* Fixed linters

* Fixed test

* Fixed test_client_kill test

* Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries

* Fixing linters

* Reverting deletion of connection_error_retry_attempts to maintain backward compatibility

* Updating retry object for existing and new connections

* Changed the default value of reinitialize_steps from 10 to 5

* fix review comments

Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: dvora-h <dvora.heller@redis.com>
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
  • Loading branch information
4 people committed Nov 10, 2022
1 parent bb06ccd commit 67214cc
Show file tree
Hide file tree
Showing 14 changed files with 413 additions and 166 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Expand Up @@ -24,6 +24,8 @@
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
* The `deprecated` library is no longer a dependency
* Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297)
* Added CredentialsProvider class to support password rotation
* Enable Lock for asyncio cluster mode

Expand Down
2 changes: 2 additions & 0 deletions redis/__init__.py
@@ -1,5 +1,6 @@
import sys

from redis.backoff import default_backoff
from redis.client import Redis, StrictRedis
from redis.cluster import RedisCluster
from redis.connection import (
Expand Down Expand Up @@ -66,6 +67,7 @@ def int_or_str(value):
"CredentialProvider",
"DataError",
"from_url",
"default_backoff",
"InvalidResponse",
"PubSubError",
"ReadOnlyError",
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/__init__.py
Expand Up @@ -15,6 +15,7 @@
SentinelManagedSSLConnection,
)
from redis.asyncio.utils import from_url
from redis.backoff import default_backoff
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
Expand Down Expand Up @@ -43,6 +44,7 @@
"ConnectionPool",
"DataError",
"from_url",
"default_backoff",
"InvalidResponse",
"PubSubError",
"ReadOnlyError",
Expand Down
7 changes: 7 additions & 0 deletions redis/asyncio/client.py
Expand Up @@ -276,6 +276,13 @@ def get_connection_kwargs(self):
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.get_connection_kwargs().get("retry")

def set_retry(self, retry: "Retry") -> None:
self.get_connection_kwargs().update({"retry": retry})
self.connection_pool.set_retry(retry)

def load_external_module(self, funcname, func):
"""
This function can be used to add externally defined redis modules,
Expand Down
121 changes: 63 additions & 58 deletions redis/asyncio/cluster.py
Expand Up @@ -26,6 +26,8 @@
)
from redis.asyncio.lock import Lock
from redis.asyncio.parser import CommandsParser
from redis.asyncio.retry import Retry
from redis.backoff import default_backoff
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
Expand Down Expand Up @@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
:param startup_nodes:
| :class:`~.ClusterNode` to used as a startup node
:param require_full_coverage:
| When set to ``False``: the client will not require a full coverage of the
slots. However, if not all slots are covered, and at least one node has
``cluster-require-full-coverage`` set to ``yes``, the server will throw a
:class:`~.ClusterDownError` for some key-based commands.
| When set to ``False``: the client will not require a full coverage of
the slots. However, if not all slots are covered, and at least one node
has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
a :class:`~.ClusterDownError` for some key-based commands.
| When set to ``True``: all slots must be covered to construct the cluster
client. If not all slots are covered, :class:`~.RedisClusterException` will be
thrown.
Expand All @@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
:param connection_error_retry_attempts:
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
or :class:`~.ConnectionError` are encountered
or :class:`~.ConnectionError` are encountered.
The default backoff strategy will be set if Retry object is not passed (see
default_backoff in backoff.py). To change it, pass a custom Retry object
using the "retry" keyword.
:param max_connections:
| Maximum number of connections per node. If there are no free connections & the
maximum number of connections are already created, a
Expand Down Expand Up @@ -214,9 +219,9 @@ def __init__(
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
reinitialize_steps: int = 10,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 5,
connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
# Client related kwargs
db: Union[str, int] = 0,
Expand All @@ -235,6 +240,8 @@ def __init__(
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
socket_timeout: Optional[float] = None,
retry: Optional["Retry"] = None,
retry_on_error: Optional[List[Exception]] = None,
# SSL related kwargs
ssl: bool = False,
ssl_ca_certs: Optional[str] = None,
Expand Down Expand Up @@ -282,6 +289,7 @@ def __init__(
"socket_keepalive": socket_keepalive,
"socket_keepalive_options": socket_keepalive_options,
"socket_timeout": socket_timeout,
"retry": retry,
}

if ssl:
Expand All @@ -302,6 +310,18 @@ def __init__(
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect

self.retry = retry
if retry or retry_on_error or connection_error_retry_attempts > 0:
# Set a retry object for all cluster nodes
self.retry = retry or Retry(
default_backoff(), connection_error_retry_attempts
)
if not retry_on_error:
# Default errors for retrying
retry_on_error = [ConnectionError, TimeoutError]
self.retry.update_supported_errors(retry_on_error)
kwargs.update({"retry": self.retry})

kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
self.connection_kwargs = kwargs

Expand All @@ -323,7 +343,6 @@ def __init__(
self.reinitialize_steps = reinitialize_steps
self.cluster_error_retry_attempts = cluster_error_retry_attempts
self.connection_error_retry_attempts = connection_error_retry_attempts

self.reinitialize_counter = 0
self.commands_parser = CommandsParser()
self.node_flags = self.__class__.NODE_FLAGS.copy()
Expand Down Expand Up @@ -481,6 +500,16 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
return self.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.retry

def set_retry(self, retry: "Retry") -> None:
self.retry = retry
for node in self.get_nodes():
node.connection_kwargs.update({"retry": retry})
for conn in node._connections:
conn.retry = retry

def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
"""Set a custom response callback."""
self.response_callbacks[command] = callback
Expand Down Expand Up @@ -618,9 +647,11 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
if passed_targets and not self._is_node_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
retry_attempts = 1
retry_attempts = 0

for _ in range(retry_attempts):
# Add one for the first execution
execute_attempts = 1 + retry_attempts
for _ in range(execute_attempts):
if self._initialize:
await self.initialize()
try:
Expand Down Expand Up @@ -658,25 +689,21 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
)
return dict(zip(keys, values))
except Exception as e:
if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were should be reinitialized.
# Try again with the new cluster setup.
exception = e
retry_attempts -= 1
continue
else:
# All other errors should be raised.
raise

# If it fails the configured number of times then raise exception back
# to caller of this method
raise exception
# raise the exception
raise e

async def _execute_command(
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
) -> Any:
asking = moved = False
redirect_addr = None
ttl = self.RedisClusterRequestTTL
connection_error_retry_counter = 0

while ttl > 0:
ttl -= 1
Expand All @@ -695,25 +722,18 @@ async def _execute_command(
moved = False

return await target_node.execute_command(*args, **kwargs)
except BusyLoadingError:
except (BusyLoadingError, MaxConnectionsError):
raise
except (ConnectionError, TimeoutError):
# Connection retries are being handled in the node's
# Retry object.
# Remove the failed node from the startup nodes before we try
# to reinitialize the cluster
self.nodes_manager.startup_nodes.pop(target_node.name, None)
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
await self.close()
raise
except (ConnectionError, TimeoutError) as e:
# Give the node 0.25 seconds to get back up and retry again with the
# same node and configuration. After the defined number of attempts, try
# to reinitialize the cluster and try again.
connection_error_retry_counter += 1
if (
connection_error_retry_counter
< self.connection_error_retry_attempts
):
await asyncio.sleep(0.25)
else:
if isinstance(e, MaxConnectionsError):
raise
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
await self.close()
raise
except ClusterDownError:
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
Expand Down Expand Up @@ -1145,26 +1165,11 @@ async def initialize(self) -> None:
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue
except ResponseError as e:
# Isn't a cluster connection, so it won't parse these
# exceptions automatically
message = e.__str__()
if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
continue
else:
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
f"server: {startup_node}. error: {message}"
)
except Exception as e:
message = e.__str__()
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
f"server {startup_node.name}. error: {message}"
)

# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
Expand Down Expand Up @@ -1245,8 +1250,8 @@ async def initialize(self) -> None:

if not startup_nodes_reachable:
raise RedisClusterException(
"Redis Cluster cannot be connected. Please provide at least "
"one reachable node. "
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception

# Check if the slots are not fully covered
Expand Down
8 changes: 7 additions & 1 deletion redis/asyncio/connection.py
Expand Up @@ -497,7 +497,7 @@ def __init__(
retry_on_error.append(socket.timeout)
retry_on_error.append(asyncio.TimeoutError)
self.retry_on_error = retry_on_error
if retry_on_error:
if retry or retry_on_error:
if not retry:
self.retry = Retry(NoBackoff(), 1)
else:
Expand Down Expand Up @@ -1445,6 +1445,12 @@ async def disconnect(self, inuse_connections: bool = True):
if exc:
raise exc

def set_retry(self, retry: "Retry") -> None:
for conn in self._available_connections:
conn.retry = retry
for conn in self._in_use_connections:
conn.retry = retry


class BlockingConnectionPool(ConnectionPool):
"""
Expand Down
17 changes: 13 additions & 4 deletions redis/backoff.py
@@ -1,6 +1,11 @@
import random
from abc import ABC, abstractmethod

# Maximum backoff between each retry in seconds
DEFAULT_CAP = 0.512
# Minimum backoff between each retry in seconds
DEFAULT_BASE = 0.008


class AbstractBackoff(ABC):
"""Backoff interface"""
Expand Down Expand Up @@ -40,7 +45,7 @@ def __init__(self):
class ExponentialBackoff(AbstractBackoff):
"""Exponential backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -55,7 +60,7 @@ def compute(self, failures):
class FullJitterBackoff(AbstractBackoff):
"""Full jitter backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -70,7 +75,7 @@ def compute(self, failures):
class EqualJitterBackoff(AbstractBackoff):
"""Equal jitter backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -86,7 +91,7 @@ def compute(self, failures):
class DecorrelatedJitterBackoff(AbstractBackoff):
"""Decorrelated jitter backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -103,3 +108,7 @@ def compute(self, failures):
temp = random.uniform(self._base, max_backoff)
self._previous_backoff = min(self._cap, temp)
return self._previous_backoff


def default_backoff():
return EqualJitterBackoff()
8 changes: 8 additions & 0 deletions redis/client.py
Expand Up @@ -26,6 +26,7 @@
WatchError,
)
from redis.lock import Lock
from redis.retry import Retry
from redis.utils import safe_str, str_if_bytes

SYM_EMPTY = b""
Expand Down Expand Up @@ -1047,6 +1048,13 @@ def get_connection_kwargs(self):
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.get_connection_kwargs().get("retry")

def set_retry(self, retry: "Retry") -> None:
self.get_connection_kwargs().update({"retry": retry})
self.connection_pool.set_retry(retry)

def set_response_callback(self, command, callback):
"""Set a custom Response Callback"""
self.response_callbacks[command] = callback
Expand Down

0 comments on commit 67214cc

Please sign in to comment.