bpo-39678: refactor queue manager thread (GH-18551)

This commit is contained in:
Thomas Moreau 2020-03-01 21:49:14 +01:00 committed by GitHub
parent 397b96f6d7
commit 0e89076247
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 241 additions and 217 deletions

View File

@ -49,7 +49,6 @@ import atexit
import os
from concurrent.futures import _base
import queue
from queue import Full
import multiprocessing as mp
import multiprocessing.connection
from multiprocessing.queues import Queue
@ -176,8 +175,9 @@ class _SafeQueue(Queue):
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
@ -193,6 +193,7 @@ def _get_chunks(*iterables, chunksize):
return
yield chunk
def _process_chunk(fn, chunk):
""" Processes a chunk of an iterable passed to map.
@ -256,122 +257,123 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
del call_item
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
"""Fills call_queue with _WorkItems from pending_work_items.
This function never blocks.
Args:
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
are consumed and the corresponding _WorkItems from
pending_work_items are transformed into _CallItems and put in
call_queue.
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems.
"""
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue,
thread_wakeup):
class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
The manager is run in a local thread.
Args:
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
process: A list of the ctx.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A ctx.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
thread_wakeup: A _ThreadWakeup to allow waking up the
queue_manager_thread from the main Thread and avoid deadlocks
caused by permanently locked queues.
executor: A reference to the ProcessPoolExecutor that owns
this thread. A weakref will be own by the manager as well as
references to internal objects used to introspect the state of
the executor.
"""
executor = None
def shutting_down():
return (_global_shutdown or executor is None
or executor._shutdown_thread)
def __init__(self, executor):
# Store references to necessary internals of the executor.
def shutdown_worker():
# This is an upper bound on the number of children alive.
n_children_alive = sum(p.is_alive() for p in processes.values())
n_children_to_stop = n_children_alive
n_sentinels_sent = 0
# Send the right number of sentinels, to make sure all children are
# properly terminated.
while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
for i in range(n_children_to_stop - n_sentinels_sent):
try:
call_queue.put_nowait(None)
n_sentinels_sent += 1
except Full:
break
n_children_alive = sum(p.is_alive() for p in processes.values())
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
# main Thread and avoid deadlocks caused by permanently locked queues.
self.thread_wakeup = executor._executor_manager_thread_wakeup
# Release the queue's resources as soon as possible.
call_queue.close()
call_queue.join_thread()
thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
# to determine if the ProcessPoolExecutor has been garbage collected
# and that the manager can exit.
# When the executor gets garbage collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_, thread_wakeup=self.thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
result_reader = result_queue._reader
wakeup_reader = thread_wakeup._reader
readers = [result_reader, wakeup_reader]
self.executor_reference = weakref.ref(executor, weakref_cb)
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
# A list of the ctx.Process instances used as workers.
self.processes = executor._processes
# A ctx.Queue that will be filled with _CallItems derived from
# _WorkItems for processing by the process workers.
self.call_queue = executor._call_queue
# A ctx.SimpleQueue of _ResultItems generated by the process workers.
self.result_queue = executor._result_queue
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids
# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
# Set this thread to be daemonized
super().__init__()
self.daemon = True
def run(self):
# Main loop for the executor manager thread.
while True:
self.add_call_item_to_queue()
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
if is_broken:
self.terminate_broken(cause)
return
if result_item is not None:
self.process_result_item(result_item)
# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item
if self.is_shutting_down():
self.flag_executor_shutting_down()
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not self.pending_work_items:
self.join_executor_internals()
return
def add_call_item_to_queue(self):
# Fills call_queue with _WorkItems from pending_work_items.
# This function never blocks.
while True:
if self.call_queue.full():
return
try:
work_id = self.work_ids_queue.get(block=False)
except queue.Empty:
return
else:
work_item = self.pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
self.call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del self.pending_work_items[work_id]
continue
def wait_result_broken_or_wakeup(self):
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
worker_sentinels = [p.sentinel for p in processes.values()]
result_reader = self.result_queue._reader
wakeup_reader = self.thread_wakeup._reader
readers = [result_reader, wakeup_reader]
worker_sentinels = [p.sentinel for p in self.processes.values()]
ready = mp.connection.wait(readers + worker_sentinels)
cause = None
is_broken = True
result_item = None
if result_reader in ready:
try:
result_item = result_reader.recv()
@ -381,97 +383,135 @@ def _queue_management_worker(executor_reference,
elif wakeup_reader in ready:
is_broken = False
result_item = None
thread_wakeup.clear()
if is_broken:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(bpe)
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
shutdown_worker()
return
self.thread_wakeup.clear()
return result_item, is_broken, cause
def process_result_item(self, result_item):
# Process the received a result_item. This can be either the PID of a
# worker that exited gracefully or a _ResultItem
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert shutting_down()
p = processes.pop(result_item)
assert self.is_shutting_down()
p = self.processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
if not self.processes:
self.join_executor_internals()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
else:
# Received a _ResultItem so mark the future as completed.
work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Delete reference to result_item
del result_item
# Check whether we should start shutting down.
executor = executor_reference()
def is_shutting_down(self):
# Check whether we should start shutting down the executor.
executor = self.executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
# Unless there are pending work items, we have nothing to cancel.
if pending_work_items and executor._cancel_pending_futures:
# Cancel all pending futures and update pending_work_items
# to only have futures that are currently running.
new_pending_work_items = {}
for work_id, work_item in pending_work_items.items():
if not work_item.future.cancel():
new_pending_work_items[work_id] = work_item
return (_global_shutdown or executor is None
or executor._shutdown_thread)
pending_work_items = new_pending_work_items
# Drain work_ids_queue since we no longer need to
# add items to the call queue.
while True:
try:
work_ids_queue.get_nowait()
except queue.Empty:
break
def terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_worker()
return
except Full:
# This is not a problem: we will eventually be woken up (in
# result_queue.get()) and be able to send a sentinel again.
pass
executor = None
# Mark the process pool broken so that submits fail right now.
executor = self.executor_reference()
if executor is not None:
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
# All pending tasks are to be marked failed with the following
# BrokenProcessPool error
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
work_item.future.set_exception(bpe)
# Delete references to object. See issue16284
del work_item
self.pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for p in self.processes.values():
p.terminate()
# clean up resources
self.join_executor_internals()
def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
# requested as early as possible if it is not gc-ed yet.
executor = self.executor_reference()
if executor is not None:
executor._shutdown_thread = True
# Cancel pending work items if requested.
if executor._cancel_pending_futures:
# Cancel all pending futures and update pending_work_items
# to only have futures that are currently running.
new_pending_work_items = {}
for work_id, work_item in self.pending_work_items.items():
if not work_item.future.cancel():
new_pending_work_items[work_id] = work_item
self.pending_work_items = new_pending_work_items
# Drain work_ids_queue since we no longer need to
# add items to the call queue.
while True:
try:
self.work_ids_queue.get_nowait()
except queue.Empty:
break
# Make sure we do this only once to not waste time looping
# on running processes over and over.
executor._cancel_pending_futures = False
def shutdown_workers(self):
n_children_to_stop = self.get_n_children_alive()
n_sentinels_sent = 0
# Send the right number of sentinels, to make sure all children are
# properly terminated.
while (n_sentinels_sent < n_children_to_stop
and self.get_n_children_alive() > 0):
for i in range(n_children_to_stop - n_sentinels_sent):
try:
self.call_queue.put_nowait(None)
n_sentinels_sent += 1
except queue.Full:
break
def join_executor_internals(self):
self.shutdown_workers()
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
p.join()
def get_n_children_alive(self):
# This is an upper bound on the number of children alive.
return sum(p.is_alive() for p in self.processes.values())
_system_limits_checked = False
@ -562,7 +602,7 @@ class ProcessPoolExecutor(_base.Executor):
self._initargs = initargs
# Management thread
self._queue_management_thread = None
self._executor_manager_thread = None
# Map of pids to processes
self._processes = {}
@ -576,12 +616,12 @@ class ProcessPoolExecutor(_base.Executor):
self._cancel_pending_futures = False
# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# of the main loop of executor_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# _result_queue to send wakeup signals to the executor_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
@ -591,7 +631,7 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
thread_wakeup=self._queue_management_thread_wakeup)
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
@ -599,32 +639,14 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
def _start_queue_management_thread(self):
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self._queue_management_thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
self._queue_management_thread_wakeup),
name="QueueManagerThread")
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_wakeups[self._queue_management_thread] = \
self._queue_management_thread_wakeup
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
@ -654,9 +676,9 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._queue_management_thread_wakeup.wakeup()
self._executor_manager_thread_wakeup.wakeup()
self._start_queue_management_thread()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
@ -694,20 +716,20 @@ class ProcessPoolExecutor(_base.Executor):
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
if self._queue_management_thread:
if self._executor_manager_thread:
# Wake up queue management thread
self._queue_management_thread_wakeup.wakeup()
self._executor_manager_thread_wakeup.wakeup()
if wait:
self._queue_management_thread.join()
self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._executor_manager_thread = None
self._call_queue = None
self._result_queue = None
self._processes = None
if self._queue_management_thread_wakeup:
self._queue_management_thread_wakeup = None
if self._executor_manager_thread_wakeup:
self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__

