bpo-39360: Ensure all workers exit when finalizing a multiprocessing Pool (GH-19009)
When the pull is not used via the context manager or terminate() is called, there is a system in multiprocessing.util that handles finalization of all pools via an atexit handler (the Finalize) class. This class registers the _terminate_pool handler in the registry of finalizers of the module, and that registry is called on interpreter exit via _exit_function. The problem is that the "happy" path with the context manager or manual call to finalize() does some extra steps that _terminate_pool does not. The step that is not executed when the atexit() handler calls _terminate_pool is pinging the _change_notifier queue to unblock the maintenance threads. This commit moves the notification to the _terminate_pool function so is called from both code paths. Co-authored-by: Pablo Galindo <Pablogsal@gmail.com>
This commit is contained in:
parent
c81609e44e
commit
ac10e0c932
|
@ -651,8 +651,6 @@ class Pool(object):
|
|||
def terminate(self):
|
||||
util.debug('terminating pool')
|
||||
self._state = TERMINATE
|
||||
self._worker_handler._state = TERMINATE
|
||||
self._change_notifier.put(None)
|
||||
self._terminate()
|
||||
|
||||
def join(self):
|
||||
|
@ -682,7 +680,12 @@ class Pool(object):
|
|||
# this is guaranteed to only be called once
|
||||
util.debug('finalizing pool')
|
||||
|
||||
# Notify that the worker_handler state has been changed so the
|
||||
# _handle_workers loop can be unblocked (and exited) in order to
|
||||
# send the finalization sentinel all the workers.
|
||||
worker_handler._state = TERMINATE
|
||||
change_notifier.put(None)
|
||||
|
||||
task_handler._state = TERMINATE
|
||||
|
||||
util.debug('helping task handler/workers to finish')
|
||||
|
|
|
@ -2780,6 +2780,24 @@ class _TestPoolWorkerLifetime(BaseTestCase):
|
|||
for (j, res) in enumerate(results):
|
||||
self.assertEqual(res.get(), sqr(j))
|
||||
|
||||
def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
|
||||
# tests cases against bpo-38744 and bpo-39360
|
||||
cmd = '''if 1:
|
||||
from multiprocessing import Pool
|
||||
problem = None
|
||||
class A:
|
||||
def __init__(self):
|
||||
self.pool = Pool(processes=1)
|
||||
def test():
|
||||
global problem
|
||||
problem = A()
|
||||
problem.pool.map(float, tuple(range(10)))
|
||||
if __name__ == "__main__":
|
||||
test()
|
||||
'''
|
||||
rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
|
||||
self.assertEqual(rc, 0)
|
||||
|
||||
#
|
||||
# Test of creating a customized manager class
|
||||
#
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
Ensure all workers exit when finalizing a :class:`multiprocessing.Pool` implicitly via the module finalization
|
||||
handlers of multiprocessing. This fixes a deadlock situation that can be experienced when the Pool is not
|
||||
properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya
|
||||
and Pablo Galindo.
|
Loading…
Reference in New Issue