(Merge 3.4) asyncio, tulip issue 196: ProactorIocp._register() now registers
the overlapped in the _cache dictionary, even if we already got the result. We need to keep a reference to the overlapped object, otherwise the memory may be reused and GetQueuedCompletionStatus() may use random bytes and behaves badly. There is still a hack for ConnectNamedPipe(): the overlapped object is not register into _cache if the overlapped object completed directly. Log also an error in debug mode in ProactorIocp._loop() if we get an unexpected event. Add a protection in ProactorIocp.close() to avoid blocking, even if it should not happen. I still don't understand exactly why some the completion of some overlapped objects are not notified.
This commit is contained in:
commit
314397aeef
|
@ -369,7 +369,10 @@ class IocpProactor:
|
||||||
ov.getresult()
|
ov.getresult()
|
||||||
return pipe
|
return pipe
|
||||||
|
|
||||||
return self._register(ov, pipe, finish_accept_pipe)
|
# FIXME: Tulip issue 196: why to we neeed register=False?
|
||||||
|
# See also the comment in the _register() method
|
||||||
|
return self._register(ov, pipe, finish_accept_pipe,
|
||||||
|
register=False)
|
||||||
|
|
||||||
def connect_pipe(self, address):
|
def connect_pipe(self, address):
|
||||||
ov = _overlapped.Overlapped(NULL)
|
ov = _overlapped.Overlapped(NULL)
|
||||||
|
@ -429,17 +432,13 @@ class IocpProactor:
|
||||||
# to avoid sending notifications to completion port of ops
|
# to avoid sending notifications to completion port of ops
|
||||||
# that succeed immediately.
|
# that succeed immediately.
|
||||||
|
|
||||||
def _register(self, ov, obj, callback, wait_for_post=False):
|
def _register(self, ov, obj, callback,
|
||||||
|
wait_for_post=False, register=True):
|
||||||
# Return a future which will be set with the result of the
|
# Return a future which will be set with the result of the
|
||||||
# operation when it completes. The future's value is actually
|
# operation when it completes. The future's value is actually
|
||||||
# the value returned by callback().
|
# the value returned by callback().
|
||||||
f = _OverlappedFuture(ov, loop=self._loop)
|
f = _OverlappedFuture(ov, loop=self._loop)
|
||||||
if ov.pending or wait_for_post:
|
if not ov.pending and not wait_for_post:
|
||||||
# Register the overlapped operation for later. Note that
|
|
||||||
# we only store obj to prevent it from being garbage
|
|
||||||
# collected too early.
|
|
||||||
self._cache[ov.address] = (f, ov, obj, callback)
|
|
||||||
else:
|
|
||||||
# The operation has completed, so no need to postpone the
|
# The operation has completed, so no need to postpone the
|
||||||
# work. We cannot take this short cut if we need the
|
# work. We cannot take this short cut if we need the
|
||||||
# NumberOfBytes, CompletionKey values returned by
|
# NumberOfBytes, CompletionKey values returned by
|
||||||
|
@ -450,6 +449,23 @@ class IocpProactor:
|
||||||
f.set_exception(e)
|
f.set_exception(e)
|
||||||
else:
|
else:
|
||||||
f.set_result(value)
|
f.set_result(value)
|
||||||
|
# Even if GetOverlappedResult() was called, we have to wait for the
|
||||||
|
# notification of the completion in GetQueuedCompletionStatus().
|
||||||
|
# Register the overlapped operation to keep a reference to the
|
||||||
|
# OVERLAPPED object, otherwise the memory is freed and Windows may
|
||||||
|
# read uninitialized memory.
|
||||||
|
#
|
||||||
|
# For an unknown reason, ConnectNamedPipe() behaves differently:
|
||||||
|
# the completion is not notified by GetOverlappedResult() if we
|
||||||
|
# already called GetOverlappedResult(). For this specific case, we
|
||||||
|
# don't expect notification (register is set to False).
|
||||||
|
else:
|
||||||
|
register = True
|
||||||
|
if register:
|
||||||
|
# Register the overlapped operation for later. Note that
|
||||||
|
# we only store obj to prevent it from being garbage
|
||||||
|
# collected too early.
|
||||||
|
self._cache[ov.address] = (f, ov, obj, callback)
|
||||||
return f
|
return f
|
||||||
|
|
||||||
def _get_accept_socket(self, family):
|
def _get_accept_socket(self, family):
|
||||||
|
@ -476,6 +492,14 @@ class IocpProactor:
|
||||||
try:
|
try:
|
||||||
f, ov, obj, callback = self._cache.pop(address)
|
f, ov, obj, callback = self._cache.pop(address)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
if self._loop.get_debug():
|
||||||
|
self._loop.call_exception_handler({
|
||||||
|
'message': ('GetQueuedCompletionStatus() returned an '
|
||||||
|
'unexpected event'),
|
||||||
|
'status': ('err=%s transferred=%s key=%#x address=%#x'
|
||||||
|
% (err, transferred, key, address)),
|
||||||
|
})
|
||||||
|
|
||||||
# key is either zero, or it is used to return a pipe
|
# key is either zero, or it is used to return a pipe
|
||||||
# handle which should be closed to avoid a leak.
|
# handle which should be closed to avoid a leak.
|
||||||
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
|
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
|
||||||
|
@ -483,15 +507,11 @@ class IocpProactor:
|
||||||
ms = 0
|
ms = 0
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if ov.pending:
|
|
||||||
# False alarm: the overlapped operation is not completed.
|
|
||||||
# FIXME: why do we get false alarms?
|
|
||||||
self._cache[address] = (f, ov, obj, callback)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if obj in self._stopped_serving:
|
if obj in self._stopped_serving:
|
||||||
f.cancel()
|
f.cancel()
|
||||||
elif not f.cancelled():
|
# Don't call the callback if _register() already read the result or
|
||||||
|
# if the overlapped has been cancelled
|
||||||
|
elif not f.done():
|
||||||
try:
|
try:
|
||||||
value = callback(transferred, key, ov)
|
value = callback(transferred, key, ov)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
@ -516,6 +536,9 @@ class IocpProactor:
|
||||||
# queues a task to Windows' thread pool. This cannot
|
# queues a task to Windows' thread pool. This cannot
|
||||||
# be cancelled, so just forget it.
|
# be cancelled, so just forget it.
|
||||||
del self._cache[address]
|
del self._cache[address]
|
||||||
|
# FIXME: Tulip issue 196: remove this case, it should not happen
|
||||||
|
elif fut.done() and not fut.cancelled():
|
||||||
|
del self._cache[address]
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
fut.cancel()
|
fut.cancel()
|
||||||
|
|
Loading…
Reference in New Issue