From 313a9809043ed2ed1ad25282af7169e08cdc92a3 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 29 Jul 2014 12:58:23 +0200 Subject: [PATCH] asyncio: sync with Tulip * _WaitHandleFuture.cancel() now notify IocpProactor through the overlapped object that the wait was cancelled. * Optimize IocpProactor.wait_for_handle() gets the result if the wait is signaled immediatly. * Enhance representation of Future and Future subclasses - Add "created at filename:lineno" in the representation - Add Future._repr_info() method which can be more easily overriden than Future.__repr__(). It should now be more easy to enhance Future representation without having to modify each subclass. For example, _OverlappedFuture and _WaitHandleFuture get the new "created at" information. - Use reprlib to format Future result, and function arguments when formatting a callback, to limit the length of the representation. * Fix repr(_WaitHandleFuture) * _WaitHandleFuture and _OverlappedFuture: hide frames of internal calls in the source traceback. * Cleanup ProactorIocp._poll(): set the timeout to 0 after the first call to GetQueuedCompletionStatus() * test_locks: close the temporary event loop and check the condition lock * Remove workaround in test_futures, no more needed --- Lib/asyncio/events.py | 15 +++-- Lib/asyncio/futures.py | 26 +++++---- Lib/asyncio/tasks.py | 27 +++------ Lib/asyncio/windows_events.py | 84 +++++++++++++++++---------- Lib/test/test_asyncio/test_futures.py | 17 +++--- Lib/test/test_asyncio/test_locks.py | 5 +- Lib/test/test_asyncio/test_tasks.py | 6 ++ 7 files changed, 108 insertions(+), 72 deletions(-) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index bddd7e3649c..3c7a36d0763 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -10,11 +10,12 @@ __all__ = ['AbstractEventLoopPolicy', import functools import inspect -import subprocess -import traceback -import threading +import reprlib import socket +import subprocess import sys +import threading +import traceback _PY34 = sys.version_info >= (3, 4) @@ -36,8 +37,12 @@ def _get_function_source(func): def _format_args(args): - # function formatting ('hello',) as ('hello') - args_repr = repr(args) + """Format function arguments. + + Special case for a single parameter: ('hello',) is formatted as ('hello'). + """ + # use reprlib to limit the length of the output + args_repr = reprlib.repr(args) if len(args) == 1 and args_repr.endswith(',)'): args_repr = args_repr[:-2] + ')' return args_repr diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 022fef76efe..7998fbbcfbf 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -7,6 +7,7 @@ __all__ = ['CancelledError', 'TimeoutError', import concurrent.futures._base import logging +import reprlib import sys import traceback @@ -175,20 +176,25 @@ class Future: format_cb(cb[-1])) return 'cb=[%s]' % cb - def _format_result(self): - if self._state != _FINISHED: - return None - elif self._exception is not None: - return 'exception={!r}'.format(self._exception) - else: - return 'result={!r}'.format(self._result) - - def __repr__(self): + def _repr_info(self): info = [self._state.lower()] if self._state == _FINISHED: - info.append(self._format_result()) + if self._exception is not None: + info.append('exception={!r}'.format(self._exception)) + else: + # use reprlib to limit the length of the output, especially + # for very long strings + result = reprlib.repr(self._result) + info.append('result={}'.format(result)) if self._callbacks: info.append(self._format_callbacks()) + if self._source_traceback: + frame = self._source_traceback[-1] + info.append('created at %s:%s' % (frame[0], frame[1])) + return info + + def __repr__(self): + info = self._repr_info() return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) # On Python 3.3 or older, objects with a destructor part of a reference diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 07952c9a64f..92070162a79 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -92,30 +92,19 @@ class Task(futures.Future): self._loop.call_exception_handler(context) futures.Future.__del__(self) - def __repr__(self): - info = [] + def _repr_info(self): + info = super()._repr_info() + if self._must_cancel: - info.append('cancelling') - else: - info.append(self._state.lower()) + # replace status + info[0] = 'cancelling' coro = coroutines._format_coroutine(self._coro) - info.append('coro=<%s>' % coro) - - if self._source_traceback: - frame = self._source_traceback[-1] - info.append('created at %s:%s' % (frame[0], frame[1])) - - if self._state == futures._FINISHED: - info.append(self._format_result()) - - if self._callbacks: - info.append(self._format_callbacks()) + info.insert(1, 'coro=<%s>' % coro) if self._fut_waiter is not None: - info.append('wait_for=%r' % self._fut_waiter) - - return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) + info.insert(2, 'wait_for=%r' % self._fut_waiter) + return info def get_stack(self, *, limit=None): """Return the list of stack frames for this task's coroutine. diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 41be8da2a04..ec427d5c705 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -42,16 +42,12 @@ class _OverlappedFuture(futures.Future): del self._source_traceback[-1] self._ov = ov - def __repr__(self): - info = [self._state.lower()] + def _repr_info(self): + info = super()._repr_info() if self._ov is not None: state = 'pending' if self._ov.pending else 'completed' - info.append('overlapped=<%s, %#x>' % (state, self._ov.address)) - if self._state == futures._FINISHED: - info.append(self._format_result()) - if self._callbacks: - info.append(self._format_callbacks()) - return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) + info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address)) + return info def _cancel_overlapped(self): if self._ov is None: @@ -85,8 +81,14 @@ class _OverlappedFuture(futures.Future): class _WaitHandleFuture(futures.Future): """Subclass of Future which represents a wait handle.""" - def __init__(self, handle, wait_handle, *, loop=None): + def __init__(self, iocp, ov, handle, wait_handle, *, loop=None): super().__init__(loop=loop) + if self._source_traceback: + del self._source_traceback[-1] + # iocp and ov are only used by cancel() to notify IocpProactor + # that the wait was cancelled + self._iocp = iocp + self._ov = ov self._handle = handle self._wait_handle = wait_handle @@ -95,19 +97,16 @@ class _WaitHandleFuture(futures.Future): return (_winapi.WaitForSingleObject(self._handle, 0) == _winapi.WAIT_OBJECT_0) - def __repr__(self): - info = [self._state.lower()] + def _repr_info(self): + info = super()._repr_info() + info.insert(1, 'handle=%#x' % self._handle) if self._wait_handle: - state = 'pending' if self._poll() else 'completed' - info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle)) - info.append('handle=<%#x>' % self._handle) - if self._state == futures._FINISHED: - info.append(self._format_result()) - if self._callbacks: - info.append(self._format_callbacks()) - return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) + state = 'signaled' if self._poll() else 'waiting' + info.insert(1, 'wait_handle=<%s, %#x>' + % (state, self._wait_handle)) + return info - def _unregister(self): + def _unregister_wait(self): if self._wait_handle is None: return try: @@ -117,10 +116,25 @@ class _WaitHandleFuture(futures.Future): raise # ERROR_IO_PENDING is not an error, the wait was unregistered self._wait_handle = None + self._iocp = None + self._ov = None def cancel(self): - self._unregister() - return super().cancel() + result = super().cancel() + if self._ov is not None: + # signal the cancellation to the overlapped object + _overlapped.PostQueuedCompletionStatus(self._iocp, True, + 0, self._ov.address) + self._unregister_wait() + return result + + def set_exception(self, exception): + super().set_exception(exception) + self._unregister_wait() + + def set_result(self, result): + super().set_result(result) + self._unregister_wait() class PipeServer(object): @@ -405,7 +419,9 @@ class IocpProactor: ov = _overlapped.Overlapped(NULL) wh = _overlapped.RegisterWaitWithQueue( handle, self._iocp, ov.address, ms) - f = _WaitHandleFuture(handle, wh, loop=self._loop) + f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop) + if f._source_traceback: + del f._source_traceback[-1] def finish_wait_for_handle(trans, key, ov): # Note that this second wait means that we should only use @@ -414,12 +430,17 @@ class IocpProactor: # or semaphores are not. Also note if the handle is # signalled and then quickly reset, then we may return # False even though we have not timed out. - try: - return f._poll() - finally: - f._unregister() + return f._poll() - self._cache[ov.address] = (f, ov, None, finish_wait_for_handle) + if f._poll(): + try: + result = f._poll() + except OSError as exc: + f.set_exception(exc) + else: + f.set_result(result) + + self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) return f def _register_with_iocp(self, obj): @@ -438,6 +459,8 @@ class IocpProactor: # operation when it completes. The future's value is actually # the value returned by callback(). f = _OverlappedFuture(ov, loop=self._loop) + if f._source_traceback: + del f._source_traceback[-1] if not ov.pending and not wait_for_post: # The operation has completed, so no need to postpone the # work. We cannot take this short cut if we need the @@ -484,10 +507,13 @@ class IocpProactor: ms = math.ceil(timeout * 1e3) if ms >= INFINITE: raise ValueError("timeout too big") + while True: status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) if status is None: return + ms = 0 + err, transferred, key, address = status try: f, ov, obj, callback = self._cache.pop(address) @@ -504,7 +530,6 @@ class IocpProactor: # handle which should be closed to avoid a leak. if key not in (0, _overlapped.INVALID_HANDLE_VALUE): _winapi.CloseHandle(key) - ms = 0 continue if obj in self._stopped_serving: @@ -520,7 +545,6 @@ class IocpProactor: else: f.set_result(value) self._results.append(f) - ms = 0 def _stop_serving(self, obj): # obj is a socket or pipe handle. It will be closed in diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index 50e9414ab8f..e5002bc825d 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -105,6 +105,15 @@ class FutureTests(test_utils.TestCase): self.assertEqual(next(g), ('C', 42)) # yield 'C', y. def test_future_repr(self): + self.loop.set_debug(True) + f_pending_debug = asyncio.Future(loop=self.loop) + frame = f_pending_debug._source_traceback[-1] + self.assertEqual(repr(f_pending_debug), + '' + % (frame[0], frame[1])) + f_pending_debug.cancel() + + self.loop.set_debug(False) f_pending = asyncio.Future(loop=self.loop) self.assertEqual(repr(f_pending), '') f_pending.cancel() @@ -299,12 +308,6 @@ class FutureTests(test_utils.TestCase): @mock.patch('asyncio.base_events.logger') def test_future_exception_never_retrieved(self, m_log): - # FIXME: Python issue #21163, other tests may "leak" pending task which - # emit a warning when they are destroyed by the GC - support.gc_collect() - m_log.error.reset_mock() - # --- - self.loop.set_debug(True) def memory_error(): @@ -324,7 +327,7 @@ class FutureTests(test_utils.TestCase): if sys.version_info >= (3, 4): frame = source_traceback[-1] regex = (r'^Future exception was never retrieved\n' - r'future: \n' + r'future: \n' r'source_traceback: Object created at \(most recent call last\):\n' r' File' r'.*\n' diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py index c4e74e33303..dda4577aedd 100644 --- a/Lib/test/test_asyncio/test_locks.py +++ b/Lib/test/test_asyncio/test_locks.py @@ -660,10 +660,13 @@ class ConditionTests(test_utils.TestCase): lock = asyncio.Lock(loop=self.loop) cond = asyncio.Condition(lock, loop=self.loop) - self.assertIs(lock._loop, cond._loop) + self.assertIs(cond._lock, lock) + self.assertIs(cond._loop, lock._loop) def test_ambiguous_loops(self): loop = self.new_test_loop() + self.addCleanup(loop.close) + lock = asyncio.Lock(loop=self.loop) with self.assertRaises(ValueError): asyncio.Condition(lock, loop=loop) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 7b93a0e2109..95cba542410 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -132,6 +132,8 @@ class TaskTests(test_utils.TestCase): asyncio.async('ok') def test_task_repr(self): + self.loop.set_debug(False) + @asyncio.coroutine def notmuch(): yield from [] @@ -189,6 +191,8 @@ class TaskTests(test_utils.TestCase): "" % coro) def test_task_repr_coro_decorator(self): + self.loop.set_debug(False) + @asyncio.coroutine def notmuch(): # notmuch() function doesn't use yield from: it will be wrapped by @@ -252,6 +256,8 @@ class TaskTests(test_utils.TestCase): self.loop.run_until_complete(t) def test_task_repr_wait_for(self): + self.loop.set_debug(False) + @asyncio.coroutine def wait_for(fut): return (yield from fut)