Issue #23293, asyncio: Rewrite IocpProactor.connect_pipe()

Add _overlapped.ConnectPipe() which tries to connect to the pipe for
asynchronous I/O (overlapped): call CreateFile() in a loop until it doesn't
fail with ERROR_PIPE_BUSY. Use an increasing delay between 1 ms and 100 ms.

Remove Overlapped.WaitNamedPipeAndConnect() which is no more used.
This commit is contained in:
Victor Stinner 2015-01-22 22:55:08 +01:00
parent 752aba7f99
commit 7ffa2c5fdd
2 changed files with 48 additions and 110 deletions

View File

@ -29,6 +29,12 @@ INFINITE = 0xffffffff
ERROR_CONNECTION_REFUSED = 1225 ERROR_CONNECTION_REFUSED = 1225
ERROR_CONNECTION_ABORTED = 1236 ERROR_CONNECTION_ABORTED = 1236
# Initial delay in seconds for connect_pipe() before retrying to connect
CONNECT_PIPE_INIT_DELAY = 0.001
# Maximum delay in seconds for connect_pipe() before retrying to connect
CONNECT_PIPE_MAX_DELAY = 0.100
class _OverlappedFuture(futures.Future): class _OverlappedFuture(futures.Future):
"""Subclass of Future which represents an overlapped operation. """Subclass of Future which represents an overlapped operation.
@ -495,25 +501,28 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe, return self._register(ov, pipe, finish_accept_pipe,
register=False) register=False)
def connect_pipe(self, address): def _connect_pipe(self, fut, address, delay):
ov = _overlapped.Overlapped(NULL) # Unfortunately there is no way to do an overlapped connect to a pipe.
ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address) # Call CreateFile() in a loop until it doesn't fail with
# ERROR_PIPE_BUSY
def finish_connect_pipe(err, handle, ov): try:
# err, handle were arguments passed to PostQueuedCompletionStatus() handle = _overlapped.ConnectPipe(address)
# in a function run in a thread pool. except OSError as exc:
if err == _overlapped.ERROR_SEM_TIMEOUT: if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
# Connection did not succeed within time limit. # Polling: retry later
msg = _overlapped.FormatMessage(err) delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
raise ConnectionRefusedError(0, msg, None, err) self._loop.call_later(delay,
elif err != 0: self._connect_pipe, fut, address, delay)
msg = _overlapped.FormatMessage(err)
raise OSError(0, msg, None, err)
else: else:
return windows_utils.PipeHandle(handle) fut.set_exception(exc)
else:
pipe = windows_utils.PipeHandle(handle)
fut.set_result(pipe)
return self._register(ov, None, finish_connect_pipe, def connect_pipe(self, address):
wait_for_post=True) fut = futures.Future(loop=self._loop)
self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
return fut
def wait_for_handle(self, handle, timeout=None): def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle. """Wait for a handle.

View File

@ -52,12 +52,6 @@ typedef struct {
}; };
} OverlappedObject; } OverlappedObject;
typedef struct {
OVERLAPPED *Overlapped;
HANDLE IocpHandle;
char Address[1];
} WaitNamedPipeAndConnectContext;
/* /*
* Map Windows error codes to subclasses of OSError * Map Windows error codes to subclasses of OSError
*/ */
@ -1133,99 +1127,33 @@ Overlapped_ConnectNamedPipe(OverlappedObject *self, PyObject *args)
} }
} }
/* Unfortunately there is no way to do an overlapped connect to a
pipe. We instead use WaitNamedPipe() and CreateFile() in a thread
pool thread. If a connection succeeds within a time limit (10
seconds) then PostQueuedCompletionStatus() is used to return the
pipe handle to the completion port. */
static DWORD WINAPI
WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx)
{
HANDLE PipeHandle = INVALID_HANDLE_VALUE;
DWORD Start = GetTickCount();
DWORD Deadline = Start + 10*1000;
DWORD Error = 0;
DWORD Timeout;
BOOL Success;
for ( ; ; ) {
Timeout = Deadline - GetTickCount();
if ((int)Timeout < 0)
break;
Success = WaitNamedPipe(ctx->Address, Timeout);
Error = Success ? ERROR_SUCCESS : GetLastError();
switch (Error) {
case ERROR_SUCCESS:
PipeHandle = CreateFile(ctx->Address,
GENERIC_READ | GENERIC_WRITE,
0, NULL, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, NULL);
if (PipeHandle == INVALID_HANDLE_VALUE)
continue;
break;
case ERROR_SEM_TIMEOUT:
continue;
}
break;
}
if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error,
(ULONG_PTR)PipeHandle, ctx->Overlapped))
CloseHandle(PipeHandle);
free(ctx);
return 0;
}
PyDoc_STRVAR( PyDoc_STRVAR(
Overlapped_WaitNamedPipeAndConnect_doc, ConnectPipe_doc,
"WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n" "ConnectPipe(addr) -> pipe_handle\n\n"
"Start overlapped connection to address, notifying iocp_handle when\n" "Connect to the pipe for asynchronous I/O (overlapped).");
"finished");
static PyObject * static PyObject *
Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args) ConnectPipe(OverlappedObject *self, PyObject *args)
{ {
char *Address; PyObject *AddressObj;
Py_ssize_t AddressLength; wchar_t *Address;
HANDLE IocpHandle; HANDLE PipeHandle;
OVERLAPPED Overlapped;
BOOL ret;
DWORD err;
WaitNamedPipeAndConnectContext *ctx;
Py_ssize_t ContextLength;
if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER, if (!PyArg_ParseTuple(args, "U", &AddressObj))
&Address, &AddressLength, &IocpHandle, &Overlapped))
return NULL; return NULL;
if (self->type != TYPE_NONE) { Address = PyUnicode_AsWideCharString(AddressObj, NULL);
PyErr_SetString(PyExc_ValueError, "operation already attempted"); if (Address == NULL)
return NULL; return NULL;
}
ContextLength = (AddressLength + PipeHandle = CreateFileW(Address,
offsetof(WaitNamedPipeAndConnectContext, Address)); GENERIC_READ | GENERIC_WRITE,
ctx = calloc(1, ContextLength + 1); 0, NULL, OPEN_EXISTING,
if (ctx == NULL) FILE_FLAG_OVERLAPPED, NULL);
return PyErr_NoMemory(); PyMem_Free(Address);
memcpy(ctx->Address, Address, AddressLength + 1); if (PipeHandle == INVALID_HANDLE_VALUE)
ctx->Overlapped = &self->overlapped; return SetFromWindowsErr(0);
ctx->IocpHandle = IocpHandle; return Py_BuildValue(F_HANDLE, PipeHandle);
self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT;
self->handle = NULL;
Py_BEGIN_ALLOW_THREADS
ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx,
WT_EXECUTELONGFUNCTION);
Py_END_ALLOW_THREADS
mark_as_completed(&self->overlapped);
self->error = err = ret ? ERROR_SUCCESS : GetLastError();
if (!ret)
return SetFromWindowsErr(err);
Py_RETURN_NONE;
} }
static PyObject* static PyObject*
@ -1262,9 +1190,6 @@ static PyMethodDef Overlapped_methods[] = {
METH_VARARGS, Overlapped_DisconnectEx_doc}, METH_VARARGS, Overlapped_DisconnectEx_doc},
{"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe, {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe,
METH_VARARGS, Overlapped_ConnectNamedPipe_doc}, METH_VARARGS, Overlapped_ConnectNamedPipe_doc},
{"WaitNamedPipeAndConnect",
(PyCFunction) Overlapped_WaitNamedPipeAndConnect,
METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc},
{NULL} {NULL}
}; };
@ -1350,6 +1275,9 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, SetEvent_doc}, METH_VARARGS, SetEvent_doc},
{"ResetEvent", overlapped_ResetEvent, {"ResetEvent", overlapped_ResetEvent,
METH_VARARGS, ResetEvent_doc}, METH_VARARGS, ResetEvent_doc},
{"ConnectPipe",
(PyCFunction) ConnectPipe,
METH_VARARGS, ConnectPipe_doc},
{NULL} {NULL}
}; };
@ -1394,6 +1322,7 @@ PyInit__overlapped(void)
WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING);
WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED); WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED);
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WINAPI_CONSTANT(F_DWORD, INFINITE); WINAPI_CONSTANT(F_DWORD, INFINITE);
WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE); WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
WINAPI_CONSTANT(F_HANDLE, NULL); WINAPI_CONSTANT(F_HANDLE, NULL);