Skip to content

Commit

Permalink
Fix issue 2540: Synchronise concurrent command calls to single-client…
Browse files Browse the repository at this point in the history
… mode. (#2568)

Co-authored-by: Viktor Ivanov <viktor@infogrid.io>
  • Loading branch information
Vivanov98 and Viktor Ivanov committed Jan 29, 2023
1 parent 9e6a9b5 commit 428d609
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
15 changes: 13 additions & 2 deletions redis/asyncio/client.py
Expand Up @@ -253,15 +253,22 @@ def __init__(

self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS)

# If using a single connection client, we need to lock creation-of and use-of
# the client in order to avoid race conditions such as using asyncio.gather
# on a set of redis commands
self._single_conn_lock = asyncio.Lock()

def __repr__(self):
return f"{self.__class__.__name__}<{self.connection_pool!r}>"

def __await__(self):
return self.initialize().__await__()

async def initialize(self: _RedisT) -> _RedisT:
if self.single_connection_client and self.connection is None:
self.connection = await self.connection_pool.get_connection("_")
if self.single_connection_client:
async with self._single_conn_lock:
if self.connection is None:
self.connection = await self.connection_pool.get_connection("_")
return self

def set_response_callback(self, command: str, callback: ResponseCallbackT):
Expand Down Expand Up @@ -501,6 +508,8 @@ async def execute_command(self, *args, **options):
command_name = args[0]
conn = self.connection or await pool.get_connection(command_name, **options)

if self.single_connection_client:
await self._single_conn_lock.acquire()
try:
return await conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
Expand All @@ -509,6 +518,8 @@ async def execute_command(self, *args, **options):
lambda error: self._disconnect_raise(conn, error),
)
finally:
if self.single_connection_client:
self._single_conn_lock.release()
if not self.connection:
await pool.release(conn)

Expand Down
45 changes: 45 additions & 0 deletions tests/test_asyncio/test_connection.py
Expand Up @@ -6,6 +6,7 @@
import pytest

import redis
from redis.asyncio import Redis
from redis.asyncio.connection import (
BaseParser,
Connection,
Expand Down Expand Up @@ -41,6 +42,50 @@ async def test_invalid_response(create_redis):
await r.connection.disconnect()


@pytest.mark.onlynoncluster
async def test_single_connection():
"""Test that concurrent requests on a single client are synchronised."""
r = Redis(single_connection_client=True)

init_call_count = 0
command_call_count = 0
in_use = False

class Retry_:
async def call_with_retry(self, _, __):
# If we remove the single-client lock, this error gets raised as two
# coroutines will be vying for the `in_use` flag due to the two
# asymmetric sleep calls
nonlocal command_call_count
nonlocal in_use
if in_use is True:
raise ValueError("Commands should be executed one at a time.")
in_use = True
await asyncio.sleep(0.01)
command_call_count += 1
await asyncio.sleep(0.03)
in_use = False
return "foo"

mock_conn = mock.MagicMock()
mock_conn.retry = Retry_()

async def get_conn(_):
# Validate only one client is created in single-client mode when
# concurrent requests are made
nonlocal init_call_count
await asyncio.sleep(0.01)
init_call_count += 1
return mock_conn

with mock.patch.object(r.connection_pool, "get_connection", get_conn):
with mock.patch.object(r.connection_pool, "release"):
await asyncio.gather(r.set("a", "b"), r.set("c", "d"))

assert init_call_count == 1
assert command_call_count == 2


@skip_if_server_version_lt("4.0.0")
@pytest.mark.redismod
@pytest.mark.onlynoncluster
Expand Down

0 comments on commit 428d609

Please sign in to comment.