2015-05-13 15:10:38 -03:00
|
|
|
"""Tests support for new syntax introduced by PEP 492."""
|
|
|
|
|
2015-05-13 16:34:12 -03:00
|
|
|
import collections.abc
|
2015-06-24 12:44:51 -03:00
|
|
|
import types
|
2015-05-13 15:10:38 -03:00
|
|
|
import unittest
|
2015-05-13 16:34:12 -03:00
|
|
|
|
|
|
|
from test import support
|
2015-05-13 15:10:38 -03:00
|
|
|
from unittest import mock
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
from asyncio import test_utils
|
|
|
|
|
|
|
|
|
|
|
|
class BaseTest(test_utils.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.loop = asyncio.BaseEventLoop()
|
|
|
|
self.loop._process_events = mock.Mock()
|
|
|
|
self.loop._selector = mock.Mock()
|
|
|
|
self.loop._selector.select.return_value = ()
|
|
|
|
self.set_event_loop(self.loop)
|
|
|
|
|
|
|
|
|
|
|
|
class LockTests(BaseTest):
|
|
|
|
|
|
|
|
def test_context_manager_async_with(self):
|
|
|
|
primitives = [
|
|
|
|
asyncio.Lock(loop=self.loop),
|
|
|
|
asyncio.Condition(loop=self.loop),
|
|
|
|
asyncio.Semaphore(loop=self.loop),
|
|
|
|
asyncio.BoundedSemaphore(loop=self.loop),
|
|
|
|
]
|
|
|
|
|
|
|
|
async def test(lock):
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
async with lock as _lock:
|
|
|
|
self.assertIs(_lock, None)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
|
|
|
|
for primitive in primitives:
|
|
|
|
self.loop.run_until_complete(test(primitive))
|
|
|
|
self.assertFalse(primitive.locked())
|
|
|
|
|
|
|
|
def test_context_manager_with_await(self):
|
|
|
|
primitives = [
|
|
|
|
asyncio.Lock(loop=self.loop),
|
|
|
|
asyncio.Condition(loop=self.loop),
|
|
|
|
asyncio.Semaphore(loop=self.loop),
|
|
|
|
asyncio.BoundedSemaphore(loop=self.loop),
|
|
|
|
]
|
|
|
|
|
|
|
|
async def test(lock):
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
with await lock as _lock:
|
|
|
|
self.assertIs(_lock, None)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
|
|
|
|
for primitive in primitives:
|
|
|
|
self.loop.run_until_complete(test(primitive))
|
|
|
|
self.assertFalse(primitive.locked())
|
|
|
|
|
|
|
|
|
2015-05-13 15:23:29 -03:00
|
|
|
class StreamReaderTests(BaseTest):
|
|
|
|
|
|
|
|
def test_readline(self):
|
|
|
|
DATA = b'line1\nline2\nline3'
|
|
|
|
|
|
|
|
stream = asyncio.StreamReader(loop=self.loop)
|
|
|
|
stream.feed_data(DATA)
|
|
|
|
stream.feed_eof()
|
|
|
|
|
|
|
|
async def reader():
|
|
|
|
data = []
|
|
|
|
async for line in stream:
|
|
|
|
data.append(line)
|
|
|
|
return data
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete(reader())
|
|
|
|
self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
|
|
|
|
|
|
|
|
|
2015-05-13 16:34:12 -03:00
|
|
|
class CoroutineTests(BaseTest):
|
|
|
|
|
|
|
|
def test_iscoroutine(self):
|
|
|
|
async def foo(): pass
|
|
|
|
|
|
|
|
f = foo()
|
|
|
|
try:
|
|
|
|
self.assertTrue(asyncio.iscoroutine(f))
|
|
|
|
finally:
|
|
|
|
f.close() # silence warning
|
|
|
|
|
2015-05-29 10:01:29 -03:00
|
|
|
# Test that asyncio.iscoroutine() uses collections.abc.Coroutine
|
|
|
|
class FakeCoro:
|
2015-05-13 16:34:12 -03:00
|
|
|
def send(self, value): pass
|
|
|
|
def throw(self, typ, val=None, tb=None): pass
|
2015-05-29 10:01:29 -03:00
|
|
|
def close(self): pass
|
|
|
|
def __await__(self): yield
|
2015-05-13 16:34:12 -03:00
|
|
|
|
2015-05-29 10:01:29 -03:00
|
|
|
self.assertTrue(asyncio.iscoroutine(FakeCoro()))
|
2015-05-13 16:34:12 -03:00
|
|
|
|
2015-08-05 15:47:33 -03:00
|
|
|
def test_iscoroutinefunction(self):
|
|
|
|
async def foo(): pass
|
|
|
|
self.assertTrue(asyncio.iscoroutinefunction(foo))
|
|
|
|
|
2015-05-30 22:04:37 -03:00
|
|
|
def test_function_returning_awaitable(self):
|
|
|
|
class Awaitable:
|
|
|
|
def __await__(self):
|
|
|
|
return ('spam',)
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def func():
|
|
|
|
return Awaitable()
|
|
|
|
|
|
|
|
coro = func()
|
2015-05-31 12:28:35 -03:00
|
|
|
self.assertEqual(coro.send(None), 'spam')
|
2015-05-30 22:04:37 -03:00
|
|
|
coro.close()
|
|
|
|
|
2015-06-24 11:32:22 -03:00
|
|
|
def test_async_def_coroutines(self):
|
2015-05-31 22:44:05 -03:00
|
|
|
async def bar():
|
|
|
|
return 'spam'
|
|
|
|
async def foo():
|
|
|
|
return await bar()
|
|
|
|
|
|
|
|
# production mode
|
|
|
|
data = self.loop.run_until_complete(foo())
|
|
|
|
self.assertEqual(data, 'spam')
|
|
|
|
|
|
|
|
# debug mode
|
|
|
|
self.loop.set_debug(True)
|
|
|
|
data = self.loop.run_until_complete(foo())
|
|
|
|
self.assertEqual(data, 'spam')
|
2015-05-13 16:34:12 -03:00
|
|
|
|
2015-06-24 11:32:22 -03:00
|
|
|
@mock.patch('asyncio.coroutines.logger')
|
|
|
|
def test_async_def_wrapped(self, m_log):
|
|
|
|
async def foo():
|
|
|
|
pass
|
|
|
|
async def start():
|
|
|
|
foo_coro = foo()
|
|
|
|
self.assertRegex(
|
|
|
|
repr(foo_coro),
|
2015-06-24 11:47:44 -03:00
|
|
|
r'<CoroWrapper .*\.foo\(\) running at .*pep492.*>')
|
2015-06-24 11:32:22 -03:00
|
|
|
|
|
|
|
with support.check_warnings((r'.*foo.*was never',
|
|
|
|
RuntimeWarning)):
|
|
|
|
foo_coro = None
|
|
|
|
support.gc_collect()
|
|
|
|
self.assertTrue(m_log.error.called)
|
|
|
|
message = m_log.error.call_args[0][0]
|
|
|
|
self.assertRegex(message,
|
|
|
|
r'CoroWrapper.*foo.*was never')
|
|
|
|
|
|
|
|
self.loop.set_debug(True)
|
|
|
|
self.loop.run_until_complete(start())
|
|
|
|
|
2015-06-24 11:55:12 -03:00
|
|
|
async def start():
|
|
|
|
foo_coro = foo()
|
|
|
|
task = asyncio.ensure_future(foo_coro, loop=self.loop)
|
|
|
|
self.assertRegex(repr(task), r'Task.*foo.*running')
|
|
|
|
|
|
|
|
self.loop.run_until_complete(start())
|
|
|
|
|
2015-06-24 11:32:22 -03:00
|
|
|
|
2015-06-24 12:44:51 -03:00
|
|
|
def test_types_coroutine(self):
|
|
|
|
def gen():
|
|
|
|
yield from ()
|
|
|
|
return 'spam'
|
|
|
|
|
|
|
|
@types.coroutine
|
|
|
|
def func():
|
|
|
|
return gen()
|
|
|
|
|
|
|
|
async def coro():
|
|
|
|
wrapper = func()
|
|
|
|
self.assertIsInstance(wrapper, types._GeneratorWrapper)
|
|
|
|
return await wrapper
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete(coro())
|
|
|
|
self.assertEqual(data, 'spam')
|
|
|
|
|
2015-08-14 16:32:37 -03:00
|
|
|
def test_task_print_stack(self):
|
|
|
|
T = None
|
|
|
|
|
|
|
|
async def foo():
|
|
|
|
f = T.get_stack(limit=1)
|
|
|
|
try:
|
|
|
|
self.assertEqual(f[0].f_code.co_name, 'foo')
|
|
|
|
finally:
|
|
|
|
f = None
|
|
|
|
|
|
|
|
async def runner():
|
|
|
|
nonlocal T
|
|
|
|
T = asyncio.ensure_future(foo(), loop=self.loop)
|
|
|
|
await T
|
|
|
|
|
|
|
|
self.loop.run_until_complete(runner())
|
|
|
|
|
2015-06-24 12:44:51 -03:00
|
|
|
|
2015-05-13 15:10:38 -03:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|