diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 7e828b69b0f..3dfa2cc6caf 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -708,7 +708,8 @@ Connection objects usually created using :func:`Pipe` -- see also Send an object to the other end of the connection which should be read using :meth:`recv`. - The object must be picklable. + The object must be picklable. Very large pickles (approximately 32 MB+, + though it depends on the OS) may raise a ValueError exception. .. method:: recv() @@ -740,7 +741,9 @@ Connection objects usually created using :func:`Pipe` -- see also complete message. If *offset* is given then data is read from that position in *buffer*. If - *size* is given then that many bytes will be read from buffer. + *size* is given then that many bytes will be read from buffer. Very large + buffers (approximately 32 MB+, though it depends on the OS) may raise a + ValueError exception .. method:: recv_bytes([maxlength]) @@ -834,6 +837,12 @@ object -- see :ref:`multiprocessing-managers`. .. class:: Event() A clone of :class:`threading.Event`. + This method returns the state of the internal semaphore on exit, so it + will always return ``True`` except if a timeout is given and the operation + times out. + + .. versionchanged:: 2.7 + Previously, the method always returned ``None``. .. class:: Lock() diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 08d7c5deb5c..8e994df6296 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -301,5 +301,10 @@ class Event(object): self._flag.release() else: self._cond.wait(timeout) + + if self._flag.acquire(False): + self._flag.release() + return True + return False finally: self._cond.release() diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index be108b1d30f..69309ba5c1d 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -750,20 +750,22 @@ class _TestEvent(BaseTestCase): # Removed temporaily, due to API shear, this does not # work with threading._Event objects. is_set == isSet - #self.assertEqual(event.is_set(), False) + self.assertEqual(event.is_set(), False) - self.assertEqual(wait(0.0), None) + # Removed, threading.Event.wait() will return the value of the __flag + # instead of None. API Shear with the semaphore backed mp.Event + self.assertEqual(wait(0.0), False) self.assertTimingAlmostEqual(wait.elapsed, 0.0) - self.assertEqual(wait(TIMEOUT1), None) + self.assertEqual(wait(TIMEOUT1), False) self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) event.set() # See note above on the API differences - # self.assertEqual(event.is_set(), True) - self.assertEqual(wait(), None) + self.assertEqual(event.is_set(), True) + self.assertEqual(wait(), True) self.assertTimingAlmostEqual(wait.elapsed, 0.0) - self.assertEqual(wait(TIMEOUT1), None) + self.assertEqual(wait(TIMEOUT1), True) self.assertTimingAlmostEqual(wait.elapsed, 0.0) # self.assertEqual(event.is_set(), True) @@ -772,7 +774,7 @@ class _TestEvent(BaseTestCase): #self.assertEqual(event.is_set(), False) self.Process(target=self._test_event, args=(event,)).start() - self.assertEqual(wait(), None) + self.assertEqual(wait(), True) # # diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h index 581beacf9f6..2ee5368ef37 100644 --- a/Modules/_multiprocessing/connection.h +++ b/Modules/_multiprocessing/connection.h @@ -139,8 +139,12 @@ connection_sendbytes(ConnectionObject *self, PyObject *args) res = conn_send_string(self, buffer + offset, size); PyBuffer_Release(&pbuffer); - if (res < 0) - return mp_SetError(PyExc_IOError, res); + if (res < 0) { + if (PyErr_Occurred()) + return NULL; + else + return mp_SetError(PyExc_IOError, res); + } Py_RETURN_NONE; } diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c index b18f71499af..afa25994699 100644 --- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -8,6 +8,12 @@ #include "multiprocessing.h" +#ifdef SCM_RIGHTS + #define HAVE_FD_TRANSFER 1 +#else + #define HAVE_FD_TRANSFER 0 +#endif + PyObject *create_win32_namespace(void); PyObject *pickle_dumps, *pickle_loads, *pickle_protocol; @@ -257,7 +263,7 @@ PyInit__multiprocessing(void) Py_INCREF(&ConnectionType); PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType); -#if defined(MS_WINDOWS) || HAVE_SEM_OPEN +#if defined(MS_WINDOWS) || defined(HAVE_SEM_OPEN) /* Add SemLock type to module */ if (PyType_Ready(&SemLockType) < 0) return NULL; diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h index 4f4f9d735e7..f8750d21540 100644 --- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -27,7 +27,7 @@ # include # include # include /* htonl() and ntohl() */ -# if HAVE_SEM_OPEN +# ifdef HAVE_SEM_OPEN # include typedef sem_t *SEM_HANDLE; # endif @@ -45,13 +45,18 @@ * Issue 3110 - Solaris does not define SEM_VALUE_MAX */ #ifndef SEM_VALUE_MAX -# ifdef _SEM_VALUE_MAX -# define SEM_VALUE_MAX _SEM_VALUE_MAX -# else -# define SEM_VALUE_MAX INT_MAX -# endif + #if defined(HAVE_SYSCONF) && defined(_SC_SEM_VALUE_MAX) + # define SEM_VALUE_MAX sysconf(_SC_SEM_VALUE_MAX) + #elif defined(_SEM_VALUE_MAX) + # define SEM_VALUE_MAX _SEM_VALUE_MAX + #elif definef(_POSIX_SEM_VALUE_MAX) + # define SEM_VALUE_MAX _POSIX_SEM_VALUE_MAX + #else + # define SEM_VALUE_MAX INT_MAX + #endif #endif + /* * Make sure Py_ssize_t available */ diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c index 27e79dda7d4..66947c838f2 100644 --- a/Modules/_multiprocessing/pipe_connection.c +++ b/Modules/_multiprocessing/pipe_connection.c @@ -23,6 +23,12 @@ conn_send_string(ConnectionObject *conn, char *string, size_t length) Py_BEGIN_ALLOW_THREADS ret = WriteFile(conn->handle, string, length, &amount_written, NULL); Py_END_ALLOW_THREADS + + if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) { + PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length); + return MP_STANDARD_ERROR; + } + return ret ? MP_SUCCESS : MP_STANDARD_ERROR; } diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c index 40bd7c3c88c..c0944c95c0e 100644 --- a/Modules/_multiprocessing/semaphore.c +++ b/Modules/_multiprocessing/semaphore.c @@ -197,11 +197,11 @@ semlock_release(SemLockObject *self, PyObject *args) #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval) #define SEM_UNLINK(name) sem_unlink(name) -#if HAVE_BROKEN_SEM_UNLINK +#ifndef HAVE_SEM_UNLINK # define sem_unlink(name) 0 #endif -#if !HAVE_SEM_TIMEDWAIT +#ifndef HAVE_SEM_TIMEDWAIT # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save) int @@ -348,7 +348,7 @@ semlock_release(SemLockObject *self, PyObject *args) } assert(self->count == 1); } else { -#if HAVE_BROKEN_SEM_GETVALUE +#ifdef HAVE_BROKEN_SEM_GETVALUE /* We will only check properly the maxvalue == 1 case */ if (self->maxvalue == 1) { /* make sure that already locked */ @@ -494,7 +494,7 @@ semlock_ismine(SemLockObject *self) static PyObject * semlock_getvalue(SemLockObject *self) { -#if HAVE_BROKEN_SEM_GETVALUE +#ifdef HAVE_BROKEN_SEM_GETVALUE PyErr_SetNone(PyExc_NotImplementedError); return NULL; #else @@ -512,7 +512,7 @@ semlock_getvalue(SemLockObject *self) static PyObject * semlock_iszero(SemLockObject *self) { -#if HAVE_BROKEN_SEM_GETVALUE +#ifdef HAVE_BROKEN_SEM_GETVALUE if (sem_trywait(self->handle) < 0) { if (errno == EAGAIN) Py_RETURN_TRUE; diff --git a/configure b/configure index 25242fed4f3..f5d7352bed0 100755 --- a/configure +++ b/configure @@ -1,5 +1,5 @@ #! /bin/sh -# From configure.in Revision: 70732 . +# From configure.in Revision: 71261 . # Guess values for system-dependent variables and create Makefiles. # Generated by GNU Autoconf 2.61 for python 3.1. # @@ -16259,6 +16259,10 @@ echo "${ECHO_T}MACHDEP_OBJS" >&6; } + + + + @@ -16271,7 +16275,8 @@ for ac_func in alarm setitimer getitimer bind_textdomain_codeset chown \ kill killpg lchmod lchown lstat mkfifo mknod mktime \ mremap nice pathconf pause plock poll pthread_init \ putenv readlink realpath \ - select setegid seteuid setgid \ + select sem_open sem_timedwait sem_getvalue sem_unlink setegid seteuid \ + setgid \ setlocale setregid setreuid setsid setpgid setpgrp setuid setvbuf snprintf \ sigaction siginterrupt sigrelse strftime strlcpy \ sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \ @@ -21692,6 +21697,86 @@ _ACEOF fi +# Multiprocessing check for broken sem_getvalue +{ echo "$as_me:$LINENO: checking for broken sem_getvalue" >&5 +echo $ECHO_N "checking for broken sem_getvalue... $ECHO_C" >&6; } +if test "$cross_compiling" = yes; then + { { echo "$as_me:$LINENO: error: cannot run test program while cross compiling +See \`config.log' for more details." >&5 +echo "$as_me: error: cannot run test program while cross compiling +See \`config.log' for more details." >&2;} + { (exit 1); exit 1; }; } +else + cat >conftest.$ac_ext <<_ACEOF +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ + +#include +#include +#include +#include +#include + +int main(void){ + sem_t *a = sem_open("/autoconf", O_CREAT, S_IRUSR|S_IWUSR, 0); + int count; + int res; + if(a==SEM_FAILED){ + perror("sem_open"); + return 1; + + } + res = sem_getvalue(a, &count); + sem_close(a); + return res==-1 ? 1 : 0; +} + + +_ACEOF +rm -f conftest$ac_exeext +if { (ac_try="$ac_link" +case "(($ac_try" in + *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;; + *) ac_try_echo=$ac_try;; +esac +eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5 + (eval "$ac_link") 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && { ac_try='./conftest$ac_exeext' + { (case "(($ac_try" in + *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;; + *) ac_try_echo=$ac_try;; +esac +eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5 + (eval "$ac_try") 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + { echo "$as_me:$LINENO: result: no" >&5 +echo "${ECHO_T}no" >&6; } +else + echo "$as_me: program exited with status $ac_status" >&5 +echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +( exit $ac_status ) +{ echo "$as_me:$LINENO: result: yes" >&5 +echo "${ECHO_T}yes" >&6; } + +cat >>confdefs.h <<\_ACEOF +#define HAVE_BROKEN_SEM_GETVALUE 1 +_ACEOF + + +fi +rm -f core *.core core.conftest.* gmon.out bb.out conftest$ac_exeext conftest.$ac_objext conftest.$ac_ext +fi + + # On FreeBSD 6.2, it appears that tanh(-0.) returns 0. instead of # -0. on some architectures. diff --git a/configure.in b/configure.in index aed3e144159..40fa044ba86 100644 --- a/configure.in +++ b/configure.in @@ -2388,7 +2388,8 @@ AC_CHECK_FUNCS(alarm setitimer getitimer bind_textdomain_codeset chown \ kill killpg lchmod lchown lstat mkfifo mknod mktime \ mremap nice pathconf pause plock poll pthread_init \ putenv readlink realpath \ - select setegid seteuid setgid \ + select sem_open sem_timedwait sem_getvalue sem_unlink setegid seteuid \ + setgid \ setlocale setregid setreuid setsid setpgid setpgrp setuid setvbuf snprintf \ sigaction siginterrupt sigrelse strftime strlcpy \ sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \ @@ -3108,6 +3109,33 @@ then [Define if arithmetic is subject to x87-style double rounding issue]) fi +# Multiprocessing check for broken sem_getvalue +AC_MSG_CHECKING(for broken sem_getvalue) +AC_TRY_RUN([ +#include +#include +#include +#include +#include + +int main(void){ + sem_t *a = sem_open("/autoconf", O_CREAT, S_IRUSR|S_IWUSR, 0); + int count; + int res; + if(a==SEM_FAILED){ + perror("sem_open"); + return 1; + + } + res = sem_getvalue(a, &count); + sem_close(a); + return res==-1 ? 1 : 0; +} +] +,AC_MSG_RESULT(no), + AC_MSG_RESULT(yes) + AC_DEFINE(HAVE_BROKEN_SEM_GETVALUE, 1, define to 1 if your sem_getvalue is broken.) +) # On FreeBSD 6.2, it appears that tanh(-0.) returns 0. instead of # -0. on some architectures. diff --git a/pyconfig.h.in b/pyconfig.h.in index db7f9dd61c1..40067f6ef82 100644 --- a/pyconfig.h.in +++ b/pyconfig.h.in @@ -74,6 +74,9 @@ /* Define if pthread_sigmask() does not work on your system. */ #undef HAVE_BROKEN_PTHREAD_SIGMASK +/* define to 1 if your sem_getvalue is broken. */ +#undef HAVE_BROKEN_SEM_GETVALUE + /* Define this if you have the type _Bool. */ #undef HAVE_C99_BOOL @@ -505,6 +508,18 @@ /* Define to 1 if you have the `select' function. */ #undef HAVE_SELECT +/* Define to 1 if you have the `sem_getvalue' function. */ +#undef HAVE_SEM_GETVALUE + +/* Define to 1 if you have the `sem_open' function. */ +#undef HAVE_SEM_OPEN + +/* Define to 1 if you have the `sem_timedwait' function. */ +#undef HAVE_SEM_TIMEDWAIT + +/* Define to 1 if you have the `sem_unlink' function. */ +#undef HAVE_SEM_UNLINK + /* Define to 1 if you have the `setegid' function. */ #undef HAVE_SETEGID diff --git a/setup.py b/setup.py index 3c251724350..29dc59ee656 100644 --- a/setup.py +++ b/setup.py @@ -988,56 +988,29 @@ class PyBuildExt(build_ext): libraries = ['ws2_32'] elif platform == 'darwin': # Mac OSX - macros = dict( - HAVE_SEM_OPEN=1, - HAVE_SEM_TIMEDWAIT=0, - HAVE_FD_TRANSFER=1, - HAVE_BROKEN_SEM_GETVALUE=1 - ) + macros = dict() libraries = [] elif platform == 'cygwin': # Cygwin - macros = dict( - HAVE_SEM_OPEN=1, - HAVE_SEM_TIMEDWAIT=1, - HAVE_FD_TRANSFER=0, - HAVE_BROKEN_SEM_UNLINK=1 - ) + macros = dict() libraries = [] elif platform in ('freebsd4', 'freebsd5', 'freebsd6', 'freebsd7', 'freebsd8'): # FreeBSD's P1003.1b semaphore support is very experimental # and has many known problems. (as of June 2008) - macros = dict( # FreeBSD - HAVE_SEM_OPEN=0, - HAVE_SEM_TIMEDWAIT=0, - HAVE_FD_TRANSFER=1, - ) + macros = dict() libraries = [] elif platform.startswith('openbsd'): - macros = dict( # OpenBSD - HAVE_SEM_OPEN=0, # Not implemented - HAVE_SEM_TIMEDWAIT=0, - HAVE_FD_TRANSFER=1, - ) + macros = dict() libraries = [] elif platform.startswith('netbsd'): - macros = dict( # at least NetBSD 5 - HAVE_SEM_OPEN=1, - HAVE_SEM_TIMEDWAIT=0, - HAVE_FD_TRANSFER=1, - HAVE_BROKEN_SEM_GETVALUE=1 - ) + macros = dict() libraries = [] else: # Linux and other unices - macros = dict( - HAVE_SEM_OPEN=1, - HAVE_SEM_TIMEDWAIT=1, - HAVE_FD_TRANSFER=1 - ) + macros = dict() libraries = ['rt'] if platform == 'win32': @@ -1052,8 +1025,7 @@ class PyBuildExt(build_ext): multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c', '_multiprocessing/socket_connection.c' ] - - if macros.get('HAVE_SEM_OPEN', False): + if sysconfig.get_config_var('HAVE_SEM_OPEN'): multiprocessing_srcs.append('_multiprocessing/semaphore.c') if sysconfig.get_config_var('WITH_THREAD'):