Skip to content

Commit

Permalink
Test that reading a message of two chunks after a disconnect() works.
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur committed Jan 20, 2023
1 parent 8f95a3e commit 3752994
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import socket
import types
from unittest.mock import AsyncMock, Mock, patch
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -151,7 +151,10 @@ async def test_connection_parse_response_resume(r: redis.Redis):


@pytest.mark.onlynoncluster
async def test_connection_hiredis_disconect_race():
@pytest.mark.parametrize(
"parser_class", [PythonParser, HiredisParser], ids=["PythonParser", "HiredisParser"]
)
async def test_connection_disconect_race(parser_class):
"""
This test reproduces the case in issue #2349
where a connection is closed while the parser is reading to feed the
Expand All @@ -163,12 +166,14 @@ async def test_connection_hiredis_disconect_race():
This test verifies that a read in progress can finish even
if the `disconnect()` method is called.
"""
if not HIREDIS_AVAILABLE:
pytest.skip("Hiredis not available)")
parser_class = HiredisParser
if parser_class == PythonParser:
pytest.xfail("doesn't work yet with PythonParser")
if parser_class == HiredisParser and not HIREDIS_AVAILABLE:
pytest.skip("Hiredis not available")

args = {}
args["parser_class"] = parser_class

conn = Connection(**args)

cond = asyncio.Condition()
Expand All @@ -177,15 +182,20 @@ async def test_connection_hiredis_disconect_race():
# 2 == closer has closed and is waiting for close to finish
state = 0

# mock read function, which wait for a close to happen before returning
async def read(_):
# Mock read function, which wait for a close to happen before returning
# Can either be invoked as two `read()` calls (HiredisParser)
# or as a `readline()` followed by `readexact()` (PythonParser)
chunks = [b"$13\r\n", b"Hello, World!\r\n"]

async def read(_=None):
nonlocal state
async with cond:
state = 1 # we are reading
cond.notify()
# wait until the closing task has done
await cond.wait_for(lambda: state == 2)
return b" "
if state == 0:
state = 1 # we are reading
cond.notify()
# wait until the closing task has done
await cond.wait_for(lambda: state == 2)
return chunks.pop(0)

# function closes the connection while reader is still blocked reading
async def do_close():
Expand All @@ -197,20 +207,24 @@ async def do_close():
await conn.disconnect()

async def do_read():
with pytest.raises(InvalidResponse):
await conn.read_response()
return await conn.read_response()

reader = AsyncMock()
writer = AsyncMock()
writer.transport = Mock()
reader = mock.AsyncMock()
writer = mock.AsyncMock()
writer.transport = mock.Mock()
writer.transport.get_extra_info.side_effect = None

# for HiredisParser
reader.read.side_effect = read
# for PythonParser
reader.readline.side_effect = read
reader.readexactly.side_effect = read

async def open_connection(*args, **kwargs):
return reader, writer

with patch.object(asyncio, "open_connection", open_connection):
await conn.connect()

await asyncio.gather(do_read(), do_close())
vals = await asyncio.gather(do_read(), do_close())
assert vals == [b"Hello, World!", None]

0 comments on commit 3752994

Please sign in to comment.