From 67214cc3eaa7890c87e45550b8320779f954094b Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Thu, 10 Nov 2022 13:16:49 +0200 Subject: [PATCH] Failover handling improvements for RedisCluster and Async RedisCluster (#2377) * Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim Co-authored-by: dvora-h Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> --- CHANGES | 2 + redis/__init__.py | 2 + redis/asyncio/__init__.py | 2 + redis/asyncio/client.py | 7 ++ redis/asyncio/cluster.py | 121 ++++++++++--------- redis/asyncio/connection.py | 8 +- redis/backoff.py | 17 ++- redis/client.py | 8 ++ redis/cluster.py | 187 ++++++++++++++--------------- redis/connection.py | 9 +- tests/test_asyncio/test_cluster.py | 78 +++++++++++- tests/test_asyncio/test_retry.py | 22 +++- tests/test_cluster.py | 100 +++++++++++++++ tests/test_retry.py | 16 ++- 14 files changed, 413 insertions(+), 166 deletions(-) diff --git a/CHANGES b/CHANGES index 4945f61612..883c548f38 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,8 @@ * ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225) * Remove compatibility code for old versions of Hiredis, drop Packaging dependency * The `deprecated` library is no longer a dependency + * Failover handling improvements for RedisCluster and Async RedisCluster (#2377) + * Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297) * Added CredentialsProvider class to support password rotation * Enable Lock for asyncio cluster mode diff --git a/redis/__init__.py b/redis/__init__.py index 5201fe22d4..6503ac30b4 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -1,5 +1,6 @@ import sys +from redis.backoff import default_backoff from redis.client import Redis, StrictRedis from redis.cluster import RedisCluster from redis.connection import ( @@ -66,6 +67,7 @@ def int_or_str(value): "CredentialProvider", "DataError", "from_url", + "default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 598791ac15..bf90dde555 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -15,6 +15,7 @@ SentinelManagedSSLConnection, ) from redis.asyncio.utils import from_url +from redis.backoff import default_backoff from redis.exceptions import ( AuthenticationError, AuthenticationWrongNumberOfArgsError, @@ -43,6 +44,7 @@ "ConnectionPool", "DataError", "from_url", + "default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index c0855717f2..e0ed85eb8f 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -276,6 +276,13 @@ def get_connection_kwargs(self): """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.get_connection_kwargs().get("retry") + + def set_retry(self, retry: "Retry") -> None: + self.get_connection_kwargs().update({"retry": retry}) + self.connection_pool.set_retry(retry) + def load_external_module(self, funcname, func): """ This function can be used to add externally defined redis modules, diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 57aafbd69f..d5a38b2878 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -26,6 +26,8 @@ ) from redis.asyncio.lock import Lock from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import default_backoff from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, @@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand :param startup_nodes: | :class:`~.ClusterNode` to used as a startup node :param require_full_coverage: - | When set to ``False``: the client will not require a full coverage of the - slots. However, if not all slots are covered, and at least one node has - ``cluster-require-full-coverage`` set to ``yes``, the server will throw a - :class:`~.ClusterDownError` for some key-based commands. + | When set to ``False``: the client will not require a full coverage of + the slots. However, if not all slots are covered, and at least one node + has ``cluster-require-full-coverage`` set to ``yes``, the server will throw + a :class:`~.ClusterDownError` for some key-based commands. | When set to ``True``: all slots must be covered to construct the cluster client. If not all slots are covered, :class:`~.RedisClusterException` will be thrown. @@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered :param connection_error_retry_attempts: | Number of times to retry before reinitializing when :class:`~.TimeoutError` - or :class:`~.ConnectionError` are encountered + or :class:`~.ConnectionError` are encountered. + The default backoff strategy will be set if Retry object is not passed (see + default_backoff in backoff.py). To change it, pass a custom Retry object + using the "retry" keyword. :param max_connections: | Maximum number of connections per node. If there are no free connections & the maximum number of connections are already created, a @@ -214,9 +219,9 @@ def __init__( startup_nodes: Optional[List["ClusterNode"]] = None, require_full_coverage: bool = True, read_from_replicas: bool = False, - reinitialize_steps: int = 10, + reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, - connection_error_retry_attempts: int = 5, + connection_error_retry_attempts: int = 3, max_connections: int = 2**31, # Client related kwargs db: Union[str, int] = 0, @@ -235,6 +240,8 @@ def __init__( socket_keepalive: bool = False, socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, socket_timeout: Optional[float] = None, + retry: Optional["Retry"] = None, + retry_on_error: Optional[List[Exception]] = None, # SSL related kwargs ssl: bool = False, ssl_ca_certs: Optional[str] = None, @@ -282,6 +289,7 @@ def __init__( "socket_keepalive": socket_keepalive, "socket_keepalive_options": socket_keepalive_options, "socket_timeout": socket_timeout, + "retry": retry, } if ssl: @@ -302,6 +310,18 @@ def __init__( # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect + self.retry = retry + if retry or retry_on_error or connection_error_retry_attempts > 0: + # Set a retry object for all cluster nodes + self.retry = retry or Retry( + default_backoff(), connection_error_retry_attempts + ) + if not retry_on_error: + # Default errors for retrying + retry_on_error = [ConnectionError, TimeoutError] + self.retry.update_supported_errors(retry_on_error) + kwargs.update({"retry": self.retry}) + kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() self.connection_kwargs = kwargs @@ -323,7 +343,6 @@ def __init__( self.reinitialize_steps = reinitialize_steps self.cluster_error_retry_attempts = cluster_error_retry_attempts self.connection_error_retry_attempts = connection_error_retry_attempts - self.reinitialize_counter = 0 self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() @@ -481,6 +500,16 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" return self.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.connection_kwargs.update({"retry": retry}) + for conn in node._connections: + conn.retry = retry + def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" self.response_callbacks[command] = callback @@ -618,9 +647,11 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: if passed_targets and not self._is_node_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) target_nodes_specified = True - retry_attempts = 1 + retry_attempts = 0 - for _ in range(retry_attempts): + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): if self._initialize: await self.initialize() try: @@ -658,17 +689,14 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: ) return dict(zip(keys, values)) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: - # The nodes and slots cache were reinitialized. + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + # The nodes and slots cache were should be reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. - raise - - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception + # raise the exception + raise e async def _execute_command( self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any @@ -676,7 +704,6 @@ async def _execute_command( asking = moved = False redirect_addr = None ttl = self.RedisClusterRequestTTL - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -695,25 +722,18 @@ async def _execute_command( moved = False return await target_node.execute_command(*args, **kwargs) - except BusyLoadingError: + except (BusyLoadingError, MaxConnectionsError): + raise + except (ConnectionError, TimeoutError): + # Connection retries are being handled in the node's + # Retry object. + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Hard force of reinitialize of the node/slots setup + # and try again with the new setup + await self.close() raise - except (ConnectionError, TimeoutError) as e: - # Give the node 0.25 seconds to get back up and retry again with the - # same node and configuration. After the defined number of attempts, try - # to reinitialize the cluster and try again. - connection_error_retry_counter += 1 - if ( - connection_error_retry_counter - < self.connection_error_retry_attempts - ): - await asyncio.sleep(0.25) - else: - if isinstance(e, MaxConnectionsError): - raise - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - await self.close() - raise except ClusterDownError: # ClusterDownError can occur during a failover and to get # self-healed, we will try to reinitialize the cluster layout @@ -1145,26 +1165,11 @@ async def initialize(self) -> None: ) cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") startup_nodes_reachable = True - except (ConnectionError, TimeoutError) as e: + except Exception as e: + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. exception = e continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) - except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1245,8 +1250,8 @@ async def initialize(self) -> None: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" ) from exception # Check if the slots are not fully covered diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index df066c4763..4f19153318 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -497,7 +497,7 @@ def __init__( retry_on_error.append(socket.timeout) retry_on_error.append(asyncio.TimeoutError) self.retry_on_error = retry_on_error - if retry_on_error: + if retry or retry_on_error: if not retry: self.retry = Retry(NoBackoff(), 1) else: @@ -1445,6 +1445,12 @@ async def disconnect(self, inuse_connections: bool = True): if exc: raise exc + def set_retry(self, retry: "Retry") -> None: + for conn in self._available_connections: + conn.retry = retry + for conn in self._in_use_connections: + conn.retry = retry + class BlockingConnectionPool(ConnectionPool): """ diff --git a/redis/backoff.py b/redis/backoff.py index 5ccdb919f3..c62e760bdc 100644 --- a/redis/backoff.py +++ b/redis/backoff.py @@ -1,6 +1,11 @@ import random from abc import ABC, abstractmethod +# Maximum backoff between each retry in seconds +DEFAULT_CAP = 0.512 +# Minimum backoff between each retry in seconds +DEFAULT_BASE = 0.008 + class AbstractBackoff(ABC): """Backoff interface""" @@ -40,7 +45,7 @@ def __init__(self): class ExponentialBackoff(AbstractBackoff): """Exponential backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -55,7 +60,7 @@ def compute(self, failures): class FullJitterBackoff(AbstractBackoff): """Full jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -70,7 +75,7 @@ def compute(self, failures): class EqualJitterBackoff(AbstractBackoff): """Equal jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -86,7 +91,7 @@ def compute(self, failures): class DecorrelatedJitterBackoff(AbstractBackoff): """Decorrelated jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -103,3 +108,7 @@ def compute(self, failures): temp = random.uniform(self._base, max_backoff) self._previous_backoff = min(self._cap, temp) return self._previous_backoff + + +def default_backoff(): + return EqualJitterBackoff() diff --git a/redis/client.py b/redis/client.py index 8356ba7d07..ed857c8fba 100755 --- a/redis/client.py +++ b/redis/client.py @@ -26,6 +26,7 @@ WatchError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import safe_str, str_if_bytes SYM_EMPTY = b"" @@ -1047,6 +1048,13 @@ def get_connection_kwargs(self): """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.get_connection_kwargs().get("retry") + + def set_retry(self, retry: "Retry") -> None: + self.get_connection_kwargs().update({"retry": retry}) + self.connection_pool.set_retry(retry) + def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.response_callbacks[command] = callback diff --git a/redis/cluster.py b/redis/cluster.py index 027fe40747..91deaead59 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1,12 +1,12 @@ -import copy import random import socket import sys import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from redis.backoff import default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url @@ -29,6 +29,7 @@ TryAgainError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import ( dict_merge, list_keys_to_dict, @@ -426,27 +427,28 @@ class initializer. In the case of conflicting arguments, querystring def __init__( self, - host=None, - port=6379, - startup_nodes=None, - cluster_error_retry_attempts=3, - require_full_coverage=False, - reinitialize_steps=10, - read_from_replicas=False, - dynamic_startup_nodes=True, - url=None, + host: Optional[str] = None, + port: int = 6379, + startup_nodes: Optional[List["ClusterNode"]] = None, + cluster_error_retry_attempts: int = 3, + retry: Optional["Retry"] = None, + require_full_coverage: bool = False, + reinitialize_steps: int = 5, + read_from_replicas: bool = False, + dynamic_startup_nodes: bool = True, + url: Optional[str] = None, **kwargs, ): """ Initialize a new RedisCluster client. - :startup_nodes: 'list[ClusterNode]' + :param startup_nodes: List of nodes from which initial bootstrapping can be done - :host: 'str' + :param host: Can be used to point to a startup node - :port: 'int' + :param port: Can be used to point to a startup node - :require_full_coverage: 'bool' + :param require_full_coverage: When set to False (default value): the client will not require a full coverage of the slots. However, if not all slots are covered, and at least one node has 'cluster-require-full-coverage' set to @@ -456,12 +458,12 @@ def __init__( When set to True: all slots must be covered to construct the cluster client. If not all slots are covered, RedisClusterException will be thrown. - :read_from_replicas: 'bool' + :param read_from_replicas: Enable read from replicas in READONLY mode. You can read possibly stale data. When set to true, read commands will be assigned between the primary and its replications in a Round-Robin manner. - :dynamic_startup_nodes: 'bool' + :param dynamic_startup_nodes: Set the RedisCluster's startup nodes to all of the discovered nodes. If true (default value), the cluster's discovered nodes will be used to determine the cluster nodes-slots mapping in the next topology refresh. @@ -469,10 +471,11 @@ def __init__( listed in the CLUSTER SLOTS output. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. - :cluster_error_retry_attempts: 'int' - Retry command execution attempts when encountering ClusterDownError - or ConnectionError - :reinitialize_steps: 'int' + :param cluster_error_retry_attempts: + Number of times to retry before raising an error when + :class:`~.TimeoutError` or :class:`~.ConnectionError` or + :class:`~.ClusterDownError` are encountered + :param reinitialize_steps: Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not need to be reinitialized on this current @@ -540,6 +543,11 @@ def __init__( self.user_on_connect_func = kwargs.pop("redis_connect_func", None) kwargs.update({"redis_connect_func": self.on_connect}) kwargs = cleanup_kwargs(**kwargs) + if retry: + self.retry = retry + kwargs.update({"retry": self.retry}) + else: + kwargs.update({"retry": Retry(default_backoff(), 0)}) self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -666,6 +674,14 @@ def set_default_node(self, node): self.nodes_manager.default_node = node return True + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.redis_connection.set_retry(retry) + def monitor(self, target_node=None): """ Returns a Monitor object for the specified target node. @@ -986,12 +1002,13 @@ def execute_command(self, *args, **kwargs): # nodes were passed to this function, we cannot retry the command # execution since the nodes may not be valid anymore after the tables # were reinitialized. So in case of passed target nodes, - # retry_attempts will be set to 1. + # retry_attempts will be set to 0. retry_attempts = ( - 1 if target_nodes_specified else self.cluster_error_retry_attempts + 0 if target_nodes_specified else self.cluster_error_retry_attempts ) - exception = None - for _ in range(0, retry_attempts): + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): try: res = {} if not target_nodes_specified: @@ -1008,18 +1025,15 @@ def execute_command(self, *args, **kwargs): # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. + # raise the exception raise e - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception - def _execute_command(self, target_node, *args, **kwargs): """ Send a command to a node in the cluster @@ -1031,7 +1045,6 @@ def _execute_command(self, target_node, *args, **kwargs): asking = False moved = False ttl = int(self.RedisClusterRequestTTL) - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -1064,25 +1077,21 @@ def _execute_command(self, target_node, *args, **kwargs): except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: + # Connection retries are being handled in the node's + # Retry object. # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. if connection is not None: connection.disconnect() - connection_error_retry_counter += 1 - - # Give the node 0.25 seconds to get back up and retry again - # with same node and configuration. After 5 attempts then try - # to reinitialize the cluster and see if the nodes - # configuration has changed or not - if connection_error_retry_counter < 5: - time.sleep(0.25) - else: - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - target_node.redis_connection = None - self.nodes_manager.initialize() - raise e + + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Reset the cluster node's connection + target_node.redis_connection = None + self.nodes_manager.initialize() + raise e except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1406,17 +1415,15 @@ def initialize(self): startup_nodes_reachable = False fully_covered = False kwargs = self.connection_kwargs + exception = None for startup_node in self.startup_nodes.values(): try: if startup_node.redis_connection: r = startup_node.redis_connection else: - # Create a new Redis connection and let Redis decode the - # responses so we won't need to handle that - copy_kwargs = copy.deepcopy(kwargs) - copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"}) + # Create a new Redis connection r = self.create_redis_node( - startup_node.host, startup_node.port, **copy_kwargs + startup_node.host, startup_node.port, **kwargs ) self.startup_nodes[startup_node.name].redis_connection = r # Make sure cluster mode is enabled on this node @@ -1426,25 +1433,11 @@ def initialize(self): ) cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) startup_nodes_reachable = True - except (ConnectionError, TimeoutError): - continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. + exception = e + continue # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1514,9 +1507,9 @@ def initialize(self): if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " - ) + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" + ) from exception # Create Redis connections to all nodes self.create_redis_connections(list(tmp_nodes_cache.values())) @@ -1699,14 +1692,14 @@ class ClusterPipeline(RedisCluster): def __init__( self, - nodes_manager, - commands_parser, - result_callbacks=None, - cluster_response_callbacks=None, - startup_nodes=None, - read_from_replicas=False, - cluster_error_retry_attempts=5, - reinitialize_steps=10, + nodes_manager: "NodesManager", + commands_parser: "CommandsParser", + result_callbacks: Optional[Dict[str, Callable]] = None, + cluster_response_callbacks: Optional[Dict[str, Callable]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, + read_from_replicas: bool = False, + cluster_error_retry_attempts: int = 3, + reinitialize_steps: int = 5, lock=None, **kwargs, ): @@ -1858,22 +1851,22 @@ def send_cluster_commands( """ if not stack: return [] - - for _ in range(0, self.cluster_error_retry_attempts): + retry_attempts = self.cluster_error_retry_attempts + while True: try: return self._send_cluster_commands( stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, ) - except ClusterDownError: - # Try again with the new cluster setup. All other errors - # should be raised. - pass - - # If it fails the configured number of times then raise - # exception back to caller of this method - raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") + except (ClusterDownError, ConnectionError) as e: + if retry_attempts > 0: + # Try again with the new cluster setup. All other errors + # should be raised. + retry_attempts -= 1 + pass + else: + raise e def _send_cluster_commands( self, stack, raise_on_error=True, allow_redirections=True @@ -1898,7 +1891,6 @@ def _send_cluster_commands( # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - connection_error_retry_counter = 0 while True: # refer to our internal node -> slot table that # tells us where a given command should route to. @@ -1931,13 +1923,10 @@ def _send_cluster_commands( try: connection = get_connection(redis_node, c.args) except ConnectionError: - connection_error_retry_counter += 1 - if connection_error_retry_counter < 5: - # reinitialize the node -> slot table - self.nodes_manager.initialize() - continue - else: - raise + # Connection retries are being handled in the node's + # Retry object. Reinitialize the node -> slot table. + self.nodes_manager.initialize() + raise nodes[node_name] = NodeCommands( redis_node.parse_response, redis_node.connection_pool, diff --git a/redis/connection.py b/redis/connection.py index a2b007406d..9c5b536f89 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -540,7 +540,7 @@ def __init__( # Add TimeoutError to the errors list to retry on retry_on_error.append(TimeoutError) self.retry_on_error = retry_on_error - if retry_on_error: + if retry or retry_on_error: if retry is None: self.retry = Retry(NoBackoff(), 1) else: @@ -1467,6 +1467,13 @@ def disconnect(self, inuse_connections=True): for connection in connections: connection.disconnect() + def set_retry(self, retry: "Retry") -> None: + self.connection_kwargs.update({"retry": retry}) + for conn in self._available_connections: + conn.retry = retry + for conn in self._in_use_connections: + conn.retry = retry + class BlockingConnectionPool(ConnectionPool): """ diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 27f11900c4..38bcaf6c00 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -13,6 +13,8 @@ from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster from redis.asyncio.connection import Connection, SSLConnection from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( @@ -247,6 +249,76 @@ async def test_startup_nodes(self) -> None: ] ) + async def test_cluster_set_get_retry_object(self, request: FixtureRequest): + retry = Retry(NoBackoff(), 2) + url = request.config.getoption("--redis-url") + async with RedisCluster.from_url(url, retry=retry) as r: + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == retry._retries + assert isinstance(n_retry._backoff, NoBackoff) + rand_cluster_node = r.get_random_node() + existing_conn = rand_cluster_node.acquire_connection() + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == new_retry._retries + assert isinstance(n_retry._backoff, ExponentialBackoff) + assert existing_conn.retry._retries == new_retry._retries + new_conn = rand_cluster_node.acquire_connection() + assert new_conn.retry._retries == new_retry._retries + + async def test_cluster_retry_object(self, request: FixtureRequest) -> None: + url = request.config.getoption("--redis-url") + async with RedisCluster.from_url(url) as rc_default: + # Test default retry + retry = rc_default.connection_kwargs.get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 3 + assert isinstance(retry._backoff, type(default_backoff())) + assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry") + + retry = Retry(ExponentialBackoff(10, 5), 5) + async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry: + # Test custom retry + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) + == retry + ) + + async with RedisCluster.from_url( + url, connection_error_retry_attempts=0 + ) as rc_no_retries: + # Test no connection retries + assert ( + rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) + is None + ) + + async with RedisCluster.from_url( + url, retry=Retry(NoBackoff(), 0) + ) as rc_no_retries: + assert ( + rc_no_retries.get_node("127.0.0.1", 16379) + .connection_kwargs.get("retry") + ._retries + == 0 + ) + async def test_empty_startup_nodes(self) -> None: """ Test that exception is raised when empty providing empty startup_nodes @@ -1289,8 +1361,11 @@ async def test_client_info(self, r: RedisCluster) -> None: assert "addr" in info @skip_if_server_version_lt("2.6.9") - async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None: + async def test_client_kill( + self, r: RedisCluster, create_redis: Callable[..., RedisCluster] + ) -> None: node = r.get_primaries()[0] + r2 = await create_redis(cls=RedisCluster, flushdb=False) await r.client_setname("redis-py-c1", target_nodes="all") await r2.client_setname("redis-py-c2", target_nodes="all") clients = [ @@ -1311,6 +1386,7 @@ async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None: ] assert len(clients) == 1 assert clients[0].get("name") == "redis-py-c1" + await r2.close() @skip_if_server_version_lt("2.6.0") async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None: diff --git a/tests/test_asyncio/test_retry.py b/tests/test_asyncio/test_retry.py index 38e353bc36..86e6ddfa0d 100644 --- a/tests/test_asyncio/test_retry.py +++ b/tests/test_asyncio/test_retry.py @@ -1,8 +1,9 @@ import pytest +from redis.asyncio import Redis from redis.asyncio.connection import Connection, UnixDomainSocketConnection from redis.asyncio.retry import Retry -from redis.backoff import AbstractBackoff, NoBackoff +from redis.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff from redis.exceptions import ConnectionError, TimeoutError @@ -114,3 +115,22 @@ async def test_infinite_retry(self): assert self.actual_attempts == 5 assert self.actual_failures == 5 + + +class TestRedisClientRetry: + "Test the Redis client behavior with retries" + + async def test_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + url = request.config.getoption("--redis-url") + r = await Redis.from_url(url, retry_on_timeout=True, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + exiting_conn = await r.connection_pool.get_connection("_") + r.set_retry(new_retry_policy) + assert r.get_retry()._retries == new_retry_policy._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert exiting_conn.retry._retries == new_retry_policy._retries + new_conn = await r.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry_policy._retries diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 5652673af9..d18fbbbb33 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -7,6 +7,7 @@ import pytest from redis import Redis +from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff from redis.cluster import ( PRIMARY, REDIS_CLUSTER_HASH_SLOTS, @@ -31,6 +32,7 @@ ResponseError, TimeoutError, ) +from redis.retry import Retry from redis.utils import str_if_bytes from tests.test_pubsub import wait_for_message @@ -358,6 +360,60 @@ def ok_response(connection, *args, **options): assert r.execute_command("SET", "foo", "bar") == "MOCK_OK" + def test_handling_cluster_failover_to_a_replica(self, r): + # Set the key we'll test for + key = "key" + r.set("key", "value") + primary = r.get_node_from_key(key, replica=False) + assert str_if_bytes(r.get("key")) == "value" + # Get the current output of cluster slots + cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS") + replica_host = "" + replica_port = 0 + # Replace one of the replicas to be the new primary based on the + # cluster slots output + for slot_range in cluster_slots: + primary_port = slot_range[2][1] + if primary_port == primary.port: + if len(slot_range) <= 3: + # cluster doesn't have a replica, return + return + replica_host = str_if_bytes(slot_range[3][0]) + replica_port = slot_range[3][1] + # replace replica and primary in the cluster slots output + tmp_node = slot_range[2] + slot_range[2] = slot_range[3] + slot_range[3] = tmp_node + break + + def raise_connection_error(): + raise ConnectionError("error") + + def mock_execute_command(*_args, **_kwargs): + if _args[0] == "CLUSTER SLOTS": + return cluster_slots + else: + raise Exception("Failed to mock cluster slots") + + # Mock connection error for the current primary + mock_node_resp_func(primary, raise_connection_error) + primary.redis_connection.set_retry(Retry(NoBackoff(), 1)) + + # Mock the cluster slots response for all other nodes + redis_mock_node = Mock() + redis_mock_node.execute_command.side_effect = mock_execute_command + # Mock response value for all other commands + redis_mock_node.parse_response.return_value = "MOCK_OK" + for node in r.get_nodes(): + if node.port != primary.port: + node.redis_connection = redis_mock_node + + assert r.get(key) == "MOCK_OK" + new_primary = r.get_node_from_key(key, replica=False) + assert new_primary.host == replica_host + assert new_primary.port == replica_port + assert r.get_node(primary.host, primary.port).server_type == REPLICA + def test_moved_redirection(self, request): """ Test that the client handles MOVED response. @@ -691,6 +747,50 @@ def moved_redirect_effect(connection, *args, **options): cur_node = r.get_node(node_name=node_name) assert conn == r.get_redis_connection(cur_node) + def test_cluster_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(RedisCluster, request, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + assert node.redis_connection.get_retry()._retries == retry._retries + assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) + rand_node = r.get_random_node() + existing_conn = rand_node.redis_connection.connection_pool.get_connection("_") + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + assert node.redis_connection.get_retry()._retries == new_retry._retries + assert isinstance( + node.redis_connection.get_retry()._backoff, ExponentialBackoff + ) + assert existing_conn.retry._retries == new_retry._retries + new_conn = rand_node.redis_connection.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry._retries + + def test_cluster_retry_object(self, r) -> None: + # Test default retry + retry = r.get_connection_kwargs().get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 0 + assert isinstance(retry._backoff, type(default_backoff())) + node1 = r.get_node("127.0.0.1", 16379).redis_connection + node2 = r.get_node("127.0.0.1", 16380).redis_connection + assert node1.get_retry()._retries == node2.get_retry()._retries + + # Test custom retry + retry = Retry(ExponentialBackoff(10, 5), 5) + rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry) + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379) + .redis_connection.get_retry() + ._retries + == retry._retries + ) + @pytest.mark.onlycluster class TestClusterRedisCommands: diff --git a/tests/test_retry.py b/tests/test_retry.py index f844fd0a12..3cfea5c09e 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -2,7 +2,7 @@ import pytest -from redis.backoff import NoBackoff +from redis.backoff import ExponentialBackoff, NoBackoff from redis.client import Redis from redis.connection import Connection, UnixDomainSocketConnection from redis.exceptions import ( @@ -203,3 +203,17 @@ def test_client_retry_on_timeout(self, request): r.get("foo") finally: assert parse_response.call_count == retries + 1 + + def test_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(Redis, request, retry_on_timeout=True, retry=retry) + exist_conn = r.connection_pool.get_connection("_") + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry_policy) + assert r.get_retry()._retries == new_retry_policy._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert exist_conn.retry._retries == new_retry_policy._retries + new_conn = r.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry_policy._retries