Skip to content

Commit

Permalink
Use context manager to manage DelayProxy
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur committed May 3, 2023
1 parent c5f842a commit 0c5f0aa
Showing 1 changed file with 121 additions and 113 deletions.
234 changes: 121 additions & 113 deletions tests/test_asyncio/test_cwe_404.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@ def __init__(self, addr, redis_addr, delay: float = 0.0):
self.delay = delay
self.send_event = asyncio.Event()

async def __aenter__(self):
await self.start()
return self

async def __aexit__(self, *args):
await self.stop()

async def start(self):
# test that we can connect to redis
async with async_timeout(2):
_, redis_writer = await asyncio.open_connection(*self.redis_addr)
redis_writer.close()
self.server = await asyncio.start_server(self.handle, *self.addr)
self.server = await asyncio.start_server(
self.handle, *self.addr, reuse_address=True
)
self.ROUTINE = asyncio.create_task(self.server.serve_forever())

@contextlib.contextmanager
Expand Down Expand Up @@ -95,91 +104,89 @@ async def test_standalone(delay, redis_addr):

# create a tcp socket proxy that relays data to Redis and back,
# inserting 0.1 seconds of delay
dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr)
await dp.start()

for b in [True, False]:
# note that we connect to proxy, rather than to Redis directly
async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r:

await r.set("foo", "foo")
await r.set("bar", "bar")

async def op(r):
with dp.set_delay(delay * 2):
return await r.get(
"foo"
) # <-- this is the operation we want to cancel

dp.send_event.clear()
t = asyncio.create_task(op(r))
# Wait until the task has sent, and then some, to make sure it has
# settled on the read.
await dp.send_event.wait()
await asyncio.sleep(0.01) # a little extra time for prudence
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t

# make sure that our previous request, cancelled while waiting for
# a repsponse, didn't leave the connection open andin a bad state
assert await r.get("bar") == b"bar"
assert await r.ping()
assert await r.get("foo") == b"foo"

await dp.stop()
async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) as dp:

for b in [True, False]:
# note that we connect to proxy, rather than to Redis directly
async with Redis(
host="127.0.0.1", port=5380, single_connection_client=b
) as r:

await r.set("foo", "foo")
await r.set("bar", "bar")

async def op(r):
with dp.set_delay(delay * 2):
return await r.get(
"foo"
) # <-- this is the operation we want to cancel

dp.send_event.clear()
t = asyncio.create_task(op(r))
# Wait until the task has sent, and then some, to make sure it has
# settled on the read.
await dp.send_event.wait()
await asyncio.sleep(0.01) # a little extra time for prudence
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t

# make sure that our previous request, cancelled while waiting for
# a repsponse, didn't leave the connection open andin a bad state
assert await r.get("bar") == b"bar"
assert await r.ping()
assert await r.get("foo") == b"foo"


@pytest.mark.onlynoncluster
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
async def test_standalone_pipeline(delay, redis_addr):
dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr)
await dp.start()
for b in [True, False]:
async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")

pipe = r.pipeline()

pipe2 = r.pipeline()
pipe2.get("bar")
pipe2.ping()
pipe2.get("foo")

async def op(pipe):
with dp.set_delay(delay * 2):
return await pipe.get(
"foo"
).execute() # <-- this is the operation we want to cancel

dp.send_event.clear()
t = asyncio.create_task(op(pipe))
# wait until task has settled on the read
await dp.send_event.wait()
await asyncio.sleep(0.01)
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t

# we have now cancelled the pieline in the middle of a request, make sure
# that the connection is still usable
pipe.get("bar")
pipe.ping()
pipe.get("foo")
await pipe.reset()

# check that the pipeline is empty after reset
assert await pipe.execute() == []

# validating that the pipeline can be used as it could previously
pipe.get("bar")
pipe.ping()
pipe.get("foo")
assert await pipe.execute() == [b"bar", True, b"foo"]
assert await pipe2.execute() == [b"bar", True, b"foo"]

await dp.stop()
async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) as dp:
for b in [True, False]:
async with Redis(
host="127.0.0.1", port=5380, single_connection_client=b
) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")

pipe = r.pipeline()

pipe2 = r.pipeline()
pipe2.get("bar")
pipe2.ping()
pipe2.get("foo")

async def op(pipe):
with dp.set_delay(delay * 2):
return await pipe.get(
"foo"
).execute() # <-- this is the operation we want to cancel

dp.send_event.clear()
t = asyncio.create_task(op(pipe))
# wait until task has settled on the read
await dp.send_event.wait()
await asyncio.sleep(0.01)
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t

# we have now cancelled the pieline in the middle of a request, make sure
# that the connection is still usable
pipe.get("bar")
pipe.ping()
pipe.get("foo")
await pipe.reset()

# check that the pipeline is empty after reset
assert await pipe.execute() == []

# validating that the pipeline can be used as it could previously
pipe.get("bar")
pipe.ping()
pipe.get("foo")
assert await pipe.execute() == [b"bar", True, b"foo"]
assert await pipe2.execute() == [b"bar", True, b"foo"]


@pytest.mark.onlycluster
Expand All @@ -202,9 +209,6 @@ def remap(address):
proxy = DelayProxy(addr=("127.0.0.1", remapped), redis_addr=forward_addr)
proxies.append(proxy)

# start proxies
await asyncio.gather(*[p.start() for p in proxies])

def all_clear():
for p in proxies:
p.send_event.clear()
Expand All @@ -221,32 +225,36 @@ def set_delay(delay: float):
stack.enter_context(p.set_delay(delay))
yield

with contextlib.closing(
RedisCluster.from_url(f"redis://127.0.0.1:{remap_base}", address_remap=remap)
) as r:
await r.initialize()
await r.set("foo", "foo")
await r.set("bar", "bar")

async def op(r):
with set_delay(delay):
return await r.get("foo")

all_clear()
t = asyncio.create_task(op(r))
# Wait for whichever DelayProxy gets the request first
await wait_for_send()
await asyncio.sleep(0.01)
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t

# try a number of requests to excercise all the connections
async def doit():
assert await r.get("bar") == b"bar"
assert await r.ping()
assert await r.get("foo") == b"foo"

await asyncio.gather(*[doit() for _ in range(10)])

await asyncio.gather(*(p.stop() for p in proxies))
async with contextlib.AsyncExitStack() as stack:
for p in proxies:
await stack.enter_async_context(p)

with contextlib.closing(
RedisCluster.from_url(
f"redis://127.0.0.1:{remap_base}", address_remap=remap
)
) as r:
await r.initialize()
await r.set("foo", "foo")
await r.set("bar", "bar")

async def op(r):
with set_delay(delay):
return await r.get("foo")

all_clear()
t = asyncio.create_task(op(r))
# Wait for whichever DelayProxy gets the request first
await wait_for_send()
await asyncio.sleep(0.01)
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t

# try a number of requests to excercise all the connections
async def doit():
assert await r.get("bar") == b"bar"
assert await r.ping()
assert await r.get("foo") == b"foo"

await asyncio.gather(*[doit() for _ in range(10)])

0 comments on commit 0c5f0aa

Please sign in to comment.