From 0e89076247580ba0e570c4816f0e5628a7e36e83 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sun, 1 Mar 2020 21:49:14 +0100 Subject: [PATCH] bpo-39678: refactor queue manager thread (GH-18551) --- Lib/concurrent/futures/process.py | 440 +++++++++--------- Lib/test/test_concurrent_futures.py | 16 +- .../2020-02-28-12-59-30.bpo-39678.3idfxM.rst | 2 + 3 files changed, 241 insertions(+), 217 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index d77322831a6..39fadcce027 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -49,7 +49,6 @@ import atexit import os from concurrent.futures import _base import queue -from queue import Full import multiprocessing as mp import multiprocessing.connection from multiprocessing.queues import Queue @@ -176,8 +175,9 @@ class _SafeQueue(Queue): e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) self.thread_wakeup.wakeup() - # work_item can be None if another process terminated. In this case, - # the queue_manager_thread fails all work_items with BrokenProcessPool + # work_item can be None if another process terminated. In this + # case, the executor_manager_thread fails all work_items + # with BrokenProcessPool if work_item is not None: work_item.future.set_exception(e) else: @@ -193,6 +193,7 @@ def _get_chunks(*iterables, chunksize): return yield chunk + def _process_chunk(fn, chunk): """ Processes a chunk of an iterable passed to map. @@ -256,122 +257,123 @@ def _process_worker(call_queue, result_queue, initializer, initargs): del call_item -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - - -def _queue_management_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - thread_wakeup): +class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. - This function is run in a local thread. + The manager is run in a local thread. Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the ctx.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A ctx.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A ctx.SimpleQueue of _ResultItems generated by the - process workers. - thread_wakeup: A _ThreadWakeup to allow waking up the - queue_manager_thread from the main Thread and avoid deadlocks - caused by permanently locked queues. + executor: A reference to the ProcessPoolExecutor that owns + this thread. A weakref will be own by the manager as well as + references to internal objects used to introspect the state of + the executor. """ - executor = None - def shutting_down(): - return (_global_shutdown or executor is None - or executor._shutdown_thread) + def __init__(self, executor): + # Store references to necessary internals of the executor. - def shutdown_worker(): - # This is an upper bound on the number of children alive. - n_children_alive = sum(p.is_alive() for p in processes.values()) - n_children_to_stop = n_children_alive - n_sentinels_sent = 0 - # Send the right number of sentinels, to make sure all children are - # properly terminated. - while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: - for i in range(n_children_to_stop - n_sentinels_sent): - try: - call_queue.put_nowait(None) - n_sentinels_sent += 1 - except Full: - break - n_children_alive = sum(p.is_alive() for p in processes.values()) + # A _ThreadWakeup to allow waking up the queue_manager_thread from the + # main Thread and avoid deadlocks caused by permanently locked queues. + self.thread_wakeup = executor._executor_manager_thread_wakeup - # Release the queue's resources as soon as possible. - call_queue.close() - call_queue.join_thread() - thread_wakeup.close() - # If .join() is not called on the created processes then - # some ctx.Queue methods may deadlock on Mac OS X. - for p in processes.values(): - p.join() + # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used + # to determine if the ProcessPoolExecutor has been garbage collected + # and that the manager can exit. + # When the executor gets garbage collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, thread_wakeup=self.thread_wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + thread_wakeup.wakeup() - result_reader = result_queue._reader - wakeup_reader = thread_wakeup._reader - readers = [result_reader, wakeup_reader] + self.executor_reference = weakref.ref(executor, weakref_cb) - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) + # A list of the ctx.Process instances used as workers. + self.processes = executor._processes + # A ctx.Queue that will be filled with _CallItems derived from + # _WorkItems for processing by the process workers. + self.call_queue = executor._call_queue + + # A ctx.SimpleQueue of _ResultItems generated by the process workers. + self.result_queue = executor._result_queue + + # A queue.Queue of work ids e.g. Queue([5, 6, ...]). + self.work_ids_queue = executor._work_ids + + # A dict mapping work ids to _WorkItems e.g. + # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + self.pending_work_items = executor._pending_work_items + + # Set this thread to be daemonized + super().__init__() + self.daemon = True + + def run(self): + # Main loop for the executor manager thread. + + while True: + self.add_call_item_to_queue() + + result_item, is_broken, cause = self.wait_result_broken_or_wakeup() + + if is_broken: + self.terminate_broken(cause) + return + if result_item is not None: + self.process_result_item(result_item) + # Delete reference to result_item to avoid keeping references + # while waiting on new results. + del result_item + + if self.is_shutting_down(): + self.flag_executor_shutting_down() + + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not self.pending_work_items: + self.join_executor_internals() + return + + def add_call_item_to_queue(self): + # Fills call_queue with _WorkItems from pending_work_items. + # This function never blocks. + while True: + if self.call_queue.full(): + return + try: + work_id = self.work_ids_queue.get(block=False) + except queue.Empty: + return + else: + work_item = self.pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + self.call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del self.pending_work_items[work_id] + continue + + def wait_result_broken_or_wakeup(self): # Wait for a result to be ready in the result_queue while checking # that all worker processes are still running, or for a wake up # signal send. The wake up signals come either from new tasks being # submitted, from the executor being shutdown/gc-ed, or from the # shutdown of the python interpreter. - worker_sentinels = [p.sentinel for p in processes.values()] + result_reader = self.result_queue._reader + wakeup_reader = self.thread_wakeup._reader + readers = [result_reader, wakeup_reader] + worker_sentinels = [p.sentinel for p in self.processes.values()] ready = mp.connection.wait(readers + worker_sentinels) cause = None is_broken = True + result_item = None if result_reader in ready: try: result_item = result_reader.recv() @@ -381,97 +383,135 @@ def _queue_management_worker(executor_reference, elif wakeup_reader in ready: is_broken = False - result_item = None - thread_wakeup.clear() - if is_broken: - # Mark the process pool broken so that submits fail right now. - executor = executor_reference() - if executor is not None: - executor._broken = ('A child process terminated ' - 'abruptly, the process pool is not ' - 'usable anymore') - executor._shutdown_thread = True - executor = None - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") - if cause is not None: - bpe.__cause__ = _RemoteTraceback( - f"\n'''\n{''.join(cause)}'''") - # All futures in flight must be marked failed - for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception(bpe) - # Delete references to object. See issue16284 - del work_item - pending_work_items.clear() - # Terminate remaining workers forcibly: the queues or their - # locks may be in a dirty state and block forever. - for p in processes.values(): - p.terminate() - shutdown_worker() - return + self.thread_wakeup.clear() + + return result_item, is_broken, cause + + def process_result_item(self, result_item): + # Process the received a result_item. This can be either the PID of a + # worker that exited gracefully or a _ResultItem + if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert shutting_down() - p = processes.pop(result_item) + assert self.is_shutting_down() + p = self.processes.pop(result_item) p.join() - if not processes: - shutdown_worker() + if not self.processes: + self.join_executor_internals() return - elif result_item is not None: - work_item = pending_work_items.pop(result_item.work_id, None) + else: + # Received a _ResultItem so mark the future as completed. + work_item = self.pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) if work_item is not None: if result_item.exception: work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) - # Delete references to object. See issue16284 - del work_item - # Delete reference to result_item - del result_item - # Check whether we should start shutting down. - executor = executor_reference() + def is_shutting_down(self): + # Check whether we should start shutting down the executor. + executor = self.executor_reference() # No more work items can be added if: # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if shutting_down(): - try: - # Flag the executor as shutting down as early as possible if it - # is not gc-ed yet. - if executor is not None: - executor._shutdown_thread = True - # Unless there are pending work items, we have nothing to cancel. - if pending_work_items and executor._cancel_pending_futures: - # Cancel all pending futures and update pending_work_items - # to only have futures that are currently running. - new_pending_work_items = {} - for work_id, work_item in pending_work_items.items(): - if not work_item.future.cancel(): - new_pending_work_items[work_id] = work_item + return (_global_shutdown or executor is None + or executor._shutdown_thread) - pending_work_items = new_pending_work_items - # Drain work_ids_queue since we no longer need to - # add items to the call queue. - while True: - try: - work_ids_queue.get_nowait() - except queue.Empty: - break + def terminate_broken(self, cause): + # Terminate the executor because it is in a broken state. The cause + # argument can be used to display more information on the error that + # lead the executor into becoming broken. - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_worker() - return - except Full: - # This is not a problem: we will eventually be woken up (in - # result_queue.get()) and be able to send a sentinel again. - pass - executor = None + # Mark the process pool broken so that submits fail right now. + executor = self.executor_reference() + if executor is not None: + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') + executor._shutdown_thread = True + executor = None + + # All pending tasks are to be marked failed with the following + # BrokenProcessPool error + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") + + # Mark pending tasks as failed. + for work_id, work_item in self.pending_work_items.items(): + work_item.future.set_exception(bpe) + # Delete references to object. See issue16284 + del work_item + self.pending_work_items.clear() + + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in self.processes.values(): + p.terminate() + + # clean up resources + self.join_executor_internals() + + def flag_executor_shutting_down(self): + # Flag the executor as shutting down and cancel remaining tasks if + # requested as early as possible if it is not gc-ed yet. + executor = self.executor_reference() + if executor is not None: + executor._shutdown_thread = True + # Cancel pending work items if requested. + if executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in self.pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + self.pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + self.work_ids_queue.get_nowait() + except queue.Empty: + break + # Make sure we do this only once to not waste time looping + # on running processes over and over. + executor._cancel_pending_futures = False + + def shutdown_workers(self): + n_children_to_stop = self.get_n_children_alive() + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while (n_sentinels_sent < n_children_to_stop + and self.get_n_children_alive() > 0): + for i in range(n_children_to_stop - n_sentinels_sent): + try: + self.call_queue.put_nowait(None) + n_sentinels_sent += 1 + except queue.Full: + break + + def join_executor_internals(self): + self.shutdown_workers() + # Release the queue's resources as soon as possible. + self.call_queue.close() + self.call_queue.join_thread() + self.thread_wakeup.close() + # If .join() is not called on the created processes then + # some ctx.Queue methods may deadlock on Mac OS X. + for p in self.processes.values(): + p.join() + + def get_n_children_alive(self): + # This is an upper bound on the number of children alive. + return sum(p.is_alive() for p in self.processes.values()) _system_limits_checked = False @@ -562,7 +602,7 @@ class ProcessPoolExecutor(_base.Executor): self._initargs = initargs # Management thread - self._queue_management_thread = None + self._executor_manager_thread = None # Map of pids to processes self._processes = {} @@ -576,12 +616,12 @@ class ProcessPoolExecutor(_base.Executor): self._cancel_pending_futures = False # _ThreadWakeup is a communication channel used to interrupt the wait - # of the main loop of queue_manager_thread from another thread (e.g. + # of the main loop of executor_manager_thread from another thread (e.g. # when calling executor.submit or executor.shutdown). We do not use the - # _result_queue to send the wakeup signal to the queue_manager_thread + # _result_queue to send wakeup signals to the executor_manager_thread # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. - self._queue_management_thread_wakeup = _ThreadWakeup() + self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -591,7 +631,7 @@ class ProcessPoolExecutor(_base.Executor): self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, - thread_wakeup=self._queue_management_thread_wakeup) + thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. @@ -599,32 +639,14 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - # When the executor gets garbarge collected, the weakref callback - # will wake up the queue management thread so that it can terminate - # if there is no pending work item. - def weakref_cb(_, - thread_wakeup=self._queue_management_thread_wakeup): - mp.util.debug('Executor collected: triggering callback for' - ' QueueManager wakeup') - thread_wakeup.wakeup() + def _start_executor_manager_thread(self): + if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. self._adjust_process_count() - self._queue_management_thread = threading.Thread( - target=_queue_management_worker, - args=(weakref.ref(self, weakref_cb), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._queue_management_thread_wakeup), - name="QueueManagerThread") - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _threads_wakeups[self._queue_management_thread] = \ - self._queue_management_thread_wakeup + self._executor_manager_thread = _ExecutorManagerThread(self) + self._executor_manager_thread.start() + _threads_wakeups[self._executor_manager_thread] = \ + self._executor_manager_thread_wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -654,9 +676,9 @@ class ProcessPoolExecutor(_base.Executor): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._queue_management_thread_wakeup.wakeup() + self._executor_manager_thread_wakeup.wakeup() - self._start_queue_management_thread() + self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ @@ -694,20 +716,20 @@ class ProcessPoolExecutor(_base.Executor): self._cancel_pending_futures = cancel_futures self._shutdown_thread = True - if self._queue_management_thread: + if self._executor_manager_thread: # Wake up queue management thread - self._queue_management_thread_wakeup.wakeup() + self._executor_manager_thread_wakeup.wakeup() if wait: - self._queue_management_thread.join() + self._executor_manager_thread.join() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. - self._queue_management_thread = None + self._executor_manager_thread = None self._call_queue = None self._result_queue = None self._processes = None - if self._queue_management_thread_wakeup: - self._queue_management_thread_wakeup = None + if self._executor_manager_thread_wakeup: + self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a7381f9d13e..868415ab299 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -508,15 +508,15 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): def test_del_shutdown(self): executor = futures.ProcessPoolExecutor(max_workers=5) res = executor.map(abs, range(-5, 5)) - queue_management_thread = executor._queue_management_thread + executor_manager_thread = executor._executor_manager_thread processes = executor._processes call_queue = executor._call_queue - queue_management_thread = executor._queue_management_thread + executor_manager_thread = executor._executor_manager_thread del executor # Make sure that all the executor resources were properly cleaned by # the shutdown process - queue_management_thread.join() + executor_manager_thread.join() for p in processes.values(): p.join() call_queue.join_thread() @@ -532,12 +532,12 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): res = executor.map(abs, range(-5, 5)) processes = executor._processes call_queue = executor._call_queue - queue_management_thread = executor._queue_management_thread + executor_manager_thread = executor._executor_manager_thread executor.shutdown(wait=False) # Make sure that all the executor resources were properly cleaned by # the shutdown process - queue_management_thread.join() + executor_manager_thread.join() for p in processes.values(): p.join() call_queue.join_thread() @@ -1139,11 +1139,11 @@ class ExecutorDeadlockTest: mp_context=get_context(self.ctx)) as executor: self.executor = executor # Allow clean up in fail_on_deadlock - # Start the executor and get the queue_management_thread to collect + # Start the executor and get the executor_manager_thread to collect # the threads and avoid dangling thread that should be cleaned up # asynchronously. executor.submit(id, 42).result() - queue_manager = executor._queue_management_thread + executor_manager = executor._executor_manager_thread # Submit a task that fails at pickle and shutdown the executor # without waiting @@ -1154,7 +1154,7 @@ class ExecutorDeadlockTest: # Make sure the executor is eventually shutdown and do not leave # dangling threads - queue_manager.join() + executor_manager.join() create_executor_tests(ExecutorDeadlockTest, diff --git a/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst b/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst new file mode 100644 index 00000000000..8b18e2259c5 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst @@ -0,0 +1,2 @@ +Refactor queue_manager in :class:`concurrent.futures.ProcessPoolExecutor` to +make it easier to maintain. \ No newline at end of file