diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 3cb5690f8e4..7d0dbe9d01d 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -29,6 +29,12 @@ INFINITE = 0xffffffff ERROR_CONNECTION_REFUSED = 1225 ERROR_CONNECTION_ABORTED = 1236 +# Initial delay in seconds for connect_pipe() before retrying to connect +CONNECT_PIPE_INIT_DELAY = 0.001 + +# Maximum delay in seconds for connect_pipe() before retrying to connect +CONNECT_PIPE_MAX_DELAY = 0.100 + class _OverlappedFuture(futures.Future): """Subclass of Future which represents an overlapped operation. @@ -495,25 +501,28 @@ class IocpProactor: return self._register(ov, pipe, finish_accept_pipe, register=False) - def connect_pipe(self, address): - ov = _overlapped.Overlapped(NULL) - ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address) - - def finish_connect_pipe(err, handle, ov): - # err, handle were arguments passed to PostQueuedCompletionStatus() - # in a function run in a thread pool. - if err == _overlapped.ERROR_SEM_TIMEOUT: - # Connection did not succeed within time limit. - msg = _overlapped.FormatMessage(err) - raise ConnectionRefusedError(0, msg, None, err) - elif err != 0: - msg = _overlapped.FormatMessage(err) - raise OSError(0, msg, None, err) + def _connect_pipe(self, fut, address, delay): + # Unfortunately there is no way to do an overlapped connect to a pipe. + # Call CreateFile() in a loop until it doesn't fail with + # ERROR_PIPE_BUSY + try: + handle = _overlapped.ConnectPipe(address) + except OSError as exc: + if exc.winerror == _overlapped.ERROR_PIPE_BUSY: + # Polling: retry later + delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) + self._loop.call_later(delay, + self._connect_pipe, fut, address, delay) else: - return windows_utils.PipeHandle(handle) + fut.set_exception(exc) + else: + pipe = windows_utils.PipeHandle(handle) + fut.set_result(pipe) - return self._register(ov, None, finish_connect_pipe, - wait_for_post=True) + def connect_pipe(self, address): + fut = futures.Future(loop=self._loop) + self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY) + return fut def wait_for_handle(self, handle, timeout=None): """Wait for a handle. @@ -693,12 +702,16 @@ class IocpProactor: # queues a task to Windows' thread pool. This cannot # be cancelled, so just forget it. del self._cache[address] - # FIXME: Tulip issue 196: remove this case, it should not happen - elif fut.done() and not fut.cancelled(): - del self._cache[address] + elif fut.cancelled(): + # Nothing to do with cancelled futures + pass elif isinstance(fut, _WaitCancelFuture): # _WaitCancelFuture must not be cancelled pass + elif fut.done(): + # FIXME: Tulip issue 196: remove this case, it should not + # happen + del self._cache[address] else: try: fut.cancel() diff --git a/Modules/overlapped.c b/Modules/overlapped.c index d22c626ecf8..8fe2e247bc1 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -52,12 +52,6 @@ typedef struct { }; } OverlappedObject; -typedef struct { - OVERLAPPED *Overlapped; - HANDLE IocpHandle; - char Address[1]; -} WaitNamedPipeAndConnectContext; - /* * Map Windows error codes to subclasses of OSError */ @@ -1133,99 +1127,33 @@ Overlapped_ConnectNamedPipe(OverlappedObject *self, PyObject *args) } } -/* Unfortunately there is no way to do an overlapped connect to a - pipe. We instead use WaitNamedPipe() and CreateFile() in a thread - pool thread. If a connection succeeds within a time limit (10 - seconds) then PostQueuedCompletionStatus() is used to return the - pipe handle to the completion port. */ - -static DWORD WINAPI -WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx) -{ - HANDLE PipeHandle = INVALID_HANDLE_VALUE; - DWORD Start = GetTickCount(); - DWORD Deadline = Start + 10*1000; - DWORD Error = 0; - DWORD Timeout; - BOOL Success; - - for ( ; ; ) { - Timeout = Deadline - GetTickCount(); - if ((int)Timeout < 0) - break; - Success = WaitNamedPipe(ctx->Address, Timeout); - Error = Success ? ERROR_SUCCESS : GetLastError(); - switch (Error) { - case ERROR_SUCCESS: - PipeHandle = CreateFile(ctx->Address, - GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, NULL); - if (PipeHandle == INVALID_HANDLE_VALUE) - continue; - break; - case ERROR_SEM_TIMEOUT: - continue; - } - break; - } - if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error, - (ULONG_PTR)PipeHandle, ctx->Overlapped)) - CloseHandle(PipeHandle); - free(ctx); - return 0; -} - PyDoc_STRVAR( - Overlapped_WaitNamedPipeAndConnect_doc, - "WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n" - "Start overlapped connection to address, notifying iocp_handle when\n" - "finished"); + ConnectPipe_doc, + "ConnectPipe(addr) -> pipe_handle\n\n" + "Connect to the pipe for asynchronous I/O (overlapped)."); static PyObject * -Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args) +ConnectPipe(OverlappedObject *self, PyObject *args) { - char *Address; - Py_ssize_t AddressLength; - HANDLE IocpHandle; - OVERLAPPED Overlapped; - BOOL ret; - DWORD err; - WaitNamedPipeAndConnectContext *ctx; - Py_ssize_t ContextLength; + PyObject *AddressObj; + wchar_t *Address; + HANDLE PipeHandle; - if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER, - &Address, &AddressLength, &IocpHandle, &Overlapped)) + if (!PyArg_ParseTuple(args, "U", &AddressObj)) return NULL; - if (self->type != TYPE_NONE) { - PyErr_SetString(PyExc_ValueError, "operation already attempted"); + Address = PyUnicode_AsWideCharString(AddressObj, NULL); + if (Address == NULL) return NULL; - } - ContextLength = (AddressLength + - offsetof(WaitNamedPipeAndConnectContext, Address)); - ctx = calloc(1, ContextLength + 1); - if (ctx == NULL) - return PyErr_NoMemory(); - memcpy(ctx->Address, Address, AddressLength + 1); - ctx->Overlapped = &self->overlapped; - ctx->IocpHandle = IocpHandle; - - self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT; - self->handle = NULL; - - Py_BEGIN_ALLOW_THREADS - ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx, - WT_EXECUTELONGFUNCTION); - Py_END_ALLOW_THREADS - - mark_as_completed(&self->overlapped); - - self->error = err = ret ? ERROR_SUCCESS : GetLastError(); - if (!ret) - return SetFromWindowsErr(err); - Py_RETURN_NONE; + PipeHandle = CreateFileW(Address, + GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, NULL); + PyMem_Free(Address); + if (PipeHandle == INVALID_HANDLE_VALUE) + return SetFromWindowsErr(0); + return Py_BuildValue(F_HANDLE, PipeHandle); } static PyObject* @@ -1262,9 +1190,6 @@ static PyMethodDef Overlapped_methods[] = { METH_VARARGS, Overlapped_DisconnectEx_doc}, {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe, METH_VARARGS, Overlapped_ConnectNamedPipe_doc}, - {"WaitNamedPipeAndConnect", - (PyCFunction) Overlapped_WaitNamedPipeAndConnect, - METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc}, {NULL} }; @@ -1350,6 +1275,9 @@ static PyMethodDef overlapped_functions[] = { METH_VARARGS, SetEvent_doc}, {"ResetEvent", overlapped_ResetEvent, METH_VARARGS, ResetEvent_doc}, + {"ConnectPipe", + (PyCFunction) ConnectPipe, + METH_VARARGS, ConnectPipe_doc}, {NULL} }; @@ -1394,6 +1322,7 @@ PyInit__overlapped(void) WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED); WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); + WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WINAPI_CONSTANT(F_DWORD, INFINITE); WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE); WINAPI_CONSTANT(F_HANDLE, NULL);