gh-77714: Provide an async iterator version of as_completed (GH-22491)

* as_completed returns object that is both iterator and async iterator
* Existing tests adjusted to test both the old and new style
* New test to ensure iterator can be resumed
* New test to ensure async iterator yields any passed-in Futures as-is

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
This commit is contained in:
Justin Turner Arthur 2024-04-01 12:07:29 -05:00 committed by GitHub
parent ddf814db74
commit c741ad3537
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 389 additions and 122 deletions

View File

@ -867,19 +867,50 @@ Waiting Primitives
.. function:: as_completed(aws, *, timeout=None)
Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws*
iterable concurrently. Return an iterator of coroutines.
Each coroutine returned can be awaited to get the earliest next
result from the iterable of the remaining awaitables.
Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws* iterable
concurrently. The returned object can be iterated to obtain the results
of the awaitables as they finish.
Raises :exc:`TimeoutError` if the timeout occurs before
all Futures are done.
The object returned by ``as_completed()`` can be iterated as an
:term:`asynchronous iterator` or a plain :term:`iterator`. When asynchronous
iteration is used, the originally-supplied awaitables are yielded if they
are tasks or futures. This makes it easy to correlate previously-scheduled
tasks with their results. Example::
Example::
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
for coro in as_completed(aws):
earliest_result = await coro
# ...
async for earliest_connect in as_completed(tasks):
# earliest_connect is done. The result can be obtained by
# awaiting it or calling earliest_connect.result()
reader, writer = await earliest_connect
if earliest_connect is ipv6_connect:
print("IPv6 connection established.")
else:
print("IPv4 connection established.")
During asynchronous iteration, implicitly-created tasks will be yielded for
supplied awaitables that aren't tasks or futures.
When used as a plain iterator, each iteration yields a new coroutine that
returns the result or raises the exception of the next completed awaitable.
This pattern is compatible with Python versions older than 3.13::
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
for next_connect in as_completed(tasks):
# next_connect is not one of the original task objects. It must be
# awaited to obtain the result value or raise the exception of the
# awaitable that finishes next.
reader, writer = await next_connect
A :exc:`TimeoutError` is raised if the timeout occurs before all awaitables
are done. This is raised by the ``async for`` loop during asynchronous
iteration or by the coroutines yielded during plain iteration.
.. versionchanged:: 3.10
Removed the *loop* parameter.
@ -891,6 +922,10 @@ Waiting Primitives
.. versionchanged:: 3.12
Added support for generators yielding tasks.
.. versionchanged:: 3.13
The result can now be used as either an :term:`asynchronous iterator`
or as a plain :term:`iterator` (previously it was only a plain iterator).
Running in Threads
==================

View File

@ -289,6 +289,13 @@ asyncio
forcefully close an asyncio server.
(Contributed by Pierre Ossman in :gh:`113538`.)
* :func:`asyncio.as_completed` now returns an object that is both an
:term:`asynchronous iterator` and a plain :term:`iterator` of awaitables.
The awaitables yielded by asynchronous iteration include original task or
future objects that were passed in, making it easier to associate results
with the tasks being completed.
(Contributed by Justin Arthur in :gh:`77714`.)
base64
------

View File

