asyncio, Tulip issue 177: Rewite repr() of Future, Task, Handle and TimerHandle
- Uniformize repr() output to format "<Class ...>" - On Python 3.5+, repr(Task) uses the qualified name instead of the short name of the coroutine
This commit is contained in:
parent
65c623de74
commit
975735f729
|
@ -18,6 +18,7 @@ import sys
|
|||
|
||||
_PY34 = sys.version_info >= (3, 4)
|
||||
|
||||
|
||||
def _get_function_source(func):
|
||||
if _PY34:
|
||||
func = inspect.unwrap(func)
|
||||
|
@ -33,6 +34,35 @@ def _get_function_source(func):
|
|||
return None
|
||||
|
||||
|
||||
def _format_args(args):
|
||||
# function formatting ('hello',) as ('hello')
|
||||
args_repr = repr(args)
|
||||
if len(args) == 1 and args_repr.endswith(',)'):
|
||||
args_repr = args_repr[:-2] + ')'
|
||||
return args_repr
|
||||
|
||||
|
||||
def _format_callback(func, args, suffix=''):
|
||||
if isinstance(func, functools.partial):
|
||||
if args is not None:
|
||||
suffix = _format_args(args) + suffix
|
||||
return _format_callback(func.func, func.args, suffix)
|
||||
|
||||
func_repr = getattr(func, '__qualname__', None)
|
||||
if not func_repr:
|
||||
func_repr = repr(func)
|
||||
|
||||
if args is not None:
|
||||
func_repr += _format_args(args)
|
||||
if suffix:
|
||||
func_repr += suffix
|
||||
|
||||
source = _get_function_source(func)
|
||||
if source:
|
||||
func_repr += ' at %s:%s' % source
|
||||
return func_repr
|
||||
|
||||
|
||||
class Handle:
|
||||
"""Object returned by callback registration methods."""
|
||||
|
||||
|
@ -46,18 +76,11 @@ class Handle:
|
|||
self._cancelled = False
|
||||
|
||||
def __repr__(self):
|
||||
cb_repr = getattr(self._callback, '__qualname__', None)
|
||||
if not cb_repr:
|
||||
cb_repr = str(self._callback)
|
||||
|
||||
source = _get_function_source(self._callback)
|
||||
if source:
|
||||
cb_repr += ' at %s:%s' % source
|
||||
|
||||
res = 'Handle({}, {})'.format(cb_repr, self._args)
|
||||
info = []
|
||||
if self._cancelled:
|
||||
res += '<cancelled>'
|
||||
return res
|
||||
info.append('cancelled')
|
||||
info.append(_format_callback(self._callback, self._args))
|
||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||
|
||||
def cancel(self):
|
||||
self._cancelled = True
|
||||
|
@ -88,13 +111,12 @@ class TimerHandle(Handle):
|
|||
self._when = when
|
||||
|
||||
def __repr__(self):
|
||||
res = 'TimerHandle({}, {}, {})'.format(self._when,
|
||||
self._callback,
|
||||
self._args)
|
||||
info = []
|
||||
if self._cancelled:
|
||||
res += '<cancelled>'
|
||||
|
||||
return res
|
||||
info.append('cancelled')
|
||||
info.append('when=%s' % self._when)
|
||||
info.append(_format_callback(self._callback, self._args))
|
||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self._when)
|
||||
|
|
|
@ -150,24 +150,40 @@ class Future:
|
|||
self._loop = loop
|
||||
self._callbacks = []
|
||||
|
||||
def _format_callbacks(self):
|
||||
cb = self._callbacks
|
||||
size = len(cb)
|
||||
if not size:
|
||||
cb = ''
|
||||
|
||||
def format_cb(callback):
|
||||
return events._format_callback(callback, ())
|
||||
|
||||
if size == 1:
|
||||
cb = format_cb(cb[0])
|
||||
elif size == 2:
|
||||
cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
|
||||
elif size > 2:
|
||||
cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
|
||||
size-2,
|
||||
format_cb(cb[-1]))
|
||||
return 'cb=[%s]' % cb
|
||||
|
||||
def _format_result(self):
|
||||
if self._state != _FINISHED:
|
||||
return None
|
||||
elif self._exception is not None:
|
||||
return 'exception={!r}'.format(self._exception)
|
||||
else:
|
||||
return 'result={!r}'.format(self._result)
|
||||
|
||||
def __repr__(self):
|
||||
res = self.__class__.__name__
|
||||
info = [self._state.lower()]
|
||||
if self._state == _FINISHED:
|
||||
if self._exception is not None:
|
||||
res += '<exception={!r}>'.format(self._exception)
|
||||
else:
|
||||
res += '<result={!r}>'.format(self._result)
|
||||
elif self._callbacks:
|
||||
size = len(self._callbacks)
|
||||
if size > 2:
|
||||
res += '<{}, [{}, <{} more>, {}]>'.format(
|
||||
self._state, self._callbacks[0],
|
||||
size-2, self._callbacks[-1])
|
||||
else:
|
||||
res += '<{}, {}>'.format(self._state, self._callbacks)
|
||||
else:
|
||||
res += '<{}>'.format(self._state)
|
||||
return res
|
||||
info.append(self._format_result())
|
||||
if self._callbacks:
|
||||
info.append(self._format_callbacks())
|
||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||
|
||||
# On Python 3.3 or older, objects with a destructor part of a reference
|
||||
# cycle are never destroyed. It's not more the case on Python 3.4 thanks to
|
||||
|
|
|
@ -132,6 +132,22 @@ def iscoroutine(obj):
|
|||
return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
|
||||
|
||||
|
||||
def _format_coroutine(coro):
|
||||
assert iscoroutine(coro)
|
||||
if _PY35:
|
||||
coro_name = coro.__qualname__
|
||||
else:
|
||||
coro_name = coro.__name__
|
||||
|
||||
filename = coro.gi_code.co_filename
|
||||
if coro.gi_frame is not None:
|
||||
lineno = coro.gi_frame.f_lineno
|
||||
return '%s() at %s:%s' % (coro_name, filename, lineno)
|
||||
else:
|
||||
lineno = coro.gi_code.co_firstlineno
|
||||
return '%s() done at %s:%s' % (coro_name, filename, lineno)
|
||||
|
||||
|
||||
class Task(futures.Future):
|
||||
"""A coroutine wrapped in a Future."""
|
||||
|
||||
|
@ -195,26 +211,21 @@ class Task(futures.Future):
|
|||
futures.Future.__del__(self)
|
||||
|
||||
def __repr__(self):
|
||||
res = super().__repr__()
|
||||
if (self._must_cancel and
|
||||
self._state == futures._PENDING and
|
||||
'<PENDING' in res):
|
||||
res = res.replace('<PENDING', '<CANCELLING', 1)
|
||||
i = res.find('<')
|
||||
if i < 0:
|
||||
i = len(res)
|
||||
text = self._coro.__name__
|
||||
coro = self._coro
|
||||
if iscoroutine(coro):
|
||||
filename = coro.gi_code.co_filename
|
||||
if coro.gi_frame is not None:
|
||||
lineno = coro.gi_frame.f_lineno
|
||||
text += ' at %s:%s' % (filename, lineno)
|
||||
info = []
|
||||
if self._must_cancel:
|
||||
info.append('cancelling')
|
||||
else:
|
||||
lineno = coro.gi_code.co_firstlineno
|
||||
text += ' done at %s:%s' % (filename, lineno)
|
||||
res = res[:i] + '(<{}>)'.format(text) + res[i:]
|
||||
return res
|
||||
info.append(self._state.lower())
|
||||
|
||||
info.append(_format_coroutine(self._coro))
|
||||
|
||||
if self._state == futures._FINISHED:
|
||||
info.append(self._format_result())
|
||||
|
||||
if self._callbacks:
|
||||
info.append(self._format_callbacks())
|
||||
|
||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||
|
||||
def get_stack(self, *, limit=None):
|
||||
"""Return the list of stack frames for this task's coroutine.
|
||||
|
|
|
@ -1016,14 +1016,14 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
self.loop.run_forever()
|
||||
fmt, *args = m_logger.warning.call_args[0]
|
||||
self.assertRegex(fmt % tuple(args),
|
||||
"^Executing Handle.*stop_loop_cb.* took .* seconds$")
|
||||
"^Executing <Handle.*stop_loop_cb.*> took .* seconds$")
|
||||
|
||||
# slow task
|
||||
asyncio.async(stop_loop_coro(self.loop), loop=self.loop)
|
||||
self.loop.run_forever()
|
||||
fmt, *args = m_logger.warning.call_args[0]
|
||||
self.assertRegex(fmt % tuple(args),
|
||||
"^Executing Task.*stop_loop_coro.* took .* seconds$")
|
||||
"^Executing <Task.*stop_loop_coro.*> took .* seconds$")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1747,7 +1747,7 @@ else:
|
|||
return asyncio.SelectorEventLoop(selectors.SelectSelector())
|
||||
|
||||
|
||||
def noop():
|
||||
def noop(*args):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -1797,50 +1797,52 @@ class HandleTests(unittest.TestCase):
|
|||
h = asyncio.Handle(lambda: None, (), self.loop)
|
||||
wd['h'] = h # Would fail without __weakref__ slot.
|
||||
|
||||
def test_repr(self):
|
||||
def test_handle_repr(self):
|
||||
# simple function
|
||||
h = asyncio.Handle(noop, (), self.loop)
|
||||
src = test_utils.get_function_source(noop)
|
||||
self.assertEqual(repr(h),
|
||||
'Handle(noop at %s:%s, ())' % src)
|
||||
'<Handle noop() at %s:%s>' % src)
|
||||
|
||||
# cancelled handle
|
||||
h.cancel()
|
||||
self.assertEqual(repr(h),
|
||||
'Handle(noop at %s:%s, ())<cancelled>' % src)
|
||||
'<Handle cancelled noop() at %s:%s>' % src)
|
||||
|
||||
# decorated function
|
||||
cb = asyncio.coroutine(noop)
|
||||
h = asyncio.Handle(cb, (), self.loop)
|
||||
self.assertEqual(repr(h),
|
||||
'Handle(noop at %s:%s, ())' % src)
|
||||
'<Handle noop() at %s:%s>' % src)
|
||||
|
||||
# partial function
|
||||
cb = functools.partial(noop)
|
||||
h = asyncio.Handle(cb, (), self.loop)
|
||||
cb = functools.partial(noop, 1, 2)
|
||||
h = asyncio.Handle(cb, (3,), self.loop)
|
||||
filename, lineno = src
|
||||
regex = (r'^Handle\(functools.partial\('
|
||||
r'<function noop .*>\) at %s:%s, '
|
||||
r'\(\)\)$' % (re.escape(filename), lineno))
|
||||
regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
|
||||
% (re.escape(filename), lineno))
|
||||
self.assertRegex(repr(h), regex)
|
||||
|
||||
# partial method
|
||||
if sys.version_info >= (3, 4):
|
||||
method = HandleTests.test_repr
|
||||
method = HandleTests.test_handle_repr
|
||||
cb = functools.partialmethod(method)
|
||||
src = test_utils.get_function_source(method)
|
||||
h = asyncio.Handle(cb, (), self.loop)
|
||||
|
||||
filename, lineno = src
|
||||
regex = (r'^Handle\(functools.partialmethod\('
|
||||
r'<function HandleTests.test_repr .*>, , \) at %s:%s, '
|
||||
r'\(\)\)$' % (re.escape(filename), lineno))
|
||||
cb_regex = r'<function HandleTests.test_handle_repr .*>'
|
||||
cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
|
||||
regex = (r'^<Handle %s at %s:%s>$'
|
||||
% (cb_regex, re.escape(filename), lineno))
|
||||
self.assertRegex(repr(h), regex)
|
||||
|
||||
|
||||
|
||||
class TimerTests(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.loop = mock.Mock()
|
||||
|
||||
def test_hash(self):
|
||||
when = time.monotonic()
|
||||
h = asyncio.TimerHandle(when, lambda: False, (),
|
||||
|
@ -1858,29 +1860,37 @@ class TimerTests(unittest.TestCase):
|
|||
self.assertIs(h._args, args)
|
||||
self.assertFalse(h._cancelled)
|
||||
|
||||
r = repr(h)
|
||||
self.assertTrue(r.endswith('())'))
|
||||
|
||||
# cancel
|
||||
h.cancel()
|
||||
self.assertTrue(h._cancelled)
|
||||
|
||||
r = repr(h)
|
||||
self.assertTrue(r.endswith('())<cancelled>'), r)
|
||||
|
||||
# when cannot be None
|
||||
self.assertRaises(AssertionError,
|
||||
asyncio.TimerHandle, None, callback, args,
|
||||
mock.Mock())
|
||||
self.loop)
|
||||
|
||||
def test_timer_repr(self):
|
||||
# simple function
|
||||
h = asyncio.TimerHandle(123, noop, (), self.loop)
|
||||
src = test_utils.get_function_source(noop)
|
||||
self.assertEqual(repr(h),
|
||||
'<TimerHandle when=123 noop() at %s:%s>' % src)
|
||||
|
||||
# cancelled handle
|
||||
h.cancel()
|
||||
self.assertEqual(repr(h),
|
||||
'<TimerHandle cancelled when=123 noop() at %s:%s>'
|
||||
% src)
|
||||
|
||||
def test_timer_comparison(self):
|
||||
loop = mock.Mock()
|
||||
|
||||
def callback(*args):
|
||||
return args
|
||||
|
||||
when = time.monotonic()
|
||||
|
||||
h1 = asyncio.TimerHandle(when, callback, (), loop)
|
||||
h2 = asyncio.TimerHandle(when, callback, (), loop)
|
||||
h1 = asyncio.TimerHandle(when, callback, (), self.loop)
|
||||
h2 = asyncio.TimerHandle(when, callback, (), self.loop)
|
||||
# TODO: Use assertLess etc.
|
||||
self.assertFalse(h1 < h2)
|
||||
self.assertFalse(h2 < h1)
|
||||
|
@ -1896,8 +1906,8 @@ class TimerTests(unittest.TestCase):
|
|||
h2.cancel()
|
||||
self.assertFalse(h1 == h2)
|
||||
|
||||
h1 = asyncio.TimerHandle(when, callback, (), loop)
|
||||
h2 = asyncio.TimerHandle(when + 10.0, callback, (), loop)
|
||||
h1 = asyncio.TimerHandle(when, callback, (), self.loop)
|
||||
h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
|
||||
self.assertTrue(h1 < h2)
|
||||
self.assertFalse(h2 < h1)
|
||||
self.assertTrue(h1 <= h2)
|
||||
|
@ -1909,7 +1919,7 @@ class TimerTests(unittest.TestCase):
|
|||
self.assertFalse(h1 == h2)
|
||||
self.assertTrue(h1 != h2)
|
||||
|
||||
h3 = asyncio.Handle(callback, (), loop)
|
||||
h3 = asyncio.Handle(callback, (), self.loop)
|
||||
self.assertIs(NotImplemented, h1.__eq__(h3))
|
||||
self.assertIs(NotImplemented, h1.__ne__(h3))
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""Tests for futures.py."""
|
||||
|
||||
import concurrent.futures
|
||||
import re
|
||||
import threading
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
@ -12,6 +13,12 @@ from asyncio import test_utils
|
|||
def _fakefunc(f):
|
||||
return f
|
||||
|
||||
def first_cb():
|
||||
pass
|
||||
|
||||
def last_cb():
|
||||
pass
|
||||
|
||||
|
||||
class FutureTests(test_utils.TestCase):
|
||||
|
||||
|
@ -95,39 +102,60 @@ class FutureTests(test_utils.TestCase):
|
|||
# The second "yield from f" does not yield f.
|
||||
self.assertEqual(next(g), ('C', 42)) # yield 'C', y.
|
||||
|
||||
def test_repr(self):
|
||||
def test_future_repr(self):
|
||||
f_pending = asyncio.Future(loop=self.loop)
|
||||
self.assertEqual(repr(f_pending), 'Future<PENDING>')
|
||||
self.assertEqual(repr(f_pending), '<Future pending>')
|
||||
f_pending.cancel()
|
||||
|
||||
f_cancelled = asyncio.Future(loop=self.loop)
|
||||
f_cancelled.cancel()
|
||||
self.assertEqual(repr(f_cancelled), 'Future<CANCELLED>')
|
||||
self.assertEqual(repr(f_cancelled), '<Future cancelled>')
|
||||
|
||||
f_result = asyncio.Future(loop=self.loop)
|
||||
f_result.set_result(4)
|
||||
self.assertEqual(repr(f_result), 'Future<result=4>')
|
||||
self.assertEqual(repr(f_result), '<Future finished result=4>')
|
||||
self.assertEqual(f_result.result(), 4)
|
||||
|
||||
exc = RuntimeError()
|
||||
f_exception = asyncio.Future(loop=self.loop)
|
||||
f_exception.set_exception(exc)
|
||||
self.assertEqual(repr(f_exception), 'Future<exception=RuntimeError()>')
|
||||
self.assertEqual(repr(f_exception), '<Future finished exception=RuntimeError()>')
|
||||
self.assertIs(f_exception.exception(), exc)
|
||||
|
||||
f_few_callbacks = asyncio.Future(loop=self.loop)
|
||||
f_few_callbacks.add_done_callback(_fakefunc)
|
||||
self.assertIn('Future<PENDING, [<function _fakefunc',
|
||||
repr(f_few_callbacks))
|
||||
f_few_callbacks.cancel()
|
||||
def func_repr(func):
|
||||
filename, lineno = test_utils.get_function_source(func)
|
||||
text = '%s() at %s:%s' % (func.__qualname__, filename, lineno)
|
||||
return re.escape(text)
|
||||
|
||||
f_one_callbacks = asyncio.Future(loop=self.loop)
|
||||
f_one_callbacks.add_done_callback(_fakefunc)
|
||||
fake_repr = func_repr(_fakefunc)
|
||||
self.assertRegex(repr(f_one_callbacks),
|
||||
r'<Future pending cb=\[%s\]>' % fake_repr)
|
||||
f_one_callbacks.cancel()
|
||||
self.assertEqual(repr(f_one_callbacks),
|
||||
'<Future cancelled>')
|
||||
|
||||
f_two_callbacks = asyncio.Future(loop=self.loop)
|
||||
f_two_callbacks.add_done_callback(first_cb)
|
||||
f_two_callbacks.add_done_callback(last_cb)
|
||||
first_repr = func_repr(first_cb)
|
||||
last_repr = func_repr(last_cb)
|
||||
self.assertRegex(repr(f_two_callbacks),
|
||||
r'<Future pending cb=\[%s, %s\]>'
|
||||
% (first_repr, last_repr))
|
||||
|
||||
f_many_callbacks = asyncio.Future(loop=self.loop)
|
||||
for i in range(20):
|
||||
f_many_callbacks.add_done_callback(first_cb)
|
||||
for i in range(8):
|
||||
f_many_callbacks.add_done_callback(_fakefunc)
|
||||
r = repr(f_many_callbacks)
|
||||
self.assertIn('Future<PENDING, [<function _fakefunc', r)
|
||||
self.assertIn('<18 more>', r)
|
||||
f_many_callbacks.add_done_callback(last_cb)
|
||||
cb_regex = r'%s, <8 more>, %s' % (first_repr, last_repr)
|
||||
self.assertRegex(repr(f_many_callbacks),
|
||||
r'<Future pending cb=\[%s\]>' % cb_regex)
|
||||
f_many_callbacks.cancel()
|
||||
self.assertEqual(repr(f_many_callbacks),
|
||||
'<Future cancelled>')
|
||||
|
||||
def test_copy_state(self):
|
||||
# Test the internal _copy_state method since it's being directly
|
||||
|
|
|
@ -26,7 +26,7 @@ def coroutine_function():
|
|||
class Dummy:
|
||||
|
||||
def __repr__(self):
|
||||
return 'Dummy()'
|
||||
return '<Dummy>'
|
||||
|
||||
def __call__(self, *args):
|
||||
pass
|
||||
|
@ -122,6 +122,7 @@ class TaskTests(test_utils.TestCase):
|
|||
yield from []
|
||||
return 'abc'
|
||||
|
||||
# test coroutine function
|
||||
self.assertEqual(notmuch.__name__, 'notmuch')
|
||||
if PY35:
|
||||
self.assertEqual(notmuch.__qualname__,
|
||||
|
@ -131,72 +132,87 @@ class TaskTests(test_utils.TestCase):
|
|||
filename, lineno = test_utils.get_function_source(notmuch)
|
||||
src = "%s:%s" % (filename, lineno)
|
||||
|
||||
# test coroutine object
|
||||
gen = notmuch()
|
||||
if PY35:
|
||||
coro_qualname = 'TaskTests.test_task_repr.<locals>.notmuch'
|
||||
else:
|
||||
coro_qualname = 'notmuch'
|
||||
self.assertEqual(gen.__name__, 'notmuch')
|
||||
if PY35:
|
||||
self.assertEqual(gen.__qualname__,
|
||||
'TaskTests.test_task_repr.<locals>.notmuch')
|
||||
coro_qualname)
|
||||
|
||||
# test pending Task
|
||||
t = asyncio.Task(gen, loop=self.loop)
|
||||
t.add_done_callback(Dummy())
|
||||
coro = '%s() at %s' % (coro_qualname, src)
|
||||
self.assertEqual(repr(t),
|
||||
'Task(<notmuch at %s>)<PENDING, [Dummy()]>' % src)
|
||||
'<Task pending %s cb=[<Dummy>()]>' % coro)
|
||||
|
||||
# test cancelling Task
|
||||
t.cancel() # Does not take immediate effect!
|
||||
self.assertEqual(repr(t),
|
||||
'Task(<notmuch at %s>)<CANCELLING, [Dummy()]>' % src)
|
||||
'<Task cancelling %s cb=[<Dummy>()]>' % coro)
|
||||
|
||||
# test cancelled Task
|
||||
self.assertRaises(asyncio.CancelledError,
|
||||
self.loop.run_until_complete, t)
|
||||
coro = '%s() done at %s' % (coro_qualname, src)
|
||||
self.assertEqual(repr(t),
|
||||
'Task(<notmuch done at %s:%s>)<CANCELLED>'
|
||||
% (filename, lineno))
|
||||
'<Task cancelled %s>' % coro)
|
||||
|
||||
# test finished Task
|
||||
t = asyncio.Task(notmuch(), loop=self.loop)
|
||||
self.loop.run_until_complete(t)
|
||||
self.assertEqual(repr(t),
|
||||
"Task(<notmuch done at %s:%s>)<result='abc'>"
|
||||
% (filename, lineno))
|
||||
"<Task finished %s result='abc'>" % coro)
|
||||
|
||||
def test_task_repr_custom(self):
|
||||
def test_task_repr_coro_decorator(self):
|
||||
@asyncio.coroutine
|
||||
def notmuch():
|
||||
pass
|
||||
# notmuch() function doesn't use yield from: it will be wrapped by
|
||||
# @coroutine decorator
|
||||
return 123
|
||||
|
||||
# test coroutine function
|
||||
self.assertEqual(notmuch.__name__, 'notmuch')
|
||||
self.assertEqual(notmuch.__module__, __name__)
|
||||
if PY35:
|
||||
self.assertEqual(notmuch.__qualname__,
|
||||
'TaskTests.test_task_repr_custom.<locals>.notmuch')
|
||||
|
||||
class T(asyncio.Future):
|
||||
def __repr__(self):
|
||||
return 'T[]'
|
||||
|
||||
class MyTask(asyncio.Task, T):
|
||||
def __repr__(self):
|
||||
return super().__repr__()
|
||||
'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch')
|
||||
self.assertEqual(notmuch.__module__, __name__)
|
||||
|
||||
# test coroutine object
|
||||
gen = notmuch()
|
||||
if PY35 or tasks._DEBUG:
|
||||
if PY35:
|
||||
# On Python >= 3.5, generators now inherit the name of the
|
||||
# function, as expected, and have a qualified name (__qualname__
|
||||
# attribute). In debug mode, @coroutine decorator uses CoroWrapper
|
||||
# which gets its name (__name__ attribute) from the wrapped
|
||||
# coroutine function.
|
||||
# attribute).
|
||||
coro_name = 'notmuch'
|
||||
coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch'
|
||||
elif tasks._DEBUG:
|
||||
# In debug mode, @coroutine decorator uses CoroWrapper which gets
|
||||
# its name (__name__ attribute) from the wrapped coroutine
|
||||
# function.
|
||||
coro_name = coro_qualname = 'notmuch'
|
||||
else:
|
||||
# On Python < 3.5, generators inherit the name of the code, not of
|
||||
# the function. See: http://bugs.python.org/issue21205
|
||||
coro_name = 'coro'
|
||||
coro_name = coro_qualname = 'coro'
|
||||
self.assertEqual(gen.__name__, coro_name)
|
||||
if PY35:
|
||||
self.assertEqual(gen.__qualname__,
|
||||
'TaskTests.test_task_repr_custom.<locals>.notmuch')
|
||||
self.assertEqual(gen.__qualname__, coro_qualname)
|
||||
|
||||
t = MyTask(gen, loop=self.loop)
|
||||
filename = gen.gi_code.co_filename
|
||||
lineno = gen.gi_frame.f_lineno
|
||||
self.assertEqual(repr(t), 'T[](<%s at %s:%s>)' % (coro_name, filename, lineno))
|
||||
# format the coroutine object
|
||||
code = gen.gi_code
|
||||
coro = ('%s() at %s:%s'
|
||||
% (coro_qualname, code.co_filename, code.co_firstlineno))
|
||||
|
||||
# test pending Task
|
||||
t = asyncio.Task(gen, loop=self.loop)
|
||||
t.add_done_callback(Dummy())
|
||||
self.assertEqual(repr(t),
|
||||
'<Task pending %s cb=[<Dummy>()]>' % coro)
|
||||
|
||||
def test_task_basics(self):
|
||||
@asyncio.coroutine
|
||||
|
|
Loading…
Reference in New Issue