Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CPU based limit and Blocking Connection Pool in redis-py by default #3200

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions redis/asyncio/client.py
Expand Up @@ -39,6 +39,7 @@
bool_ok,
)
from redis.asyncio.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
SSLConnection,
Expand Down Expand Up @@ -160,12 +161,12 @@ def from_url(
Boolean arguments can be specified with string values "True"/"False"
or "Yes"/"No". Values that cannot be properly cast cause a
``ValueError`` to be raised. Once parsed, the querystring arguments
and keyword arguments are passed to the ``ConnectionPool``'s
and keyword arguments are passed to the ``BlockingConnectionPool``'s
class initializer. In the case of conflicting arguments, querystring
arguments always win.

"""
connection_pool = ConnectionPool.from_url(url, **kwargs)
connection_pool = BlockingConnectionPool.from_url(url, **kwargs)
client = cls(
connection_pool=connection_pool,
single_connection_client=single_connection_client,
Expand Down Expand Up @@ -337,7 +338,7 @@ def __init__(
)
# This arg only used if no pool is passed in
self.auto_close_connection_pool = auto_close_connection_pool
connection_pool = ConnectionPool(**kwargs)
connection_pool = BlockingConnectionPool(**kwargs)
else:
# If a pool is passed in, do not close it
self.auto_close_connection_pool = False
Expand Down
5 changes: 3 additions & 2 deletions redis/asyncio/cluster.py
@@ -1,5 +1,6 @@
import asyncio
import collections
import os
import random
import socket
import ssl
Expand Down Expand Up @@ -242,7 +243,7 @@ def __init__(
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
max_connections: int = os.cpu_count() * 10,
# Client related kwargs
db: Union[str, int] = 0,
path: Optional[str] = None,
Expand Down Expand Up @@ -964,7 +965,7 @@ def __init__(
port: Union[str, int],
server_type: Optional[str] = None,
*,
max_connections: int = 2**31,
max_connections: int = os.cpu_count() * 10,
connection_class: Type[Connection] = Connection,
**connection_kwargs: Any,
) -> None:
Expand Down
5 changes: 3 additions & 2 deletions redis/asyncio/connection.py
Expand Up @@ -2,6 +2,7 @@
import copy
import enum
import inspect
import os
import socket
import ssl
import sys
Expand Down Expand Up @@ -1125,7 +1126,7 @@ def __init__(
max_connections: Optional[int] = None,
**connection_kwargs,
):
max_connections = max_connections or 2**31
max_connections = max_connections or os.cpu_count() * 10
if not isinstance(max_connections, int) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')

Expand Down Expand Up @@ -1317,7 +1318,7 @@ class BlockingConnectionPool(ConnectionPool):

def __init__(
self,
max_connections: int = 50,
max_connections: int = os.cpu_count() * 10,
timeout: Optional[int] = 20,
connection_class: Type[AbstractConnection] = Connection,
queue_class: Type[asyncio.Queue] = asyncio.LifoQueue, # deprecated
Expand Down
7 changes: 4 additions & 3 deletions redis/client.py
Expand Up @@ -27,6 +27,7 @@
)
from redis.connection import (
AbstractConnection,
BlockingConnectionPool,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
Expand Down Expand Up @@ -141,13 +142,13 @@ def from_url(cls, url: str, **kwargs) -> "Redis":
Boolean arguments can be specified with string values "True"/"False"
or "Yes"/"No". Values that cannot be properly cast cause a
``ValueError`` to be raised. Once parsed, the querystring arguments
and keyword arguments are passed to the ``ConnectionPool``'s
and keyword arguments are passed to the ``BlockingConnectionPool``'s
class initializer. In the case of conflicting arguments, querystring
arguments always win.

"""
single_connection_client = kwargs.pop("single_connection_client", False)
connection_pool = ConnectionPool.from_url(url, **kwargs)
connection_pool = BlockingConnectionPool.from_url(url, **kwargs)
client = cls(
connection_pool=connection_pool,
single_connection_client=single_connection_client,
Expand Down Expand Up @@ -320,7 +321,7 @@ def __init__(
"ssl_min_version": ssl_min_version,
}
)
connection_pool = ConnectionPool(**kwargs)
connection_pool = BlockingConnectionPool(**kwargs)
self.auto_close_connection_pool = True
else:
self.auto_close_connection_pool = False
Expand Down
6 changes: 3 additions & 3 deletions redis/cluster.py
Expand Up @@ -12,7 +12,7 @@
from redis.client import CaseInsensitiveDict, PubSub, Redis
from redis.commands import READ_COMMANDS, RedisClusterCommands
from redis.commands.helpers import list_or_args
from redis.connection import ConnectionPool, DefaultParser, parse_url
from redis.connection import BlockingConnectionPool, DefaultParser, parse_url
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
Expand Down Expand Up @@ -487,7 +487,7 @@ def from_url(cls, url, **kwargs):
Boolean arguments can be specified with string values "True"/"False"
or "Yes"/"No". Values that cannot be properly cast cause a
``ValueError`` to be raised. Once parsed, the querystring arguments
and keyword arguments are passed to the ``ConnectionPool``'s
and keyword arguments are passed to the ``BlockingConnectionPool``'s
class initializer. In the case of conflicting arguments, querystring
arguments always win.

Expand Down Expand Up @@ -1321,7 +1321,7 @@ def __init__(
require_full_coverage=False,
lock=None,
dynamic_startup_nodes=True,
connection_pool_class=ConnectionPool,
connection_pool_class=BlockingConnectionPool,
address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
**kwargs,
):
Expand Down
53 changes: 47 additions & 6 deletions redis/connection.py
Expand Up @@ -1079,7 +1079,7 @@ def __init__(
max_connections: Optional[int] = None,
**connection_kwargs,
):
max_connections = max_connections or 2**31
max_connections = max_connections or os.cpu_count() * 10
if not isinstance(max_connections, int) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')

Expand Down Expand Up @@ -1348,7 +1348,7 @@ class BlockingConnectionPool(ConnectionPool):

def __init__(
self,
max_connections=50,
max_connections=os.cpu_count() * 10,
timeout=20,
connection_class=Connection,
queue_class=LifoQueue,
Expand Down Expand Up @@ -1425,12 +1425,15 @@ def get_connection(self, command_name, *keys, **options):
try:
# ensure this connection is connected to Redis
connection.connect()
# connections that the pool provides should be ready to send
# a command. if not, the connection was either returned to the
# if client caching is not enabled connections that the pool
# provides should be ready to send a command.
# if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
# (if caching enabled the connection will not always be ready
# to send a command because it may contain invalidation messages)
try:
if connection.can_read():
if connection.can_read() and connection.client_cache is None:
raise ConnectionError("Connection has data")
except (ConnectionError, OSError):
connection.disconnect()
Expand All @@ -1454,7 +1457,10 @@ def release(self, connection):
# that will cause the pool to recreate the connection if
# its needed.
connection.disconnect()
self.pool.put_nowait(None)
try:
self.pool.put_nowait(None)
except Full:
pass
return

# Put the connection back into the pool.
Expand All @@ -1470,3 +1476,38 @@ def disconnect(self):
self._checkpid()
for connection in self._connections:
connection.disconnect()

def set_retry(self, retry: "Retry") -> None:
self.connection_kwargs.update({"retry": retry})
for conn in self._connections:
conn.retry = retry

def flush_cache(self):
self._checkpid()
with self._lock:
for connection in self._connections:
try:
connection.client_cache.flush()
except AttributeError:
# cache is not enabled
pass

def delete_command_from_cache(self, command: str):
self._checkpid()
with self._lock:
for connection in self._connections:
try:
connection.client_cache.delete_command(command)
except AttributeError:
# cache is not enabled
pass

def invalidate_key_from_cache(self, key: str):
self._checkpid()
with self._lock:
for connection in self._connections:
try:
connection.client_cache.invalidate_key(key)
except AttributeError:
# cache is not enabled
pass
2 changes: 2 additions & 0 deletions tests/test_asyncio/test_cluster.py
Expand Up @@ -2859,11 +2859,13 @@ async def test_readonly_pipeline_from_readonly_client(

async def test_can_run_concurrent_pipelines(self, r: RedisCluster) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the fixture from here, if initialized inside the test?

"""Test that the pipeline can be used concurrently."""
r = RedisCluster(host="localhost", port=16379, max_connections=100)
await asyncio.gather(
*(self.test_redis_cluster_pipeline(r) for i in range(100)),
*(self.test_multi_key_operation_with_a_single_slot(r) for i in range(100)),
*(self.test_multi_key_operation_with_multi_slots(r) for i in range(100)),
)
r.flushdb()

@pytest.mark.onlycluster
async def test_pipeline_with_default_node_error_command(self, create_redis):
Expand Down
5 changes: 2 additions & 3 deletions tests/test_asyncio/test_connection_pool.py
Expand Up @@ -647,11 +647,10 @@ def test_connect_from_url_tcp(self):
connection = redis.Redis.from_url("redis://localhost")
pool = connection.connection_pool

print(repr(pool))
assert re.match(
r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE
).groups() == (
"ConnectionPool",
"BlockingConnectionPool",
"Connection",
"host=localhost,port=6379,db=0",
)
Expand All @@ -663,7 +662,7 @@ def test_connect_from_url_unix(self):
assert re.match(
r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE
).groups() == (
"ConnectionPool",
"BlockingConnectionPool",
"UnixDomainSocketConnection",
"path=/path/to/socket,db=0",
)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_cluster.py
Expand Up @@ -3281,8 +3281,7 @@ def raise_error(target_node, *args, **kwargs):
assert get_connection.call_count == 4
for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections
assert len(connection_pool._connections) < 2

def test_empty_stack(self, r):
"""
Expand Down
10 changes: 6 additions & 4 deletions tests/test_connection_pool.py
Expand Up @@ -501,8 +501,10 @@ def test_on_connect_error(self):
with pytest.raises(redis.RedisError):
bad_connection.info()
pool = bad_connection.connection_pool
assert len(pool._available_connections) == 1
assert not pool._available_connections[0]._sock
con = pool.pool.get()
assert con is not None
assert pool.pool.get() is None
assert not con._sock

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("2.8.8")
Expand Down Expand Up @@ -570,7 +572,7 @@ def test_connect_from_url_tcp(self):
assert re.match(
r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE
).groups() == (
"ConnectionPool",
"BlockingConnectionPool",
"Connection",
"host=localhost,port=6379,db=0",
)
Expand All @@ -582,7 +584,7 @@ def test_connect_from_url_unix(self):
assert re.match(
r"< .*?([^\.]+) \( < .*?([^\.]+) \( (.+) \) > \) >", repr(pool), re.VERBOSE
).groups() == (
"ConnectionPool",
"BlockingConnectionPool",
"UnixDomainSocketConnection",
"path=/path/to/socket,db=0",
)
Expand Down