bpo-42392: Remove loop parameter form asyncio locks and Queue (#23420)

Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
This commit is contained in:
Yurii Karabas 2020-11-24 20:08:54 +02:00 committed by GitHub
parent b0b428510c
commit 0ec34cab9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 304 additions and 451 deletions

View File

@ -3,10 +3,9 @@
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore') __all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
import collections import collections
import warnings
from . import events
from . import exceptions from . import exceptions
from . import mixins
class _ContextManagerMixin: class _ContextManagerMixin:
@ -20,7 +19,7 @@ class _ContextManagerMixin:
self.release() self.release()
class Lock(_ContextManagerMixin): class Lock(_ContextManagerMixin, mixins._LoopBoundedMixin):
"""Primitive lock objects. """Primitive lock objects.
A primitive lock is a synchronization primitive that is not owned A primitive lock is a synchronization primitive that is not owned
@ -74,16 +73,9 @@ class Lock(_ContextManagerMixin):
""" """
def __init__(self, *, loop=None): def __init__(self):
self._waiters = None self._waiters = None
self._locked = False self._locked = False
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
def __repr__(self): def __repr__(self):
res = super().__repr__() res = super().__repr__()
@ -109,7 +101,7 @@ class Lock(_ContextManagerMixin):
if self._waiters is None: if self._waiters is None:
self._waiters = collections.deque() self._waiters = collections.deque()
fut = self._loop.create_future() fut = self._get_loop().create_future()
self._waiters.append(fut) self._waiters.append(fut)
# Finally block should be called before the CancelledError # Finally block should be called before the CancelledError
@ -161,7 +153,7 @@ class Lock(_ContextManagerMixin):
fut.set_result(True) fut.set_result(True)
class Event: class Event(mixins._LoopBoundedMixin):
"""Asynchronous equivalent to threading.Event. """Asynchronous equivalent to threading.Event.
Class implementing event objects. An event manages a flag that can be set Class implementing event objects. An event manages a flag that can be set
@ -170,16 +162,9 @@ class Event:
false. false.
""" """
def __init__(self, *, loop=None): def __init__(self):
self._waiters = collections.deque() self._waiters = collections.deque()
self._value = False self._value = False
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
def __repr__(self): def __repr__(self):
res = super().__repr__() res = super().__repr__()
@ -220,7 +205,7 @@ class Event:
if self._value: if self._value:
return True return True
fut = self._loop.create_future() fut = self._get_loop().create_future()
self._waiters.append(fut) self._waiters.append(fut)
try: try:
await fut await fut
@ -229,7 +214,7 @@ class Event:
self._waiters.remove(fut) self._waiters.remove(fut)
class Condition(_ContextManagerMixin): class Condition(_ContextManagerMixin, mixins._LoopBoundedMixin):
"""Asynchronous equivalent to threading.Condition. """Asynchronous equivalent to threading.Condition.
This class implements condition variable objects. A condition variable This class implements condition variable objects. A condition variable
@ -239,18 +224,10 @@ class Condition(_ContextManagerMixin):
A new Lock object is created and used as the underlying lock. A new Lock object is created and used as the underlying lock.
""" """
def __init__(self, lock=None, *, loop=None): def __init__(self, lock=None):
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if lock is None: if lock is None:
lock = Lock(loop=loop) lock = Lock()
elif lock._loop is not self._loop: elif lock._loop is not self._get_loop():
raise ValueError("loop argument must agree with lock") raise ValueError("loop argument must agree with lock")
self._lock = lock self._lock = lock
@ -284,7 +261,7 @@ class Condition(_ContextManagerMixin):
self.release() self.release()
try: try:
fut = self._loop.create_future() fut = self._get_loop().create_future()
self._waiters.append(fut) self._waiters.append(fut)
try: try:
await fut await fut
@ -351,7 +328,7 @@ class Condition(_ContextManagerMixin):
self.notify(len(self._waiters)) self.notify(len(self._waiters))
class Semaphore(_ContextManagerMixin): class Semaphore(_ContextManagerMixin, mixins._LoopBoundedMixin):
"""A Semaphore implementation. """A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each A semaphore manages an internal counter which is decremented by each
@ -366,18 +343,11 @@ class Semaphore(_ContextManagerMixin):
ValueError is raised. ValueError is raised.
""" """
def __init__(self, value=1, *, loop=None): def __init__(self, value=1):
if value < 0: if value < 0:
raise ValueError("Semaphore initial value must be >= 0") raise ValueError("Semaphore initial value must be >= 0")
self._value = value self._value = value
self._waiters = collections.deque() self._waiters = collections.deque()
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
def __repr__(self): def __repr__(self):
res = super().__repr__() res = super().__repr__()
@ -407,7 +377,7 @@ class Semaphore(_ContextManagerMixin):
True. True.
""" """
while self._value <= 0: while self._value <= 0:
fut = self._loop.create_future() fut = self._get_loop().create_future()
self._waiters.append(fut) self._waiters.append(fut)
try: try:
await fut await fut
@ -436,14 +406,9 @@ class BoundedSemaphore(Semaphore):
above the initial value. above the initial value.
""" """
def __init__(self, value=1, *, loop=None): def __init__(self, value=1):
if loop:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
self._bound_value = value self._bound_value = value
super().__init__(value, loop=loop) super().__init__(value)
def release(self): def release(self):
if self._value >= self._bound_value: if self._value >= self._bound_value:

21
Lib/asyncio/mixins.py Normal file
View File

@ -0,0 +1,21 @@
"""Event loop mixins."""
import threading
from . import events
_global_lock = threading.Lock()
class _LoopBoundedMixin:
_loop = None
def _get_loop(self):
loop = events._get_running_loop()
if self._loop is None:
with _global_lock:
if self._loop is None:
self._loop = loop
if loop is not self._loop:
raise RuntimeError(f'{type(self).__name__} have already bounded to another loop')
return loop

View File

@ -2,10 +2,9 @@ __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
import collections import collections
import heapq import heapq
import warnings
from . import events
from . import locks from . import locks
from . import mixins
class QueueEmpty(Exception): class QueueEmpty(Exception):
@ -18,7 +17,7 @@ class QueueFull(Exception):
pass pass
class Queue: class Queue(mixins._LoopBoundedMixin):
"""A queue, useful for coordinating producer and consumer coroutines. """A queue, useful for coordinating producer and consumer coroutines.
If maxsize is less than or equal to zero, the queue size is infinite. If it If maxsize is less than or equal to zero, the queue size is infinite. If it
@ -30,14 +29,7 @@ class Queue:
interrupted between calling qsize() and doing an operation on the Queue. interrupted between calling qsize() and doing an operation on the Queue.
""" """
def __init__(self, maxsize=0, *, loop=None): def __init__(self, maxsize=0):
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
self._maxsize = maxsize self._maxsize = maxsize
# Futures. # Futures.
@ -45,7 +37,7 @@ class Queue:
# Futures. # Futures.
self._putters = collections.deque() self._putters = collections.deque()
self._unfinished_tasks = 0 self._unfinished_tasks = 0
self._finished = locks.Event(loop=loop) self._finished = locks.Event()
self._finished.set() self._finished.set()
self._init(maxsize) self._init(maxsize)
@ -122,7 +114,7 @@ class Queue:
slot is available before adding item. slot is available before adding item.
""" """
while self.full(): while self.full():
putter = self._loop.create_future() putter = self._get_loop().create_future()
self._putters.append(putter) self._putters.append(putter)
try: try:
await putter await putter
@ -160,7 +152,7 @@ class Queue:
If queue is empty, wait until an item is available. If queue is empty, wait until an item is available.
""" """
while self.empty(): while self.empty():
getter = self._loop.create_future() getter = self._get_loop().create_future()
self._getters.append(getter) self._getters.append(getter)
try: try:
await getter await getter

View File

@ -578,7 +578,7 @@ def as_completed(fs, *, loop=None, timeout=None):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
from .queues import Queue # Import here to avoid circular import problem. from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop) done = Queue()
if loop is None: if loop is None:
loop = events.get_event_loop() loop = events.get_event_loop()

View File

@ -206,8 +206,8 @@ class MySubprocessProtocol(asyncio.SubprocessProtocol):
self.disconnects = {fd: loop.create_future() for fd in range(3)} self.disconnects = {fd: loop.create_future() for fd in range(3)}
self.data = {1: b'', 2: b''} self.data = {1: b'', 2: b''}
self.returncode = None self.returncode = None
self.got_data = {1: asyncio.Event(loop=loop), self.got_data = {1: asyncio.Event(),
2: asyncio.Event(loop=loop)} 2: asyncio.Event()}
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
@ -1740,20 +1740,20 @@ class SubprocessTestsMixin:
connect = self.loop.subprocess_exec( connect = self.loop.subprocess_exec(
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning):
transp, proto = self.loop.run_until_complete(connect)
self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
stdin = transp.get_pipe_transport(0) transp, proto = self.loop.run_until_complete(connect)
stdin.write(b'Python The Winner') self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.got_data[1].wait()) self.loop.run_until_complete(proto.connected)
with test_utils.disable_logger(): self.assertEqual('CONNECTED', proto.state)
transp.close()
self.loop.run_until_complete(proto.completed) stdin = transp.get_pipe_transport(0)
self.check_killed(proto.returncode) stdin.write(b'Python The Winner')
self.assertEqual(b'Python The Winner', proto.data[1]) self.loop.run_until_complete(proto.got_data[1].wait())
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_killed(proto.returncode)
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self): def test_subprocess_interactive(self):
prog = os.path.join(os.path.dirname(__file__), 'echo.py') prog = os.path.join(os.path.dirname(__file__), 'echo.py')
@ -1762,51 +1762,48 @@ class SubprocessTestsMixin:
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning): transp, proto = self.loop.run_until_complete(connect)
transp, proto = self.loop.run_until_complete(connect) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsInstance(proto, MySubprocessProtocol) self.loop.run_until_complete(proto.connected)
self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state)
self.assertEqual('CONNECTED', proto.state)
stdin = transp.get_pipe_transport(0) stdin = transp.get_pipe_transport(0)
stdin.write(b'Python ') stdin.write(b'Python ')
self.loop.run_until_complete(proto.got_data[1].wait()) self.loop.run_until_complete(proto.got_data[1].wait())
proto.got_data[1].clear() proto.got_data[1].clear()
self.assertEqual(b'Python ', proto.data[1]) self.assertEqual(b'Python ', proto.data[1])
stdin.write(b'The Winner') stdin.write(b'The Winner')
self.loop.run_until_complete(proto.got_data[1].wait()) self.loop.run_until_complete(proto.got_data[1].wait())
self.assertEqual(b'Python The Winner', proto.data[1]) self.assertEqual(b'Python The Winner', proto.data[1])
with test_utils.disable_logger(): with test_utils.disable_logger():
transp.close() transp.close()
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.check_killed(proto.returncode) self.check_killed(proto.returncode)
def test_subprocess_shell(self): def test_subprocess_shell(self):
with self.assertWarns(DeprecationWarning): connect = self.loop.subprocess_shell(
connect = self.loop.subprocess_shell( functools.partial(MySubprocessProtocol, self.loop),
functools.partial(MySubprocessProtocol, self.loop), 'echo Python')
'echo Python') transp, proto = self.loop.run_until_complete(connect)
transp, proto = self.loop.run_until_complete(connect) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsInstance(proto, MySubprocessProtocol) self.loop.run_until_complete(proto.connected)
self.loop.run_until_complete(proto.connected)
transp.get_pipe_transport(0).close() transp.get_pipe_transport(0).close()
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.assertEqual(0, proto.returncode) self.assertEqual(0, proto.returncode)
self.assertTrue(all(f.done() for f in proto.disconnects.values())) self.assertTrue(all(f.done() for f in proto.disconnects.values()))
self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
self.assertEqual(proto.data[2], b'') self.assertEqual(proto.data[2], b'')
transp.close() transp.close()
def test_subprocess_exitcode(self): def test_subprocess_exitcode(self):
connect = self.loop.subprocess_shell( connect = self.loop.subprocess_shell(
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
'exit 7', stdin=None, stdout=None, stderr=None) 'exit 7', stdin=None, stdout=None, stderr=None)
with self.assertWarns(DeprecationWarning): transp, proto = self.loop.run_until_complete(connect)
transp, proto = self.loop.run_until_complete(connect)
self.assertIsInstance(proto, MySubprocessProtocol) self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.assertEqual(7, proto.returncode) self.assertEqual(7, proto.returncode)
@ -1816,8 +1813,8 @@ class SubprocessTestsMixin:
connect = self.loop.subprocess_shell( connect = self.loop.subprocess_shell(
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
'exit 7', stdin=None, stdout=None, stderr=None) 'exit 7', stdin=None, stdout=None, stderr=None)
with self.assertWarns(DeprecationWarning):
transp, proto = self.loop.run_until_complete(connect) transp, proto = self.loop.run_until_complete(connect)
self.assertIsInstance(proto, MySubprocessProtocol) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsNone(transp.get_pipe_transport(0)) self.assertIsNone(transp.get_pipe_transport(0))
self.assertIsNone(transp.get_pipe_transport(1)) self.assertIsNone(transp.get_pipe_transport(1))
@ -1833,15 +1830,14 @@ class SubprocessTestsMixin:
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning): transp, proto = self.loop.run_until_complete(connect)
transp, proto = self.loop.run_until_complete(connect) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsInstance(proto, MySubprocessProtocol) self.loop.run_until_complete(proto.connected)
self.loop.run_until_complete(proto.connected)
transp.kill() transp.kill()
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.check_killed(proto.returncode) self.check_killed(proto.returncode)
transp.close() transp.close()
def test_subprocess_terminate(self): def test_subprocess_terminate(self):
prog = os.path.join(os.path.dirname(__file__), 'echo.py') prog = os.path.join(os.path.dirname(__file__), 'echo.py')
@ -1850,15 +1846,14 @@ class SubprocessTestsMixin:
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning): transp, proto = self.loop.run_until_complete(connect)
transp, proto = self.loop.run_until_complete(connect) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsInstance(proto, MySubprocessProtocol) self.loop.run_until_complete(proto.connected)
self.loop.run_until_complete(proto.connected)
transp.terminate() transp.terminate()
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode) self.check_terminated(proto.returncode)
transp.close() transp.close()
@unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
def test_subprocess_send_signal(self): def test_subprocess_send_signal(self):
@ -1873,15 +1868,15 @@ class SubprocessTestsMixin:
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning):
transp, proto = self.loop.run_until_complete(connect)
self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
transp.send_signal(signal.SIGHUP) transp, proto = self.loop.run_until_complete(connect)
self.loop.run_until_complete(proto.completed) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertEqual(-signal.SIGHUP, proto.returncode) self.loop.run_until_complete(proto.connected)
transp.close()
transp.send_signal(signal.SIGHUP)
self.loop.run_until_complete(proto.completed)
self.assertEqual(-signal.SIGHUP, proto.returncode)
transp.close()
finally: finally:
signal.signal(signal.SIGHUP, old_handler) signal.signal(signal.SIGHUP, old_handler)
@ -1892,20 +1887,19 @@ class SubprocessTestsMixin:
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning): transp, proto = self.loop.run_until_complete(connect)
transp, proto = self.loop.run_until_complete(connect) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsInstance(proto, MySubprocessProtocol) self.loop.run_until_complete(proto.connected)
self.loop.run_until_complete(proto.connected)
stdin = transp.get_pipe_transport(0) stdin = transp.get_pipe_transport(0)
stdin.write(b'test') stdin.write(b'test')
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
transp.close() transp.close()
self.assertEqual(b'OUT:test', proto.data[1]) self.assertEqual(b'OUT:test', proto.data[1])
self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
self.assertEqual(0, proto.returncode) self.assertEqual(0, proto.returncode)
def test_subprocess_stderr_redirect_to_stdout(self): def test_subprocess_stderr_redirect_to_stdout(self):
prog = os.path.join(os.path.dirname(__file__), 'echo2.py') prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
@ -1914,23 +1908,23 @@ class SubprocessTestsMixin:
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog, stderr=subprocess.STDOUT) sys.executable, prog, stderr=subprocess.STDOUT)
with self.assertWarns(DeprecationWarning):
transp, proto = self.loop.run_until_complete(connect)
self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
stdin = transp.get_pipe_transport(0) transp, proto = self.loop.run_until_complete(connect)
self.assertIsNotNone(transp.get_pipe_transport(1)) self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsNone(transp.get_pipe_transport(2)) self.loop.run_until_complete(proto.connected)
stdin.write(b'test') stdin = transp.get_pipe_transport(0)
self.loop.run_until_complete(proto.completed) self.assertIsNotNone(transp.get_pipe_transport(1))
self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), self.assertIsNone(transp.get_pipe_transport(2))
proto.data[1])
self.assertEqual(b'', proto.data[2])
transp.close() stdin.write(b'test')
self.assertEqual(0, proto.returncode) self.loop.run_until_complete(proto.completed)
self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
proto.data[1])
self.assertEqual(b'', proto.data[2])
transp.close()
self.assertEqual(0, proto.returncode)
def test_subprocess_close_client_stream(self): def test_subprocess_close_client_stream(self):
prog = os.path.join(os.path.dirname(__file__), 'echo3.py') prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
@ -1938,33 +1932,33 @@ class SubprocessTestsMixin:
connect = self.loop.subprocess_exec( connect = self.loop.subprocess_exec(
functools.partial(MySubprocessProtocol, self.loop), functools.partial(MySubprocessProtocol, self.loop),
sys.executable, prog) sys.executable, prog)
with self.assertWarns(DeprecationWarning):
transp, proto = self.loop.run_until_complete(connect)
self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
stdin = transp.get_pipe_transport(0) transp, proto = self.loop.run_until_complete(connect)
stdout = transp.get_pipe_transport(1) self.assertIsInstance(proto, MySubprocessProtocol)
stdin.write(b'test') self.loop.run_until_complete(proto.connected)
self.loop.run_until_complete(proto.got_data[1].wait())
self.assertEqual(b'OUT:test', proto.data[1])
stdout.close() stdin = transp.get_pipe_transport(0)
self.loop.run_until_complete(proto.disconnects[1]) stdout = transp.get_pipe_transport(1)
stdin.write(b'xxx') stdin.write(b'test')
self.loop.run_until_complete(proto.got_data[2].wait()) self.loop.run_until_complete(proto.got_data[1].wait())
if sys.platform != 'win32': self.assertEqual(b'OUT:test', proto.data[1])
self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
else: stdout.close()
# After closing the read-end of a pipe, writing to the self.loop.run_until_complete(proto.disconnects[1])
# write-end using os.write() fails with errno==EINVAL and stdin.write(b'xxx')
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using self.loop.run_until_complete(proto.got_data[2].wait())
# WriteFile() we get ERROR_BROKEN_PIPE as expected.) if sys.platform != 'win32':
self.assertEqual(b'ERR:OSError', proto.data[2]) self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
with test_utils.disable_logger(): else:
transp.close() # After closing the read-end of a pipe, writing to the
self.loop.run_until_complete(proto.completed) # write-end using os.write() fails with errno==EINVAL and
self.check_killed(proto.returncode) # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
self.assertEqual(b'ERR:OSError', proto.data[2])
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_killed(proto.returncode)
def test_subprocess_wait_no_same_group(self): def test_subprocess_wait_no_same_group(self):
# start the new process in a new session # start the new process in a new session

