diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index e153f0cd3..b2de46f3d 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -39,6 +39,7 @@ bool_ok, ) from redis.asyncio.connection import ( + BlockingConnectionPool, Connection, ConnectionPool, SSLConnection, @@ -160,12 +161,12 @@ def from_url( Boolean arguments can be specified with string values "True"/"False" or "Yes"/"No". Values that cannot be properly cast cause a ``ValueError`` to be raised. Once parsed, the querystring arguments - and keyword arguments are passed to the ``ConnectionPool``'s + and keyword arguments are passed to the ``BlockingConnectionPool``'s class initializer. In the case of conflicting arguments, querystring arguments always win. """ - connection_pool = ConnectionPool.from_url(url, **kwargs) + connection_pool = BlockingConnectionPool.from_url(url, **kwargs) client = cls( connection_pool=connection_pool, single_connection_client=single_connection_client, @@ -337,7 +338,7 @@ def __init__( ) # This arg only used if no pool is passed in self.auto_close_connection_pool = auto_close_connection_pool - connection_pool = ConnectionPool(**kwargs) + connection_pool = BlockingConnectionPool(**kwargs) else: # If a pool is passed in, do not close it self.auto_close_connection_pool = False diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index ff2bd10c9..e6a32f405 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1,5 +1,6 @@ import asyncio import collections +import os import random import socket import ssl @@ -242,7 +243,7 @@ def __init__( reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, connection_error_retry_attempts: int = 3, - max_connections: int = 2**31, + max_connections: int = os.cpu_count() * 10, # Client related kwargs db: Union[str, int] = 0, path: Optional[str] = None, @@ -964,7 +965,7 @@ def __init__( port: Union[str, int], server_type: Optional[str] = None, *, - max_connections: int = 2**31, + max_connections: int = os.cpu_count() * 10, connection_class: Type[Connection] = Connection, **connection_kwargs: Any, ) -> None: diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 2e470bfcf..66404c0f5 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -2,6 +2,7 @@ import copy import enum import inspect +import os import socket import ssl import sys @@ -1125,7 +1126,7 @@ def __init__( max_connections: Optional[int] = None, **connection_kwargs, ): - max_connections = max_connections or 2**31 + max_connections = max_connections or os.cpu_count() * 10 if not isinstance(max_connections, int) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') @@ -1317,7 +1318,7 @@ class BlockingConnectionPool(ConnectionPool): def __init__( self, - max_connections: int = 50, + max_connections: int = os.cpu_count() * 10, timeout: Optional[int] = 20, connection_class: Type[AbstractConnection] = Connection, queue_class: Type[asyncio.Queue] = asyncio.LifoQueue, # deprecated diff --git a/redis/client.py b/redis/client.py index 79f52cc98..f4a2b0aa8 100755 --- a/redis/client.py +++ b/redis/client.py @@ -27,6 +27,7 @@ ) from redis.connection import ( AbstractConnection, + BlockingConnectionPool, ConnectionPool, SSLConnection, UnixDomainSocketConnection, @@ -141,13 +142,13 @@ def from_url(cls, url: str, **kwargs) -> "Redis": Boolean arguments can be specified with string values "True"/"False" or "Yes"/"No". Values that cannot be properly cast cause a ``ValueError`` to be raised. Once parsed, the querystring arguments - and keyword arguments are passed to the ``ConnectionPool``'s + and keyword arguments are passed to the ``BlockingConnectionPool``'s class initializer. In the case of conflicting arguments, querystring arguments always win. """ single_connection_client = kwargs.pop("single_connection_client", False) - connection_pool = ConnectionPool.from_url(url, **kwargs) + connection_pool = BlockingConnectionPool.from_url(url, **kwargs) client = cls( connection_pool=connection_pool, single_connection_client=single_connection_client, @@ -320,7 +321,7 @@ def __init__( "ssl_min_version": ssl_min_version, } ) - connection_pool = ConnectionPool(**kwargs) + connection_pool = BlockingConnectionPool(**kwargs) self.auto_close_connection_pool = True else: self.auto_close_connection_pool = False diff --git a/redis/cluster.py b/redis/cluster.py index cfe902115..108273f7f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -12,7 +12,7 @@ from redis.client import CaseInsensitiveDict, PubSub, Redis from redis.commands import READ_COMMANDS, RedisClusterCommands from redis.commands.helpers import list_or_args -from redis.connection import ConnectionPool, DefaultParser, parse_url +from redis.connection import BlockingConnectionPool, DefaultParser, parse_url from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, @@ -487,7 +487,7 @@ def from_url(cls, url, **kwargs): Boolean arguments can be specified with string values "True"/"False" or "Yes"/"No". Values that cannot be properly cast cause a ``ValueError`` to be raised. Once parsed, the querystring arguments - and keyword arguments are passed to the ``ConnectionPool``'s + and keyword arguments are passed to the ``BlockingConnectionPool``'s class initializer. In the case of conflicting arguments, querystring arguments always win. @@ -1321,7 +1321,7 @@ def __init__( require_full_coverage=False, lock=None, dynamic_startup_nodes=True, - connection_pool_class=ConnectionPool, + connection_pool_class=BlockingConnectionPool, address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, **kwargs, ): diff --git a/redis/connection.py b/redis/connection.py index b89ce0e94..494ad028a 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1079,7 +1079,7 @@ def __init__( max_connections: Optional[int] = None, **connection_kwargs, ): - max_connections = max_connections or 2**31 + max_connections = max_connections or os.cpu_count() * 10 if not isinstance(max_connections, int) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') @@ -1348,7 +1348,7 @@ class BlockingConnectionPool(ConnectionPool): def __init__( self, - max_connections=50, + max_connections=os.cpu_count() * 10, timeout=20, connection_class=Connection, queue_class=LifoQueue, @@ -1425,12 +1425,15 @@ def get_connection(self, command_name, *keys, **options): try: # ensure this connection is connected to Redis connection.connect() - # connections that the pool provides should be ready to send - # a command. if not, the connection was either returned to the + # if client caching is not enabled connections that the pool + # provides should be ready to send a command. + # if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. + # (if caching enabled the connection will not always be ready + # to send a command because it may contain invalidation messages) try: - if connection.can_read(): + if connection.can_read() and connection.client_cache is None: raise ConnectionError("Connection has data") except (ConnectionError, OSError): connection.disconnect() @@ -1454,7 +1457,10 @@ def release(self, connection): # that will cause the pool to recreate the connection if # its needed. connection.disconnect() - self.pool.put_nowait(None) + try: + self.pool.put_nowait(None) + except Full: + pass return # Put the connection back into the pool. @@ -1470,3 +1476,38 @@ def disconnect(self): self._checkpid() for connection in self._connections: connection.disconnect() + + def set_retry(self, retry: "Retry") -> None: + self.connection_kwargs.update({"retry": retry}) + for conn in self._connections: + conn.retry = retry + + def flush_cache(self): + self._checkpid() + with self._lock: + for connection in self._connections: + try: + connection.client_cache.flush() + except AttributeError: + # cache is not enabled + pass + + def delete_command_from_cache(self, command: str): + self._checkpid() + with self._lock: + for connection in self._connections: + try: + connection.client_cache.delete_command(command) + except AttributeError: + # cache is not enabled + pass + + def invalidate_key_from_cache(self, key: str): + self._checkpid() + with self._lock: + for connection in self._connections: + try: + connection.client_cache.invalidate_key(key) + except AttributeError: + # cache is not enabled + pass diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index d7554b12a..863842c9c 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -2859,11 +2859,13 @@ async def test_readonly_pipeline_from_readonly_client( async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None: """Test that the pipeline can be used concurrently.""" + r = RedisCluster(host="localhost", port=16379, max_connections=100) await asyncio.gather( *(self.test_redis_cluster_pipeline(r) for i in range(100)), *(self.test_multi_key_operation_with_a_single_slot(r) for i in range(100)), *(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)), ) + r.flushdb() @pytest.mark.onlycluster async def test_pipeline_with_default_node_error_command(self, create_redis): diff --git a/tests/test_asyncio/test_connection_pool.py b/tests/test_asyncio/test_connection_pool.py index 5e4d3f206..22d3ef696 100644 --- a/tests/test_asyncio/test_connection_pool.py +++ b/tests/test_asyncio/test_connection_pool.py @@ -647,11 +647,10 @@ def test_connect_from_url_tcp(self): connection = redis.Redis.from_url("redis://localhost") pool = connection.connection_pool - print(repr(pool)) assert re.match( r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE ).groups() == ( - "ConnectionPool", + "BlockingConnectionPool", "Connection", "host=localhost,port=6379,db=0", ) @@ -663,7 +662,7 @@ def test_connect_from_url_unix(self): assert re.match( r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE ).groups() == ( - "ConnectionPool", + "BlockingConnectionPool", "UnixDomainSocketConnection", "path=/path/to/socket,db=0", ) diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 1f505b816..089277f97 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -3281,8 +3281,7 @@ def raise_error(target_node, *args, **kwargs): assert get_connection.call_count == 4 for cluster_node in r.nodes_manager.nodes_cache.values(): connection_pool = cluster_node.redis_connection.connection_pool - num_of_conns = len(connection_pool._available_connections) - assert num_of_conns == connection_pool._created_connections + assert len(connection_pool._connections) < 2 def test_empty_stack(self, r): """ diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index dee7c554d..1a448bb78 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -501,8 +501,10 @@ def test_on_connect_error(self): with pytest.raises(redis.RedisError): bad_connection.info() pool = bad_connection.connection_pool - assert len(pool._available_connections) == 1 - assert not pool._available_connections[0]._sock + con = pool.pool.get() + assert con is not None + assert pool.pool.get() is None + assert not con._sock @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.8") @@ -570,7 +572,7 @@ def test_connect_from_url_tcp(self): assert re.match( r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE ).groups() == ( - "ConnectionPool", + "BlockingConnectionPool", "Connection", "host=localhost,port=6379,db=0", ) @@ -582,7 +584,7 @@ def test_connect_from_url_unix(self): assert re.match( r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE ).groups() == ( - "ConnectionPool", + "BlockingConnectionPool", "UnixDomainSocketConnection", "path=/path/to/socket,db=0", )