asyncio: Change as_completed() to use a Queue, to avoid O(N**2) behavior. Fixes issue #20566.

This commit is contained in:
Guido van Rossum 2014-02-12 17:58:19 -08:00
parent b13177885f
commit 2303fecedc
2 changed files with 55 additions and 21 deletions

View File

@ -463,7 +463,11 @@ def _wait(fs, timeout, return_when, loop):
# This is *not* a @coroutine! It is just an iterator (yielding Futures). # This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None): def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values, when waited for, are Futures. """Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is: This differs from PEP 3148; the proper way to use this is:
@ -471,8 +475,8 @@ def as_completed(fs, *, loop=None, timeout=None):
result = yield from f # The 'yield from' may raise. result = yield from f # The 'yield from' may raise.
# Use result. # Use result.
Raises TimeoutError if the timeout occurs before all Futures are If a timeout is specified, the 'yield from' will raise
done. TimeoutError when the timeout occurs before all Futures are done.
Note: The futures 'f' are not necessarily members of fs. Note: The futures 'f' are not necessarily members of fs.
""" """
@ -481,27 +485,36 @@ def as_completed(fs, *, loop=None, timeout=None):
loop = loop if loop is not None else events.get_event_loop() loop = loop if loop is not None else events.get_event_loop()
deadline = None if timeout is None else loop.time() + timeout deadline = None if timeout is None else loop.time() + timeout
todo = {async(f, loop=loop) for f in set(fs)} todo = {async(f, loop=loop) for f in set(fs)}
completed = collections.deque() from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop)
timeout_handle = None
def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
@coroutine @coroutine
def _wait_for_one(): def _wait_for_one():
while not completed: f = yield from done.get()
timeout = None if f is None:
if deadline is not None: # Dummy value from _on_timeout().
timeout = deadline - loop.time() raise futures.TimeoutError
if timeout < 0: return f.result() # May raise f.exception().
raise futures.TimeoutError()
done, pending = yield from _wait(
todo, timeout, FIRST_COMPLETED, loop)
# Multiple callers might be waiting for the same events
# and getting the same outcome. Dedupe by updating todo.
for f in done:
if f in todo:
todo.remove(f)
completed.append(f)
f = completed.popleft()
return f.result() # May raise.
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)): for _ in range(len(todo)):
yield _wait_for_one() yield _wait_for_one()

View File

@ -779,7 +779,6 @@ class TaskTests(unittest.TestCase):
yield 0 yield 0
yield 0 yield 0
yield 0.1 yield 0.1
yield 0.02
loop = test_utils.TestLoop(gen) loop = test_utils.TestLoop(gen)
self.addCleanup(loop.close) self.addCleanup(loop.close)
@ -791,6 +790,8 @@ class TaskTests(unittest.TestCase):
def foo(): def foo():
values = [] values = []
for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
if values:
loop.advance_time(0.02)
try: try:
v = yield from f v = yield from f
values.append((1, v)) values.append((1, v))
@ -809,6 +810,26 @@ class TaskTests(unittest.TestCase):
loop.advance_time(10) loop.advance_time(10)
loop.run_until_complete(asyncio.wait([a, b], loop=loop)) loop.run_until_complete(asyncio.wait([a, b], loop=loop))
def test_as_completed_with_unused_timeout(self):
def gen():
yield
yield 0
yield 0.01
loop = test_utils.TestLoop(gen)
self.addCleanup(loop.close)
a = asyncio.sleep(0.01, 'a', loop=loop)
@asyncio.coroutine
def foo():
for f in asyncio.as_completed([a], timeout=1, loop=loop):
v = yield from f
self.assertEqual(v, 'a')
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
def test_as_completed_reverse_wait(self): def test_as_completed_reverse_wait(self):
def gen(): def gen():