mirror of https://github.com/python/cpython
gh-90622: Do not spawn ProcessPool workers on demand via fork method. (GH-91598) (#92495)
Do not spawn ProcessPool workers on demand when they spawn via fork.
This avoids potential deadlocks in the child processes due to forking from
a multithreaded process.
(cherry picked from commit ebb37fc3fd
)
Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
parent
5917e71017
commit
4270b7927d
|
@ -652,6 +652,10 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
mp_context = mp.get_context()
|
mp_context = mp.get_context()
|
||||||
self._mp_context = mp_context
|
self._mp_context = mp_context
|
||||||
|
|
||||||
|
# https://github.com/python/cpython/issues/90622
|
||||||
|
self._safe_to_dynamically_spawn_children = (
|
||||||
|
self._mp_context.get_start_method(allow_none=False) != "fork")
|
||||||
|
|
||||||
if initializer is not None and not callable(initializer):
|
if initializer is not None and not callable(initializer):
|
||||||
raise TypeError("initializer must be a callable")
|
raise TypeError("initializer must be a callable")
|
||||||
self._initializer = initializer
|
self._initializer = initializer
|
||||||
|
@ -714,6 +718,8 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
def _start_executor_manager_thread(self):
|
def _start_executor_manager_thread(self):
|
||||||
if self._executor_manager_thread is None:
|
if self._executor_manager_thread is None:
|
||||||
# Start the processes so that their sentinels are known.
|
# Start the processes so that their sentinels are known.
|
||||||
|
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
|
||||||
|
self._launch_processes()
|
||||||
self._executor_manager_thread = _ExecutorManagerThread(self)
|
self._executor_manager_thread = _ExecutorManagerThread(self)
|
||||||
self._executor_manager_thread.start()
|
self._executor_manager_thread.start()
|
||||||
_threads_wakeups[self._executor_manager_thread] = \
|
_threads_wakeups[self._executor_manager_thread] = \
|
||||||
|
@ -726,6 +732,23 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
|
|
||||||
process_count = len(self._processes)
|
process_count = len(self._processes)
|
||||||
if process_count < self._max_workers:
|
if process_count < self._max_workers:
|
||||||
|
# Assertion disabled as this codepath is also used to replace a
|
||||||
|
# worker that unexpectedly dies, even when using the 'fork' start
|
||||||
|
# method. That means there is still a potential deadlock bug. If a
|
||||||
|
# 'fork' mp_context worker dies, we'll be forking a new one when
|
||||||
|
# we know a thread is running (self._executor_manager_thread).
|
||||||
|
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
|
||||||
|
self._spawn_process()
|
||||||
|
|
||||||
|
def _launch_processes(self):
|
||||||
|
# https://github.com/python/cpython/issues/90622
|
||||||
|
assert not self._executor_manager_thread, (
|
||||||
|
'Processes cannot be fork()ed after the thread has started, '
|
||||||
|
'deadlock in the child processes could result.')
|
||||||
|
for _ in range(len(self._processes), self._max_workers):
|
||||||
|
self._spawn_process()
|
||||||
|
|
||||||
|
def _spawn_process(self):
|
||||||
p = self._mp_context.Process(
|
p = self._mp_context.Process(
|
||||||
target=_process_worker,
|
target=_process_worker,
|
||||||
args=(self._call_queue,
|
args=(self._call_queue,
|
||||||
|
@ -755,6 +778,7 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
# Wake up queue management thread
|
# Wake up queue management thread
|
||||||
self._executor_manager_thread_wakeup.wakeup()
|
self._executor_manager_thread_wakeup.wakeup()
|
||||||
|
|
||||||
|
if self._safe_to_dynamically_spawn_children:
|
||||||
self._adjust_process_count()
|
self._adjust_process_count()
|
||||||
self._start_executor_manager_thread()
|
self._start_executor_manager_thread()
|
||||||
return f
|
return f
|
||||||
|
|
|
@ -497,10 +497,16 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
|
|
||||||
mp_context = self.get_context()
|
mp_context = self.get_context()
|
||||||
|
if mp_context.get_start_method(allow_none=False) == "fork":
|
||||||
|
# fork pre-spawns, not on demand.
|
||||||
|
expected_num_processes = self.worker_count
|
||||||
|
else:
|
||||||
|
expected_num_processes = 3
|
||||||
|
|
||||||
sem = mp_context.Semaphore(0)
|
sem = mp_context.Semaphore(0)
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
self.executor.submit(acquire_lock, sem)
|
self.executor.submit(acquire_lock, sem)
|
||||||
self.assertEqual(len(self.executor._processes), 3)
|
self.assertEqual(len(self.executor._processes), expected_num_processes)
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
sem.release()
|
sem.release()
|
||||||
processes = self.executor._processes
|
processes = self.executor._processes
|
||||||
|
@ -1021,6 +1027,8 @@ class ProcessPoolExecutorTest(ExecutorTest):
|
||||||
def test_idle_process_reuse_one(self):
|
def test_idle_process_reuse_one(self):
|
||||||
executor = self.executor
|
executor = self.executor
|
||||||
assert executor._max_workers >= 4
|
assert executor._max_workers >= 4
|
||||||
|
if self.get_context().get_start_method(allow_none=False) == "fork":
|
||||||
|
raise unittest.SkipTest("Incompatible with the fork start method.")
|
||||||
executor.submit(mul, 21, 2).result()
|
executor.submit(mul, 21, 2).result()
|
||||||
executor.submit(mul, 6, 7).result()
|
executor.submit(mul, 6, 7).result()
|
||||||
executor.submit(mul, 3, 14).result()
|
executor.submit(mul, 3, 14).result()
|
||||||
|
@ -1029,6 +1037,8 @@ class ProcessPoolExecutorTest(ExecutorTest):
|
||||||
def test_idle_process_reuse_multiple(self):
|
def test_idle_process_reuse_multiple(self):
|
||||||
executor = self.executor
|
executor = self.executor
|
||||||
assert executor._max_workers <= 5
|
assert executor._max_workers <= 5
|
||||||
|
if self.get_context().get_start_method(allow_none=False) == "fork":
|
||||||
|
raise unittest.SkipTest("Incompatible with the fork start method.")
|
||||||
executor.submit(mul, 12, 7).result()
|
executor.submit(mul, 12, 7).result()
|
||||||
executor.submit(mul, 33, 25)
|
executor.submit(mul, 33, 25)
|
||||||
executor.submit(mul, 25, 26).result()
|
executor.submit(mul, 25, 26).result()
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no
|
||||||
|
longer spawned on demand (a feature added in 3.9) when the multiprocessing
|
||||||
|
context start method is ``"fork"`` as that can lead to deadlocks in the
|
||||||
|
child processes due to a fork happening while threads are running.
|
Loading…
Reference in New Issue