diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 81a125f44d7..b7ee758d640 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -463,7 +463,11 @@ def _wait(fs, timeout, return_when, loop): # This is *not* a @coroutine! It is just an iterator (yielding Futures). def as_completed(fs, *, loop=None, timeout=None): - """Return an iterator whose values, when waited for, are Futures. + """Return an iterator whose values are coroutines. + + When waiting for the yielded coroutines you'll get the results (or + exceptions!) of the original Futures (or coroutines), in the order + in which and as soon as they complete. This differs from PEP 3148; the proper way to use this is: @@ -471,8 +475,8 @@ def as_completed(fs, *, loop=None, timeout=None): result = yield from f # The 'yield from' may raise. # Use result. - Raises TimeoutError if the timeout occurs before all Futures are - done. + If a timeout is specified, the 'yield from' will raise + TimeoutError when the timeout occurs before all Futures are done. Note: The futures 'f' are not necessarily members of fs. """ @@ -481,27 +485,36 @@ def as_completed(fs, *, loop=None, timeout=None): loop = loop if loop is not None else events.get_event_loop() deadline = None if timeout is None else loop.time() + timeout todo = {async(f, loop=loop) for f in set(fs)} - completed = collections.deque() + from .queues import Queue # Import here to avoid circular import problem. + done = Queue(loop=loop) + timeout_handle = None + + def _on_timeout(): + for f in todo: + f.remove_done_callback(_on_completion) + done.put_nowait(None) # Queue a dummy value for _wait_for_one(). + todo.clear() # Can't do todo.remove(f) in the loop. + + def _on_completion(f): + if not todo: + return # _on_timeout() was here first. + todo.remove(f) + done.put_nowait(f) + if not todo and timeout_handle is not None: + timeout_handle.cancel() @coroutine def _wait_for_one(): - while not completed: - timeout = None - if deadline is not None: - timeout = deadline - loop.time() - if timeout < 0: - raise futures.TimeoutError() - done, pending = yield from _wait( - todo, timeout, FIRST_COMPLETED, loop) - # Multiple callers might be waiting for the same events - # and getting the same outcome. Dedupe by updating todo. - for f in done: - if f in todo: - todo.remove(f) - completed.append(f) - f = completed.popleft() - return f.result() # May raise. + f = yield from done.get() + if f is None: + # Dummy value from _on_timeout(). + raise futures.TimeoutError + return f.result() # May raise f.exception(). + for f in todo: + f.add_done_callback(_on_completion) + if todo and timeout is not None: + timeout_handle = loop.call_later(timeout, _on_timeout) for _ in range(len(todo)): yield _wait_for_one() diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 6847de04712..024dd2ead51 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -779,7 +779,6 @@ class TaskTests(unittest.TestCase): yield 0 yield 0 yield 0.1 - yield 0.02 loop = test_utils.TestLoop(gen) self.addCleanup(loop.close) @@ -791,6 +790,8 @@ class TaskTests(unittest.TestCase): def foo(): values = [] for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): + if values: + loop.advance_time(0.02) try: v = yield from f values.append((1, v)) @@ -809,6 +810,26 @@ class TaskTests(unittest.TestCase): loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) + def test_as_completed_with_unused_timeout(self): + + def gen(): + yield + yield 0 + yield 0.01 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = asyncio.sleep(0.01, 'a', loop=loop) + + @asyncio.coroutine + def foo(): + for f in asyncio.as_completed([a], timeout=1, loop=loop): + v = yield from f + self.assertEqual(v, 'a') + + res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + def test_as_completed_reverse_wait(self): def gen():