Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support using asyncio coroutines from inside greenlets #877

57 changes: 57 additions & 0 deletions eventlet/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Asyncio compatibility functions.
"""
import asyncio

from greenlet import GreenletExit

from .greenthread import spawn, getcurrent
from .event import Event
from .hubs import get_hub
from .hubs.asyncio import Hub as AsyncioHub

__all__ = ["spawn_for_awaitable"]


def spawn_for_awaitable(coroutine):
"""
Take a coroutine or some other object that can be awaited
(``asyncio.Future``, ``asyncio.Task``), and turn it into a ``GreenThread``.

Known limitations:

* The coroutine/future/etc. don't run in their own
greenlet/``GreenThread``.
* As a result, things like ``eventlet.Lock``
won't work correctly inside ``async`` functions, thread ids aren't
meaningful, and so on.
"""
if not isinstance(get_hub(), AsyncioHub):
raise RuntimeError(
"This API only works with eventlet's asyncio hub. "
+ "To use it, set an EVENTLET_HUB=asyncio environment variable."
)

def _run():
# Convert the coroutine/Future/Task we're wrapping into a Future.
future = asyncio.ensure_future(coroutine, loop=asyncio.get_running_loop())

# Ensure killing the GreenThread cancels the Future:
def _got_result(gthread):
try:
gthread.wait()
except GreenletExit:
future.cancel()

getcurrent().link(_got_result)

# Wait until the Future has a result.
has_result = Event()
future.add_done_callback(lambda _: has_result.send(True))
has_result.wait()
# Return the result of the Future (or raise an exception if it had an
# exception).
return future.result()

Check warning on line 54 in eventlet/asyncio.py

View check run for this annotation

Codecov / codecov/patch

eventlet/asyncio.py#L54

Added line #L54 was not covered by tests

# Start a GreenThread:
return spawn(_run)
136 changes: 136 additions & 0 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Tests for asyncio integration."""

import asyncio
from time import time

import pytest

from greenlet import GreenletExit

import eventlet
from eventlet.hubs import get_hub
from eventlet.hubs.asyncio import Hub as AsyncioHub
from eventlet.asyncio import spawn_for_awaitable
from eventlet.greenthread import getcurrent
from .wsgi_test import _TestBase, Site

if not isinstance(get_hub(), AsyncioHub):
pytest.skip("Only works on asyncio hub", allow_module_level=True)


class CallingAsyncFunctionsFromGreenletsHighLevelTests(_TestBase):
"""
High-level tests for using ``asyncio``-based code inside greenlets.

For this functionality to be useful, users need to be able to use 3rd party
libraries that use sockets etc.. Merely hooking up futures to greenlets
doesn't help if you can't use the asyncio library ecosystem. So this set
of tests does more integration-y tests showing that functionality works.
"""

def set_site(self):
self.site = Site()

def test_aiohttp_client(self):
"""
The ``aiohttp`` HTTP client works correctly on top of eventlet.
"""
import aiohttp

async def request():
host, port = self.server_addr
async with aiohttp.ClientSession() as session:
url = "http://{}:{}/".format(host, port)
async with session.get(url) as response:
html = await response.text()
return html

gthread = spawn_for_awaitable(request())
assert gthread.wait() == "hello world"


def test_result():
"""
The result of the coroutine is returned by the ``GreenThread`` created by
``spawn_for_awaitable``.
"""

async def go():
await asyncio.sleep(0.0001)
return 13

assert spawn_for_awaitable(go()).wait() == 13


def test_exception():
"""
An exception raised by the coroutine is raised by ``GreenThread.wait()``
for the green thread created by ``spawn_for_awaitable()``.
"""

async def go():
await asyncio.sleep(0.0001)
raise ZeroDivisionError()

with pytest.raises(ZeroDivisionError):
assert spawn_for_awaitable(go()).wait()


def test_future_and_task():
"""
``spawn_for_awaitable()`` can take an ``asyncio.Future`` or an
``asyncio.Task``.
"""

async def go(value):
return value * 2

assert spawn_for_awaitable(asyncio.ensure_future(go(8))).wait() == 16
assert spawn_for_awaitable(asyncio.create_task(go(6))).wait() == 12


def test_asyncio_sleep():
"""
``asyncio`` scheduled events work on eventlet.
"""

async def go():
start = time()
await asyncio.sleep(0.07)
return time() - start

elapsed = spawn_for_awaitable(go()).wait()
assert 0.05 < elapsed < 0.09


def test_kill_greenthread():
"""
If a ``GreenThread`` wrapping an ``asyncio.Future``/coroutine is killed,
the ``asyncio.Future`` is cancelled.
"""

the_greenthread = []
progress = []

async def go():
await asyncio.sleep(0.1)
progress.append(1)
while not the_greenthread:
await asyncio.sleep(0.001)
# Kill the green thread.
progress.append(2)
the_greenthread[0].kill()
progress.append(3)
await asyncio.sleep(1)
# This should never be reached:
progress.append(4)

future = asyncio.ensure_future(go())
the_greenthread.append(spawn_for_awaitable(future))
with pytest.raises(GreenletExit):
the_greenthread[0].wait()
assert progress == [1, 2, 3]
# Cancellation may not be immediate.
eventlet.sleep(0.01)
assert future.cancelled()
assert progress == [1, 2, 3]
2 changes: 1 addition & 1 deletion tests/mysqldb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def mysql_requirement(_f):


class TestMySQLdb(tests.LimitedTestCase):
TEST_TIMEOUT = 5
TEST_TIMEOUT = 50

def setUp(self):
self._auth = tests.get_database_auth()['MySQLdb']
Expand Down
23 changes: 23 additions & 0 deletions tests/not_asyncio_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Tests for hubs that are not asyncio-based."""

import pytest

from eventlet.hubs import get_hub
from eventlet.hubs.asyncio import Hub as AsyncioHub
from eventlet.asyncio import spawn_for_awaitable

if isinstance(get_hub(), AsyncioHub):
pytest.skip("Only works on non-asyncio hub", allow_module_level=True)


def test_spawn_from_coroutine_errors():
"""
If ``spawn_for_awaitable()`` is called in a non-asyncio hub it will raise a
``RuntimeError``.
"""

async def go():
return 13

with pytest.raises(RuntimeError):
spawn_for_awaitable(go())
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ deps =
py{310,311}: psycopg2-binary==2.9.5
py{310,311}: pyzmq==25.0.0
dnspython1: dnspython<2
asyncio: aiohttp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is aiohttp is only required by tests? I suppose that yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's just a testing thing.

usedevelop = True
commands =
pip install -e .
Expand Down