bpo-33613, test_semaphore_tracker_sigint: fix race condition (#7850)
Fail `test_semaphore_tracker_sigint` if no warnings are expected and one is received. Fix race condition when the child receives SIGINT before it can register signal handlers for it. The race condition occurs when the parent calls `_semaphore_tracker.ensure_running()` (which in turn spawns the semaphore_tracker using `_posixsubprocess.fork_exec`), the child registers the signal handlers and the parent tries to kill the child. What seem to happen is that in some slow systems, the parent sends the signal to kill the child before the child protects against the signal.
This commit is contained in:
parent
e9ba3705de
commit
ec74d187f5
|
@ -23,6 +23,9 @@ from . import util
|
||||||
|
|
||||||
__all__ = ['ensure_running', 'register', 'unregister']
|
__all__ = ['ensure_running', 'register', 'unregister']
|
||||||
|
|
||||||
|
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
|
||||||
|
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
||||||
|
|
||||||
|
|
||||||
class SemaphoreTracker(object):
|
class SemaphoreTracker(object):
|
||||||
|
|
||||||
|
@ -43,10 +46,16 @@ class SemaphoreTracker(object):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if self._pid is not None:
|
if self._pid is not None:
|
||||||
# semaphore tracker was launched before, is it still running?
|
# semaphore tracker was launched before, is it still running?
|
||||||
pid, status = os.waitpid(self._pid, os.WNOHANG)
|
try:
|
||||||
|
pid, _ = os.waitpid(self._pid, os.WNOHANG)
|
||||||
|
except ChildProcessError:
|
||||||
|
# The process terminated
|
||||||
|
pass
|
||||||
|
else:
|
||||||
if not pid:
|
if not pid:
|
||||||
# => still alive
|
# => still alive
|
||||||
return
|
return
|
||||||
|
|
||||||
# => dead, launch it again
|
# => dead, launch it again
|
||||||
os.close(self._fd)
|
os.close(self._fd)
|
||||||
self._fd = None
|
self._fd = None
|
||||||
|
@ -68,7 +77,19 @@ class SemaphoreTracker(object):
|
||||||
exe = spawn.get_executable()
|
exe = spawn.get_executable()
|
||||||
args = [exe] + util._args_from_interpreter_flags()
|
args = [exe] + util._args_from_interpreter_flags()
|
||||||
args += ['-c', cmd % r]
|
args += ['-c', cmd % r]
|
||||||
|
# bpo-33613: Register a signal mask that will block the signals.
|
||||||
|
# This signal mask will be inherited by the child that is going
|
||||||
|
# to be spawned and will protect the child from a race condition
|
||||||
|
# that can make the child die before it registers signal handlers
|
||||||
|
# for SIGINT and SIGTERM. The mask is unregistered after spawning
|
||||||
|
# the child.
|
||||||
|
try:
|
||||||
|
if _HAVE_SIGMASK:
|
||||||
|
signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
|
||||||
pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
||||||
|
finally:
|
||||||
|
if _HAVE_SIGMASK:
|
||||||
|
signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
|
||||||
except:
|
except:
|
||||||
os.close(w)
|
os.close(w)
|
||||||
raise
|
raise
|
||||||
|
@ -104,12 +125,13 @@ register = _semaphore_tracker.register
|
||||||
unregister = _semaphore_tracker.unregister
|
unregister = _semaphore_tracker.unregister
|
||||||
getfd = _semaphore_tracker.getfd
|
getfd = _semaphore_tracker.getfd
|
||||||
|
|
||||||
|
|
||||||
def main(fd):
|
def main(fd):
|
||||||
'''Run semaphore tracker.'''
|
'''Run semaphore tracker.'''
|
||||||
# protect the process from ^C and "killall python" etc
|
# protect the process from ^C and "killall python" etc
|
||||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||||
|
if _HAVE_SIGMASK:
|
||||||
|
signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
|
||||||
|
|
||||||
for f in (sys.stdin, sys.stdout):
|
for f in (sys.stdin, sys.stdout):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -20,6 +20,7 @@ import logging
|
||||||
import struct
|
import struct
|
||||||
import operator
|
import operator
|
||||||
import weakref
|
import weakref
|
||||||
|
import warnings
|
||||||
import test.support
|
import test.support
|
||||||
import test.support.script_helper
|
import test.support.script_helper
|
||||||
from test import support
|
from test import support
|
||||||
|
@ -4517,17 +4518,19 @@ class TestSemaphoreTracker(unittest.TestCase):
|
||||||
# bpo-31310: if the semaphore tracker process has died, it should
|
# bpo-31310: if the semaphore tracker process has died, it should
|
||||||
# be restarted implicitly.
|
# be restarted implicitly.
|
||||||
from multiprocessing.semaphore_tracker import _semaphore_tracker
|
from multiprocessing.semaphore_tracker import _semaphore_tracker
|
||||||
|
pid = _semaphore_tracker._pid
|
||||||
|
if pid is not None:
|
||||||
|
os.kill(pid, signal.SIGKILL)
|
||||||
|
os.waitpid(pid, 0)
|
||||||
|
with warnings.catch_warnings(record=True) as all_warn:
|
||||||
_semaphore_tracker.ensure_running()
|
_semaphore_tracker.ensure_running()
|
||||||
pid = _semaphore_tracker._pid
|
pid = _semaphore_tracker._pid
|
||||||
|
|
||||||
os.kill(pid, signum)
|
os.kill(pid, signum)
|
||||||
time.sleep(1.0) # give it time to die
|
time.sleep(1.0) # give it time to die
|
||||||
|
|
||||||
ctx = multiprocessing.get_context("spawn")
|
ctx = multiprocessing.get_context("spawn")
|
||||||
with contextlib.ExitStack() as stack:
|
with warnings.catch_warnings(record=True) as all_warn:
|
||||||
if should_die:
|
|
||||||
stack.enter_context(self.assertWarnsRegex(
|
|
||||||
UserWarning,
|
|
||||||
"semaphore_tracker: process died"))
|
|
||||||
sem = ctx.Semaphore()
|
sem = ctx.Semaphore()
|
||||||
sem.acquire()
|
sem.acquire()
|
||||||
sem.release()
|
sem.release()
|
||||||
|
@ -4537,11 +4540,23 @@ class TestSemaphoreTracker(unittest.TestCase):
|
||||||
del sem
|
del sem
|
||||||
gc.collect()
|
gc.collect()
|
||||||
self.assertIsNone(wr())
|
self.assertIsNone(wr())
|
||||||
|
if should_die:
|
||||||
|
self.assertEqual(len(all_warn), 1)
|
||||||
|
the_warn = all_warn[0]
|
||||||
|
issubclass(the_warn.category, UserWarning)
|
||||||
|
self.assertTrue("semaphore_tracker: process died"
|
||||||
|
in str(the_warn.message))
|
||||||
|
else:
|
||||||
|
self.assertEqual(len(all_warn), 0)
|
||||||
|
|
||||||
def test_semaphore_tracker_sigint(self):
|
def test_semaphore_tracker_sigint(self):
|
||||||
# Catchable signal (ignored by semaphore tracker)
|
# Catchable signal (ignored by semaphore tracker)
|
||||||
self.check_semaphore_tracker_death(signal.SIGINT, False)
|
self.check_semaphore_tracker_death(signal.SIGINT, False)
|
||||||
|
|
||||||
|
def test_semaphore_tracker_sigterm(self):
|
||||||
|
# Catchable signal (ignored by semaphore tracker)
|
||||||
|
self.check_semaphore_tracker_death(signal.SIGTERM, False)
|
||||||
|
|
||||||
def test_semaphore_tracker_sigkill(self):
|
def test_semaphore_tracker_sigkill(self):
|
||||||
# Uncatchable signal.
|
# Uncatchable signal.
|
||||||
self.check_semaphore_tracker_death(signal.SIGKILL, True)
|
self.check_semaphore_tracker_death(signal.SIGKILL, True)
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Fix a race condition in ``multiprocessing.semaphore_tracker`` when the
|
||||||
|
tracker receives SIGINT before it can register signal handlers for ignoring
|
||||||
|
it.
|
Loading…
Reference in New Issue