Issue #12040: Expose a new attribute `sentinel` on instances of

:class:`multiprocessing.Process`.  Also, fix Process.join() to not use
polling anymore, when given a timeout.
This commit is contained in:
Antoine Pitrou 2011-06-06 19:35:31 +02:00
parent f068ab8304
commit 176f07dadf
6 changed files with 108 additions and 14 deletions

View File

@ -411,6 +411,20 @@ The :mod:`multiprocessing` package mostly replicates the API of the
See :ref:`multiprocessing-auth-keys`.
.. attribute:: sentinel
A numeric handle of a system object which will become "ready" when
the process ends.
On Windows, this is an OS handle usable with the ``WaitForSingleObject``
and ``WaitForMultipleObjects`` family of API calls. On Unix, this is
a file descriptor usable with primitives from the :mod:`select` module.
You can use this value if you want to wait on several events at once.
Otherwise calling :meth:`join()` is simpler.
.. versionadded:: 3.3
.. method:: terminate()
Terminate the process. On Unix this is done using the ``SIGTERM`` signal;

View File

@ -101,10 +101,12 @@ else:
if sys.platform != 'win32':
import time
import select
exit = os._exit
duplicate = os.dup
close = os.close
_select = util._eintr_retry(select.select)
#
# We define a Popen class similar to the one from subprocess, but
@ -118,8 +120,12 @@ if sys.platform != 'win32':
sys.stderr.flush()
self.returncode = None
r, w = os.pipe()
self.sentinel = r
self.pid = os.fork()
if self.pid == 0:
os.close(r)
if 'random' in sys.modules:
import random
random.seed()
@ -128,6 +134,11 @@ if sys.platform != 'win32':
sys.stderr.flush()
os._exit(code)
# `w` will be closed when the child exits, at which point `r`
# will become ready for reading (using e.g. select()).
os.close(w)
util.Finalize(self, os.close, (r,))
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
@ -145,20 +156,14 @@ if sys.platform != 'win32':
return self.returncode
def wait(self, timeout=None):
if timeout is None:
return self.poll(0)
deadline = time.time() + timeout
delay = 0.0005
while 1:
res = self.poll()
if res is not None:
break
remaining = deadline - time.time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, 0.05)
time.sleep(delay)
return res
if self.returncode is None:
if timeout is not None:
r = _select([self.sentinel], [], [], timeout)[0]
if not r:
return None
# This shouldn't block if select() returned successfully.
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
return self.returncode
def terminate(self):
if self.returncode is None:
@ -258,6 +263,7 @@ else:
self.pid = pid
self.returncode = None
self._handle = hp
self.sentinel = int(hp)
# send information to child
prep_data = get_preparation_data(process_obj._name)

View File

@ -132,6 +132,7 @@ class Process(object):
else:
from .forking import Popen
self._popen = Popen(self)
self._sentinel = self._popen.sentinel
_current_process._children.add(self)
def terminate(self):
@ -218,6 +219,17 @@ class Process(object):
pid = ident
@property
def sentinel(self):
'''
Return a file descriptor (Unix) or handle (Windows) suitable for
waiting for process termination.
'''
try:
return self._sentinel
except AttributeError:
raise ValueError("process not started")
def __repr__(self):
if self is _current_process:
status = 'started'

View File

@ -32,9 +32,11 @@
# SUCH DAMAGE.
#
import functools
import itertools
import weakref
import atexit
import select
import threading # we want threading to install it's
# cleanup function before multiprocessing does
@ -315,3 +317,22 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
#
# Automatic retry after EINTR
#
def _eintr_retry(func, _errors=(EnvironmentError, select.error)):
@functools.wraps(func)
def wrapped(*args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except _errors as e:
# select.error has no `errno` attribute
if e.args[0] == errno.EINTR:
continue
raise
return wrapped

View File

@ -71,6 +71,23 @@ HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)
WIN32 = (sys.platform == "win32")
if WIN32:
from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
def wait_for_handle(handle, timeout):
if timeout is None or timeout < 0.0:
timeout = INFINITE
else:
timeout = int(1000 * timeout)
return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
else:
from select import select
_select = util._eintr_retry(select)
def wait_for_handle(handle, timeout):
if timeout is not None and timeout < 0.0:
timeout = None
return handle in _select([handle], [], [], timeout)[0]
#
# Some tests require ctypes
@ -307,6 +324,26 @@ class _TestProcess(BaseTestCase):
]
self.assertEqual(result, expected)
@classmethod
def _test_sentinel(cls, event):
event.wait(10.0)
def test_sentinel(self):
if self.TYPE == "threads":
return
event = self.Event()
p = self.Process(target=self._test_sentinel, args=(event,))
with self.assertRaises(ValueError):
p.sentinel
p.start()
self.addCleanup(p.join)
sentinel = p.sentinel
self.assertIsInstance(sentinel, int)
self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
event.set()
p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
#
#
#

View File

@ -187,6 +187,10 @@ Core and Builtins
Library
-------
- Issue #12040: Expose a new attribute ``sentinel`` on instances of
:class:`multiprocessing.Process`. Also, fix Process.join() to not use
polling anymore, when given a timeout.
- Issue #11893: Remove obsolete internal wrapper class ``SSLFakeFile`` in the
smtplib module. Patch by Catalin Iacob.