From 020436b0d4ae271638ed5d0881c1fa7f7c0a1b09 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 2 Jul 2011 21:20:25 +0200 Subject: [PATCH] Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor. --- Lib/concurrent/futures/process.py | 30 ++++++++++++++++++++--------- Lib/test/test_concurrent_futures.py | 7 +++++++ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index c2331e7da97..689f9ba99c5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -50,7 +50,7 @@ import os from concurrent.futures import _base import queue import multiprocessing -from multiprocessing.queues import SimpleQueue, SentinelReady +from multiprocessing.queues import SimpleQueue, SentinelReady, Full import threading import weakref @@ -195,6 +195,10 @@ def _queue_management_worker(executor_reference, result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """ + executor = None + + def shutting_down(): + return _shutdown or executor is None or executor._shutdown_thread def shutdown_worker(): # This is an upper bound @@ -202,8 +206,7 @@ def _queue_management_worker(executor_reference, for i in range(0, nb_children_alive): call_queue.put(None) # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. + # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() @@ -222,7 +225,7 @@ def _queue_management_worker(executor_reference, if executor is not None: executor._broken = True executor._shutdown_thread = True - del executor + executor = None # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( @@ -242,7 +245,11 @@ def _queue_management_worker(executor_reference, if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) + assert shutting_down() del processes[result_item] + if not processes: + shutdown_worker() + return elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) @@ -257,16 +264,21 @@ def _queue_management_worker(executor_reference, # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: + if shutting_down(): # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. - if not pending_work_items: + if not pending_work_items and call_queue.qsize() == 0: shutdown_worker() return - else: + try: # Start shutting down by telling a process it can exit. - call_queue.put(None) - del executor + call_queue.put_nowait(None) + except Full: + # This is not a problem: we will eventually be woken up (in + # result_queue.get()) and be able to send a sentinel again, + # if necessary. + pass + executor = None _system_limits_checked = False _system_limited = None diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 5968980ff1e..fda6f5b807e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -367,6 +367,13 @@ class ExecutorTest(unittest.TestCase): self.assertEqual([None, None], results) + def test_shutdown_race_issue12456(self): + # Issue #12456: race condition at shutdown where trying to post a + # sentinel in the call queue blocks (the queue is full while processes + # have exited). + self.executor.map(str, [2] * (self.worker_count + 1)) + self.executor.shutdown() + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): def test_map_submits_without_iteration(self):