From 18a28dc5c28ae9a953f537486780159ddb768702 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 25 Jul 2014 13:05:20 +0200 Subject: [PATCH] asyncio: sync with Tulip * Fix _WaitHandleFuture.cancel(): return the result of the parent cancel() method. * _OverlappedFuture.cancel() now clears its reference to the overlapped object. Make also the _OverlappedFuture.ov attribute private. * Check if _WaitHandleFuture completed before unregistering it in the callback. Add also _WaitHandleFuture._poll() and repr(_WaitHandleFuture). * _WaitHandleFuture now unregisters its wait handler if WaitForSingleObject() raises an exception. * _OverlappedFuture.set_exception() now cancels the overlapped operation. --- Lib/asyncio/proactor_events.py | 8 +--- Lib/asyncio/windows_events.py | 71 ++++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 27 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index c530687d6f9..ab566b32757 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -44,13 +44,9 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, def __repr__(self): info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()] if self._read_fut is not None: - ov = "pending" if self._read_fut.ov.pending else "completed" - info.append('read=%s' % ov) + info.append('read=%s' % self._read_fut) if self._write_fut is not None: - if self._write_fut.ov.pending: - info.append("write=pending=%s" % self._pending_write) - else: - info.append("write=completed") + info.append("write=%r" % self._write_fut) if self._buffer: bufsize = len(self._buffer) info.append('write_bufsize=%s' % bufsize) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index af290b7e412..375003c4f1c 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -40,41 +40,69 @@ class _OverlappedFuture(futures.Future): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] - self.ov = ov + self._ov = ov def __repr__(self): info = [self._state.lower()] - state = 'pending' if self.ov.pending else 'completed' - info.append('overlapped=<%s, %#x>' % (state, self.ov.address)) + if self._ov is not None: + state = 'pending' if self._ov.pending else 'completed' + info.append('overlapped=<%s, %#x>' % (state, self._ov.address)) if self._state == futures._FINISHED: info.append(self._format_result()) if self._callbacks: info.append(self._format_callbacks()) return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) + def _cancel_overlapped(self): + if self._ov is None: + return + try: + self._ov.cancel() + except OSError as exc: + context = { + 'message': 'Cancelling an overlapped future failed', + 'exception': exc, + 'future': self, + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + self._ov = None + def cancel(self): - if not self.done(): - try: - self.ov.cancel() - except OSError as exc: - context = { - 'message': 'Cancelling an overlapped future failed', - 'exception': exc, - 'future': self, - } - if self._source_traceback: - context['source_traceback'] = self._source_traceback - self._loop.call_exception_handler(context) + self._cancel_overlapped() return super().cancel() + def set_exception(self, exception): + super().set_exception(exception) + self._cancel_overlapped() + class _WaitHandleFuture(futures.Future): """Subclass of Future which represents a wait handle.""" - def __init__(self, wait_handle, *, loop=None): + def __init__(self, handle, wait_handle, *, loop=None): super().__init__(loop=loop) + self._handle = handle self._wait_handle = wait_handle + def _poll(self): + # non-blocking wait: use a timeout of 0 millisecond + return (_winapi.WaitForSingleObject(self._handle, 0) == + _winapi.WAIT_OBJECT_0) + + def __repr__(self): + info = [self._state.lower()] + if self._wait_handle: + state = 'pending' if self._poll() else 'completed' + info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle)) + info.append('handle=<%#x>' % self._handle) + if self._state == futures._FINISHED: + info.append(self._format_result()) + if self._callbacks: + info.append(self._format_callbacks()) + return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) + def _unregister(self): if self._wait_handle is None: return @@ -88,7 +116,7 @@ class _WaitHandleFuture(futures.Future): def cancel(self): self._unregister() - super().cancel() + return super().cancel() class PipeServer(object): @@ -370,18 +398,19 @@ class IocpProactor: ov = _overlapped.Overlapped(NULL) wh = _overlapped.RegisterWaitWithQueue( handle, self._iocp, ov.address, ms) - f = _WaitHandleFuture(wh, loop=self._loop) + f = _WaitHandleFuture(handle, wh, loop=self._loop) def finish_wait_for_handle(trans, key, ov): - f._unregister() # Note that this second wait means that we should only use # this with handles types where a successful wait has no # effect. So events or processes are all right, but locks # or semaphores are not. Also note if the handle is # signalled and then quickly reset, then we may return # False even though we have not timed out. - return (_winapi.WaitForSingleObject(handle, 0) == - _winapi.WAIT_OBJECT_0) + try: + return f._poll() + finally: + f._unregister() self._cache[ov.address] = (f, ov, None, finish_wait_for_handle) return f