(Merge 3.4) asyncio: sync with Tulip

* Fix _WaitHandleFuture.cancel(): return the result of the parent cancel()
  method.
* _OverlappedFuture.cancel() now clears its reference to the overlapped object.
  Make also the _OverlappedFuture.ov attribute private.
* Check if _WaitHandleFuture completed before unregistering it in the callback.
  Add also _WaitHandleFuture._poll() and repr(_WaitHandleFuture).
* _WaitHandleFuture now unregisters its wait handler if WaitForSingleObject()
  raises an exception.
* _OverlappedFuture.set_exception() now cancels the overlapped operation.
This commit is contained in:
Victor Stinner 2014-07-25 13:05:43 +02:00
commit d6766ae434
2 changed files with 52 additions and 27 deletions

View File

@ -44,13 +44,9 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()] info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
if self._read_fut is not None: if self._read_fut is not None:
ov = "pending" if self._read_fut.ov.pending else "completed" info.append('read=%s' % self._read_fut)
info.append('read=%s' % ov)
if self._write_fut is not None: if self._write_fut is not None:
if self._write_fut.ov.pending: info.append("write=%r" % self._write_fut)
info.append("write=pending=%s" % self._pending_write)
else:
info.append("write=completed")
if self._buffer: if self._buffer:
bufsize = len(self._buffer) bufsize = len(self._buffer)
info.append('write_bufsize=%s' % bufsize) info.append('write_bufsize=%s' % bufsize)

View File

@ -40,22 +40,24 @@ class _OverlappedFuture(futures.Future):
super().__init__(loop=loop) super().__init__(loop=loop)
if self._source_traceback: if self._source_traceback:
del self._source_traceback[-1] del self._source_traceback[-1]
self.ov = ov self._ov = ov
def __repr__(self): def __repr__(self):
info = [self._state.lower()] info = [self._state.lower()]
state = 'pending' if self.ov.pending else 'completed' if self._ov is not None:
info.append('overlapped=<%s, %#x>' % (state, self.ov.address)) state = 'pending' if self._ov.pending else 'completed'
info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
if self._state == futures._FINISHED: if self._state == futures._FINISHED:
info.append(self._format_result()) info.append(self._format_result())
if self._callbacks: if self._callbacks:
info.append(self._format_callbacks()) info.append(self._format_callbacks())
return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
def cancel(self): def _cancel_overlapped(self):
if not self.done(): if self._ov is None:
return
try: try:
self.ov.cancel() self._ov.cancel()
except OSError as exc: except OSError as exc:
context = { context = {
'message': 'Cancelling an overlapped future failed', 'message': 'Cancelling an overlapped future failed',
@ -65,16 +67,42 @@ class _OverlappedFuture(futures.Future):
if self._source_traceback: if self._source_traceback:
context['source_traceback'] = self._source_traceback context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context) self._loop.call_exception_handler(context)
self._ov = None
def cancel(self):
self._cancel_overlapped()
return super().cancel() return super().cancel()
def set_exception(self, exception):
super().set_exception(exception)
self._cancel_overlapped()
class _WaitHandleFuture(futures.Future): class _WaitHandleFuture(futures.Future):
"""Subclass of Future which represents a wait handle.""" """Subclass of Future which represents a wait handle."""
def __init__(self, wait_handle, *, loop=None): def __init__(self, handle, wait_handle, *, loop=None):
super().__init__(loop=loop) super().__init__(loop=loop)
self._handle = handle
self._wait_handle = wait_handle self._wait_handle = wait_handle
def _poll(self):
# non-blocking wait: use a timeout of 0 millisecond
return (_winapi.WaitForSingleObject(self._handle, 0) ==
_winapi.WAIT_OBJECT_0)
def __repr__(self):
info = [self._state.lower()]
if self._wait_handle:
state = 'pending' if self._poll() else 'completed'
info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
info.append('handle=<%#x>' % self._handle)
if self._state == futures._FINISHED:
info.append(self._format_result())
if self._callbacks:
info.append(self._format_callbacks())
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
def _unregister(self): def _unregister(self):
if self._wait_handle is None: if self._wait_handle is None:
return return
@ -88,7 +116,7 @@ class _WaitHandleFuture(futures.Future):
def cancel(self): def cancel(self):
self._unregister() self._unregister()
super().cancel() return super().cancel()
class PipeServer(object): class PipeServer(object):
@ -370,18 +398,19 @@ class IocpProactor:
ov = _overlapped.Overlapped(NULL) ov = _overlapped.Overlapped(NULL)
wh = _overlapped.RegisterWaitWithQueue( wh = _overlapped.RegisterWaitWithQueue(
handle, self._iocp, ov.address, ms) handle, self._iocp, ov.address, ms)
f = _WaitHandleFuture(wh, loop=self._loop) f = _WaitHandleFuture(handle, wh, loop=self._loop)
def finish_wait_for_handle(trans, key, ov): def finish_wait_for_handle(trans, key, ov):
f._unregister()
# Note that this second wait means that we should only use # Note that this second wait means that we should only use
# this with handles types where a successful wait has no # this with handles types where a successful wait has no
# effect. So events or processes are all right, but locks # effect. So events or processes are all right, but locks
# or semaphores are not. Also note if the handle is # or semaphores are not. Also note if the handle is
# signalled and then quickly reset, then we may return # signalled and then quickly reset, then we may return
# False even though we have not timed out. # False even though we have not timed out.
return (_winapi.WaitForSingleObject(handle, 0) == try:
_winapi.WAIT_OBJECT_0) return f._poll()
finally:
f._unregister()
self._cache[ov.address] = (f, ov, None, finish_wait_for_handle) self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
return f return f