(Merge 3.4) asyncio: sync with Tulip
* _WaitHandleFuture.cancel() now notify IocpProactor through the overlapped object that the wait was cancelled. * Optimize IocpProactor.wait_for_handle() gets the result if the wait is signaled immediatly. * Enhance representation of Future and Future subclasses - Add "created at filename:lineno" in the representation - Add Future._repr_info() method which can be more easily overriden than Future.__repr__(). It should now be more easy to enhance Future representation without having to modify each subclass. For example, _OverlappedFuture and _WaitHandleFuture get the new "created at" information. - Use reprlib to format Future result, and function arguments when formatting a callback, to limit the length of the representation. * Fix repr(_WaitHandleFuture) * _WaitHandleFuture and _OverlappedFuture: hide frames of internal calls in the source traceback. * Cleanup ProactorIocp._poll(): set the timeout to 0 after the first call to GetQueuedCompletionStatus() * test_locks: close the temporary event loop and check the condition lock * Remove workaround in test_futures, no more needed
This commit is contained in:
commit
00b39ffbd3
|
@ -10,11 +10,12 @@ __all__ = ['AbstractEventLoopPolicy',
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
import subprocess
|
import reprlib
|
||||||
import traceback
|
|
||||||
import threading
|
|
||||||
import socket
|
import socket
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
import threading
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
|
||||||
_PY34 = sys.version_info >= (3, 4)
|
_PY34 = sys.version_info >= (3, 4)
|
||||||
|
@ -36,8 +37,12 @@ def _get_function_source(func):
|
||||||
|
|
||||||
|
|
||||||
def _format_args(args):
|
def _format_args(args):
|
||||||
# function formatting ('hello',) as ('hello')
|
"""Format function arguments.
|
||||||
args_repr = repr(args)
|
|
||||||
|
Special case for a single parameter: ('hello',) is formatted as ('hello').
|
||||||
|
"""
|
||||||
|
# use reprlib to limit the length of the output
|
||||||
|
args_repr = reprlib.repr(args)
|
||||||
if len(args) == 1 and args_repr.endswith(',)'):
|
if len(args) == 1 and args_repr.endswith(',)'):
|
||||||
args_repr = args_repr[:-2] + ')'
|
args_repr = args_repr[:-2] + ')'
|
||||||
return args_repr
|
return args_repr
|
||||||
|
|
|
@ -7,6 +7,7 @@ __all__ = ['CancelledError', 'TimeoutError',
|
||||||
|
|
||||||
import concurrent.futures._base
|
import concurrent.futures._base
|
||||||
import logging
|
import logging
|
||||||
|
import reprlib
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
@ -175,20 +176,25 @@ class Future:
|
||||||
format_cb(cb[-1]))
|
format_cb(cb[-1]))
|
||||||
return 'cb=[%s]' % cb
|
return 'cb=[%s]' % cb
|
||||||
|
|
||||||
def _format_result(self):
|
def _repr_info(self):
|
||||||
if self._state != _FINISHED:
|
|
||||||
return None
|
|
||||||
elif self._exception is not None:
|
|
||||||
return 'exception={!r}'.format(self._exception)
|
|
||||||
else:
|
|
||||||
return 'result={!r}'.format(self._result)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
info = [self._state.lower()]
|
info = [self._state.lower()]
|
||||||
if self._state == _FINISHED:
|
if self._state == _FINISHED:
|
||||||
info.append(self._format_result())
|
if self._exception is not None:
|
||||||
|
info.append('exception={!r}'.format(self._exception))
|
||||||
|
else:
|
||||||
|
# use reprlib to limit the length of the output, especially
|
||||||
|
# for very long strings
|
||||||
|
result = reprlib.repr(self._result)
|
||||||
|
info.append('result={}'.format(result))
|
||||||
if self._callbacks:
|
if self._callbacks:
|
||||||
info.append(self._format_callbacks())
|
info.append(self._format_callbacks())
|
||||||
|
if self._source_traceback:
|
||||||
|
frame = self._source_traceback[-1]
|
||||||
|
info.append('created at %s:%s' % (frame[0], frame[1]))
|
||||||
|
return info
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
info = self._repr_info()
|
||||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||||
|
|
||||||
# On Python 3.3 or older, objects with a destructor part of a reference
|
# On Python 3.3 or older, objects with a destructor part of a reference
|
||||||
|
|
|
@ -92,30 +92,19 @@ class Task(futures.Future):
|
||||||
self._loop.call_exception_handler(context)
|
self._loop.call_exception_handler(context)
|
||||||
futures.Future.__del__(self)
|
futures.Future.__del__(self)
|
||||||
|
|
||||||
def __repr__(self):
|
def _repr_info(self):
|
||||||
info = []
|
info = super()._repr_info()
|
||||||
|
|
||||||
if self._must_cancel:
|
if self._must_cancel:
|
||||||
info.append('cancelling')
|
# replace status
|
||||||
else:
|
info[0] = 'cancelling'
|
||||||
info.append(self._state.lower())
|
|
||||||
|
|
||||||
coro = coroutines._format_coroutine(self._coro)
|
coro = coroutines._format_coroutine(self._coro)
|
||||||
info.append('coro=<%s>' % coro)
|
info.insert(1, 'coro=<%s>' % coro)
|
||||||
|
|
||||||
if self._source_traceback:
|
|
||||||
frame = self._source_traceback[-1]
|
|
||||||
info.append('created at %s:%s' % (frame[0], frame[1]))
|
|
||||||
|
|
||||||
if self._state == futures._FINISHED:
|
|
||||||
info.append(self._format_result())
|
|
||||||
|
|
||||||
if self._callbacks:
|
|
||||||
info.append(self._format_callbacks())
|
|
||||||
|
|
||||||
if self._fut_waiter is not None:
|
if self._fut_waiter is not None:
|
||||||
info.append('wait_for=%r' % self._fut_waiter)
|
info.insert(2, 'wait_for=%r' % self._fut_waiter)
|
||||||
|
return info
|
||||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
|
||||||
|
|
||||||
def get_stack(self, *, limit=None):
|
def get_stack(self, *, limit=None):
|
||||||
"""Return the list of stack frames for this task's coroutine.
|
"""Return the list of stack frames for this task's coroutine.
|
||||||
|
|
|
@ -42,16 +42,12 @@ class _OverlappedFuture(futures.Future):
|
||||||
del self._source_traceback[-1]
|
del self._source_traceback[-1]
|
||||||
self._ov = ov
|
self._ov = ov
|
||||||
|
|
||||||
def __repr__(self):
|
def _repr_info(self):
|
||||||
info = [self._state.lower()]
|
info = super()._repr_info()
|
||||||
if self._ov is not None:
|
if self._ov is not None:
|
||||||
state = 'pending' if self._ov.pending else 'completed'
|
state = 'pending' if self._ov.pending else 'completed'
|
||||||
info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
|
info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
|
||||||
if self._state == futures._FINISHED:
|
return info
|
||||||
info.append(self._format_result())
|
|
||||||
if self._callbacks:
|
|
||||||
info.append(self._format_callbacks())
|
|
||||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
|
||||||
|
|
||||||
def _cancel_overlapped(self):
|
def _cancel_overlapped(self):
|
||||||
if self._ov is None:
|
if self._ov is None:
|
||||||
|
@ -85,8 +81,14 @@ class _OverlappedFuture(futures.Future):
|
||||||
class _WaitHandleFuture(futures.Future):
|
class _WaitHandleFuture(futures.Future):
|
||||||
"""Subclass of Future which represents a wait handle."""
|
"""Subclass of Future which represents a wait handle."""
|
||||||
|
|
||||||
def __init__(self, handle, wait_handle, *, loop=None):
|
def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
|
||||||
super().__init__(loop=loop)
|
super().__init__(loop=loop)
|
||||||
|
if self._source_traceback:
|
||||||
|
del self._source_traceback[-1]
|
||||||
|
# iocp and ov are only used by cancel() to notify IocpProactor
|
||||||
|
# that the wait was cancelled
|
||||||
|
self._iocp = iocp
|
||||||
|
self._ov = ov
|
||||||
self._handle = handle
|
self._handle = handle
|
||||||
self._wait_handle = wait_handle
|
self._wait_handle = wait_handle
|
||||||
|
|
||||||
|
@ -95,19 +97,16 @@ class _WaitHandleFuture(futures.Future):
|
||||||
return (_winapi.WaitForSingleObject(self._handle, 0) ==
|
return (_winapi.WaitForSingleObject(self._handle, 0) ==
|
||||||
_winapi.WAIT_OBJECT_0)
|
_winapi.WAIT_OBJECT_0)
|
||||||
|
|
||||||
def __repr__(self):
|
def _repr_info(self):
|
||||||
info = [self._state.lower()]
|
info = super()._repr_info()
|
||||||
|
info.insert(1, 'handle=%#x' % self._handle)
|
||||||
if self._wait_handle:
|
if self._wait_handle:
|
||||||
state = 'pending' if self._poll() else 'completed'
|
state = 'signaled' if self._poll() else 'waiting'
|
||||||
info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
|
info.insert(1, 'wait_handle=<%s, %#x>'
|
||||||
info.append('handle=<%#x>' % self._handle)
|
% (state, self._wait_handle))
|
||||||
if self._state == futures._FINISHED:
|
return info
|
||||||
info.append(self._format_result())
|
|
||||||
if self._callbacks:
|
|
||||||
info.append(self._format_callbacks())
|
|
||||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
|
||||||
|
|
||||||
def _unregister(self):
|
def _unregister_wait(self):
|
||||||
if self._wait_handle is None:
|
if self._wait_handle is None:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
|
@ -117,10 +116,25 @@ class _WaitHandleFuture(futures.Future):
|
||||||
raise
|
raise
|
||||||
# ERROR_IO_PENDING is not an error, the wait was unregistered
|
# ERROR_IO_PENDING is not an error, the wait was unregistered
|
||||||
self._wait_handle = None
|
self._wait_handle = None
|
||||||
|
self._iocp = None
|
||||||
|
self._ov = None
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
self._unregister()
|
result = super().cancel()
|
||||||
return super().cancel()
|
if self._ov is not None:
|
||||||
|
# signal the cancellation to the overlapped object
|
||||||
|
_overlapped.PostQueuedCompletionStatus(self._iocp, True,
|
||||||
|
0, self._ov.address)
|
||||||
|
self._unregister_wait()
|
||||||
|
return result
|
||||||
|
|
||||||
|
def set_exception(self, exception):
|
||||||
|
super().set_exception(exception)
|
||||||
|
self._unregister_wait()
|
||||||
|
|
||||||
|
def set_result(self, result):
|
||||||
|
super().set_result(result)
|
||||||
|
self._unregister_wait()
|
||||||
|
|
||||||
|
|
||||||
class PipeServer(object):
|
class PipeServer(object):
|
||||||
|
@ -405,7 +419,9 @@ class IocpProactor:
|
||||||
ov = _overlapped.Overlapped(NULL)
|
ov = _overlapped.Overlapped(NULL)
|
||||||
wh = _overlapped.RegisterWaitWithQueue(
|
wh = _overlapped.RegisterWaitWithQueue(
|
||||||
handle, self._iocp, ov.address, ms)
|
handle, self._iocp, ov.address, ms)
|
||||||
f = _WaitHandleFuture(handle, wh, loop=self._loop)
|
f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
|
||||||
|
if f._source_traceback:
|
||||||
|
del f._source_traceback[-1]
|
||||||
|
|
||||||
def finish_wait_for_handle(trans, key, ov):
|
def finish_wait_for_handle(trans, key, ov):
|
||||||
# Note that this second wait means that we should only use
|
# Note that this second wait means that we should only use
|
||||||
|
@ -414,12 +430,17 @@ class IocpProactor:
|
||||||
# or semaphores are not. Also note if the handle is
|
# or semaphores are not. Also note if the handle is
|
||||||
# signalled and then quickly reset, then we may return
|
# signalled and then quickly reset, then we may return
|
||||||
# False even though we have not timed out.
|
# False even though we have not timed out.
|
||||||
try:
|
|
||||||
return f._poll()
|
return f._poll()
|
||||||
finally:
|
|
||||||
f._unregister()
|
|
||||||
|
|
||||||
self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
|
if f._poll():
|
||||||
|
try:
|
||||||
|
result = f._poll()
|
||||||
|
except OSError as exc:
|
||||||
|
f.set_exception(exc)
|
||||||
|
else:
|
||||||
|
f.set_result(result)
|
||||||
|
|
||||||
|
self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
|
||||||
return f
|
return f
|
||||||
|
|
||||||
def _register_with_iocp(self, obj):
|
def _register_with_iocp(self, obj):
|
||||||
|
@ -438,6 +459,8 @@ class IocpProactor:
|
||||||
# operation when it completes. The future's value is actually
|
# operation when it completes. The future's value is actually
|
||||||
# the value returned by callback().
|
# the value returned by callback().
|
||||||
f = _OverlappedFuture(ov, loop=self._loop)
|
f = _OverlappedFuture(ov, loop=self._loop)
|
||||||
|
if f._source_traceback:
|
||||||
|
del f._source_traceback[-1]
|
||||||
if not ov.pending and not wait_for_post:
|
if not ov.pending and not wait_for_post:
|
||||||
# The operation has completed, so no need to postpone the
|
# The operation has completed, so no need to postpone the
|
||||||
# work. We cannot take this short cut if we need the
|
# work. We cannot take this short cut if we need the
|
||||||
|
@ -484,10 +507,13 @@ class IocpProactor:
|
||||||
ms = math.ceil(timeout * 1e3)
|
ms = math.ceil(timeout * 1e3)
|
||||||
if ms >= INFINITE:
|
if ms >= INFINITE:
|
||||||
raise ValueError("timeout too big")
|
raise ValueError("timeout too big")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
|
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
|
||||||
if status is None:
|
if status is None:
|
||||||
return
|
return
|
||||||
|
ms = 0
|
||||||
|
|
||||||
err, transferred, key, address = status
|
err, transferred, key, address = status
|
||||||
try:
|
try:
|
||||||
f, ov, obj, callback = self._cache.pop(address)
|
f, ov, obj, callback = self._cache.pop(address)
|
||||||
|
@ -504,7 +530,6 @@ class IocpProactor:
|
||||||
# handle which should be closed to avoid a leak.
|
# handle which should be closed to avoid a leak.
|
||||||
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
|
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
|
||||||
_winapi.CloseHandle(key)
|
_winapi.CloseHandle(key)
|
||||||
ms = 0
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if obj in self._stopped_serving:
|
if obj in self._stopped_serving:
|
||||||
|
@ -520,7 +545,6 @@ class IocpProactor:
|
||||||
else:
|
else:
|
||||||
f.set_result(value)
|
f.set_result(value)
|
||||||
self._results.append(f)
|
self._results.append(f)
|
||||||
ms = 0
|
|
||||||
|
|
||||||
def _stop_serving(self, obj):
|
def _stop_serving(self, obj):
|
||||||
# obj is a socket or pipe handle. It will be closed in
|
# obj is a socket or pipe handle. It will be closed in
|
||||||
|
|
|
@ -105,6 +105,15 @@ class FutureTests(test_utils.TestCase):
|
||||||
self.assertEqual(next(g), ('C', 42)) # yield 'C', y.
|
self.assertEqual(next(g), ('C', 42)) # yield 'C', y.
|
||||||
|
|
||||||
def test_future_repr(self):
|
def test_future_repr(self):
|
||||||
|
self.loop.set_debug(True)
|
||||||
|
f_pending_debug = asyncio.Future(loop=self.loop)
|
||||||
|
frame = f_pending_debug._source_traceback[-1]
|
||||||
|
self.assertEqual(repr(f_pending_debug),
|
||||||
|
'<Future pending created at %s:%s>'
|
||||||
|
% (frame[0], frame[1]))
|
||||||
|
f_pending_debug.cancel()
|
||||||
|
|
||||||
|
self.loop.set_debug(False)
|
||||||
f_pending = asyncio.Future(loop=self.loop)
|
f_pending = asyncio.Future(loop=self.loop)
|
||||||
self.assertEqual(repr(f_pending), '<Future pending>')
|
self.assertEqual(repr(f_pending), '<Future pending>')
|
||||||
f_pending.cancel()
|
f_pending.cancel()
|
||||||
|
@ -299,12 +308,6 @@ class FutureTests(test_utils.TestCase):
|
||||||
|
|
||||||
@mock.patch('asyncio.base_events.logger')
|
@mock.patch('asyncio.base_events.logger')
|
||||||
def test_future_exception_never_retrieved(self, m_log):
|
def test_future_exception_never_retrieved(self, m_log):
|
||||||
# FIXME: Python issue #21163, other tests may "leak" pending task which
|
|
||||||
# emit a warning when they are destroyed by the GC
|
|
||||||
support.gc_collect()
|
|
||||||
m_log.error.reset_mock()
|
|
||||||
# ---
|
|
||||||
|
|
||||||
self.loop.set_debug(True)
|
self.loop.set_debug(True)
|
||||||
|
|
||||||
def memory_error():
|
def memory_error():
|
||||||
|
@ -324,7 +327,7 @@ class FutureTests(test_utils.TestCase):
|
||||||
if sys.version_info >= (3, 4):
|
if sys.version_info >= (3, 4):
|
||||||
frame = source_traceback[-1]
|
frame = source_traceback[-1]
|
||||||
regex = (r'^Future exception was never retrieved\n'
|
regex = (r'^Future exception was never retrieved\n'
|
||||||
r'future: <Future finished exception=MemoryError\(\)>\n'
|
r'future: <Future finished exception=MemoryError\(\) created at {filename}:{lineno}>\n'
|
||||||
r'source_traceback: Object created at \(most recent call last\):\n'
|
r'source_traceback: Object created at \(most recent call last\):\n'
|
||||||
r' File'
|
r' File'
|
||||||
r'.*\n'
|
r'.*\n'
|
||||||
|
|
|
@ -660,10 +660,13 @@ class ConditionTests(test_utils.TestCase):
|
||||||
lock = asyncio.Lock(loop=self.loop)
|
lock = asyncio.Lock(loop=self.loop)
|
||||||
cond = asyncio.Condition(lock, loop=self.loop)
|
cond = asyncio.Condition(lock, loop=self.loop)
|
||||||
|
|
||||||
self.assertIs(lock._loop, cond._loop)
|
self.assertIs(cond._lock, lock)
|
||||||
|
self.assertIs(cond._loop, lock._loop)
|
||||||
|
|
||||||
def test_ambiguous_loops(self):
|
def test_ambiguous_loops(self):
|
||||||
loop = self.new_test_loop()
|
loop = self.new_test_loop()
|
||||||
|
self.addCleanup(loop.close)
|
||||||
|
|
||||||
lock = asyncio.Lock(loop=self.loop)
|
lock = asyncio.Lock(loop=self.loop)
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
asyncio.Condition(lock, loop=loop)
|
asyncio.Condition(lock, loop=loop)
|
||||||
|
|
|
@ -132,6 +132,8 @@ class TaskTests(test_utils.TestCase):
|
||||||
asyncio.async('ok')
|
asyncio.async('ok')
|
||||||
|
|
||||||
def test_task_repr(self):
|
def test_task_repr(self):
|
||||||
|
self.loop.set_debug(False)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def notmuch():
|
def notmuch():
|
||||||
yield from []
|
yield from []
|
||||||
|
@ -189,6 +191,8 @@ class TaskTests(test_utils.TestCase):
|
||||||
"<Task finished %s result='abc'>" % coro)
|
"<Task finished %s result='abc'>" % coro)
|
||||||
|
|
||||||
def test_task_repr_coro_decorator(self):
|
def test_task_repr_coro_decorator(self):
|
||||||
|
self.loop.set_debug(False)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def notmuch():
|
def notmuch():
|
||||||
# notmuch() function doesn't use yield from: it will be wrapped by
|
# notmuch() function doesn't use yield from: it will be wrapped by
|
||||||
|
@ -252,6 +256,8 @@ class TaskTests(test_utils.TestCase):
|
||||||
self.loop.run_until_complete(t)
|
self.loop.run_until_complete(t)
|
||||||
|
|
||||||
def test_task_repr_wait_for(self):
|
def test_task_repr_wait_for(self):
|
||||||
|
self.loop.set_debug(False)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def wait_for(fut):
|
def wait_for(fut):
|
||||||
return (yield from fut)
|
return (yield from fut)
|
||||||
|
|
Loading…
Reference in New Issue