Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes for #2666 and enhanced async test #2673

Merged
merged 1 commit into from Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 3 additions & 7 deletions redis/asyncio/client.py
Expand Up @@ -1227,13 +1227,9 @@ async def immediate_execute_command(self, *args, **options):
command_name, self.shard_hint
)
self.connection = conn
try:
return await asyncio.shield(
self._try_send_command_parse_response(conn, *args, **options)
)
except asyncio.CancelledError:
await conn.disconnect()
raise
return await asyncio.shield(
self._try_send_command_parse_response(conn, *args, **options)
)

def pipeline_execute_command(self, *args, **options):
"""
Expand Down
45 changes: 23 additions & 22 deletions tests/test_asyncio/test_cwe_404.py
Expand Up @@ -88,34 +88,35 @@ async def test_standalone_pipeline(delay):
addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
)
await dp.start()
async with Redis(host="localhost", port=5380) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")
for b in [True, False]:
async with Redis(host="localhost", port=5380, single_connection_client=b) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")

pipe = r.pipeline()
pipe = r.pipeline()

pipe2 = r.pipeline()
pipe2.get("bar")
pipe2.ping()
pipe2.get("foo")
pipe2 = r.pipeline()
pipe2.get("bar")
pipe2.ping()
pipe2.get("foo")

t = asyncio.create_task(pipe.get("foo").execute())
await asyncio.sleep(delay)
t.cancel()
t = asyncio.create_task(pipe.get("foo").execute())
await asyncio.sleep(delay)
t.cancel()

pipe.get("bar")
pipe.ping()
pipe.get("foo")
pipe.reset()
pipe.get("bar")
pipe.ping()
pipe.get("foo")
pipe.reset()

assert await pipe.execute() is None
assert await pipe.execute() is None

# validating that the pipeline can be used as it could previously
pipe.get("bar")
pipe.ping()
pipe.get("foo")
assert await pipe.execute() == [b"bar", True, b"foo"]
assert await pipe2.execute() == [b"bar", True, b"foo"]
# validating that the pipeline can be used as it could previously
pipe.get("bar")
pipe.ping()
pipe.get("foo")
assert await pipe.execute() == [b"bar", True, b"foo"]
assert await pipe2.execute() == [b"bar", True, b"foo"]

await dp.stop()

Expand Down