View File

@ -26,24 +26,8 @@ class LockTests(test_utils.TestCase):
super().setUp() super().setUp()
self.loop = self.new_test_loop() self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
with self.assertWarns(DeprecationWarning):
lock = asyncio.Lock(loop=loop)
self.assertIs(lock._loop, loop)
with self.assertWarns(DeprecationWarning):
lock = asyncio.Lock(loop=self.loop)
self.assertIs(lock._loop, self.loop)
def test_ctor_noloop(self):
asyncio.set_event_loop(self.loop)
lock = asyncio.Lock()
self.assertIs(lock._loop, self.loop)
def test_repr(self): def test_repr(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
self.assertTrue(repr(lock).endswith('[unlocked]>')) self.assertTrue(repr(lock).endswith('[unlocked]>'))
self.assertTrue(RGX_REPR.match(repr(lock))) self.assertTrue(RGX_REPR.match(repr(lock)))
@ -52,9 +36,9 @@ class LockTests(test_utils.TestCase):
self.assertTrue(RGX_REPR.match(repr(lock))) self.assertTrue(RGX_REPR.match(repr(lock)))
def test_lock(self): def test_lock(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
with self.assertWarns(DeprecationWarning):
@asyncio.coroutine @asyncio.coroutine
def acquire_lock(): def acquire_lock():
return (yield from lock) return (yield from lock)
@ -70,14 +54,14 @@ class LockTests(test_utils.TestCase):
def test_lock_by_with_statement(self): def test_lock_by_with_statement(self):
loop = asyncio.new_event_loop() # don't use TestLoop quirks loop = asyncio.new_event_loop() # don't use TestLoop quirks
self.set_event_loop(loop) self.set_event_loop(loop)
with self.assertWarns(DeprecationWarning): primitives = [
primitives = [ asyncio.Lock(),
asyncio.Lock(loop=loop), asyncio.Condition(),
asyncio.Condition(loop=loop), asyncio.Semaphore(),
asyncio.Semaphore(loop=loop), asyncio.BoundedSemaphore(),
asyncio.BoundedSemaphore(loop=loop), ]
]
with self.assertWarns(DeprecationWarning):
@asyncio.coroutine @asyncio.coroutine
def test(lock): def test(lock):
yield from asyncio.sleep(0.01) yield from asyncio.sleep(0.01)
@ -95,8 +79,7 @@ class LockTests(test_utils.TestCase):
self.assertFalse(primitive.locked()) self.assertFalse(primitive.locked())
def test_acquire(self): def test_acquire(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
result = [] result = []
self.assertTrue(self.loop.run_until_complete(lock.acquire())) self.assertTrue(self.loop.run_until_complete(lock.acquire()))
@ -147,8 +130,7 @@ class LockTests(test_utils.TestCase):
self.assertTrue(t3.result()) self.assertTrue(t3.result())
def test_acquire_cancel(self): def test_acquire_cancel(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
self.assertTrue(self.loop.run_until_complete(lock.acquire())) self.assertTrue(self.loop.run_until_complete(lock.acquire()))
task = self.loop.create_task(lock.acquire()) task = self.loop.create_task(lock.acquire())
@ -173,8 +155,7 @@ class LockTests(test_utils.TestCase):
# B's waiter; instead, it should move on to C's waiter. # B's waiter; instead, it should move on to C's waiter.
# Setup: A has the lock, b and c are waiting. # Setup: A has the lock, b and c are waiting.
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
async def lockit(name, blocker): async def lockit(name, blocker):
await lock.acquire() await lock.acquire()
@ -210,8 +191,7 @@ class LockTests(test_utils.TestCase):
# Issue 32734 # Issue 32734
# Acquire 4 locks, cancel second, release first # Acquire 4 locks, cancel second, release first
# and 2 locks are taken at once. # and 2 locks are taken at once.
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
lock_count = 0 lock_count = 0
call_count = 0 call_count = 0
@ -256,8 +236,7 @@ class LockTests(test_utils.TestCase):
self.assertTrue(t3.cancelled()) self.assertTrue(t3.cancelled())
def test_finished_waiter_cancelled(self): def test_finished_waiter_cancelled(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
ta = self.loop.create_task(lock.acquire()) ta = self.loop.create_task(lock.acquire())
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
@ -279,14 +258,12 @@ class LockTests(test_utils.TestCase):
self.assertTrue(tb.cancelled()) self.assertTrue(tb.cancelled())
def test_release_not_acquired(self): def test_release_not_acquired(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
self.assertRaises(RuntimeError, lock.release) self.assertRaises(RuntimeError, lock.release)
def test_release_no_waiters(self): def test_release_no_waiters(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop)
self.loop.run_until_complete(lock.acquire()) self.loop.run_until_complete(lock.acquire())
self.assertTrue(lock.locked()) self.assertTrue(lock.locked())
@ -312,24 +289,8 @@ class EventTests(test_utils.TestCase):
super().setUp() super().setUp()
self.loop = self.new_test_loop() self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
with self.assertWarns(DeprecationWarning):
ev = asyncio.Event(loop=loop)
self.assertIs(ev._loop, loop)
with self.assertWarns(DeprecationWarning):
ev = asyncio.Event(loop=self.loop)
self.assertIs(ev._loop, self.loop)
def test_ctor_noloop(self):
asyncio.set_event_loop(self.loop)
ev = asyncio.Event()
self.assertIs(ev._loop, self.loop)
def test_repr(self): def test_repr(self):
with self.assertWarns(DeprecationWarning): ev = asyncio.Event()
ev = asyncio.Event(loop=self.loop)
self.assertTrue(repr(ev).endswith('[unset]>')) self.assertTrue(repr(ev).endswith('[unset]>'))
match = RGX_REPR.match(repr(ev)) match = RGX_REPR.match(repr(ev))
self.assertEqual(match.group('extras'), 'unset') self.assertEqual(match.group('extras'), 'unset')
@ -343,8 +304,7 @@ class EventTests(test_utils.TestCase):
self.assertTrue(RGX_REPR.match(repr(ev))) self.assertTrue(RGX_REPR.match(repr(ev)))
def test_wait(self): def test_wait(self):
with self.assertWarns(DeprecationWarning): ev = asyncio.Event()
ev = asyncio.Event(loop=self.loop)
self.assertFalse(ev.is_set()) self.assertFalse(ev.is_set())
result = [] result = []
@ -381,16 +341,14 @@ class EventTests(test_utils.TestCase):
self.assertIsNone(t3.result()) self.assertIsNone(t3.result())
def test_wait_on_set(self): def test_wait_on_set(self):
with self.assertWarns(DeprecationWarning): ev = asyncio.Event()
ev = asyncio.Event(loop=self.loop)
ev.set() ev.set()
res = self.loop.run_until_complete(ev.wait()) res = self.loop.run_until_complete(ev.wait())
self.assertTrue(res) self.assertTrue(res)
def test_wait_cancel(self): def test_wait_cancel(self):
with self.assertWarns(DeprecationWarning): ev = asyncio.Event()
ev = asyncio.Event(loop=self.loop)
wait = self.loop.create_task(ev.wait()) wait = self.loop.create_task(ev.wait())
self.loop.call_soon(wait.cancel) self.loop.call_soon(wait.cancel)
@ -400,8 +358,7 @@ class EventTests(test_utils.TestCase):
self.assertFalse(ev._waiters) self.assertFalse(ev._waiters)
def test_clear(self): def test_clear(self):
with self.assertWarns(DeprecationWarning): ev = asyncio.Event()
ev = asyncio.Event(loop=self.loop)
self.assertFalse(ev.is_set()) self.assertFalse(ev.is_set())
ev.set() ev.set()
@ -411,8 +368,7 @@ class EventTests(test_utils.TestCase):
self.assertFalse(ev.is_set()) self.assertFalse(ev.is_set())
def test_clear_with_waiters(self): def test_clear_with_waiters(self):
with self.assertWarns(DeprecationWarning): ev = asyncio.Event()
ev = asyncio.Event(loop=self.loop)
result = [] result = []
async def c1(result): async def c1(result):
@ -446,23 +402,8 @@ class ConditionTests(test_utils.TestCase):
super().setUp() super().setUp()
self.loop = self.new_test_loop() self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
with self.assertWarns(DeprecationWarning):
cond = asyncio.Condition(loop=loop)
self.assertIs(cond._loop, loop)
cond = asyncio.Condition(loop=self.loop)
self.assertIs(cond._loop, self.loop)
def test_ctor_noloop(self):
asyncio.set_event_loop(self.loop)
cond = asyncio.Condition()
self.assertIs(cond._loop, self.loop)
def test_wait(self): def test_wait(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
result = [] result = []
async def c1(result): async def c1(result):
@ -525,8 +466,7 @@ class ConditionTests(test_utils.TestCase):
self.assertTrue(t3.result()) self.assertTrue(t3.result())
def test_wait_cancel(self): def test_wait_cancel(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
self.loop.run_until_complete(cond.acquire()) self.loop.run_until_complete(cond.acquire())
wait = self.loop.create_task(cond.wait()) wait = self.loop.create_task(cond.wait())
@ -538,8 +478,7 @@ class ConditionTests(test_utils.TestCase):
self.assertTrue(cond.locked()) self.assertTrue(cond.locked())
def test_wait_cancel_contested(self): def test_wait_cancel_contested(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
self.loop.run_until_complete(cond.acquire()) self.loop.run_until_complete(cond.acquire())
self.assertTrue(cond.locked()) self.assertTrue(cond.locked())
@ -565,10 +504,11 @@ class ConditionTests(test_utils.TestCase):
def test_wait_cancel_after_notify(self): def test_wait_cancel_after_notify(self):
# See bpo-32841 # See bpo-32841
with self.assertWarns(DeprecationWarning):
cond = asyncio.Condition(loop=self.loop)
waited = False waited = False
cond = asyncio.Condition()
cond._loop = self.loop
async def wait_on_cond(): async def wait_on_cond():
nonlocal waited nonlocal waited
async with cond: async with cond:
@ -590,15 +530,13 @@ class ConditionTests(test_utils.TestCase):
self.assertTrue(waited) self.assertTrue(waited)
def test_wait_unacquired(self): def test_wait_unacquired(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
self.assertRaises( self.assertRaises(
RuntimeError, RuntimeError,
self.loop.run_until_complete, cond.wait()) self.loop.run_until_complete, cond.wait())
def test_wait_for(self): def test_wait_for(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
presult = False presult = False
def predicate(): def predicate():
@ -635,8 +573,7 @@ class ConditionTests(test_utils.TestCase):
self.assertTrue(t.result()) self.assertTrue(t.result())
def test_wait_for_unacquired(self): def test_wait_for_unacquired(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
# predicate can return true immediately # predicate can return true immediately
res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3])) res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3]))
@ -648,8 +585,7 @@ class ConditionTests(test_utils.TestCase):
cond.wait_for(lambda: False)) cond.wait_for(lambda: False))
def test_notify(self): def test_notify(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
result = [] result = []
async def c1(result): async def c1(result):
@ -701,8 +637,7 @@ class ConditionTests(test_utils.TestCase):
self.assertTrue(t3.result()) self.assertTrue(t3.result())
def test_notify_all(self): def test_notify_all(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
result = [] result = []
@ -738,18 +673,15 @@ class ConditionTests(test_utils.TestCase):
self.assertTrue(t2.result()) self.assertTrue(t2.result())
def test_notify_unacquired(self): def test_notify_unacquired(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
self.assertRaises(RuntimeError, cond.notify) self.assertRaises(RuntimeError, cond.notify)
def test_notify_all_unacquired(self): def test_notify_all_unacquired(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
self.assertRaises(RuntimeError, cond.notify_all) self.assertRaises(RuntimeError, cond.notify_all)
def test_repr(self): def test_repr(self):
with self.assertWarns(DeprecationWarning): cond = asyncio.Condition()
cond = asyncio.Condition(loop=self.loop)
self.assertTrue('unlocked' in repr(cond)) self.assertTrue('unlocked' in repr(cond))
self.assertTrue(RGX_REPR.match(repr(cond))) self.assertTrue(RGX_REPR.match(repr(cond)))
@ -775,9 +707,8 @@ class ConditionTests(test_utils.TestCase):
self.loop.run_until_complete(f()) self.loop.run_until_complete(f())
def test_explicit_lock(self): def test_explicit_lock(self):
with self.assertWarns(DeprecationWarning): lock = asyncio.Lock()
lock = asyncio.Lock(loop=self.loop) cond = asyncio.Condition(lock)
cond = asyncio.Condition(lock, loop=self.loop)
self.assertIs(cond._lock, lock) self.assertIs(cond._lock, lock)
self.assertIs(cond._loop, lock._loop) self.assertIs(cond._loop, lock._loop)
@ -785,23 +716,27 @@ class ConditionTests(test_utils.TestCase):
def test_ambiguous_loops(self): def test_ambiguous_loops(self):
loop = self.new_test_loop() loop = self.new_test_loop()
self.addCleanup(loop.close) self.addCleanup(loop.close)
with self.assertWarns(DeprecationWarning):
lock = asyncio.Lock(loop=self.loop) lock = asyncio.Lock()
lock._loop = loop
async def _create_condition():
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
asyncio.Condition(lock, loop=loop) asyncio.Condition(lock)
self.loop.run_until_complete(_create_condition())
def test_timeout_in_block(self): def test_timeout_in_block(self):
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
self.addCleanup(loop.close) self.addCleanup(loop.close)
async def task_timeout(): async def task_timeout():
condition = asyncio.Condition(loop=loop) condition = asyncio.Condition()
async with condition: async with condition:
with self.assertRaises(asyncio.TimeoutError): with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(condition.wait(), timeout=0.5) await asyncio.wait_for(condition.wait(), timeout=0.5)
with self.assertWarns(DeprecationWarning): loop.run_until_complete(task_timeout())
loop.run_until_complete(task_timeout())
class SemaphoreTests(test_utils.TestCase): class SemaphoreTests(test_utils.TestCase):
@ -810,29 +745,12 @@ class SemaphoreTests(test_utils.TestCase):
super().setUp() super().setUp()
self.loop = self.new_test_loop() self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
with self.assertWarns(DeprecationWarning):
sem = asyncio.Semaphore(loop=loop)
self.assertIs(sem._loop, loop)
with self.assertWarns(DeprecationWarning):
sem = asyncio.Semaphore(loop=self.loop)
self.assertIs(sem._loop, self.loop)
def test_ctor_noloop(self):
asyncio.set_event_loop(self.loop)
sem = asyncio.Semaphore()
self.assertIs(sem._loop, self.loop)
def test_initial_value_zero(self): def test_initial_value_zero(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore(0)
sem = asyncio.Semaphore(0, loop=self.loop)
self.assertTrue(sem.locked()) self.assertTrue(sem.locked())
def test_repr(self): def test_repr(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore()
sem = asyncio.Semaphore(loop=self.loop)
self.assertTrue(repr(sem).endswith('[unlocked, value:1]>')) self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
self.assertTrue(RGX_REPR.match(repr(sem))) self.assertTrue(RGX_REPR.match(repr(sem)))
@ -850,8 +768,7 @@ class SemaphoreTests(test_utils.TestCase):
self.assertTrue(RGX_REPR.match(repr(sem))) self.assertTrue(RGX_REPR.match(repr(sem)))
def test_semaphore(self): def test_semaphore(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore()
sem = asyncio.Semaphore(loop=self.loop)
self.assertEqual(1, sem._value) self.assertEqual(1, sem._value)
with self.assertWarns(DeprecationWarning): with self.assertWarns(DeprecationWarning):
@ -872,8 +789,7 @@ class SemaphoreTests(test_utils.TestCase):
self.assertRaises(ValueError, asyncio.Semaphore, -1) self.assertRaises(ValueError, asyncio.Semaphore, -1)
def test_acquire(self): def test_acquire(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore(3)
sem = asyncio.Semaphore(3, loop=self.loop)
result = [] result = []
self.assertTrue(self.loop.run_until_complete(sem.acquire())) self.assertTrue(self.loop.run_until_complete(sem.acquire()))
@ -934,8 +850,7 @@ class SemaphoreTests(test_utils.TestCase):
self.loop.run_until_complete(asyncio.gather(*race_tasks)) self.loop.run_until_complete(asyncio.gather(*race_tasks))
def test_acquire_cancel(self): def test_acquire_cancel(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore()
sem = asyncio.Semaphore(loop=self.loop)
self.loop.run_until_complete(sem.acquire()) self.loop.run_until_complete(sem.acquire())
acquire = self.loop.create_task(sem.acquire()) acquire = self.loop.create_task(sem.acquire())
@ -947,8 +862,7 @@ class SemaphoreTests(test_utils.TestCase):
all(waiter.done() for waiter in sem._waiters)) all(waiter.done() for waiter in sem._waiters))
def test_acquire_cancel_before_awoken(self): def test_acquire_cancel_before_awoken(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore(value=0)
sem = asyncio.Semaphore(value=0, loop=self.loop)
t1 = self.loop.create_task(sem.acquire()) t1 = self.loop.create_task(sem.acquire())
t2 = self.loop.create_task(sem.acquire()) t2 = self.loop.create_task(sem.acquire())
@ -970,8 +884,7 @@ class SemaphoreTests(test_utils.TestCase):
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
def test_acquire_hang(self): def test_acquire_hang(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore(value=0)
sem = asyncio.Semaphore(value=0, loop=self.loop)
t1 = self.loop.create_task(sem.acquire()) t1 = self.loop.create_task(sem.acquire())
t2 = self.loop.create_task(sem.acquire()) t2 = self.loop.create_task(sem.acquire())
@ -985,14 +898,12 @@ class SemaphoreTests(test_utils.TestCase):
self.assertTrue(sem.locked()) self.assertTrue(sem.locked())
def test_release_not_acquired(self): def test_release_not_acquired(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.BoundedSemaphore()
sem = asyncio.BoundedSemaphore(loop=self.loop)
self.assertRaises(ValueError, sem.release) self.assertRaises(ValueError, sem.release)
def test_release_no_waiters(self): def test_release_no_waiters(self):
with self.assertWarns(DeprecationWarning): sem = asyncio.Semaphore()
sem = asyncio.Semaphore(loop=self.loop)
self.loop.run_until_complete(sem.acquire()) self.loop.run_until_complete(sem.acquire())
self.assertTrue(sem.locked()) self.assertTrue(sem.locked())

View File

@ -43,13 +43,12 @@ class BaseTest(test_utils.TestCase):
class LockTests(BaseTest): class LockTests(BaseTest):
def test_context_manager_async_with(self): def test_context_manager_async_with(self):
with self.assertWarns(DeprecationWarning): primitives = [
primitives = [ asyncio.Lock(),
asyncio.Lock(loop=self.loop), asyncio.Condition(),
asyncio.Condition(loop=self.loop), asyncio.Semaphore(),
asyncio.Semaphore(loop=self.loop), asyncio.BoundedSemaphore(),
asyncio.BoundedSemaphore(loop=self.loop), ]
]
async def test(lock): async def test(lock):
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
@ -66,13 +65,12 @@ class LockTests(BaseTest):
self.assertFalse(primitive.locked()) self.assertFalse(primitive.locked())
def test_context_manager_with_await(self): def test_context_manager_with_await(self):
with self.assertWarns(DeprecationWarning): primitives = [
primitives = [ asyncio.Lock(),
asyncio.Lock(loop=self.loop), asyncio.Condition(),
asyncio.Condition(loop=self.loop), asyncio.Semaphore(),
asyncio.Semaphore(loop=self.loop), asyncio.BoundedSemaphore(),
asyncio.BoundedSemaphore(loop=self.loop), ]
]
async def test(lock): async def test(lock):
await asyncio.sleep(0.01) await asyncio.sleep(0.01)

View File

@ -35,14 +35,13 @@ class QueueBasicTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=loop)
self.assertTrue(fn(q).startswith('<Queue'), fn(q)) self.assertTrue(fn(q).startswith('<Queue'), fn(q))
id_is_present = hex(id(q)) in fn(q) id_is_present = hex(id(q)) in fn(q)
self.assertEqual(expect_id, id_is_present) self.assertEqual(expect_id, id_is_present)
async def add_getter(): async def add_getter():
q = asyncio.Queue(loop=loop) q = asyncio.Queue()
# Start a task that waits to get. # Start a task that waits to get.
loop.create_task(q.get()) loop.create_task(q.get())
# Let it start waiting. # Let it start waiting.
@ -51,11 +50,10 @@ class QueueBasicTests(_QueueTestBase):
# resume q.get coroutine to finish generator # resume q.get coroutine to finish generator
q.put_nowait(0) q.put_nowait(0)
with self.assertWarns(DeprecationWarning): loop.run_until_complete(add_getter())
loop.run_until_complete(add_getter())
async def add_putter(): async def add_putter():
q = asyncio.Queue(maxsize=1, loop=loop) q = asyncio.Queue(maxsize=1)
q.put_nowait(1) q.put_nowait(1)
# Start a task that waits to put. # Start a task that waits to put.
loop.create_task(q.put(2)) loop.create_task(q.put(2))
@ -65,27 +63,11 @@ class QueueBasicTests(_QueueTestBase):
# resume q.put coroutine to finish generator # resume q.put coroutine to finish generator
q.get_nowait() q.get_nowait()
with self.assertWarns(DeprecationWarning): loop.run_until_complete(add_putter())
loop.run_until_complete(add_putter()) q = asyncio.Queue()
q = asyncio.Queue(loop=loop)
q.put_nowait(1) q.put_nowait(1)
self.assertTrue('_queue=[1]' in fn(q)) self.assertTrue('_queue=[1]' in fn(q))
def test_ctor_loop(self):
loop = mock.Mock()
with self.assertWarns(DeprecationWarning):
q = asyncio.Queue(loop=loop)
self.assertIs(q._loop, loop)
with self.assertWarns(DeprecationWarning):
q = asyncio.Queue(loop=self.loop)
self.assertIs(q._loop, self.loop)
def test_ctor_noloop(self):
asyncio.set_event_loop(self.loop)
q = asyncio.Queue()
self.assertIs(q._loop, self.loop)
def test_repr(self): def test_repr(self):
self._test_repr_or_str(repr, True) self._test_repr_or_str(repr, True)
@ -93,8 +75,7 @@ class QueueBasicTests(_QueueTestBase):
self._test_repr_or_str(str, False) self._test_repr_or_str(str, False)
def test_empty(self): def test_empty(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
self.assertTrue(q.empty()) self.assertTrue(q.empty())
q.put_nowait(1) q.put_nowait(1)
self.assertFalse(q.empty()) self.assertFalse(q.empty())
@ -102,18 +83,15 @@ class QueueBasicTests(_QueueTestBase):
self.assertTrue(q.empty()) self.assertTrue(q.empty())
def test_full(self): def test_full(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
self.assertFalse(q.full()) self.assertFalse(q.full())
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1)
q = asyncio.Queue(maxsize=1, loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
self.assertTrue(q.full()) self.assertTrue(q.full())
def test_order(self): def test_order(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
for i in [1, 3, 2]: for i in [1, 3, 2]:
q.put_nowait(i) q.put_nowait(i)
@ -131,8 +109,7 @@ class QueueBasicTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=2)
q = asyncio.Queue(maxsize=2, loop=loop)
self.assertEqual(2, q.maxsize) self.assertEqual(2, q.maxsize)
have_been_put = [] have_been_put = []
@ -166,8 +143,7 @@ class QueueBasicTests(_QueueTestBase):
class QueueGetTests(_QueueTestBase): class QueueGetTests(_QueueTestBase):
def test_blocking_get(self): def test_blocking_get(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
async def queue_get(): async def queue_get():
@ -177,8 +153,7 @@ class QueueGetTests(_QueueTestBase):
self.assertEqual(1, res) self.assertEqual(1, res)
def test_get_with_putters(self): def test_get_with_putters(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(1)
q = asyncio.Queue(1, loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
waiter = self.loop.create_future() waiter = self.loop.create_future()
@ -198,9 +173,8 @@ class QueueGetTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=loop) started = asyncio.Event()
started = asyncio.Event(loop=loop)
finished = False finished = False
async def queue_get(): async def queue_get():
@ -224,14 +198,12 @@ class QueueGetTests(_QueueTestBase):
self.assertAlmostEqual(0.01, loop.time()) self.assertAlmostEqual(0.01, loop.time())
def test_nonblocking_get(self): def test_nonblocking_get(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
self.assertEqual(1, q.get_nowait()) self.assertEqual(1, q.get_nowait())
def test_nonblocking_get_exception(self): def test_nonblocking_get_exception(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
self.assertRaises(asyncio.QueueEmpty, q.get_nowait) self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
def test_get_cancelled(self): def test_get_cancelled(self):
@ -245,8 +217,7 @@ class QueueGetTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=loop)
async def queue_get(): async def queue_get():
return await asyncio.wait_for(q.get(), 0.051) return await asyncio.wait_for(q.get(), 0.051)
@ -261,8 +232,7 @@ class QueueGetTests(_QueueTestBase):
self.assertAlmostEqual(0.06, loop.time()) self.assertAlmostEqual(0.06, loop.time())
def test_get_cancelled_race(self): def test_get_cancelled_race(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
t1 = self.loop.create_task(q.get()) t1 = self.loop.create_task(q.get())
t2 = self.loop.create_task(q.get()) t2 = self.loop.create_task(q.get())
@ -276,8 +246,7 @@ class QueueGetTests(_QueueTestBase):
self.assertEqual(t2.result(), 'a') self.assertEqual(t2.result(), 'a')
def test_get_with_waiting_putters(self): def test_get_with_waiting_putters(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1)
q = asyncio.Queue(loop=self.loop, maxsize=1)
self.loop.create_task(q.put('a')) self.loop.create_task(q.put('a'))
self.loop.create_task(q.put('b')) self.loop.create_task(q.put('b'))
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
@ -298,8 +267,12 @@ class QueueGetTests(_QueueTestBase):
queue_size = 1 queue_size = 1
producer_num_items = 5 producer_num_items = 5
with self.assertWarns(DeprecationWarning): async def create_queue():
q = asyncio.Queue(queue_size, loop=self.loop) queue = asyncio.Queue(queue_size)
queue._get_loop()
return queue
q = self.loop.run_until_complete(create_queue())
self.loop.run_until_complete( self.loop.run_until_complete(
asyncio.gather(producer(q, producer_num_items), asyncio.gather(producer(q, producer_num_items),
@ -320,8 +293,7 @@ class QueueGetTests(_QueueTestBase):
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
with self.assertWarns(DeprecationWarning): queue = asyncio.Queue(maxsize=5)
queue = asyncio.Queue(loop=self.loop, maxsize=5)
self.loop.run_until_complete(self.loop.create_task(consumer(queue))) self.loop.run_until_complete(self.loop.create_task(consumer(queue)))
self.assertEqual(len(queue._getters), 0) self.assertEqual(len(queue._getters), 0)
@ -329,8 +301,7 @@ class QueueGetTests(_QueueTestBase):
class QueuePutTests(_QueueTestBase): class QueuePutTests(_QueueTestBase):
def test_blocking_put(self): def test_blocking_put(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
async def queue_put(): async def queue_put():
# No maxsize, won't block. # No maxsize, won't block.
@ -347,9 +318,8 @@ class QueuePutTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1)
q = asyncio.Queue(maxsize=1, loop=loop) started = asyncio.Event()
started = asyncio.Event(loop=loop)
finished = False finished = False
async def queue_put(): async def queue_put():
@ -371,8 +341,7 @@ class QueuePutTests(_QueueTestBase):
self.assertAlmostEqual(0.01, loop.time()) self.assertAlmostEqual(0.01, loop.time())
def test_nonblocking_put(self): def test_nonblocking_put(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
self.assertEqual(1, q.get_nowait()) self.assertEqual(1, q.get_nowait())
@ -383,8 +352,7 @@ class QueuePutTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=loop)
reader = loop.create_task(q.get()) reader = loop.create_task(q.get())
@ -413,8 +381,7 @@ class QueuePutTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
loop.set_debug(True) loop.set_debug(True)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=loop)
reader1 = loop.create_task(q.get()) reader1 = loop.create_task(q.get())
reader2 = loop.create_task(q.get()) reader2 = loop.create_task(q.get())
@ -444,8 +411,7 @@ class QueuePutTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(1)
q = asyncio.Queue(1, loop=loop)
q.put_nowait(1) q.put_nowait(1)
@ -469,21 +435,18 @@ class QueuePutTests(_QueueTestBase):
self.assertEqual(q.qsize(), 0) self.assertEqual(q.qsize(), 0)
def test_nonblocking_put_exception(self): def test_nonblocking_put_exception(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1, )
q = asyncio.Queue(maxsize=1, loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
self.assertRaises(asyncio.QueueFull, q.put_nowait, 2) self.assertRaises(asyncio.QueueFull, q.put_nowait, 2)
def test_float_maxsize(self): def test_float_maxsize(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1.3, )
q = asyncio.Queue(maxsize=1.3, loop=self.loop)
q.put_nowait(1) q.put_nowait(1)
q.put_nowait(2) q.put_nowait(2)
self.assertTrue(q.full()) self.assertTrue(q.full())
self.assertRaises(asyncio.QueueFull, q.put_nowait, 3) self.assertRaises(asyncio.QueueFull, q.put_nowait, 3)
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1.3, )
q = asyncio.Queue(maxsize=1.3, loop=self.loop)
async def queue_put(): async def queue_put():
await q.put(1) await q.put(1)
@ -492,8 +455,7 @@ class QueuePutTests(_QueueTestBase):
self.loop.run_until_complete(queue_put()) self.loop.run_until_complete(queue_put())
def test_put_cancelled(self): def test_put_cancelled(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
async def queue_put(): async def queue_put():
await q.put(1) await q.put(1)
@ -508,8 +470,7 @@ class QueuePutTests(_QueueTestBase):
self.assertTrue(t.result()) self.assertTrue(t.result())
def test_put_cancelled_race(self): def test_put_cancelled_race(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue(maxsize=1)
q = asyncio.Queue(loop=self.loop, maxsize=1)
put_a = self.loop.create_task(q.put('a')) put_a = self.loop.create_task(q.put('a'))
put_b = self.loop.create_task(q.put('b')) put_b = self.loop.create_task(q.put('b'))
@ -529,8 +490,7 @@ class QueuePutTests(_QueueTestBase):
self.loop.run_until_complete(put_b) self.loop.run_until_complete(put_b)
def test_put_with_waiting_getters(self): def test_put_with_waiting_getters(self):
with self.assertWarns(DeprecationWarning): q = asyncio.Queue()
q = asyncio.Queue(loop=self.loop)
t = self.loop.create_task(q.get()) t = self.loop.create_task(q.get())
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
self.loop.run_until_complete(q.put('a')) self.loop.run_until_complete(q.put('a'))
@ -539,8 +499,12 @@ class QueuePutTests(_QueueTestBase):
def test_why_are_putters_waiting(self): def test_why_are_putters_waiting(self):
# From issue #265. # From issue #265.
with self.assertWarns(DeprecationWarning): async def create_queue():
queue = asyncio.Queue(2, loop=self.loop) q = asyncio.Queue(2)
q._get_loop()
return q
queue = self.loop.run_until_complete(create_queue())
async def putter(item): async def putter(item):
await queue.put(item) await queue.put(item)
@ -566,8 +530,7 @@ class QueuePutTests(_QueueTestBase):
loop = self.new_test_loop(a_generator) loop = self.new_test_loop(a_generator)
# Full queue. # Full queue.
with self.assertWarns(DeprecationWarning): queue = asyncio.Queue(maxsize=1)
queue = asyncio.Queue(loop=loop, maxsize=1)
queue.put_nowait(1) queue.put_nowait(1)
# Task waiting for space to put an item in the queue. # Task waiting for space to put an item in the queue.
@ -590,8 +553,7 @@ class QueuePutTests(_QueueTestBase):
loop = self.new_test_loop(gen) loop = self.new_test_loop(gen)
# Full Queue. # Full Queue.
with self.assertWarns(DeprecationWarning): queue = asyncio.Queue(1)
queue = asyncio.Queue(1, loop=loop)
queue.put_nowait(1) queue.put_nowait(1)
# Task waiting for space to put a item in the queue. # Task waiting for space to put a item in the queue.
@ -614,8 +576,7 @@ class QueuePutTests(_QueueTestBase):
class LifoQueueTests(_QueueTestBase): class LifoQueueTests(_QueueTestBase):
def test_order(self): def test_order(self):
with self.assertWarns(DeprecationWarning): q = asyncio.LifoQueue()
q = asyncio.LifoQueue(loop=self.loop)
for i in [1, 3, 2]: for i in [1, 3, 2]:
q.put_nowait(i) q.put_nowait(i)
@ -626,8 +587,7 @@ class LifoQueueTests(_QueueTestBase):
class PriorityQueueTests(_QueueTestBase): class PriorityQueueTests(_QueueTestBase):
def test_order(self): def test_order(self):
with self.assertWarns(DeprecationWarning): q = asyncio.PriorityQueue()
q = asyncio.PriorityQueue(loop=self.loop)
for i in [1, 3, 2]: for i in [1, 3, 2]:
q.put_nowait(i) q.put_nowait(i)
@ -640,13 +600,11 @@ class _QueueJoinTestMixin:
q_class = None q_class = None
def test_task_done_underflow(self): def test_task_done_underflow(self):
with self.assertWarns(DeprecationWarning): q = self.q_class()
q = self.q_class(loop=self.loop)
self.assertRaises(ValueError, q.task_done) self.assertRaises(ValueError, q.task_done)
def test_task_done(self): def test_task_done(self):
with self.assertWarns(DeprecationWarning): q = self.q_class()
q = self.q_class(loop=self.loop)
for i in range(100): for i in range(100):
q.put_nowait(i) q.put_nowait(i)
@ -681,8 +639,7 @@ class _QueueJoinTestMixin:
self.loop.run_until_complete(asyncio.wait(tasks)) self.loop.run_until_complete(asyncio.wait(tasks))
def test_join_empty_queue(self): def test_join_empty_queue(self):
with self.assertWarns(DeprecationWarning): q = self.q_class()
q = self.q_class(loop=self.loop)
# Test that a queue join()s successfully, and before anything else # Test that a queue join()s successfully, and before anything else
# (done twice for insurance). # (done twice for insurance).
@ -694,8 +651,7 @@ class _QueueJoinTestMixin:
self.loop.run_until_complete(join()) self.loop.run_until_complete(join())
def test_format(self): def test_format(self):
with self.assertWarns(DeprecationWarning): q = self.q_class()
q = self.q_class(loop=self.loop)
self.assertEqual(q._format(), 'maxsize=0') self.assertEqual(q._format(), 'maxsize=0')
q._unfinished_tasks = 2 q._unfinished_tasks = 2

View File

@ -546,7 +546,21 @@ class TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self._get_running_loop = events._get_running_loop self._get_running_loop = events._get_running_loop
events._get_running_loop = lambda: None
def _get_running_loop():
frame = sys._getframe(1)
if frame.f_globals['__name__'] == 'asyncio.mixins':
# When we called from LoopBoundedMixin we should
# fallback to default implementation of get_running_loop
try:
return events.get_running_loop()
except RuntimeError:
return None
return None
events._get_running_loop = _get_running_loop
self._thread_cleanup = threading_helper.threading_setup() self._thread_cleanup = threading_helper.threading_setup()
def tearDown(self): def tearDown(self):

View File

@ -0,0 +1,2 @@
Remove loop parameter from ``__init__`` in all ``asyncio.locks`` and
``asyncio.Queue`` classes. Patch provided by Yurii Karabas.