Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor.

This commit is contained in:
Antoine Pitrou 2011-07-02 21:20:25 +02:00
parent aac0f75b3b
commit 020436b0d4
2 changed files with 28 additions and 9 deletions

View File

@ -50,7 +50,7 @@ import os
from concurrent.futures import _base from concurrent.futures import _base
import queue import queue
import multiprocessing import multiprocessing
from multiprocessing.queues import SimpleQueue, SentinelReady from multiprocessing.queues import SimpleQueue, SentinelReady, Full
import threading import threading
import weakref import weakref
@ -195,6 +195,10 @@ def _queue_management_worker(executor_reference,
result_queue: A multiprocessing.Queue of _ResultItems generated by the result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers. process workers.
""" """
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker(): def shutdown_worker():
# This is an upper bound # This is an upper bound
@ -202,8 +206,7 @@ def _queue_management_worker(executor_reference,
for i in range(0, nb_children_alive): for i in range(0, nb_children_alive):
call_queue.put(None) call_queue.put(None)
# If .join() is not called on the created processes then # If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS # some multiprocessing.Queue methods may deadlock on Mac OS X.
# X.
for p in processes.values(): for p in processes.values():
p.join() p.join()
@ -222,7 +225,7 @@ def _queue_management_worker(executor_reference,
if executor is not None: if executor is not None:
executor._broken = True executor._broken = True
executor._shutdown_thread = True executor._shutdown_thread = True
del executor executor = None
# All futures in flight must be marked failed # All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items(): for work_id, work_item in pending_work_items.items():
work_item.future.set_exception( work_item.future.set_exception(
@ -242,7 +245,11 @@ def _queue_management_worker(executor_reference,
if isinstance(result_item, int): if isinstance(result_item, int):
# Clean shutdown of a worker using its PID # Clean shutdown of a worker using its PID
# (avoids marking the executor broken) # (avoids marking the executor broken)
assert shutting_down()
del processes[result_item] del processes[result_item]
if not processes:
shutdown_worker()
return
elif result_item is not None: elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None) work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above) # work_item can be None if another process terminated (see above)
@ -257,16 +264,21 @@ def _queue_management_worker(executor_reference,
# - The interpreter is shutting down OR # - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR # - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown. # - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread: if shutting_down():
# Since no new work items can be added, it is safe to shutdown # Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items. # this thread if there are no pending work items.
if not pending_work_items: if not pending_work_items and call_queue.qsize() == 0:
shutdown_worker() shutdown_worker()
return return
else: try:
# Start shutting down by telling a process it can exit. # Start shutting down by telling a process it can exit.
call_queue.put(None) call_queue.put_nowait(None)
del executor except Full:
# This is not a problem: we will eventually be woken up (in
# result_queue.get()) and be able to send a sentinel again,
# if necessary.
pass
executor = None
_system_limits_checked = False _system_limits_checked = False
_system_limited = None _system_limited = None

View File

@ -367,6 +367,13 @@ class ExecutorTest(unittest.TestCase):
self.assertEqual([None, None], results) self.assertEqual([None, None], results)
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
# have exited).
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self): def test_map_submits_without_iteration(self):