From 42d3bdeed6e34117b787d61a471563a0dba6a894 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 28 Jul 2014 00:18:43 +0200 Subject: [PATCH] asyncio, tulip issue 196: ProactorIocp._register() now registers the overlapped in the _cache dictionary, even if we already got the result. We need to keep a reference to the overlapped object, otherwise the memory may be reused and GetQueuedCompletionStatus() may use random bytes and behaves badly. There is still a hack for ConnectNamedPipe(): the overlapped object is not register into _cache if the overlapped object completed directly. Log also an error in debug mode in ProactorIocp._loop() if we get an unexpected event. Add a protection in ProactorIocp.close() to avoid blocking, even if it should not happen. I still don't understand exactly why some the completion of some overlapped objects are not notified. --- Lib/asyncio/windows_events.py | 53 +++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 3aa142c4fcf..41be8da2a04 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -369,7 +369,10 @@ class IocpProactor: ov.getresult() return pipe - return self._register(ov, pipe, finish_accept_pipe) + # FIXME: Tulip issue 196: why to we neeed register=False? + # See also the comment in the _register() method + return self._register(ov, pipe, finish_accept_pipe, + register=False) def connect_pipe(self, address): ov = _overlapped.Overlapped(NULL) @@ -429,17 +432,13 @@ class IocpProactor: # to avoid sending notifications to completion port of ops # that succeed immediately. - def _register(self, ov, obj, callback, wait_for_post=False): + def _register(self, ov, obj, callback, + wait_for_post=False, register=True): # Return a future which will be set with the result of the # operation when it completes. The future's value is actually # the value returned by callback(). f = _OverlappedFuture(ov, loop=self._loop) - if ov.pending or wait_for_post: - # Register the overlapped operation for later. Note that - # we only store obj to prevent it from being garbage - # collected too early. - self._cache[ov.address] = (f, ov, obj, callback) - else: + if not ov.pending and not wait_for_post: # The operation has completed, so no need to postpone the # work. We cannot take this short cut if we need the # NumberOfBytes, CompletionKey values returned by @@ -450,6 +449,23 @@ class IocpProactor: f.set_exception(e) else: f.set_result(value) + # Even if GetOverlappedResult() was called, we have to wait for the + # notification of the completion in GetQueuedCompletionStatus(). + # Register the overlapped operation to keep a reference to the + # OVERLAPPED object, otherwise the memory is freed and Windows may + # read uninitialized memory. + # + # For an unknown reason, ConnectNamedPipe() behaves differently: + # the completion is not notified by GetOverlappedResult() if we + # already called GetOverlappedResult(). For this specific case, we + # don't expect notification (register is set to False). + else: + register = True + if register: + # Register the overlapped operation for later. Note that + # we only store obj to prevent it from being garbage + # collected too early. + self._cache[ov.address] = (f, ov, obj, callback) return f def _get_accept_socket(self, family): @@ -476,6 +492,14 @@ class IocpProactor: try: f, ov, obj, callback = self._cache.pop(address) except KeyError: + if self._loop.get_debug(): + self._loop.call_exception_handler({ + 'message': ('GetQueuedCompletionStatus() returned an ' + 'unexpected event'), + 'status': ('err=%s transferred=%s key=%#x address=%#x' + % (err, transferred, key, address)), + }) + # key is either zero, or it is used to return a pipe # handle which should be closed to avoid a leak. if key not in (0, _overlapped.INVALID_HANDLE_VALUE): @@ -483,15 +507,11 @@ class IocpProactor: ms = 0 continue - if ov.pending: - # False alarm: the overlapped operation is not completed. - # FIXME: why do we get false alarms? - self._cache[address] = (f, ov, obj, callback) - continue - if obj in self._stopped_serving: f.cancel() - elif not f.cancelled(): + # Don't call the callback if _register() already read the result or + # if the overlapped has been cancelled + elif not f.done(): try: value = callback(transferred, key, ov) except OSError as e: @@ -516,6 +536,9 @@ class IocpProactor: # queues a task to Windows' thread pool. This cannot # be cancelled, so just forget it. del self._cache[address] + # FIXME: Tulip issue 196: remove this case, it should not happen + elif fut.done() and not fut.cancelled(): + del self._cache[address] else: try: fut.cancel()