@ -25,6 +25,7 @@ from . import coroutines
from . import events
from . import exceptions
from . import futures
from . import queues
from . import timeouts
# Helper to generate new task names
@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
fut.remove_done_callback(cb)
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
class _AsCompletedIterator:
"""Iterator of awaitables representing tasks of asyncio.as_completed.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is:
for f in as_completed(fs):
result = await f # The 'await' may raise.
# Use result.
If a timeout is specified, the 'await' will raise
TimeoutError when the timeout occurs before all Futures are done.
Note: The futures 'f' are not necessarily members of fs.
As an asynchronous iterator, iteration yields futures as they finish. As a
plain iterator, new coroutines are yielded that will return or raise the
result of the next underlying future to complete.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
def __init__(self, aws, timeout):
self._done = queues.Queue()
self._timeout_handle = None
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
def _on_timeout():
loop = events.get_event_loop()
todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
f.add_done_callback(self._handle_completion)
if todo and timeout is not None:
self._timeout_handle = (
loop.call_later(timeout, self._handle_timeout)
)
self._todo = todo
self._todo_left = len(todo)
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
def __aiter__(self):
return self
async def _wait_for_one():
f = await done.get()
def __iter__(self):
return self
async def __anext__(self):
if not self._todo_left:
raise StopAsyncIteration
assert self._todo_left > 0
self._todo_left -= 1
return await self._wait_for_one()
def __next__(self):
if not self._todo_left:
raise StopIteration
assert self._todo_left > 0
self._todo_left -= 1
return self._wait_for_one(resolve=True)
def _handle_timeout(self):
for f in self._todo:
f.remove_done_callback(self._handle_completion)
self._done.put_nowait(None) # Sentinel for _wait_for_one().
self._todo.clear() # Can't do todo.remove(f) in the loop.
def _handle_completion(self, f):
if not self._todo:
return # _handle_timeout() was here first.
self._todo.remove(f)
self._done.put_nowait(f)
if not self._todo and self._timeout_handle is not None:
self._timeout_handle.cancel()
async def _wait_for_one(self, resolve=False):
# Wait for the next future to be done and return it unless resolve is
# set, in which case return either the result of the future or raise
# an exception.
f = await self._done.get()
if f is None:
# Dummy value from _on_timeout().
# Dummy value from _handle_timeout().
raise exceptions.TimeoutError
return f.result() # May raise f.exception().
return f.result() if resolve else f
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
def as_completed(fs, *, timeout=None):
"""Create an iterator of awaitables or their results in completion order.
Run the supplied awaitables concurrently. The returned object can be
iterated to obtain the results of the awaitables as they finish.
The object returned can be iterated as an asynchronous iterator or a plain
iterator. When asynchronous iteration is used, the originally-supplied
awaitables are yielded if they are tasks or futures. This makes it easy to
correlate previously-scheduled tasks with their results:
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
async for earliest_connect in as_completed(tasks):
# earliest_connect is done. The result can be obtained by
# awaiting it or calling earliest_connect.result()
reader, writer = await earliest_connect
if earliest_connect is ipv6_connect:
print("IPv6 connection established.")
else:
print("IPv4 connection established.")
During asynchronous iteration, implicitly-created tasks will be yielded for
supplied awaitables that aren't tasks or futures.
When used as a plain iterator, each iteration yields a new coroutine that
returns the result or raises the exception of the next completed awaitable.
This pattern is compatible with Python versions older than 3.13:
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
for next_connect in as_completed(tasks):
# next_connect is not one of the original task objects. It must be
# awaited to obtain the result value or raise the exception of the
# awaitable that finishes next.
reader, writer = await next_connect
A TimeoutError is raised if the timeout occurs before all awaitables are
done. This is raised by the async for loop during asynchronous iteration or
by the coroutines yielded during plain iteration.
"""
if inspect.isawaitable(fs):
raise TypeError(
f"expects an iterable of awaitables, not {type(fs).__name__}"
)
return _AsCompletedIterator(fs, timeout)
@types.coroutine

View File

@ -1,6 +1,7 @@
"""Tests for tasks.py."""
import collections
import contextlib
import contextvars
import gc
import io
@ -1409,12 +1410,6 @@ class BaseTaskTests:
yield 0.01
yield 0
loop = self.new_test_loop(gen)
# disable "slow callback" warning
loop.slow_callback_duration = 1.0
completed = set()
time_shifted = False
async def sleeper(dt, x):
nonlocal time_shifted
await asyncio.sleep(dt)
@ -1424,21 +1419,78 @@ class BaseTaskTests:
loop.advance_time(0.14)
return x
a = sleeper(0.01, 'a')
b = sleeper(0.01, 'b')
c = sleeper(0.15, 'c')
async def foo():
async def try_iterator(awaitables):
values = []
for f in asyncio.as_completed([b, c, a]):
for f in asyncio.as_completed(awaitables):
values.append(await f)
return values
res = loop.run_until_complete(self.new_task(loop, foo()))
self.assertAlmostEqual(0.15, loop.time())
self.assertTrue('a' in res[:2])
self.assertTrue('b' in res[:2])
self.assertEqual(res[2], 'c')
async def try_async_iterator(awaitables):
values = []
async for f in asyncio.as_completed(awaitables):
values.append(await f)
return values
for foo in try_iterator, try_async_iterator:
with self.subTest(method=foo.__name__):
loop = self.new_test_loop(gen)
# disable "slow callback" warning
loop.slow_callback_duration = 1.0
completed = set()
time_shifted = False
a = sleeper(0.01, 'a')
b = sleeper(0.01, 'b')
c = sleeper(0.15, 'c')
res = loop.run_until_complete(self.new_task(loop, foo([b, c, a])))
self.assertAlmostEqual(0.15, loop.time())
self.assertTrue('a' in res[:2])
self.assertTrue('b' in res[:2])
self.assertEqual(res[2], 'c')
def test_as_completed_same_tasks_in_as_out(self):
# Ensures that asynchronously iterating as_completed's iterator
# yields awaitables are the same awaitables that were passed in when
# those awaitables are futures.
async def try_async_iterator(awaitables):
awaitables_out = set()
async for out_aw in asyncio.as_completed(awaitables):
awaitables_out.add(out_aw)
return awaitables_out
async def coro(i):
return i
with contextlib.closing(asyncio.new_event_loop()) as loop:
# Coroutines shouldn't be yielded back as finished coroutines
# can't be re-used.
awaitables_in = frozenset(
(coro(0), coro(1), coro(2), coro(3))
)
awaitables_out = loop.run_until_complete(
try_async_iterator(awaitables_in)
)
if awaitables_in - awaitables_out != awaitables_in:
raise self.failureException('Got original coroutines '
'out of as_completed iterator.')
# Tasks should be yielded back.
coro_obj_a = coro('a')
task_b = loop.create_task(coro('b'))
coro_obj_c = coro('c')
task_d = loop.create_task(coro('d'))
awaitables_in = frozenset(
(coro_obj_a, task_b, coro_obj_c, task_d)
)
awaitables_out = loop.run_until_complete(
try_async_iterator(awaitables_in)
)
if awaitables_in & awaitables_out != {task_b, task_d}:
raise self.failureException('Only tasks should be yielded '
'from as_completed iterator '
'as-is.')
def test_as_completed_with_timeout(self):
@ -1448,12 +1500,7 @@ class BaseTaskTests:
yield 0
yield 0.1
loop = self.new_test_loop(gen)
a = loop.create_task(asyncio.sleep(0.1, 'a'))
b = loop.create_task(asyncio.sleep(0.15, 'b'))
async def foo():
async def try_iterator():
values = []
for f in asyncio.as_completed([a, b], timeout=0.12):
if values:
@ -1465,16 +1512,33 @@ class BaseTaskTests:
values.append((2, exc))
return values
res = loop.run_until_complete(self.new_task(loop, foo()))
self.assertEqual(len(res), 2, res)
self.assertEqual(res[0], (1, 'a'))
self.assertEqual(res[1][0], 2)
self.assertIsInstance(res[1][1], asyncio.TimeoutError)
self.assertAlmostEqual(0.12, loop.time())
async def try_async_iterator():
values = []
try:
async for f in asyncio.as_completed([a, b], timeout=0.12):
v = await f
values.append((1, v))
loop.advance_time(0.02)
except asyncio.TimeoutError as exc:
values.append((2, exc))
return values
# move forward to close generator
loop.advance_time(10)
loop.run_until_complete(asyncio.wait([a, b]))
for foo in try_iterator, try_async_iterator:
with self.subTest(method=foo.__name__):
loop = self.new_test_loop(gen)
a = loop.create_task(asyncio.sleep(0.1, 'a'))
b = loop.create_task(asyncio.sleep(0.15, 'b'))
res = loop.run_until_complete(self.new_task(loop, foo()))
self.assertEqual(len(res), 2, res)
self.assertEqual(res[0], (1, 'a'))
self.assertEqual(res[1][0], 2)
self.assertIsInstance(res[1][1], asyncio.TimeoutError)
self.assertAlmostEqual(0.12, loop.time())
# move forward to close generator
loop.advance_time(10)
loop.run_until_complete(asyncio.wait([a, b]))
def test_as_completed_with_unused_timeout(self):
@ -1483,19 +1547,75 @@ class BaseTaskTests:
yield 0
yield 0.01
loop = self.new_test_loop(gen)
a = asyncio.sleep(0.01, 'a')
async def foo():
async def try_iterator():
for f in asyncio.as_completed([a], timeout=1):
v = await f
self.assertEqual(v, 'a')
loop.run_until_complete(self.new_task(loop, foo()))
async def try_async_iterator():
async for f in asyncio.as_completed([a], timeout=1):
v = await f
self.assertEqual(v, 'a')
for foo in try_iterator, try_async_iterator:
with self.subTest(method=foo.__name__):
a = asyncio.sleep(0.01, 'a')
loop = self.new_test_loop(gen)
loop.run_until_complete(self.new_task(loop, foo()))
loop.close()
def test_as_completed_resume_iterator(self):
# Test that as_completed returns an iterator that can be resumed
# the next time iteration is performed (i.e. if __iter__ is called
# again)
async def try_iterator(awaitables):
iterations = 0
iterator = asyncio.as_completed(awaitables)
collected = []
for f in iterator:
collected.append(await f)
iterations += 1
if iterations == 2:
break
self.assertEqual(len(collected), 2)
# Resume same iterator:
for f in iterator:
collected.append(await f)
return collected
async def try_async_iterator(awaitables):
iterations = 0
iterator = asyncio.as_completed(awaitables)
collected = []
async for f in iterator:
collected.append(await f)
iterations += 1
if iterations == 2:
break
self.assertEqual(len(collected), 2)
# Resume same iterator:
async for f in iterator:
collected.append(await f)
return collected
async def coro(i):
return i
with contextlib.closing(asyncio.new_event_loop()) as loop:
for foo in try_iterator, try_async_iterator:
with self.subTest(method=foo.__name__):
results = loop.run_until_complete(
foo((coro(0), coro(1), coro(2), coro(3)))
)
self.assertCountEqual(results, (0, 1, 2, 3))
def test_as_completed_reverse_wait(self):
# Tests the plain iterator style of as_completed iteration to
# ensure that the first future awaited resolves to the first
# completed awaitable from the set we passed in, even if it wasn't
# the first future generated by as_completed.
def gen():
yield 0
yield 0.05
@ -1522,7 +1642,8 @@ class BaseTaskTests:
loop.run_until_complete(test())
def test_as_completed_concurrent(self):
# Ensure that more than one future or coroutine yielded from
# as_completed can be awaited concurrently.
def gen():
when = yield
self.assertAlmostEqual(0.05, when)
@ -1530,38 +1651,55 @@ class BaseTaskTests:
self.assertAlmostEqual(0.05, when)
yield 0.05
a = asyncio.sleep(0.05, 'a')
b = asyncio.sleep(0.05, 'b')
fs = {a, b}
async def try_iterator(fs):
return list(asyncio.as_completed(fs))
async def test():
futs = list(asyncio.as_completed(fs))
self.assertEqual(len(futs), 2)
done, pending = await asyncio.wait(
[asyncio.ensure_future(fut) for fut in futs]
)
self.assertEqual(set(f.result() for f in done), {'a', 'b'})
async def try_async_iterator(fs):
return [f async for f in asyncio.as_completed(fs)]
loop = self.new_test_loop(gen)
loop.run_until_complete(test())
for runner in try_iterator, try_async_iterator:
with self.subTest(method=runner.__name__):
a = asyncio.sleep(0.05, 'a')
b = asyncio.sleep(0.05, 'b')
fs = {a, b}
async def test():
futs = await runner(fs)
self.assertEqual(len(futs), 2)
done, pending = await asyncio.wait(
[asyncio.ensure_future(fut) for fut in futs]
)
self.assertEqual(set(f.result() for f in done), {'a', 'b'})
loop = self.new_test_loop(gen)
loop.run_until_complete(test())
def test_as_completed_duplicate_coroutines(self):
async def coro(s):
return s
async def runner():
async def try_iterator():
result = []
c = coro('ham')
for f in asyncio.as_completed([c, c, coro('spam')]):
result.append(await f)
return result
fut = self.new_task(self.loop, runner())
self.loop.run_until_complete(fut)
result = fut.result()
self.assertEqual(set(result), {'ham', 'spam'})
self.assertEqual(len(result), 2)
async def try_async_iterator():
result = []
c = coro('ham')
async for f in asyncio.as_completed([c, c, coro('spam')]):
result.append(await f)
return result
for runner in try_iterator, try_async_iterator:
with self.subTest(method=runner.__name__):
fut = self.new_task(self.loop, runner())
self.loop.run_until_complete(fut)
result = fut.result()
self.assertEqual(set(result), {'ham', 'spam'})
self.assertEqual(len(result), 2)
def test_as_completed_coroutine_without_loop(self):
async def coro():
@ -1570,8 +1708,8 @@ class BaseTaskTests:
a = coro()
self.addCleanup(a.close)
futs = asyncio.as_completed([a])
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
futs = asyncio.as_completed([a])
list(futs)
def test_as_completed_coroutine_use_running_loop(self):
@ -2044,14 +2182,32 @@ class BaseTaskTests:
self.assertEqual(res, 42)
def test_as_completed_invalid_args(self):
fut = self.new_future(self.loop)
# as_completed() expects a list of futures, not a future instance
self.assertRaises(TypeError, self.loop.run_until_complete,
asyncio.as_completed(fut))
# TypeError should be raised either on iterator construction or first
# iteration
# Plain iterator
fut = self.new_future(self.loop)
with self.assertRaises(TypeError):
iterator = asyncio.as_completed(fut)
next(iterator)
coro = coroutine_function()
self.assertRaises(TypeError, self.loop.run_until_complete,
asyncio.as_completed(coro))
with self.assertRaises(TypeError):
iterator = asyncio.as_completed(coro)
next(iterator)
coro.close()
# Async iterator
async def try_async_iterator(aw):
async for f in asyncio.as_completed(aw):
break
fut = self.new_future(self.loop)
with self.assertRaises(TypeError):
self.loop.run_until_complete(try_async_iterator(fut))
coro = coroutine_function()
with self.assertRaises(TypeError):
self.loop.run_until_complete(try_async_iterator(coro))
coro.close()
def test_wait_invalid_args(self):

View File

@ -0,0 +1,5 @@
:func:`asyncio.as_completed` now returns an object that is both an asynchronous
iterator and plain iterator. The new asynchronous iteration pattern allows for
easier correlation between prior tasks and their completed results. This is
a closer match to :func:`concurrent.futures.as_completed`'s iteration pattern.
Patch by Justin Arthur.