bpo-39995: Fix concurrent.futures _ThreadWakeup (GH-19760)

Fix a race condition in concurrent.futures._ThreadWakeup: access to
_ThreadWakeup is now protected with the shutdown lock.
This commit is contained in:
Victor Stinner 2020-04-29 03:32:06 +02:00 committed by GitHub
parent 7036477323
commit a4dfe8ede5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 14 deletions

View File

@ -90,6 +90,7 @@ def _python_exit():
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
# call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
@ -157,8 +158,10 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
@ -167,7 +170,8 @@ class _SafeQueue(Queue):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
with self.shutdown_lock:
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
@ -268,6 +272,7 @@ class _ExecutorManagerThread(threading.Thread):
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
# main Thread and avoid deadlocks caused by permanently locked queues.
self.thread_wakeup = executor._executor_manager_thread_wakeup
self.shutdown_lock = executor._shutdown_lock
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
# to determine if the ProcessPoolExecutor has been garbage collected
@ -275,10 +280,13 @@ class _ExecutorManagerThread(threading.Thread):
# When the executor gets garbage collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_, thread_wakeup=self.thread_wakeup):
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
with shutdown_lock:
thread_wakeup.wakeup()
self.executor_reference = weakref.ref(executor, weakref_cb)
@ -363,6 +371,7 @@ class _ExecutorManagerThread(threading.Thread):
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
result_reader = self.result_queue._reader
assert not self.thread_wakeup._closed
wakeup_reader = self.thread_wakeup._reader
readers = [result_reader, wakeup_reader]
worker_sentinels = [p.sentinel for p in self.processes.values()]
@ -380,7 +389,9 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
self.thread_wakeup.clear()
with self.shutdown_lock:
self.thread_wakeup.clear()
return result_item, is_broken, cause
@ -500,7 +511,8 @@ class _ExecutorManagerThread(threading.Thread):
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
self.thread_wakeup.close()
with self.shutdown_lock:
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
@ -619,6 +631,8 @@ class ProcessPoolExecutor(_base.Executor):
# _result_queue to send wakeup signals to the executor_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
@ -629,6 +643,7 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
@ -718,12 +733,12 @@ class ProcessPoolExecutor(_base.Executor):
with self._shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
if self._executor_manager_thread_wakeup is not None:
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
if self._executor_manager_thread:
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
if wait:
self._executor_manager_thread.join()
if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._executor_manager_thread = None
@ -732,8 +747,6 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue.close()
self._result_queue = None
self._processes = None
if self._executor_manager_thread_wakeup:
self._executor_manager_thread_wakeup = None
self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__

View File

@ -0,0 +1,2 @@
Fix a race condition in concurrent.futures._ThreadWakeup: access to
_ThreadWakeup is now protected with the shutdown lock.