Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asyncio.Queue.__aiter__ #119154

Open
Zac-HD opened this issue May 18, 2024 · 9 comments
Open

Add asyncio.Queue.__aiter__ #119154

Zac-HD opened this issue May 18, 2024 · 9 comments
Labels
topic-asyncio type-feature A feature request or enhancement

Comments

@Zac-HD
Copy link
Contributor

Zac-HD commented May 18, 2024

Feature or enhancement

Proposal:

Over the last few years, Trio and AnyIO users have proven out several design patterns using channels as async iterables. For example, having a context manager yield an async iterable avoids the motivating problems of both PEP-533 and PEP-789.

An asyncio.Queue is almost identical to a channel-pair, especially with the .shutdown() method added in Python 3.13. I therefore propose that we add an .__aiter__ method, to more support such design patterns without subclassing or a generator helper function, with an implementation as described in #119154 (comment)

Links to previous discussion of this feature:

python/peps#3782 (review) suggested that queue.Queue could also be iterable. If we're extending this to synchronous classes I'd also include multiprocessing.Queue and multiprocessing.SimpleQueue. I'd omit multiprocessing.connection.Connection, due to the byte-level send/recv methods, and queue.SimpleQueue because without a .close() or .shutdown() method there's no clean way to shut down.

Limitations

Making Queue aiterable reaches API parity for single-producer, single-consumer patterns. In multi-producer and/or multi-consumer patterns, without a .clone() method it is the user's responsibility to shut down the queue when the last task is done. I do not propose to add .clone(), but we could include that link in the docs as an option for multi-producer patterns if desired.

@Zac-HD Zac-HD added the type-feature A feature request or enhancement label May 18, 2024
@graingert
Copy link
Contributor

graingert commented May 19, 2024

It should be:

    async def __aiter__(self):
        try:
            while True:
                yield await self.get()
        except asyncio.QueueShutDown:
            return

Or it might be even better to implement __anext__ and do

def __aiter__(self):
    return self

@gvanrossum
Copy link
Member

Probably should also have a task_done call.

@Zac-HD
Copy link
Contributor Author

Zac-HD commented May 20, 2024

I've included those suggestions in the top comment, sticking with __aiter__ because I don't see a clean way to support task_done() from __anext__ - and it feels more reasonable to describe queues as aiterables than aiterators.

@gvanrossum
Copy link
Member

I suspect the real problem is that the scope of the try/except is too large, and should only go around the await queue.get() part? (And you need s/queue/self/.)

@gvanrossum
Copy link
Member

Maybe this?

async def __aiter__(self):
    while True:
        try:
            item = await self.get()
        except asyncio.QueueShutDown:
            return
        yield item
        self.task_done()

A philosophical question is, if yield item raises (i.e., the caller throws an exception into the generator), should task_done() be called or not?

Also, does it look like you're proposing a legit use case for async generators?

@Zac-HD
Copy link
Contributor Author

Zac-HD commented May 20, 2024

Also, does it look like you're proposing a legit use case for async generators?

Yes! Generators (sync or async) are a really elegant syntax for defining iterables, and the problems motivating PEP-789 only occur if you yield (and thus suspend execution) within certain context managers. If you don't have a context manager in the generator function, and you're careful about finalization timing for PEP-533 reasons, I think they're great.

@serhiy-storchaka
Copy link
Member

In general, you should always close an asynchronous generator. So the correct use of the proposed feature would be:

async with contextlib.aclosing(aiter(queue)) as it:
    async for item in it:
        # process item

instead of simple

async for item in queue:
    # process item

We should also consider to add a method or a global function which returns an iterator instead of making Queue an iterable. It will allow to emit a warning if the iterator was not closed.

It is not clear what to do with task_done() if the iteration was stopped (exception or break/return) before exhausting the iterator.

@graingert
Copy link
Contributor

Rather than using an async generator, you could use a class with a __anext__ then you don't need to worry about exceptions being thrown in, or the async generator not being closed

@gvanrossum
Copy link
Member

Like this:

class AsyncQueueIterator:
    def __init__(self, queue):
        self.queue = queue

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            item = await self.queue.get()
        except asyncio.QueueShutDown:
            raise StopAsyncIteration
        else:
            return item


class Queue:
    ...
    def __aiter__(self):
        return AsyncQueueIterator(self)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-asyncio type-feature A feature request or enhancement
Projects
Status: Todo
Development

No branches or pull requests

5 participants