Issue #12981: rewrite multiprocessing_{sendfd,recvfd} in Python.

This commit is contained in:
Charles-François Natali 2011-09-24 20:04:29 +02:00
parent 57e683e53e
commit dc863ddf79
3 changed files with 19 additions and 140 deletions

View File

@ -39,6 +39,7 @@ import os
import sys import sys
import socket import socket
import threading import threading
import struct
import _multiprocessing import _multiprocessing
from multiprocessing import current_process from multiprocessing import current_process
@ -51,7 +52,8 @@ from multiprocessing.connection import Client, Listener, Connection
# #
# #
if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
hasattr(socket, 'SCM_RIGHTS'))):
raise ImportError('pickling of connections not supported') raise ImportError('pickling of connections not supported')
# #
@ -77,10 +79,23 @@ if sys.platform == 'win32':
else: else:
def send_handle(conn, handle, destination_pid): def send_handle(conn, handle, destination_pid):
_multiprocessing.sendfd(conn.fileno(), handle) with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
struct.pack("@i", handle))])
def recv_handle(conn): def recv_handle(conn):
return _multiprocessing.recvfd(conn.fileno()) size = struct.calcsize("@i")
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
try:
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
if (cmsg_level == socket.SOL_SOCKET and
cmsg_type == socket.SCM_RIGHTS):
return struct.unpack("@i", cmsg_data[:size])[0]
except (ValueError, IndexError, struct.error):
pass
raise RuntimeError('Invalid data received')
# #
# Support for a per-process server thread which caches pickled handles # Support for a per-process server thread which caches pickled handles

View File

@ -8,11 +8,6 @@
#include "multiprocessing.h" #include "multiprocessing.h"
#ifdef SCM_RIGHTS
#define HAVE_FD_TRANSFER 1
#else
#define HAVE_FD_TRANSFER 0
#endif
PyObject *create_win32_namespace(void); PyObject *create_win32_namespace(void);
@ -75,115 +70,7 @@ ProcessingCtrlHandler(DWORD dwCtrlType)
return FALSE; return FALSE;
} }
/* #endif /* MS_WINDOWS */
* Unix only
*/
#else /* !MS_WINDOWS */
#if HAVE_FD_TRANSFER
/* Functions for transferring file descriptors between processes.
Reimplements some of the functionality of the fdcred
module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
/* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */
static PyObject *
multiprocessing_sendfd(PyObject *self, PyObject *args)
{
int conn, fd, res;
struct iovec dummy_iov;
char dummy_char;
struct msghdr msg;
struct cmsghdr *cmsg;
union {
struct cmsghdr hdr;
unsigned char buf[CMSG_SPACE(sizeof(int))];
} cmsgbuf;
if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
return NULL;
dummy_iov.iov_base = &dummy_char;
dummy_iov.iov_len = 1;
memset(&msg, 0, sizeof(msg));
msg.msg_control = &cmsgbuf.buf;
msg.msg_controllen = sizeof(cmsgbuf.buf);
msg.msg_iov = &dummy_iov;
msg.msg_iovlen = 1;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
* (int *) CMSG_DATA(cmsg) = fd;
Py_BEGIN_ALLOW_THREADS
res = sendmsg(conn, &msg, 0);
Py_END_ALLOW_THREADS
if (res < 0)
return PyErr_SetFromErrno(PyExc_OSError);
Py_RETURN_NONE;
}
static PyObject *
multiprocessing_recvfd(PyObject *self, PyObject *args)
{
int conn, fd, res;
char dummy_char;
struct iovec dummy_iov;
struct msghdr msg = {0};
struct cmsghdr *cmsg;
union {
struct cmsghdr hdr;
unsigned char buf[CMSG_SPACE(sizeof(int))];
} cmsgbuf;
if (!PyArg_ParseTuple(args, "i", &conn))
return NULL;
dummy_iov.iov_base = &dummy_char;
dummy_iov.iov_len = 1;
memset(&msg, 0, sizeof(msg));
msg.msg_control = &cmsgbuf.buf;
msg.msg_controllen = sizeof(cmsgbuf.buf);
msg.msg_iov = &dummy_iov;
msg.msg_iovlen = 1;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
msg.msg_controllen = cmsg->cmsg_len;
Py_BEGIN_ALLOW_THREADS
res = recvmsg(conn, &msg, 0);
Py_END_ALLOW_THREADS
if (res < 0)
return PyErr_SetFromErrno(PyExc_OSError);
if (msg.msg_controllen < CMSG_LEN(sizeof(int)) ||
(cmsg = CMSG_FIRSTHDR(&msg)) == NULL ||
cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SCM_RIGHTS ||
cmsg->cmsg_len < CMSG_LEN(sizeof(int))) {
/* If at least one control message is present, there should be
no room for any further data in the buffer. */
PyErr_SetString(PyExc_RuntimeError, "No file descriptor received");
return NULL;
}
fd = * (int *) CMSG_DATA(cmsg);
return Py_BuildValue("i", fd);
}
#endif /* HAVE_FD_TRANSFER */
#endif /* !MS_WINDOWS */
/* /*
@ -212,16 +99,6 @@ static PyMethodDef module_methods[] = {
{"address_of_buffer", multiprocessing_address_of_buffer, METH_O, {"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
"address_of_buffer(obj) -> int\n" "address_of_buffer(obj) -> int\n"
"Return address of obj assuming obj supports buffer inteface"}, "Return address of obj assuming obj supports buffer inteface"},
#if HAVE_FD_TRANSFER
{"sendfd", multiprocessing_sendfd, METH_VARARGS,
"sendfd(sockfd, fd) -> None\n"
"Send file descriptor given by fd over the unix domain socket\n"
"whose file decriptor is sockfd"},
{"recvfd", multiprocessing_recvfd, METH_VARARGS,
"recvfd(sockfd) -> fd\n"
"Receive a file descriptor over a unix domain socket\n"
"whose file decriptor is sockfd"},
#endif
{NULL} {NULL}
}; };
@ -319,9 +196,6 @@ PyInit__multiprocessing(void)
#ifdef HAVE_SEM_TIMEDWAIT #ifdef HAVE_SEM_TIMEDWAIT
ADD_FLAG(HAVE_SEM_TIMEDWAIT); ADD_FLAG(HAVE_SEM_TIMEDWAIT);
#endif #endif
#ifdef HAVE_FD_TRANSFER
ADD_FLAG(HAVE_FD_TRANSFER);
#endif
#ifdef HAVE_BROKEN_SEM_GETVALUE #ifdef HAVE_BROKEN_SEM_GETVALUE
ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE); ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
#endif #endif

View File

@ -3,12 +3,6 @@
#define PY_SSIZE_T_CLEAN #define PY_SSIZE_T_CLEAN
#ifdef __sun
/* The control message API is only available on Solaris
if XPG 4.2 or later is requested. */
#define _XOPEN_SOURCE 500
#endif
#include "Python.h" #include "Python.h"
#include "structmember.h" #include "structmember.h"
#include "pythread.h" #include "pythread.h"
@ -29,10 +23,6 @@
# define SEM_VALUE_MAX LONG_MAX # define SEM_VALUE_MAX LONG_MAX
#else #else
# include <fcntl.h> /* O_CREAT and O_EXCL */ # include <fcntl.h> /* O_CREAT and O_EXCL */
# include <netinet/in.h>
# include <sys/socket.h>
# include <sys/uio.h>
# include <arpa/inet.h> /* htonl() and ntohl() */
# if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED) # if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)
# include <semaphore.h> # include <semaphore.h>
typedef sem_t *SEM_HANDLE; typedef sem_t *SEM_HANDLE;