mirror of https://github.com/python/cpython
2221 lines
70 KiB
Python
2221 lines
70 KiB
Python
"""Tests for tasks.py."""
|
|
|
|
import contextlib
|
|
import functools
|
|
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
import types
|
|
import unittest
|
|
import weakref
|
|
from unittest import mock
|
|
|
|
import asyncio
|
|
from asyncio import coroutines
|
|
from asyncio import test_utils
|
|
try:
|
|
from test import support
|
|
except ImportError:
|
|
from asyncio import test_support as support
|
|
try:
|
|
from test.support.script_helper import assert_python_ok
|
|
except ImportError:
|
|
try:
|
|
from test.script_helper import assert_python_ok
|
|
except ImportError:
|
|
from asyncio.test_support import assert_python_ok
|
|
|
|
|
|
PY34 = (sys.version_info >= (3, 4))
|
|
PY35 = (sys.version_info >= (3, 5))
|
|
|
|
|
|
@asyncio.coroutine
|
|
def coroutine_function():
|
|
pass
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def set_coroutine_debug(enabled):
|
|
coroutines = asyncio.coroutines
|
|
|
|
old_debug = coroutines._DEBUG
|
|
try:
|
|
coroutines._DEBUG = enabled
|
|
yield
|
|
finally:
|
|
coroutines._DEBUG = old_debug
|
|
|
|
|
|
|
|
def format_coroutine(qualname, state, src, source_traceback, generator=False):
|
|
if generator:
|
|
state = '%s' % state
|
|
else:
|
|
state = '%s, defined' % state
|
|
if source_traceback is not None:
|
|
frame = source_traceback[-1]
|
|
return ('coro=<%s() %s at %s> created at %s:%s'
|
|
% (qualname, state, src, frame[0], frame[1]))
|
|
else:
|
|
return 'coro=<%s() %s at %s>' % (qualname, state, src)
|
|
|
|
|
|
class Dummy:
|
|
|
|
def __repr__(self):
|
|
return '<Dummy>'
|
|
|
|
def __call__(self, *args):
|
|
pass
|
|
|
|
|
|
class TaskTests(test_utils.TestCase):
|
|
|
|
def setUp(self):
|
|
self.loop = self.new_test_loop()
|
|
|
|
def test_task_class(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ok'
|
|
t = asyncio.Task(notmuch(), loop=self.loop)
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'ok')
|
|
self.assertIs(t._loop, self.loop)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
self.set_event_loop(loop)
|
|
t = asyncio.Task(notmuch(), loop=loop)
|
|
self.assertIs(t._loop, loop)
|
|
loop.run_until_complete(t)
|
|
loop.close()
|
|
|
|
def test_ensure_future_coroutine(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ok'
|
|
t = asyncio.ensure_future(notmuch(), loop=self.loop)
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'ok')
|
|
self.assertIs(t._loop, self.loop)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
self.set_event_loop(loop)
|
|
t = asyncio.ensure_future(notmuch(), loop=loop)
|
|
self.assertIs(t._loop, loop)
|
|
loop.run_until_complete(t)
|
|
loop.close()
|
|
|
|
def test_ensure_future_future(self):
|
|
f_orig = asyncio.Future(loop=self.loop)
|
|
f_orig.set_result('ko')
|
|
|
|
f = asyncio.ensure_future(f_orig)
|
|
self.loop.run_until_complete(f)
|
|
self.assertTrue(f.done())
|
|
self.assertEqual(f.result(), 'ko')
|
|
self.assertIs(f, f_orig)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
self.set_event_loop(loop)
|
|
|
|
with self.assertRaises(ValueError):
|
|
f = asyncio.ensure_future(f_orig, loop=loop)
|
|
|
|
loop.close()
|
|
|
|
f = asyncio.ensure_future(f_orig, loop=self.loop)
|
|
self.assertIs(f, f_orig)
|
|
|
|
def test_ensure_future_task(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ok'
|
|
t_orig = asyncio.Task(notmuch(), loop=self.loop)
|
|
t = asyncio.ensure_future(t_orig)
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'ok')
|
|
self.assertIs(t, t_orig)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
self.set_event_loop(loop)
|
|
|
|
with self.assertRaises(ValueError):
|
|
t = asyncio.ensure_future(t_orig, loop=loop)
|
|
|
|
loop.close()
|
|
|
|
t = asyncio.ensure_future(t_orig, loop=self.loop)
|
|
self.assertIs(t, t_orig)
|
|
|
|
@unittest.skipUnless(PY35, 'need python 3.5 or later')
|
|
def test_ensure_future_awaitable(self):
|
|
class Aw:
|
|
def __init__(self, coro):
|
|
self.coro = coro
|
|
def __await__(self):
|
|
return (yield from self.coro)
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
return 'ok'
|
|
|
|
loop = asyncio.new_event_loop()
|
|
self.set_event_loop(loop)
|
|
fut = asyncio.ensure_future(Aw(coro()), loop=loop)
|
|
loop.run_until_complete(fut)
|
|
assert fut.result() == 'ok'
|
|
|
|
def test_ensure_future_neither(self):
|
|
with self.assertRaises(TypeError):
|
|
asyncio.ensure_future('ok')
|
|
|
|
def test_async_warning(self):
|
|
f = asyncio.Future(loop=self.loop)
|
|
with self.assertWarnsRegex(DeprecationWarning,
|
|
'function is deprecated, use ensure_'):
|
|
self.assertIs(f, asyncio.async(f))
|
|
|
|
def test_get_stack(self):
|
|
T = None
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
yield from bar()
|
|
|
|
@asyncio.coroutine
|
|
def bar():
|
|
# test get_stack()
|
|
f = T.get_stack(limit=1)
|
|
try:
|
|
self.assertEqual(f[0].f_code.co_name, 'foo')
|
|
finally:
|
|
f = None
|
|
|
|
# test print_stack()
|
|
file = io.StringIO()
|
|
T.print_stack(limit=1, file=file)
|
|
file.seek(0)
|
|
tb = file.read()
|
|
self.assertRegex(tb, r'foo\(\) running')
|
|
|
|
@asyncio.coroutine
|
|
def runner():
|
|
nonlocal T
|
|
T = asyncio.ensure_future(foo(), loop=self.loop)
|
|
yield from T
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
def test_task_repr(self):
|
|
self.loop.set_debug(False)
|
|
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
yield from []
|
|
return 'abc'
|
|
|
|
# test coroutine function
|
|
self.assertEqual(notmuch.__name__, 'notmuch')
|
|
if PY35:
|
|
self.assertEqual(notmuch.__qualname__,
|
|
'TaskTests.test_task_repr.<locals>.notmuch')
|
|
self.assertEqual(notmuch.__module__, __name__)
|
|
|
|
filename, lineno = test_utils.get_function_source(notmuch)
|
|
src = "%s:%s" % (filename, lineno)
|
|
|
|
# test coroutine object
|
|
gen = notmuch()
|
|
if coroutines._DEBUG or PY35:
|
|
coro_qualname = 'TaskTests.test_task_repr.<locals>.notmuch'
|
|
else:
|
|
coro_qualname = 'notmuch'
|
|
self.assertEqual(gen.__name__, 'notmuch')
|
|
if PY35:
|
|
self.assertEqual(gen.__qualname__,
|
|
coro_qualname)
|
|
|
|
# test pending Task
|
|
t = asyncio.Task(gen, loop=self.loop)
|
|
t.add_done_callback(Dummy())
|
|
|
|
coro = format_coroutine(coro_qualname, 'running', src,
|
|
t._source_traceback, generator=True)
|
|
self.assertEqual(repr(t),
|
|
'<Task pending %s cb=[<Dummy>()]>' % coro)
|
|
|
|
# test cancelling Task
|
|
t.cancel() # Does not take immediate effect!
|
|
self.assertEqual(repr(t),
|
|
'<Task cancelling %s cb=[<Dummy>()]>' % coro)
|
|
|
|
# test cancelled Task
|
|
self.assertRaises(asyncio.CancelledError,
|
|
self.loop.run_until_complete, t)
|
|
coro = format_coroutine(coro_qualname, 'done', src,
|
|
t._source_traceback)
|
|
self.assertEqual(repr(t),
|
|
'<Task cancelled %s>' % coro)
|
|
|
|
# test finished Task
|
|
t = asyncio.Task(notmuch(), loop=self.loop)
|
|
self.loop.run_until_complete(t)
|
|
coro = format_coroutine(coro_qualname, 'done', src,
|
|
t._source_traceback)
|
|
self.assertEqual(repr(t),
|
|
"<Task finished %s result='abc'>" % coro)
|
|
|
|
def test_task_repr_coro_decorator(self):
|
|
self.loop.set_debug(False)
|
|
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
# notmuch() function doesn't use yield from: it will be wrapped by
|
|
# @coroutine decorator
|
|
return 123
|
|
|
|
# test coroutine function
|
|
self.assertEqual(notmuch.__name__, 'notmuch')
|
|
if PY35:
|
|
self.assertEqual(notmuch.__qualname__,
|
|
'TaskTests.test_task_repr_coro_decorator'
|
|
'.<locals>.notmuch')
|
|
self.assertEqual(notmuch.__module__, __name__)
|
|
|
|
# test coroutine object
|
|
gen = notmuch()
|
|
if coroutines._DEBUG or PY35:
|
|
# On Python >= 3.5, generators now inherit the name of the
|
|
# function, as expected, and have a qualified name (__qualname__
|
|
# attribute).
|
|
coro_name = 'notmuch'
|
|
coro_qualname = ('TaskTests.test_task_repr_coro_decorator'
|
|
'.<locals>.notmuch')
|
|
else:
|
|
# On Python < 3.5, generators inherit the name of the code, not of
|
|
# the function. See: http://bugs.python.org/issue21205
|
|
coro_name = coro_qualname = 'coro'
|
|
self.assertEqual(gen.__name__, coro_name)
|
|
if PY35:
|
|
self.assertEqual(gen.__qualname__, coro_qualname)
|
|
|
|
# test repr(CoroWrapper)
|
|
if coroutines._DEBUG:
|
|
# format the coroutine object
|
|
if coroutines._DEBUG:
|
|
filename, lineno = test_utils.get_function_source(notmuch)
|
|
frame = gen._source_traceback[-1]
|
|
coro = ('%s() running, defined at %s:%s, created at %s:%s'
|
|
% (coro_qualname, filename, lineno,
|
|
frame[0], frame[1]))
|
|
else:
|
|
code = gen.gi_code
|
|
coro = ('%s() running at %s:%s'
|
|
% (coro_qualname, code.co_filename,
|
|
code.co_firstlineno))
|
|
|
|
self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
|
|
|
|
# test pending Task
|
|
t = asyncio.Task(gen, loop=self.loop)
|
|
t.add_done_callback(Dummy())
|
|
|
|
# format the coroutine object
|
|
if coroutines._DEBUG:
|
|
src = '%s:%s' % test_utils.get_function_source(notmuch)
|
|
else:
|
|
code = gen.gi_code
|
|
src = '%s:%s' % (code.co_filename, code.co_firstlineno)
|
|
coro = format_coroutine(coro_qualname, 'running', src,
|
|
t._source_traceback,
|
|
generator=not coroutines._DEBUG)
|
|
self.assertEqual(repr(t),
|
|
'<Task pending %s cb=[<Dummy>()]>' % coro)
|
|
self.loop.run_until_complete(t)
|
|
|
|
def test_task_repr_wait_for(self):
|
|
self.loop.set_debug(False)
|
|
|
|
@asyncio.coroutine
|
|
def wait_for(fut):
|
|
return (yield from fut)
|
|
|
|
fut = asyncio.Future(loop=self.loop)
|
|
task = asyncio.Task(wait_for(fut), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertRegex(repr(task),
|
|
'<Task .* wait_for=%s>' % re.escape(repr(fut)))
|
|
|
|
fut.set_result(None)
|
|
self.loop.run_until_complete(task)
|
|
|
|
def test_task_repr_partial_corowrapper(self):
|
|
# Issue #222: repr(CoroWrapper) must not fail in debug mode if the
|
|
# coroutine is a partial function
|
|
with set_coroutine_debug(True):
|
|
self.loop.set_debug(True)
|
|
|
|
@asyncio.coroutine
|
|
def func(x, y):
|
|
yield from asyncio.sleep(0)
|
|
|
|
partial_func = asyncio.coroutine(functools.partial(func, 1))
|
|
task = self.loop.create_task(partial_func(2))
|
|
|
|
# make warnings quiet
|
|
task._log_destroy_pending = False
|
|
self.addCleanup(task._coro.close)
|
|
|
|
coro_repr = repr(task._coro)
|
|
expected = ('<CoroWrapper TaskTests.test_task_repr_partial_corowrapper'
|
|
'.<locals>.func(1)() running, ')
|
|
self.assertTrue(coro_repr.startswith(expected),
|
|
coro_repr)
|
|
|
|
def test_task_basics(self):
|
|
@asyncio.coroutine
|
|
def outer():
|
|
a = yield from inner1()
|
|
b = yield from inner2()
|
|
return a+b
|
|
|
|
@asyncio.coroutine
|
|
def inner1():
|
|
return 42
|
|
|
|
@asyncio.coroutine
|
|
def inner2():
|
|
return 1000
|
|
|
|
t = outer()
|
|
self.assertEqual(self.loop.run_until_complete(t), 1042)
|
|
|
|
def test_cancel(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from asyncio.sleep(10.0, loop=loop)
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=loop)
|
|
loop.call_soon(t.cancel)
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertTrue(t.cancelled())
|
|
self.assertFalse(t.cancel())
|
|
|
|
def test_cancel_yield(self):
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield
|
|
yield
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop) # start coro
|
|
t.cancel()
|
|
self.assertRaises(
|
|
asyncio.CancelledError, self.loop.run_until_complete, t)
|
|
self.assertTrue(t.done())
|
|
self.assertTrue(t.cancelled())
|
|
self.assertFalse(t.cancel())
|
|
|
|
def test_cancel_inner_future(self):
|
|
f = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from f
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop) # start task
|
|
f.cancel()
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(f.cancelled())
|
|
self.assertTrue(t.cancelled())
|
|
|
|
def test_cancel_both_task_and_inner_future(self):
|
|
f = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from f
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
|
|
f.cancel()
|
|
t.cancel()
|
|
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(t)
|
|
|
|
self.assertTrue(t.done())
|
|
self.assertTrue(f.cancelled())
|
|
self.assertTrue(t.cancelled())
|
|
|
|
def test_cancel_task_catching(self):
|
|
fut1 = asyncio.Future(loop=self.loop)
|
|
fut2 = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from fut1
|
|
try:
|
|
yield from fut2
|
|
except asyncio.CancelledError:
|
|
return 42
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut1) # White-box test.
|
|
fut1.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut2) # White-box test.
|
|
t.cancel()
|
|
self.assertTrue(fut2.cancelled())
|
|
res = self.loop.run_until_complete(t)
|
|
self.assertEqual(res, 42)
|
|
self.assertFalse(t.cancelled())
|
|
|
|
def test_cancel_task_ignoring(self):
|
|
fut1 = asyncio.Future(loop=self.loop)
|
|
fut2 = asyncio.Future(loop=self.loop)
|
|
fut3 = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from fut1
|
|
try:
|
|
yield from fut2
|
|
except asyncio.CancelledError:
|
|
pass
|
|
res = yield from fut3
|
|
return res
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut1) # White-box test.
|
|
fut1.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut2) # White-box test.
|
|
t.cancel()
|
|
self.assertTrue(fut2.cancelled())
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut3) # White-box test.
|
|
fut3.set_result(42)
|
|
res = self.loop.run_until_complete(t)
|
|
self.assertEqual(res, 42)
|
|
self.assertFalse(fut3.cancelled())
|
|
self.assertFalse(t.cancelled())
|
|
|
|
def test_cancel_current_task(self):
|
|
loop = asyncio.new_event_loop()
|
|
self.set_event_loop(loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
t.cancel()
|
|
self.assertTrue(t._must_cancel) # White-box test.
|
|
# The sleep should be cancelled immediately.
|
|
yield from asyncio.sleep(100, loop=loop)
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=loop)
|
|
self.assertRaises(
|
|
asyncio.CancelledError, loop.run_until_complete, t)
|
|
self.assertTrue(t.done())
|
|
self.assertFalse(t._must_cancel) # White-box test.
|
|
self.assertFalse(t.cancel())
|
|
|
|
def test_stop_while_run_in_complete(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0.1
|
|
self.assertAlmostEqual(0.2, when)
|
|
when = yield 0.1
|
|
self.assertAlmostEqual(0.3, when)
|
|
yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
x = 0
|
|
waiters = []
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
nonlocal x
|
|
while x < 10:
|
|
waiters.append(asyncio.sleep(0.1, loop=loop))
|
|
yield from waiters[-1]
|
|
x += 1
|
|
if x == 2:
|
|
loop.stop()
|
|
|
|
t = asyncio.Task(task(), loop=loop)
|
|
with self.assertRaises(RuntimeError) as cm:
|
|
loop.run_until_complete(t)
|
|
self.assertEqual(str(cm.exception),
|
|
'Event loop stopped before Future completed.')
|
|
self.assertFalse(t.done())
|
|
self.assertEqual(x, 2)
|
|
self.assertAlmostEqual(0.3, loop.time())
|
|
|
|
# close generators
|
|
for w in waiters:
|
|
w.close()
|
|
t.cancel()
|
|
self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
|
|
|
|
def test_wait_for(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.2, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
foo_running = None
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
nonlocal foo_running
|
|
foo_running = True
|
|
try:
|
|
yield from asyncio.sleep(0.2, loop=loop)
|
|
finally:
|
|
foo_running = False
|
|
return 'done'
|
|
|
|
fut = asyncio.Task(foo(), loop=loop)
|
|
|
|
with self.assertRaises(asyncio.TimeoutError):
|
|
loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop))
|
|
self.assertTrue(fut.done())
|
|
# it should have been cancelled due to the timeout
|
|
self.assertTrue(fut.cancelled())
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
self.assertEqual(foo_running, False)
|
|
|
|
def test_wait_for_blocking(self):
|
|
loop = self.new_test_loop()
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
return 'done'
|
|
|
|
res = loop.run_until_complete(asyncio.wait_for(coro(),
|
|
timeout=None,
|
|
loop=loop))
|
|
self.assertEqual(res, 'done')
|
|
|
|
def test_wait_for_with_global_loop(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.2, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.01, when)
|
|
yield 0.01
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
yield from asyncio.sleep(0.2, loop=loop)
|
|
return 'done'
|
|
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
fut = asyncio.Task(foo(), loop=loop)
|
|
with self.assertRaises(asyncio.TimeoutError):
|
|
loop.run_until_complete(asyncio.wait_for(fut, 0.01))
|
|
finally:
|
|
asyncio.set_event_loop(None)
|
|
|
|
self.assertAlmostEqual(0.01, loop.time())
|
|
self.assertTrue(fut.done())
|
|
self.assertTrue(fut.cancelled())
|
|
|
|
def test_wait_for_race_condition(self):
|
|
|
|
def gen():
|
|
yield 0.1
|
|
yield 0.1
|
|
yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
fut = asyncio.Future(loop=loop)
|
|
task = asyncio.wait_for(fut, timeout=0.2, loop=loop)
|
|
loop.call_later(0.1, fut.set_result, "ok")
|
|
res = loop.run_until_complete(task)
|
|
self.assertEqual(res, "ok")
|
|
|
|
def test_wait(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
yield 0.15
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a], loop=loop)
|
|
self.assertEqual(done, set([a, b]))
|
|
self.assertEqual(pending, set())
|
|
return 42
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertEqual(res, 42)
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
# Doing it again should take no time and exercise a different path.
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_wait_with_global_loop(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.01, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.015, when)
|
|
yield 0.015
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.01, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.015, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a])
|
|
self.assertEqual(done, set([a, b]))
|
|
self.assertEqual(pending, set())
|
|
return 42
|
|
|
|
asyncio.set_event_loop(loop)
|
|
res = loop.run_until_complete(
|
|
asyncio.Task(foo(), loop=loop))
|
|
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_wait_duplicate_coroutines(self):
|
|
@asyncio.coroutine
|
|
def coro(s):
|
|
return s
|
|
c = coro('test')
|
|
|
|
task = asyncio.Task(
|
|
asyncio.wait([c, c, coro('spam')], loop=self.loop),
|
|
loop=self.loop)
|
|
|
|
done, pending = self.loop.run_until_complete(task)
|
|
|
|
self.assertFalse(pending)
|
|
self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
|
|
|
|
def test_wait_errors(self):
|
|
self.assertRaises(
|
|
ValueError, self.loop.run_until_complete,
|
|
asyncio.wait(set(), loop=self.loop))
|
|
|
|
# -1 is an invalid return_when value
|
|
sleep_coro = asyncio.sleep(10.0, loop=self.loop)
|
|
wait_coro = asyncio.wait([sleep_coro], return_when=-1, loop=self.loop)
|
|
self.assertRaises(ValueError,
|
|
self.loop.run_until_complete, wait_coro)
|
|
|
|
sleep_coro.close()
|
|
|
|
def test_wait_first_completed(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.1, when)
|
|
yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
task = asyncio.Task(
|
|
asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED,
|
|
loop=loop),
|
|
loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(task)
|
|
self.assertEqual({b}, done)
|
|
self.assertEqual({a}, pending)
|
|
self.assertFalse(a.done())
|
|
self.assertTrue(b.done())
|
|
self.assertIsNone(b.result())
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_really_done(self):
|
|
# there is possibility that some tasks in the pending list
|
|
# became done but their callbacks haven't all been called yet
|
|
|
|
@asyncio.coroutine
|
|
def coro1():
|
|
yield
|
|
|
|
@asyncio.coroutine
|
|
def coro2():
|
|
yield
|
|
yield
|
|
|
|
a = asyncio.Task(coro1(), loop=self.loop)
|
|
b = asyncio.Task(coro2(), loop=self.loop)
|
|
task = asyncio.Task(
|
|
asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED,
|
|
loop=self.loop),
|
|
loop=self.loop)
|
|
|
|
done, pending = self.loop.run_until_complete(task)
|
|
self.assertEqual({a, b}, done)
|
|
self.assertTrue(a.done())
|
|
self.assertIsNone(a.result())
|
|
self.assertTrue(b.done())
|
|
self.assertIsNone(b.result())
|
|
|
|
def test_wait_first_exception(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
# first_exception, task already has exception
|
|
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def exc():
|
|
raise ZeroDivisionError('err')
|
|
|
|
b = asyncio.Task(exc(), loop=loop)
|
|
task = asyncio.Task(
|
|
asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION,
|
|
loop=loop),
|
|
loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(task)
|
|
self.assertEqual({b}, done)
|
|
self.assertEqual({a}, pending)
|
|
self.assertAlmostEqual(0, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_first_exception_in_wait(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.01, when)
|
|
yield 0.01
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
# first_exception, exception during waiting
|
|
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def exc():
|
|
yield from asyncio.sleep(0.01, loop=loop)
|
|
raise ZeroDivisionError('err')
|
|
|
|
b = asyncio.Task(exc(), loop=loop)
|
|
task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION,
|
|
loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(task)
|
|
self.assertEqual({b}, done)
|
|
self.assertEqual({a}, pending)
|
|
self.assertAlmostEqual(0.01, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_with_exception(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
yield 0.15
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def sleeper():
|
|
yield from asyncio.sleep(0.15, loop=loop)
|
|
raise ZeroDivisionError('really')
|
|
|
|
b = asyncio.Task(sleeper(), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a], loop=loop)
|
|
self.assertEqual(len(done), 2)
|
|
self.assertEqual(pending, set())
|
|
errors = set(f for f in done if f.exception() is not None)
|
|
self.assertEqual(len(errors), 1)
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
def test_wait_with_timeout(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.11, when)
|
|
yield 0.11
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a], timeout=0.11,
|
|
loop=loop)
|
|
self.assertEqual(done, set([a]))
|
|
self.assertEqual(pending, set([b]))
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.11, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_concurrent_complete(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.1, when)
|
|
yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(
|
|
asyncio.wait([b, a], timeout=0.1, loop=loop))
|
|
|
|
self.assertEqual(done, set([a]))
|
|
self.assertEqual(pending, set([b]))
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_as_completed(self):
|
|
|
|
def gen():
|
|
yield 0
|
|
yield 0
|
|
yield 0.01
|
|
yield 0
|
|
|
|
loop = self.new_test_loop(gen)
|
|
# disable "slow callback" warning
|
|
loop.slow_callback_duration = 1.0
|
|
completed = set()
|
|
time_shifted = False
|
|
|
|
@asyncio.coroutine
|
|
def sleeper(dt, x):
|
|
nonlocal time_shifted
|
|
yield from asyncio.sleep(dt, loop=loop)
|
|
completed.add(x)
|
|
if not time_shifted and 'a' in completed and 'b' in completed:
|
|
time_shifted = True
|
|
loop.advance_time(0.14)
|
|
return x
|
|
|
|
a = sleeper(0.01, 'a')
|
|
b = sleeper(0.01, 'b')
|
|
c = sleeper(0.15, 'c')
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
values = []
|
|
for f in asyncio.as_completed([b, c, a], loop=loop):
|
|
values.append((yield from f))
|
|
return values
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
self.assertTrue('a' in res[:2])
|
|
self.assertTrue('b' in res[:2])
|
|
self.assertEqual(res[2], 'c')
|
|
|
|
# Doing it again should take no time and exercise a different path.
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
def test_as_completed_with_timeout(self):
|
|
|
|
def gen():
|
|
yield
|
|
yield 0
|
|
yield 0
|
|
yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.sleep(0.1, 'a', loop=loop)
|
|
b = asyncio.sleep(0.15, 'b', loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
values = []
|
|
for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
|
|
if values:
|
|
loop.advance_time(0.02)
|
|
try:
|
|
v = yield from f
|
|
values.append((1, v))
|
|
except asyncio.TimeoutError as exc:
|
|
values.append((2, exc))
|
|
return values
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertEqual(len(res), 2, res)
|
|
self.assertEqual(res[0], (1, 'a'))
|
|
self.assertEqual(res[1][0], 2)
|
|
self.assertIsInstance(res[1][1], asyncio.TimeoutError)
|
|
self.assertAlmostEqual(0.12, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_as_completed_with_unused_timeout(self):
|
|
|
|
def gen():
|
|
yield
|
|
yield 0
|
|
yield 0.01
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.sleep(0.01, 'a', loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
for f in asyncio.as_completed([a], timeout=1, loop=loop):
|
|
v = yield from f
|
|
self.assertEqual(v, 'a')
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
|
|
def test_as_completed_reverse_wait(self):
|
|
|
|
def gen():
|
|
yield 0
|
|
yield 0.05
|
|
yield 0
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.sleep(0.05, 'a', loop=loop)
|
|
b = asyncio.sleep(0.10, 'b', loop=loop)
|
|
fs = {a, b}
|
|
futs = list(asyncio.as_completed(fs, loop=loop))
|
|
self.assertEqual(len(futs), 2)
|
|
|
|
x = loop.run_until_complete(futs[1])
|
|
self.assertEqual(x, 'a')
|
|
self.assertAlmostEqual(0.05, loop.time())
|
|
loop.advance_time(0.05)
|
|
y = loop.run_until_complete(futs[0])
|
|
self.assertEqual(y, 'b')
|
|
self.assertAlmostEqual(0.10, loop.time())
|
|
|
|
def test_as_completed_concurrent(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.05, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.05, when)
|
|
yield 0.05
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
a = asyncio.sleep(0.05, 'a', loop=loop)
|
|
b = asyncio.sleep(0.05, 'b', loop=loop)
|
|
fs = {a, b}
|
|
futs = list(asyncio.as_completed(fs, loop=loop))
|
|
self.assertEqual(len(futs), 2)
|
|
waiter = asyncio.wait(futs, loop=loop)
|
|
done, pending = loop.run_until_complete(waiter)
|
|
self.assertEqual(set(f.result() for f in done), {'a', 'b'})
|
|
|
|
def test_as_completed_duplicate_coroutines(self):
|
|
|
|
@asyncio.coroutine
|
|
def coro(s):
|
|
return s
|
|
|
|
@asyncio.coroutine
|
|
def runner():
|
|
result = []
|
|
c = coro('ham')
|
|
for f in asyncio.as_completed([c, c, coro('spam')],
|
|
loop=self.loop):
|
|
result.append((yield from f))
|
|
return result
|
|
|
|
fut = asyncio.Task(runner(), loop=self.loop)
|
|
self.loop.run_until_complete(fut)
|
|
result = fut.result()
|
|
self.assertEqual(set(result), {'ham', 'spam'})
|
|
self.assertEqual(len(result), 2)
|
|
|
|
def test_sleep(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.05, when)
|
|
when = yield 0.05
|
|
self.assertAlmostEqual(0.1, when)
|
|
yield 0.05
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
@asyncio.coroutine
|
|
def sleeper(dt, arg):
|
|
yield from asyncio.sleep(dt/2, loop=loop)
|
|
res = yield from asyncio.sleep(dt/2, arg, loop=loop)
|
|
return res
|
|
|
|
t = asyncio.Task(sleeper(0.1, 'yeah'), loop=loop)
|
|
loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'yeah')
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
def test_sleep_cancel(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
t = asyncio.Task(asyncio.sleep(10.0, 'yeah', loop=loop),
|
|
loop=loop)
|
|
|
|
handle = None
|
|
orig_call_later = loop.call_later
|
|
|
|
def call_later(delay, callback, *args):
|
|
nonlocal handle
|
|
handle = orig_call_later(delay, callback, *args)
|
|
return handle
|
|
|
|
loop.call_later = call_later
|
|
test_utils.run_briefly(loop)
|
|
|
|
self.assertFalse(handle._cancelled)
|
|
|
|
t.cancel()
|
|
test_utils.run_briefly(loop)
|
|
self.assertTrue(handle._cancelled)
|
|
|
|
def test_task_cancel_sleeping_task(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(5000, when)
|
|
yield 0.1
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
@asyncio.coroutine
|
|
def sleep(dt):
|
|
yield from asyncio.sleep(dt, loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def doit():
|
|
sleeper = asyncio.Task(sleep(5000), loop=loop)
|
|
loop.call_later(0.1, sleeper.cancel)
|
|
try:
|
|
yield from sleeper
|
|
except asyncio.CancelledError:
|
|
return 'cancelled'
|
|
else:
|
|
return 'slept in'
|
|
|
|
doer = doit()
|
|
self.assertEqual(loop.run_until_complete(doer), 'cancelled')
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
def test_task_cancel_waiter_future(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
yield from fut
|
|
|
|
task = asyncio.Task(coro(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(task._fut_waiter, fut)
|
|
|
|
task.cancel()
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertRaises(
|
|
asyncio.CancelledError, self.loop.run_until_complete, task)
|
|
self.assertIsNone(task._fut_waiter)
|
|
self.assertTrue(fut.cancelled())
|
|
|
|
def test_step_in_completed_task(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ko'
|
|
|
|
gen = notmuch()
|
|
task = asyncio.Task(gen, loop=self.loop)
|
|
task.set_result('ok')
|
|
|
|
self.assertRaises(AssertionError, task._step)
|
|
gen.close()
|
|
|
|
def test_step_result(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
yield None
|
|
yield 1
|
|
return 'ko'
|
|
|
|
self.assertRaises(
|
|
RuntimeError, self.loop.run_until_complete, notmuch())
|
|
|
|
def test_step_result_future(self):
|
|
# If coroutine returns future, task waits on this future.
|
|
|
|
class Fut(asyncio.Future):
|
|
def __init__(self, *args, **kwds):
|
|
self.cb_added = False
|
|
super().__init__(*args, **kwds)
|
|
|
|
def add_done_callback(self, fn):
|
|
self.cb_added = True
|
|
super().add_done_callback(fn)
|
|
|
|
fut = Fut(loop=self.loop)
|
|
result = None
|
|
|
|
@asyncio.coroutine
|
|
def wait_for_future():
|
|
nonlocal result
|
|
result = yield from fut
|
|
|
|
t = asyncio.Task(wait_for_future(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertTrue(fut.cb_added)
|
|
|
|
res = object()
|
|
fut.set_result(res)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(res, result)
|
|
self.assertTrue(t.done())
|
|
self.assertIsNone(t.result())
|
|
|
|
def test_step_with_baseexception(self):
|
|
@asyncio.coroutine
|
|
def notmutch():
|
|
raise BaseException()
|
|
|
|
task = asyncio.Task(notmutch(), loop=self.loop)
|
|
self.assertRaises(BaseException, task._step)
|
|
|
|
self.assertTrue(task.done())
|
|
self.assertIsInstance(task.exception(), BaseException)
|
|
|
|
def test_baseexception_during_cancel(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = self.new_test_loop(gen)
|
|
|
|
@asyncio.coroutine
|
|
def sleeper():
|
|
yield from asyncio.sleep(10, loop=loop)
|
|
|
|
base_exc = BaseException()
|
|
|
|
@asyncio.coroutine
|
|
def notmutch():
|
|
try:
|
|
yield from sleeper()
|
|
except asyncio.CancelledError:
|
|
raise base_exc
|
|
|
|
task = asyncio.Task(notmutch(), loop=loop)
|
|
test_utils.run_briefly(loop)
|
|
|
|
task.cancel()
|
|
self.assertFalse(task.done())
|
|
|
|
self.assertRaises(BaseException, test_utils.run_briefly, loop)
|
|
|
|
self.assertTrue(task.done())
|
|
self.assertFalse(task.cancelled())
|
|
self.assertIs(task.exception(), base_exc)
|
|
|
|
def test_iscoroutinefunction(self):
|
|
def fn():
|
|
pass
|
|
|
|
self.assertFalse(asyncio.iscoroutinefunction(fn))
|
|
|
|
def fn1():
|
|
yield
|
|
self.assertFalse(asyncio.iscoroutinefunction(fn1))
|
|
|
|
@asyncio.coroutine
|
|
def fn2():
|
|
yield
|
|
self.assertTrue(asyncio.iscoroutinefunction(fn2))
|
|
|
|
def test_yield_vs_yield_from(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def wait_for_future():
|
|
yield fut
|
|
|
|
task = wait_for_future()
|
|
with self.assertRaises(RuntimeError):
|
|
self.loop.run_until_complete(task)
|
|
|
|
self.assertFalse(fut.done())
|
|
|
|
def test_yield_vs_yield_from_generator(self):
|
|
@asyncio.coroutine
|
|
def coro():
|
|
yield
|
|
|
|
@asyncio.coroutine
|
|
def wait_for_future():
|
|
gen = coro()
|
|
try:
|
|
yield gen
|
|
finally:
|
|
gen.close()
|
|
|
|
task = wait_for_future()
|
|
self.assertRaises(
|
|
RuntimeError,
|
|
self.loop.run_until_complete, task)
|
|
|
|
def test_coroutine_non_gen_function(self):
|
|
@asyncio.coroutine
|
|
def func():
|
|
return 'test'
|
|
|
|
self.assertTrue(asyncio.iscoroutinefunction(func))
|
|
|
|
coro = func()
|
|
self.assertTrue(asyncio.iscoroutine(coro))
|
|
|
|
res = self.loop.run_until_complete(coro)
|
|
self.assertEqual(res, 'test')
|
|
|
|
def test_coroutine_non_gen_function_return_future(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def func():
|
|
return fut
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
fut.set_result('test')
|
|
|
|
t1 = asyncio.Task(func(), loop=self.loop)
|
|
t2 = asyncio.Task(coro(), loop=self.loop)
|
|
res = self.loop.run_until_complete(t1)
|
|
self.assertEqual(res, 'test')
|
|
self.assertIsNone(t2.result())
|
|
|
|
def test_current_task(self):
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
@asyncio.coroutine
|
|
def coro(loop):
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task)
|
|
|
|
task = asyncio.Task(coro(self.loop), loop=self.loop)
|
|
self.loop.run_until_complete(task)
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
def test_current_task_with_interleaving_tasks(self):
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
fut1 = asyncio.Future(loop=self.loop)
|
|
fut2 = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def coro1(loop):
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task1)
|
|
yield from fut1
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task1)
|
|
fut2.set_result(True)
|
|
|
|
@asyncio.coroutine
|
|
def coro2(loop):
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task2)
|
|
fut1.set_result(True)
|
|
yield from fut2
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task2)
|
|
|
|
task1 = asyncio.Task(coro1(self.loop), loop=self.loop)
|
|
task2 = asyncio.Task(coro2(self.loop), loop=self.loop)
|
|
|
|
self.loop.run_until_complete(asyncio.wait((task1, task2),
|
|
loop=self.loop))
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
# Some thorough tests for cancellation propagation through
|
|
# coroutines, tasks and wait().
|
|
|
|
def test_yield_future_passes_cancel(self):
|
|
# Cancelling outer() cancels inner() cancels waiter.
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
try:
|
|
yield from waiter
|
|
except asyncio.CancelledError:
|
|
proof += 1
|
|
raise
|
|
else:
|
|
self.fail('got past sleep() in inner()')
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof
|
|
try:
|
|
yield from inner()
|
|
except asyncio.CancelledError:
|
|
proof += 100 # Expect this path.
|
|
else:
|
|
proof += 10
|
|
|
|
f = asyncio.ensure_future(outer(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
f.cancel()
|
|
self.loop.run_until_complete(f)
|
|
self.assertEqual(proof, 101)
|
|
self.assertTrue(waiter.cancelled())
|
|
|
|
def test_yield_wait_does_not_shield_cancel(self):
|
|
# Cancelling outer() makes wait() return early, leaves inner()
|
|
# running.
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
yield from waiter
|
|
proof += 1
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof
|
|
d, p = yield from asyncio.wait([inner()], loop=self.loop)
|
|
proof += 100
|
|
|
|
f = asyncio.ensure_future(outer(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
f.cancel()
|
|
self.assertRaises(
|
|
asyncio.CancelledError, self.loop.run_until_complete, f)
|
|
waiter.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertEqual(proof, 1)
|
|
|
|
def test_shield_result(self):
|
|
inner = asyncio.Future(loop=self.loop)
|
|
outer = asyncio.shield(inner)
|
|
inner.set_result(42)
|
|
res = self.loop.run_until_complete(outer)
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_shield_exception(self):
|
|
inner = asyncio.Future(loop=self.loop)
|
|
outer = asyncio.shield(inner)
|
|
test_utils.run_briefly(self.loop)
|
|
exc = RuntimeError('expected')
|
|
inner.set_exception(exc)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(outer.exception(), exc)
|
|
|
|
def test_shield_cancel(self):
|
|
inner = asyncio.Future(loop=self.loop)
|
|
outer = asyncio.shield(inner)
|
|
test_utils.run_briefly(self.loop)
|
|
inner.cancel()
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertTrue(outer.cancelled())
|
|
|
|
def test_shield_shortcut(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
fut.set_result(42)
|
|
res = self.loop.run_until_complete(asyncio.shield(fut))
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_shield_effect(self):
|
|
# Cancelling outer() does not affect inner().
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
yield from waiter
|
|
proof += 1
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof
|
|
yield from asyncio.shield(inner(), loop=self.loop)
|
|
proof += 100
|
|
|
|
f = asyncio.ensure_future(outer(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
f.cancel()
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(f)
|
|
waiter.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertEqual(proof, 1)
|
|
|
|
def test_shield_gather(self):
|
|
child1 = asyncio.Future(loop=self.loop)
|
|
child2 = asyncio.Future(loop=self.loop)
|
|
parent = asyncio.gather(child1, child2, loop=self.loop)
|
|
outer = asyncio.shield(parent, loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
outer.cancel()
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertTrue(outer.cancelled())
|
|
child1.set_result(1)
|
|
child2.set_result(2)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertEqual(parent.result(), [1, 2])
|
|
|
|
def test_gather_shield(self):
|
|
child1 = asyncio.Future(loop=self.loop)
|
|
child2 = asyncio.Future(loop=self.loop)
|
|
inner1 = asyncio.shield(child1, loop=self.loop)
|
|
inner2 = asyncio.shield(child2, loop=self.loop)
|
|
parent = asyncio.gather(inner1, inner2, loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
parent.cancel()
|
|
# This should cancel inner1 and inner2 but bot child1 and child2.
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIsInstance(parent.exception(), asyncio.CancelledError)
|
|
self.assertTrue(inner1.cancelled())
|
|
self.assertTrue(inner2.cancelled())
|
|
child1.set_result(1)
|
|
child2.set_result(2)
|
|
test_utils.run_briefly(self.loop)
|
|
|
|
def test_as_completed_invalid_args(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
# as_completed() expects a list of futures, not a future instance
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.as_completed(fut, loop=self.loop))
|
|
coro = coroutine_function()
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.as_completed(coro, loop=self.loop))
|
|
coro.close()
|
|
|
|
def test_wait_invalid_args(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
# wait() expects a list of futures, not a future instance
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.wait(fut, loop=self.loop))
|
|
coro = coroutine_function()
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.wait(coro, loop=self.loop))
|
|
coro.close()
|
|
|
|
# wait() expects at least a future
|
|
self.assertRaises(ValueError, self.loop.run_until_complete,
|
|
asyncio.wait([], loop=self.loop))
|
|
|
|
def test_corowrapper_mocks_generator(self):
|
|
|
|
def check():
|
|
# A function that asserts various things.
|
|
# Called twice, with different debug flag values.
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
# The actual coroutine.
|
|
self.assertTrue(gen.gi_running)
|
|
yield from fut
|
|
|
|
# A completed Future used to run the coroutine.
|
|
fut = asyncio.Future(loop=self.loop)
|
|
fut.set_result(None)
|
|
|
|
# Call the coroutine.
|
|
gen = coro()
|
|
|
|
# Check some properties.
|
|
self.assertTrue(asyncio.iscoroutine(gen))
|
|
self.assertIsInstance(gen.gi_frame, types.FrameType)
|
|
self.assertFalse(gen.gi_running)
|
|
self.assertIsInstance(gen.gi_code, types.CodeType)
|
|
|
|
# Run it.
|
|
self.loop.run_until_complete(gen)
|
|
|
|
# The frame should have changed.
|
|
self.assertIsNone(gen.gi_frame)
|
|
|
|
# Test with debug flag cleared.
|
|
with set_coroutine_debug(False):
|
|
check()
|
|
|
|
# Test with debug flag set.
|
|
with set_coroutine_debug(True):
|
|
check()
|
|
|
|
def test_yield_from_corowrapper(self):
|
|
with set_coroutine_debug(True):
|
|
@asyncio.coroutine
|
|
def t1():
|
|
return (yield from t2())
|
|
|
|
@asyncio.coroutine
|
|
def t2():
|
|
f = asyncio.Future(loop=self.loop)
|
|
asyncio.Task(t3(f), loop=self.loop)
|
|
return (yield from f)
|
|
|
|
@asyncio.coroutine
|
|
def t3(f):
|
|
f.set_result((1, 2, 3))
|
|
|
|
task = asyncio.Task(t1(), loop=self.loop)
|
|
val = self.loop.run_until_complete(task)
|
|
self.assertEqual(val, (1, 2, 3))
|
|
|
|
def test_yield_from_corowrapper_send(self):
|
|
def foo():
|
|
a = yield
|
|
return a
|
|
|
|
def call(arg):
|
|
cw = asyncio.coroutines.CoroWrapper(foo())
|
|
cw.send(None)
|
|
try:
|
|
cw.send(arg)
|
|
except StopIteration as ex:
|
|
return ex.args[0]
|
|
else:
|
|
raise AssertionError('StopIteration was expected')
|
|
|
|
self.assertEqual(call((1, 2)), (1, 2))
|
|
self.assertEqual(call('spam'), 'spam')
|
|
|
|
def test_corowrapper_weakref(self):
|
|
wd = weakref.WeakValueDictionary()
|
|
def foo(): yield from []
|
|
cw = asyncio.coroutines.CoroWrapper(foo())
|
|
wd['cw'] = cw # Would fail without __weakref__ slot.
|
|
cw.gen = None # Suppress warning from __del__.
|
|
|
|
@unittest.skipUnless(PY34,
|
|
'need python 3.4 or later')
|
|
def test_log_destroyed_pending_task(self):
|
|
@asyncio.coroutine
|
|
def kill_me(loop):
|
|
future = asyncio.Future(loop=loop)
|
|
yield from future
|
|
# at this point, the only reference to kill_me() task is
|
|
# the Task._wakeup() method in future._callbacks
|
|
raise Exception("code never reached")
|
|
|
|
mock_handler = mock.Mock()
|
|
self.loop.set_debug(True)
|
|
self.loop.set_exception_handler(mock_handler)
|
|
|
|
# schedule the task
|
|
coro = kill_me(self.loop)
|
|
task = asyncio.ensure_future(coro, loop=self.loop)
|
|
self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), {task})
|
|
|
|
# execute the task so it waits for future
|
|
self.loop._run_once()
|
|
self.assertEqual(len(self.loop._ready), 0)
|
|
|
|
# remove the future used in kill_me(), and references to the task
|
|
del coro.gi_frame.f_locals['future']
|
|
coro = None
|
|
source_traceback = task._source_traceback
|
|
task = None
|
|
|
|
# no more reference to kill_me() task: the task is destroyed by the GC
|
|
support.gc_collect()
|
|
|
|
self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), set())
|
|
|
|
mock_handler.assert_called_with(self.loop, {
|
|
'message': 'Task was destroyed but it is pending!',
|
|
'task': mock.ANY,
|
|
'source_traceback': source_traceback,
|
|
})
|
|
mock_handler.reset_mock()
|
|
|
|
@mock.patch('asyncio.coroutines.logger')
|
|
def test_coroutine_never_yielded(self, m_log):
|
|
with set_coroutine_debug(True):
|
|
@asyncio.coroutine
|
|
def coro_noop():
|
|
pass
|
|
|
|
tb_filename = __file__
|
|
tb_lineno = sys._getframe().f_lineno + 2
|
|
# create a coroutine object but don't use it
|
|
coro_noop()
|
|
support.gc_collect()
|
|
|
|
self.assertTrue(m_log.error.called)
|
|
message = m_log.error.call_args[0][0]
|
|
func_filename, func_lineno = test_utils.get_function_source(coro_noop)
|
|
|
|
regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
|
|
r'was never yielded from\n'
|
|
r'Coroutine object created at \(most recent call last\):\n'
|
|
r'.*\n'
|
|
r' File "%s", line %s, in test_coroutine_never_yielded\n'
|
|
r' coro_noop\(\)$'
|
|
% (re.escape(coro_noop.__qualname__),
|
|
re.escape(func_filename), func_lineno,
|
|
re.escape(tb_filename), tb_lineno))
|
|
|
|
self.assertRegex(message, re.compile(regex, re.DOTALL))
|
|
|
|
def test_task_source_traceback(self):
|
|
self.loop.set_debug(True)
|
|
|
|
task = asyncio.Task(coroutine_function(), loop=self.loop)
|
|
lineno = sys._getframe().f_lineno - 1
|
|
self.assertIsInstance(task._source_traceback, list)
|
|
self.assertEqual(task._source_traceback[-1][:3],
|
|
(__file__,
|
|
lineno,
|
|
'test_task_source_traceback'))
|
|
self.loop.run_until_complete(task)
|
|
|
|
def _test_cancel_wait_for(self, timeout):
|
|
loop = asyncio.new_event_loop()
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def blocking_coroutine():
|
|
fut = asyncio.Future(loop=loop)
|
|
# Block: fut result is never set
|
|
yield from fut
|
|
|
|
task = loop.create_task(blocking_coroutine())
|
|
|
|
wait = loop.create_task(asyncio.wait_for(task, timeout, loop=loop))
|
|
loop.call_soon(wait.cancel)
|
|
|
|
self.assertRaises(asyncio.CancelledError,
|
|
loop.run_until_complete, wait)
|
|
|
|
# Python issue #23219: cancelling the wait must also cancel the task
|
|
self.assertTrue(task.cancelled())
|
|
|
|
def test_cancel_blocking_wait_for(self):
|
|
self._test_cancel_wait_for(None)
|
|
|
|
def test_cancel_wait_for(self):
|
|
self._test_cancel_wait_for(60.0)
|
|
|
|
|
|
class GatherTestsBase:
|
|
|
|
def setUp(self):
|
|
self.one_loop = self.new_test_loop()
|
|
self.other_loop = self.new_test_loop()
|
|
self.set_event_loop(self.one_loop, cleanup=False)
|
|
|
|
def _run_loop(self, loop):
|
|
while loop._ready:
|
|
test_utils.run_briefly(loop)
|
|
|
|
def _check_success(self, **kwargs):
|
|
a, b, c = [asyncio.Future(loop=self.one_loop) for i in range(3)]
|
|
fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
b.set_result(1)
|
|
a.set_result(2)
|
|
self._run_loop(self.one_loop)
|
|
self.assertEqual(cb.called, False)
|
|
self.assertFalse(fut.done())
|
|
c.set_result(3)
|
|
self._run_loop(self.one_loop)
|
|
cb.assert_called_once_with(fut)
|
|
self.assertEqual(fut.result(), [2, 1, 3])
|
|
|
|
def test_success(self):
|
|
self._check_success()
|
|
self._check_success(return_exceptions=False)
|
|
|
|
def test_result_exception_success(self):
|
|
self._check_success(return_exceptions=True)
|
|
|
|
def test_one_exception(self):
|
|
a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)]
|
|
fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
exc = ZeroDivisionError()
|
|
a.set_result(1)
|
|
b.set_exception(exc)
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
cb.assert_called_once_with(fut)
|
|
self.assertIs(fut.exception(), exc)
|
|
# Does nothing
|
|
c.set_result(3)
|
|
d.cancel()
|
|
e.set_exception(RuntimeError())
|
|
e.exception()
|
|
|
|
def test_return_exceptions(self):
|
|
a, b, c, d = [asyncio.Future(loop=self.one_loop) for i in range(4)]
|
|
fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
|
|
return_exceptions=True)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
exc = ZeroDivisionError()
|
|
exc2 = RuntimeError()
|
|
b.set_result(1)
|
|
c.set_exception(exc)
|
|
a.set_result(3)
|
|
self._run_loop(self.one_loop)
|
|
self.assertFalse(fut.done())
|
|
d.set_exception(exc2)
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
cb.assert_called_once_with(fut)
|
|
self.assertEqual(fut.result(), [3, 1, exc, exc2])
|
|
|
|
def test_env_var_debug(self):
|
|
aio_path = os.path.dirname(os.path.dirname(asyncio.__file__))
|
|
|
|
code = '\n'.join((
|
|
'import asyncio.coroutines',
|
|
'print(asyncio.coroutines._DEBUG)'))
|
|
|
|
# Test with -E to not fail if the unit test was run with
|
|
# PYTHONASYNCIODEBUG set to a non-empty string
|
|
sts, stdout, stderr = assert_python_ok('-E', '-c', code,
|
|
PYTHONPATH=aio_path)
|
|
self.assertEqual(stdout.rstrip(), b'False')
|
|
|
|
sts, stdout, stderr = assert_python_ok('-c', code,
|
|
PYTHONASYNCIODEBUG='',
|
|
PYTHONPATH=aio_path)
|
|
self.assertEqual(stdout.rstrip(), b'False')
|
|
|
|
sts, stdout, stderr = assert_python_ok('-c', code,
|
|
PYTHONASYNCIODEBUG='1',
|
|
PYTHONPATH=aio_path)
|
|
self.assertEqual(stdout.rstrip(), b'True')
|
|
|
|
sts, stdout, stderr = assert_python_ok('-E', '-c', code,
|
|
PYTHONASYNCIODEBUG='1',
|
|
PYTHONPATH=aio_path)
|
|
self.assertEqual(stdout.rstrip(), b'False')
|
|
|
|
|
|
class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
|
|
|
|
def wrap_futures(self, *futures):
|
|
return futures
|
|
|
|
def _check_empty_sequence(self, seq_or_iter):
|
|
asyncio.set_event_loop(self.one_loop)
|
|
self.addCleanup(asyncio.set_event_loop, None)
|
|
fut = asyncio.gather(*seq_or_iter)
|
|
self.assertIsInstance(fut, asyncio.Future)
|
|
self.assertIs(fut._loop, self.one_loop)
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
self.assertEqual(fut.result(), [])
|
|
fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
|
|
def test_constructor_empty_sequence(self):
|
|
self._check_empty_sequence([])
|
|
self._check_empty_sequence(())
|
|
self._check_empty_sequence(set())
|
|
self._check_empty_sequence(iter(""))
|
|
|
|
def test_constructor_heterogenous_futures(self):
|
|
fut1 = asyncio.Future(loop=self.one_loop)
|
|
fut2 = asyncio.Future(loop=self.other_loop)
|
|
with self.assertRaises(ValueError):
|
|
asyncio.gather(fut1, fut2)
|
|
with self.assertRaises(ValueError):
|
|
asyncio.gather(fut1, loop=self.other_loop)
|
|
|
|
def test_constructor_homogenous_futures(self):
|
|
children = [asyncio.Future(loop=self.other_loop) for i in range(3)]
|
|
fut = asyncio.gather(*children)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
self._run_loop(self.other_loop)
|
|
self.assertFalse(fut.done())
|
|
fut = asyncio.gather(*children, loop=self.other_loop)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
self._run_loop(self.other_loop)
|
|
self.assertFalse(fut.done())
|
|
|
|
def test_one_cancellation(self):
|
|
a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)]
|
|
fut = asyncio.gather(a, b, c, d, e)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
a.set_result(1)
|
|
b.cancel()
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
cb.assert_called_once_with(fut)
|
|
self.assertFalse(fut.cancelled())
|
|
self.assertIsInstance(fut.exception(), asyncio.CancelledError)
|
|
# Does nothing
|
|
c.set_result(3)
|
|
d.cancel()
|
|
e.set_exception(RuntimeError())
|
|
e.exception()
|
|
|
|
def test_result_exception_one_cancellation(self):
|
|
a, b, c, d, e, f = [asyncio.Future(loop=self.one_loop)
|
|
for i in range(6)]
|
|
fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
a.set_result(1)
|
|
zde = ZeroDivisionError()
|
|
b.set_exception(zde)
|
|
c.cancel()
|
|
self._run_loop(self.one_loop)
|
|
self.assertFalse(fut.done())
|
|
d.set_result(3)
|
|
e.cancel()
|
|
rte = RuntimeError()
|
|
f.set_exception(rte)
|
|
res = self.one_loop.run_until_complete(fut)
|
|
self.assertIsInstance(res[2], asyncio.CancelledError)
|
|
self.assertIsInstance(res[4], asyncio.CancelledError)
|
|
res[2] = res[4] = None
|
|
self.assertEqual(res, [1, zde, None, 3, None, rte])
|
|
cb.assert_called_once_with(fut)
|
|
|
|
|
|
class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
asyncio.set_event_loop(self.one_loop)
|
|
|
|
def wrap_futures(self, *futures):
|
|
coros = []
|
|
for fut in futures:
|
|
@asyncio.coroutine
|
|
def coro(fut=fut):
|
|
return (yield from fut)
|
|
coros.append(coro())
|
|
return coros
|
|
|
|
def test_constructor_loop_selection(self):
|
|
@asyncio.coroutine
|
|
def coro():
|
|
return 'abc'
|
|
gen1 = coro()
|
|
gen2 = coro()
|
|
fut = asyncio.gather(gen1, gen2)
|
|
self.assertIs(fut._loop, self.one_loop)
|
|
self.one_loop.run_until_complete(fut)
|
|
|
|
self.set_event_loop(self.other_loop, cleanup=False)
|
|
gen3 = coro()
|
|
gen4 = coro()
|
|
fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
|
|
self.assertIs(fut2._loop, self.other_loop)
|
|
self.other_loop.run_until_complete(fut2)
|
|
|
|
def test_duplicate_coroutines(self):
|
|
@asyncio.coroutine
|
|
def coro(s):
|
|
return s
|
|
c = coro('abc')
|
|
fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
|
|
self._run_loop(self.one_loop)
|
|
self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
|
|
|
|
def test_cancellation_broadcast(self):
|
|
# Cancelling outer() cancels all children.
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.one_loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
yield from waiter
|
|
proof += 1
|
|
|
|
child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
|
|
child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
|
|
gatherer = None
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof, gatherer
|
|
gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
|
|
yield from gatherer
|
|
proof += 100
|
|
|
|
f = asyncio.ensure_future(outer(), loop=self.one_loop)
|
|
test_utils.run_briefly(self.one_loop)
|
|
self.assertTrue(f.cancel())
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.one_loop.run_until_complete(f)
|
|
self.assertFalse(gatherer.cancel())
|
|
self.assertTrue(waiter.cancelled())
|
|
self.assertTrue(child1.cancelled())
|
|
self.assertTrue(child2.cancelled())
|
|
test_utils.run_briefly(self.one_loop)
|
|
self.assertEqual(proof, 0)
|
|
|
|
def test_exception_marking(self):
|
|
# Test for the first line marked "Mark exception retrieved."
|
|
|
|
@asyncio.coroutine
|
|
def inner(f):
|
|
yield from f
|
|
raise RuntimeError('should not be ignored')
|
|
|
|
a = asyncio.Future(loop=self.one_loop)
|
|
b = asyncio.Future(loop=self.one_loop)
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
yield from asyncio.gather(inner(a), inner(b), loop=self.one_loop)
|
|
|
|
f = asyncio.ensure_future(outer(), loop=self.one_loop)
|
|
test_utils.run_briefly(self.one_loop)
|
|
a.set_result(None)
|
|
test_utils.run_briefly(self.one_loop)
|
|
b.set_result(None)
|
|
test_utils.run_briefly(self.one_loop)
|
|
self.assertIsInstance(f.exception(), RuntimeError)
|
|
|
|
|
|
class RunCoroutineThreadsafeTests(test_utils.TestCase):
|
|
"""Test case for asyncio.run_coroutine_threadsafe."""
|
|
|
|
def setUp(self):
|
|
self.loop = asyncio.new_event_loop()
|
|
self.set_event_loop(self.loop) # Will cleanup properly
|
|
|
|
@asyncio.coroutine
|
|
def add(self, a, b, fail=False, cancel=False):
|
|
"""Wait 0.05 second and return a + b."""
|
|
yield from asyncio.sleep(0.05, loop=self.loop)
|
|
if fail:
|
|
raise RuntimeError("Fail!")
|
|
if cancel:
|
|
asyncio.tasks.Task.current_task(self.loop).cancel()
|
|
yield
|
|
return a + b
|
|
|
|
def target(self, fail=False, cancel=False, timeout=None,
|
|
advance_coro=False):
|
|
"""Run add coroutine in the event loop."""
|
|
coro = self.add(1, 2, fail=fail, cancel=cancel)
|
|
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
|
|
if advance_coro:
|
|
# this is for test_run_coroutine_threadsafe_task_factory_exception;
|
|
# otherwise it spills errors and breaks **other** unittests, since
|
|
# 'target' is interacting with threads.
|
|
|
|
# With this call, `coro` will be advanced, so that
|
|
# CoroWrapper.__del__ won't do anything when asyncio tests run
|
|
# in debug mode.
|
|
self.loop.call_soon_threadsafe(coro.send, None)
|
|
try:
|
|
return future.result(timeout)
|
|
finally:
|
|
future.done() or future.cancel()
|
|
|
|
def test_run_coroutine_threadsafe(self):
|
|
"""Test coroutine submission from a thread to an event loop."""
|
|
future = self.loop.run_in_executor(None, self.target)
|
|
result = self.loop.run_until_complete(future)
|
|
self.assertEqual(result, 3)
|
|
|
|
def test_run_coroutine_threadsafe_with_exception(self):
|
|
"""Test coroutine submission from a thread to an event loop
|
|
when an exception is raised."""
|
|
future = self.loop.run_in_executor(None, self.target, True)
|
|
with self.assertRaises(RuntimeError) as exc_context:
|
|
self.loop.run_until_complete(future)
|
|
self.assertIn("Fail!", exc_context.exception.args)
|
|
|
|
def test_run_coroutine_threadsafe_with_timeout(self):
|
|
"""Test coroutine submission from a thread to an event loop
|
|
when a timeout is raised."""
|
|
callback = lambda: self.target(timeout=0)
|
|
future = self.loop.run_in_executor(None, callback)
|
|
with self.assertRaises(asyncio.TimeoutError):
|
|
self.loop.run_until_complete(future)
|
|
test_utils.run_briefly(self.loop)
|
|
# Check that there's no pending task (add has been cancelled)
|
|
for task in asyncio.Task.all_tasks(self.loop):
|
|
self.assertTrue(task.done())
|
|
|
|
def test_run_coroutine_threadsafe_task_cancelled(self):
|
|
"""Test coroutine submission from a tread to an event loop
|
|
when the task is cancelled."""
|
|
callback = lambda: self.target(cancel=True)
|
|
future = self.loop.run_in_executor(None, callback)
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(future)
|
|
|
|
def test_run_coroutine_threadsafe_task_factory_exception(self):
|
|
"""Test coroutine submission from a tread to an event loop
|
|
when the task factory raise an exception."""
|
|
# Schedule the target
|
|
future = self.loop.run_in_executor(
|
|
None, lambda: self.target(advance_coro=True))
|
|
# Set corrupted task factory
|
|
self.loop.set_task_factory(lambda loop, coro: wrong_name)
|
|
# Set exception handler
|
|
callback = test_utils.MockCallback()
|
|
self.loop.set_exception_handler(callback)
|
|
# Run event loop
|
|
with self.assertRaises(NameError) as exc_context:
|
|
self.loop.run_until_complete(future)
|
|
# Check exceptions
|
|
self.assertIn('wrong_name', exc_context.exception.args[0])
|
|
self.assertEqual(len(callback.call_args_list), 1)
|
|
(loop, context), kwargs = callback.call_args
|
|
self.assertEqual(context['exception'], exc_context.exception)
|
|
|
|
|
|
class SleepTests(test_utils.TestCase):
|
|
def setUp(self):
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(None)
|
|
|
|
def test_sleep_zero(self):
|
|
result = 0
|
|
|
|
def inc_result(num):
|
|
nonlocal result
|
|
result += num
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
self.loop.call_soon(inc_result, 1)
|
|
self.assertEqual(result, 0)
|
|
num = yield from asyncio.sleep(0, loop=self.loop, result=10)
|
|
self.assertEqual(result, 1) # inc'ed by call_soon
|
|
inc_result(num) # num should be 11
|
|
|
|
self.loop.run_until_complete(coro())
|
|
self.assertEqual(result, 11)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|