mirror of https://github.com/python/cpython
bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (#3247)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed * Avoid mucking with process state in test. Add a warning if the semaphore process died, as semaphores may then be leaked. * Add NEWS entry
This commit is contained in:
parent
fc6b348b12
commit
cbe1756e3e
|
@ -29,6 +29,7 @@ class SemaphoreTracker(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._fd = None
|
self._fd = None
|
||||||
|
self._pid = None
|
||||||
|
|
||||||
def getfd(self):
|
def getfd(self):
|
||||||
self.ensure_running()
|
self.ensure_running()
|
||||||
|
@ -40,8 +41,20 @@ class SemaphoreTracker(object):
|
||||||
This can be run from any process. Usually a child process will use
|
This can be run from any process. Usually a child process will use
|
||||||
the semaphore created by its parent.'''
|
the semaphore created by its parent.'''
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if self._fd is not None:
|
if self._pid is not None:
|
||||||
return
|
# semaphore tracker was launched before, is it still running?
|
||||||
|
pid, status = os.waitpid(self._pid, os.WNOHANG)
|
||||||
|
if not pid:
|
||||||
|
# => still alive
|
||||||
|
return
|
||||||
|
# => dead, launch it again
|
||||||
|
os.close(self._fd)
|
||||||
|
self._fd = None
|
||||||
|
self._pid = None
|
||||||
|
|
||||||
|
warnings.warn('semaphore_tracker: process died unexpectedly, '
|
||||||
|
'relaunching. Some semaphores might leak.')
|
||||||
|
|
||||||
fds_to_pass = []
|
fds_to_pass = []
|
||||||
try:
|
try:
|
||||||
fds_to_pass.append(sys.stderr.fileno())
|
fds_to_pass.append(sys.stderr.fileno())
|
||||||
|
@ -55,12 +68,13 @@ class SemaphoreTracker(object):
|
||||||
exe = spawn.get_executable()
|
exe = spawn.get_executable()
|
||||||
args = [exe] + util._args_from_interpreter_flags()
|
args = [exe] + util._args_from_interpreter_flags()
|
||||||
args += ['-c', cmd % r]
|
args += ['-c', cmd % r]
|
||||||
util.spawnv_passfds(exe, args, fds_to_pass)
|
pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
||||||
except:
|
except:
|
||||||
os.close(w)
|
os.close(w)
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
self._fd = w
|
self._fd = w
|
||||||
|
self._pid = pid
|
||||||
finally:
|
finally:
|
||||||
os.close(r)
|
os.close(r)
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
import queue as pyqueue
|
import queue as pyqueue
|
||||||
|
import contextlib
|
||||||
import time
|
import time
|
||||||
import io
|
import io
|
||||||
import itertools
|
import itertools
|
||||||
|
@ -4344,14 +4345,14 @@ class TestStartMethod(unittest.TestCase):
|
||||||
self.fail("failed spawning forkserver or grandchild")
|
self.fail("failed spawning forkserver or grandchild")
|
||||||
|
|
||||||
|
|
||||||
#
|
|
||||||
# Check that killing process does not leak named semaphores
|
|
||||||
#
|
|
||||||
|
|
||||||
@unittest.skipIf(sys.platform == "win32",
|
@unittest.skipIf(sys.platform == "win32",
|
||||||
"test semantics don't make sense on Windows")
|
"test semantics don't make sense on Windows")
|
||||||
class TestSemaphoreTracker(unittest.TestCase):
|
class TestSemaphoreTracker(unittest.TestCase):
|
||||||
|
|
||||||
def test_semaphore_tracker(self):
|
def test_semaphore_tracker(self):
|
||||||
|
#
|
||||||
|
# Check that killing process does not leak named semaphores
|
||||||
|
#
|
||||||
import subprocess
|
import subprocess
|
||||||
cmd = '''if 1:
|
cmd = '''if 1:
|
||||||
import multiprocessing as mp, time, os
|
import multiprocessing as mp, time, os
|
||||||
|
@ -4385,6 +4386,40 @@ class TestSemaphoreTracker(unittest.TestCase):
|
||||||
self.assertRegex(err, expected)
|
self.assertRegex(err, expected)
|
||||||
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
|
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
|
||||||
|
|
||||||
|
def check_semaphore_tracker_death(self, signum, should_die):
|
||||||
|
# bpo-31310: if the semaphore tracker process has died, it should
|
||||||
|
# be restarted implicitly.
|
||||||
|
from multiprocessing.semaphore_tracker import _semaphore_tracker
|
||||||
|
_semaphore_tracker.ensure_running()
|
||||||
|
pid = _semaphore_tracker._pid
|
||||||
|
os.kill(pid, signum)
|
||||||
|
time.sleep(1.0) # give it time to die
|
||||||
|
|
||||||
|
ctx = multiprocessing.get_context("spawn")
|
||||||
|
with contextlib.ExitStack() as stack:
|
||||||
|
if should_die:
|
||||||
|
stack.enter_context(self.assertWarnsRegex(
|
||||||
|
UserWarning,
|
||||||
|
"semaphore_tracker: process died"))
|
||||||
|
sem = ctx.Semaphore()
|
||||||
|
sem.acquire()
|
||||||
|
sem.release()
|
||||||
|
wr = weakref.ref(sem)
|
||||||
|
# ensure `sem` gets collected, which triggers communication with
|
||||||
|
# the semaphore tracker
|
||||||
|
del sem
|
||||||
|
gc.collect()
|
||||||
|
self.assertIsNone(wr())
|
||||||
|
|
||||||
|
def test_semaphore_tracker_sigint(self):
|
||||||
|
# Catchable signal (ignored by semaphore tracker)
|
||||||
|
self.check_semaphore_tracker_death(signal.SIGINT, False)
|
||||||
|
|
||||||
|
def test_semaphore_tracker_sigkill(self):
|
||||||
|
# Uncatchable signal.
|
||||||
|
self.check_semaphore_tracker_death(signal.SIGKILL, True)
|
||||||
|
|
||||||
|
|
||||||
class TestSimpleQueue(unittest.TestCase):
|
class TestSimpleQueue(unittest.TestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
multiprocessing's semaphore tracker should be launched again if crashed.
|
Loading…
Reference in New Issue