2015-05-13 15:10:38 -03:00
|
|
|
"""Tests support for new syntax introduced by PEP 492."""
|
|
|
|
|
2015-05-13 16:34:12 -03:00
|
|
|
import collections.abc
|
2015-05-13 15:10:38 -03:00
|
|
|
import unittest
|
2015-05-13 16:34:12 -03:00
|
|
|
|
|
|
|
from test import support
|
2015-05-13 15:10:38 -03:00
|
|
|
from unittest import mock
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
from asyncio import test_utils
|
|
|
|
|
|
|
|
|
|
|
|
class BaseTest(test_utils.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.loop = asyncio.BaseEventLoop()
|
|
|
|
self.loop._process_events = mock.Mock()
|
|
|
|
self.loop._selector = mock.Mock()
|
|
|
|
self.loop._selector.select.return_value = ()
|
|
|
|
self.set_event_loop(self.loop)
|
|
|
|
|
|
|
|
|
|
|
|
class LockTests(BaseTest):
|
|
|
|
|
|
|
|
def test_context_manager_async_with(self):
|
|
|
|
primitives = [
|
|
|
|
asyncio.Lock(loop=self.loop),
|
|
|
|
asyncio.Condition(loop=self.loop),
|
|
|
|
asyncio.Semaphore(loop=self.loop),
|
|
|
|
asyncio.BoundedSemaphore(loop=self.loop),
|
|
|
|
]
|
|
|
|
|
|
|
|
async def test(lock):
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
async with lock as _lock:
|
|
|
|
self.assertIs(_lock, None)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
|
|
|
|
for primitive in primitives:
|
|
|
|
self.loop.run_until_complete(test(primitive))
|
|
|
|
self.assertFalse(primitive.locked())
|
|
|
|
|
|
|
|
def test_context_manager_with_await(self):
|
|
|
|
primitives = [
|
|
|
|
asyncio.Lock(loop=self.loop),
|
|
|
|
asyncio.Condition(loop=self.loop),
|
|
|
|
asyncio.Semaphore(loop=self.loop),
|
|
|
|
asyncio.BoundedSemaphore(loop=self.loop),
|
|
|
|
]
|
|
|
|
|
|
|
|
async def test(lock):
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
with await lock as _lock:
|
|
|
|
self.assertIs(_lock, None)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
await asyncio.sleep(0.01, loop=self.loop)
|
|
|
|
self.assertTrue(lock.locked())
|
|
|
|
self.assertFalse(lock.locked())
|
|
|
|
|
|
|
|
for primitive in primitives:
|
|
|
|
self.loop.run_until_complete(test(primitive))
|
|
|
|
self.assertFalse(primitive.locked())
|
|
|
|
|
|
|
|
|
2015-05-13 15:23:29 -03:00
|
|
|
class StreamReaderTests(BaseTest):
|
|
|
|
|
|
|
|
def test_readline(self):
|
|
|
|
DATA = b'line1\nline2\nline3'
|
|
|
|
|
|
|
|
stream = asyncio.StreamReader(loop=self.loop)
|
|
|
|
stream.feed_data(DATA)
|
|
|
|
stream.feed_eof()
|
|
|
|
|
|
|
|
async def reader():
|
|
|
|
data = []
|
|
|
|
async for line in stream:
|
|
|
|
data.append(line)
|
|
|
|
return data
|
|
|
|
|
|
|
|
data = self.loop.run_until_complete(reader())
|
|
|
|
self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
|
|
|
|
|
|
|
|
|
2015-05-13 16:34:12 -03:00
|
|
|
class CoroutineTests(BaseTest):
|
|
|
|
|
|
|
|
def test_iscoroutine(self):
|
|
|
|
async def foo(): pass
|
|
|
|
|
|
|
|
f = foo()
|
|
|
|
try:
|
|
|
|
self.assertTrue(asyncio.iscoroutine(f))
|
|
|
|
finally:
|
|
|
|
f.close() # silence warning
|
|
|
|
|
2015-05-29 10:01:29 -03:00
|
|
|
# Test that asyncio.iscoroutine() uses collections.abc.Coroutine
|
|
|
|
class FakeCoro:
|
2015-05-13 16:34:12 -03:00
|
|
|
def send(self, value): pass
|
|
|
|
def throw(self, typ, val=None, tb=None): pass
|
2015-05-29 10:01:29 -03:00
|
|
|
def close(self): pass
|
|
|
|
def __await__(self): yield
|
2015-05-13 16:34:12 -03:00
|
|
|
|
2015-05-29 10:01:29 -03:00
|
|
|
self.assertTrue(asyncio.iscoroutine(FakeCoro()))
|
2015-05-13 16:34:12 -03:00
|
|
|
|
2015-05-30 22:04:37 -03:00
|
|
|
def test_function_returning_awaitable(self):
|
|
|
|
class Awaitable:
|
|
|
|
def __await__(self):
|
|
|
|
return ('spam',)
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def func():
|
|
|
|
return Awaitable()
|
|
|
|
|
|
|
|
coro = func()
|
2015-05-31 12:28:35 -03:00
|
|
|
self.assertEqual(coro.send(None), 'spam')
|
2015-05-30 22:04:37 -03:00
|
|
|
coro.close()
|
|
|
|
|
2015-05-13 16:34:12 -03:00
|
|
|
|
2015-05-13 15:10:38 -03:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|