View File

@ -508,15 +508,15 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
queue_management_thread = executor._queue_management_thread
executor_manager_thread = executor._executor_manager_thread
processes = executor._processes
call_queue = executor._call_queue
queue_management_thread = executor._queue_management_thread
executor_manager_thread = executor._executor_manager_thread
del executor
# Make sure that all the executor resources were properly cleaned by
# the shutdown process
queue_management_thread.join()
executor_manager_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()
@ -532,12 +532,12 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
res = executor.map(abs, range(-5, 5))
processes = executor._processes
call_queue = executor._call_queue
queue_management_thread = executor._queue_management_thread
executor_manager_thread = executor._executor_manager_thread
executor.shutdown(wait=False)
# Make sure that all the executor resources were properly cleaned by
# the shutdown process
queue_management_thread.join()
executor_manager_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()
@ -1139,11 +1139,11 @@ class ExecutorDeadlockTest:
mp_context=get_context(self.ctx)) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
# Start the executor and get the queue_management_thread to collect
# Start the executor and get the executor_manager_thread to collect
# the threads and avoid dangling thread that should be cleaned up
# asynchronously.
executor.submit(id, 42).result()
queue_manager = executor._queue_management_thread
executor_manager = executor._executor_manager_thread
# Submit a task that fails at pickle and shutdown the executor
# without waiting
@ -1154,7 +1154,7 @@ class ExecutorDeadlockTest:
# Make sure the executor is eventually shutdown and do not leave
# dangling threads
queue_manager.join()
executor_manager.join()
create_executor_tests(ExecutorDeadlockTest,

View File

@ -0,0 +1,2 @@
Refactor queue_manager in :class:`concurrent.futures.ProcessPoolExecutor` to
make it easier to maintain.