mirror of https://github.com/python/cpython
asyncio: sync with Tulip
* Fix _WaitHandleFuture.cancel(): return the result of the parent cancel() method. * _OverlappedFuture.cancel() now clears its reference to the overlapped object. Make also the _OverlappedFuture.ov attribute private. * Check if _WaitHandleFuture completed before unregistering it in the callback. Add also _WaitHandleFuture._poll() and repr(_WaitHandleFuture). * _WaitHandleFuture now unregisters its wait handler if WaitForSingleObject() raises an exception. * _OverlappedFuture.set_exception() now cancels the overlapped operation.
This commit is contained in:
parent
1a901cc952
commit
18a28dc5c2
|
@ -44,13 +44,9 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
|
info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
|
||||||
if self._read_fut is not None:
|
if self._read_fut is not None:
|
||||||
ov = "pending" if self._read_fut.ov.pending else "completed"
|
info.append('read=%s' % self._read_fut)
|
||||||
info.append('read=%s' % ov)
|
|
||||||
if self._write_fut is not None:
|
if self._write_fut is not None:
|
||||||
if self._write_fut.ov.pending:
|
info.append("write=%r" % self._write_fut)
|
||||||
info.append("write=pending=%s" % self._pending_write)
|
|
||||||
else:
|
|
||||||
info.append("write=completed")
|
|
||||||
if self._buffer:
|
if self._buffer:
|
||||||
bufsize = len(self._buffer)
|
bufsize = len(self._buffer)
|
||||||
info.append('write_bufsize=%s' % bufsize)
|
info.append('write_bufsize=%s' % bufsize)
|
||||||
|
|
|
@ -40,41 +40,69 @@ class _OverlappedFuture(futures.Future):
|
||||||
super().__init__(loop=loop)
|
super().__init__(loop=loop)
|
||||||
if self._source_traceback:
|
if self._source_traceback:
|
||||||
del self._source_traceback[-1]
|
del self._source_traceback[-1]
|
||||||
self.ov = ov
|
self._ov = ov
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
info = [self._state.lower()]
|
info = [self._state.lower()]
|
||||||
state = 'pending' if self.ov.pending else 'completed'
|
if self._ov is not None:
|
||||||
info.append('overlapped=<%s, %#x>' % (state, self.ov.address))
|
state = 'pending' if self._ov.pending else 'completed'
|
||||||
|
info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
|
||||||
if self._state == futures._FINISHED:
|
if self._state == futures._FINISHED:
|
||||||
info.append(self._format_result())
|
info.append(self._format_result())
|
||||||
if self._callbacks:
|
if self._callbacks:
|
||||||
info.append(self._format_callbacks())
|
info.append(self._format_callbacks())
|
||||||
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||||
|
|
||||||
|
def _cancel_overlapped(self):
|
||||||
|
if self._ov is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._ov.cancel()
|
||||||
|
except OSError as exc:
|
||||||
|
context = {
|
||||||
|
'message': 'Cancelling an overlapped future failed',
|
||||||
|
'exception': exc,
|
||||||
|
'future': self,
|
||||||
|
}
|
||||||
|
if self._source_traceback:
|
||||||
|
context['source_traceback'] = self._source_traceback
|
||||||
|
self._loop.call_exception_handler(context)
|
||||||
|
self._ov = None
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
if not self.done():
|
self._cancel_overlapped()
|
||||||
try:
|
|
||||||
self.ov.cancel()
|
|
||||||
except OSError as exc:
|
|
||||||
context = {
|
|
||||||
'message': 'Cancelling an overlapped future failed',
|
|
||||||
'exception': exc,
|
|
||||||
'future': self,
|
|
||||||
}
|
|
||||||
if self._source_traceback:
|
|
||||||
context['source_traceback'] = self._source_traceback
|
|
||||||
self._loop.call_exception_handler(context)
|
|
||||||
return super().cancel()
|
return super().cancel()
|
||||||
|
|
||||||
|
def set_exception(self, exception):
|
||||||
|
super().set_exception(exception)
|
||||||
|
self._cancel_overlapped()
|
||||||
|
|
||||||
|
|
||||||
class _WaitHandleFuture(futures.Future):
|
class _WaitHandleFuture(futures.Future):
|
||||||
"""Subclass of Future which represents a wait handle."""
|
"""Subclass of Future which represents a wait handle."""
|
||||||
|
|
||||||
def __init__(self, wait_handle, *, loop=None):
|
def __init__(self, handle, wait_handle, *, loop=None):
|
||||||
super().__init__(loop=loop)
|
super().__init__(loop=loop)
|
||||||
|
self._handle = handle
|
||||||
self._wait_handle = wait_handle
|
self._wait_handle = wait_handle
|
||||||
|
|
||||||
|
def _poll(self):
|
||||||
|
# non-blocking wait: use a timeout of 0 millisecond
|
||||||
|
return (_winapi.WaitForSingleObject(self._handle, 0) ==
|
||||||
|
_winapi.WAIT_OBJECT_0)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
info = [self._state.lower()]
|
||||||
|
if self._wait_handle:
|
||||||
|
state = 'pending' if self._poll() else 'completed'
|
||||||
|
info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
|
||||||
|
info.append('handle=<%#x>' % self._handle)
|
||||||
|
if self._state == futures._FINISHED:
|
||||||
|
info.append(self._format_result())
|
||||||
|
if self._callbacks:
|
||||||
|
info.append(self._format_callbacks())
|
||||||
|
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
|
||||||
|
|
||||||
def _unregister(self):
|
def _unregister(self):
|
||||||
if self._wait_handle is None:
|
if self._wait_handle is None:
|
||||||
return
|
return
|
||||||
|
@ -88,7 +116,7 @@ class _WaitHandleFuture(futures.Future):
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
self._unregister()
|
self._unregister()
|
||||||
super().cancel()
|
return super().cancel()
|
||||||
|
|
||||||
|
|
||||||
class PipeServer(object):
|
class PipeServer(object):
|
||||||
|
@ -370,18 +398,19 @@ class IocpProactor:
|
||||||
ov = _overlapped.Overlapped(NULL)
|
ov = _overlapped.Overlapped(NULL)
|
||||||
wh = _overlapped.RegisterWaitWithQueue(
|
wh = _overlapped.RegisterWaitWithQueue(
|
||||||
handle, self._iocp, ov.address, ms)
|
handle, self._iocp, ov.address, ms)
|
||||||
f = _WaitHandleFuture(wh, loop=self._loop)
|
f = _WaitHandleFuture(handle, wh, loop=self._loop)
|
||||||
|
|
||||||
def finish_wait_for_handle(trans, key, ov):
|
def finish_wait_for_handle(trans, key, ov):
|
||||||
f._unregister()
|
|
||||||
# Note that this second wait means that we should only use
|
# Note that this second wait means that we should only use
|
||||||
# this with handles types where a successful wait has no
|
# this with handles types where a successful wait has no
|
||||||
# effect. So events or processes are all right, but locks
|
# effect. So events or processes are all right, but locks
|
||||||
# or semaphores are not. Also note if the handle is
|
# or semaphores are not. Also note if the handle is
|
||||||
# signalled and then quickly reset, then we may return
|
# signalled and then quickly reset, then we may return
|
||||||
# False even though we have not timed out.
|
# False even though we have not timed out.
|
||||||
return (_winapi.WaitForSingleObject(handle, 0) ==
|
try:
|
||||||
_winapi.WAIT_OBJECT_0)
|
return f._poll()
|
||||||
|
finally:
|
||||||
|
f._unregister()
|
||||||
|
|
||||||
self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
|
self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
|
||||||
return f
|
return f
|
||||||
|
|
Loading…
Reference in New Issue