bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure (GH-17670)
As reported initially by @rad-pat in #6084, the following script causes a deadlock. ``` from concurrent.futures import ProcessPoolExecutor class ObjectWithPickleError(): """Triggers a RuntimeError when sending job to the workers""" def __reduce__(self): raise RuntimeError() if __name__ == "__main__": e = ProcessPoolExecutor() f = e.submit(id, ObjectWithPickleError()) e.shutdown(wait=False) f.result() # Deadlock on get ``` This is caused by the fact that the main process is closing communication channels that might be necessary to the `queue_management_thread` later. To avoid this, this PR let the `queue_management_thread` manage all the closing. https://bugs.python.org/issue39104 Automerge-Triggered-By: @pitrou
This commit is contained in:
parent
1ed61617a4
commit
a5cbab552d
|
@ -80,18 +80,23 @@ _global_shutdown = False
|
||||||
|
|
||||||
class _ThreadWakeup:
|
class _ThreadWakeup:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
self._closed = False
|
||||||
self._reader, self._writer = mp.Pipe(duplex=False)
|
self._reader, self._writer = mp.Pipe(duplex=False)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self._writer.close()
|
if not self._closed:
|
||||||
self._reader.close()
|
self._closed = True
|
||||||
|
self._writer.close()
|
||||||
|
self._reader.close()
|
||||||
|
|
||||||
def wakeup(self):
|
def wakeup(self):
|
||||||
self._writer.send_bytes(b"")
|
if not self._closed:
|
||||||
|
self._writer.send_bytes(b"")
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
while self._reader.poll():
|
if not self._closed:
|
||||||
self._reader.recv_bytes()
|
while self._reader.poll():
|
||||||
|
self._reader.recv_bytes()
|
||||||
|
|
||||||
|
|
||||||
def _python_exit():
|
def _python_exit():
|
||||||
|
@ -160,8 +165,9 @@ class _CallItem(object):
|
||||||
|
|
||||||
class _SafeQueue(Queue):
|
class _SafeQueue(Queue):
|
||||||
"""Safe Queue set exception to the future object linked to a job"""
|
"""Safe Queue set exception to the future object linked to a job"""
|
||||||
def __init__(self, max_size=0, *, ctx, pending_work_items):
|
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
|
||||||
self.pending_work_items = pending_work_items
|
self.pending_work_items = pending_work_items
|
||||||
|
self.thread_wakeup = thread_wakeup
|
||||||
super().__init__(max_size, ctx=ctx)
|
super().__init__(max_size, ctx=ctx)
|
||||||
|
|
||||||
def _on_queue_feeder_error(self, e, obj):
|
def _on_queue_feeder_error(self, e, obj):
|
||||||
|
@ -169,6 +175,7 @@ class _SafeQueue(Queue):
|
||||||
tb = traceback.format_exception(type(e), e, e.__traceback__)
|
tb = traceback.format_exception(type(e), e, e.__traceback__)
|
||||||
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
|
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
|
||||||
work_item = self.pending_work_items.pop(obj.work_id, None)
|
work_item = self.pending_work_items.pop(obj.work_id, None)
|
||||||
|
self.thread_wakeup.wakeup()
|
||||||
# work_item can be None if another process terminated. In this case,
|
# work_item can be None if another process terminated. In this case,
|
||||||
# the queue_manager_thread fails all work_items with BrokenProcessPool
|
# the queue_manager_thread fails all work_items with BrokenProcessPool
|
||||||
if work_item is not None:
|
if work_item is not None:
|
||||||
|
@ -339,6 +346,8 @@ def _queue_management_worker(executor_reference,
|
||||||
|
|
||||||
# Release the queue's resources as soon as possible.
|
# Release the queue's resources as soon as possible.
|
||||||
call_queue.close()
|
call_queue.close()
|
||||||
|
call_queue.join_thread()
|
||||||
|
thread_wakeup.close()
|
||||||
# If .join() is not called on the created processes then
|
# If .join() is not called on the created processes then
|
||||||
# some ctx.Queue methods may deadlock on Mac OS X.
|
# some ctx.Queue methods may deadlock on Mac OS X.
|
||||||
for p in processes.values():
|
for p in processes.values():
|
||||||
|
@ -566,21 +575,6 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
self._pending_work_items = {}
|
self._pending_work_items = {}
|
||||||
self._cancel_pending_futures = False
|
self._cancel_pending_futures = False
|
||||||
|
|
||||||
# Create communication channels for the executor
|
|
||||||
# Make the call queue slightly larger than the number of processes to
|
|
||||||
# prevent the worker processes from idling. But don't make it too big
|
|
||||||
# because futures in the call queue cannot be cancelled.
|
|
||||||
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
|
|
||||||
self._call_queue = _SafeQueue(
|
|
||||||
max_size=queue_size, ctx=self._mp_context,
|
|
||||||
pending_work_items=self._pending_work_items)
|
|
||||||
# Killed worker processes can produce spurious "broken pipe"
|
|
||||||
# tracebacks in the queue's own worker thread. But we detect killed
|
|
||||||
# processes anyway, so silence the tracebacks.
|
|
||||||
self._call_queue._ignore_epipe = True
|
|
||||||
self._result_queue = mp_context.SimpleQueue()
|
|
||||||
self._work_ids = queue.Queue()
|
|
||||||
|
|
||||||
# _ThreadWakeup is a communication channel used to interrupt the wait
|
# _ThreadWakeup is a communication channel used to interrupt the wait
|
||||||
# of the main loop of queue_manager_thread from another thread (e.g.
|
# of the main loop of queue_manager_thread from another thread (e.g.
|
||||||
# when calling executor.submit or executor.shutdown). We do not use the
|
# when calling executor.submit or executor.shutdown). We do not use the
|
||||||
|
@ -589,6 +583,22 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
# _result_queue write lock still acquired.
|
# _result_queue write lock still acquired.
|
||||||
self._queue_management_thread_wakeup = _ThreadWakeup()
|
self._queue_management_thread_wakeup = _ThreadWakeup()
|
||||||
|
|
||||||
|
# Create communication channels for the executor
|
||||||
|
# Make the call queue slightly larger than the number of processes to
|
||||||
|
# prevent the worker processes from idling. But don't make it too big
|
||||||
|
# because futures in the call queue cannot be cancelled.
|
||||||
|
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
|
||||||
|
self._call_queue = _SafeQueue(
|
||||||
|
max_size=queue_size, ctx=self._mp_context,
|
||||||
|
pending_work_items=self._pending_work_items,
|
||||||
|
thread_wakeup=self._queue_management_thread_wakeup)
|
||||||
|
# Killed worker processes can produce spurious "broken pipe"
|
||||||
|
# tracebacks in the queue's own worker thread. But we detect killed
|
||||||
|
# processes anyway, so silence the tracebacks.
|
||||||
|
self._call_queue._ignore_epipe = True
|
||||||
|
self._result_queue = mp_context.SimpleQueue()
|
||||||
|
self._work_ids = queue.Queue()
|
||||||
|
|
||||||
def _start_queue_management_thread(self):
|
def _start_queue_management_thread(self):
|
||||||
if self._queue_management_thread is None:
|
if self._queue_management_thread is None:
|
||||||
# When the executor gets garbarge collected, the weakref callback
|
# When the executor gets garbarge collected, the weakref callback
|
||||||
|
@ -692,16 +702,11 @@ class ProcessPoolExecutor(_base.Executor):
|
||||||
# To reduce the risk of opening too many files, remove references to
|
# To reduce the risk of opening too many files, remove references to
|
||||||
# objects that use file descriptors.
|
# objects that use file descriptors.
|
||||||
self._queue_management_thread = None
|
self._queue_management_thread = None
|
||||||
if self._call_queue is not None:
|
self._call_queue = None
|
||||||
self._call_queue.close()
|
|
||||||
if wait:
|
|
||||||
self._call_queue.join_thread()
|
|
||||||
self._call_queue = None
|
|
||||||
self._result_queue = None
|
self._result_queue = None
|
||||||
self._processes = None
|
self._processes = None
|
||||||
|
|
||||||
if self._queue_management_thread_wakeup:
|
if self._queue_management_thread_wakeup:
|
||||||
self._queue_management_thread_wakeup.close()
|
|
||||||
self._queue_management_thread_wakeup = None
|
self._queue_management_thread_wakeup = None
|
||||||
|
|
||||||
shutdown.__doc__ = _base.Executor.shutdown.__doc__
|
shutdown.__doc__ = _base.Executor.shutdown.__doc__
|
||||||
|
|
|
@ -415,13 +415,32 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
|
||||||
|
|
||||||
def test_del_shutdown(self):
|
def test_del_shutdown(self):
|
||||||
executor = futures.ThreadPoolExecutor(max_workers=5)
|
executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||||
executor.map(abs, range(-5, 5))
|
res = executor.map(abs, range(-5, 5))
|
||||||
threads = executor._threads
|
threads = executor._threads
|
||||||
del executor
|
del executor
|
||||||
|
|
||||||
for t in threads:
|
for t in threads:
|
||||||
t.join()
|
t.join()
|
||||||
|
|
||||||
|
# Make sure the results were all computed before the
|
||||||
|
# executor got shutdown.
|
||||||
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||||
|
|
||||||
|
def test_shutdown_no_wait(self):
|
||||||
|
# Ensure that the executor cleans up the threads when calling
|
||||||
|
# shutdown with wait=False
|
||||||
|
executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||||
|
res = executor.map(abs, range(-5, 5))
|
||||||
|
threads = executor._threads
|
||||||
|
executor.shutdown(wait=False)
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
# Make sure the results were all computed before the
|
||||||
|
# executor got shutdown.
|
||||||
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||||
|
|
||||||
|
|
||||||
def test_thread_names_assigned(self):
|
def test_thread_names_assigned(self):
|
||||||
executor = futures.ThreadPoolExecutor(
|
executor = futures.ThreadPoolExecutor(
|
||||||
max_workers=5, thread_name_prefix='SpecialPool')
|
max_workers=5, thread_name_prefix='SpecialPool')
|
||||||
|
@ -488,7 +507,7 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
|
||||||
|
|
||||||
def test_del_shutdown(self):
|
def test_del_shutdown(self):
|
||||||
executor = futures.ProcessPoolExecutor(max_workers=5)
|
executor = futures.ProcessPoolExecutor(max_workers=5)
|
||||||
list(executor.map(abs, range(-5, 5)))
|
res = executor.map(abs, range(-5, 5))
|
||||||
queue_management_thread = executor._queue_management_thread
|
queue_management_thread = executor._queue_management_thread
|
||||||
processes = executor._processes
|
processes = executor._processes
|
||||||
call_queue = executor._call_queue
|
call_queue = executor._call_queue
|
||||||
|
@ -502,6 +521,31 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
|
||||||
p.join()
|
p.join()
|
||||||
call_queue.join_thread()
|
call_queue.join_thread()
|
||||||
|
|
||||||
|
# Make sure the results were all computed before the
|
||||||
|
# executor got shutdown.
|
||||||
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||||
|
|
||||||
|
def test_shutdown_no_wait(self):
|
||||||
|
# Ensure that the executor cleans up the processes when calling
|
||||||
|
# shutdown with wait=False
|
||||||
|
executor = futures.ProcessPoolExecutor(max_workers=5)
|
||||||
|
res = executor.map(abs, range(-5, 5))
|
||||||
|
processes = executor._processes
|
||||||
|
call_queue = executor._call_queue
|
||||||
|
queue_management_thread = executor._queue_management_thread
|
||||||
|
executor.shutdown(wait=False)
|
||||||
|
|
||||||
|
# Make sure that all the executor resources were properly cleaned by
|
||||||
|
# the shutdown process
|
||||||
|
queue_management_thread.join()
|
||||||
|
for p in processes.values():
|
||||||
|
p.join()
|
||||||
|
call_queue.join_thread()
|
||||||
|
|
||||||
|
# Make sure the results were all computed before the executor got
|
||||||
|
# shutdown.
|
||||||
|
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||||
|
|
||||||
|
|
||||||
create_executor_tests(ProcessPoolShutdownTest,
|
create_executor_tests(ProcessPoolShutdownTest,
|
||||||
executor_mixins=(ProcessPoolForkMixin,
|
executor_mixins=(ProcessPoolForkMixin,
|
||||||
|
@ -1086,6 +1130,32 @@ class ExecutorDeadlockTest:
|
||||||
with self.assertRaises(BrokenProcessPool):
|
with self.assertRaises(BrokenProcessPool):
|
||||||
f.result()
|
f.result()
|
||||||
|
|
||||||
|
def test_shutdown_deadlock_pickle(self):
|
||||||
|
# Test that the pool calling shutdown with wait=False does not cause
|
||||||
|
# a deadlock if a task fails at pickle after the shutdown call.
|
||||||
|
# Reported in bpo-39104.
|
||||||
|
self.executor.shutdown(wait=True)
|
||||||
|
with self.executor_type(max_workers=2,
|
||||||
|
mp_context=get_context(self.ctx)) as executor:
|
||||||
|
self.executor = executor # Allow clean up in fail_on_deadlock
|
||||||
|
|
||||||
|
# Start the executor and get the queue_management_thread to collect
|
||||||
|
# the threads and avoid dangling thread that should be cleaned up
|
||||||
|
# asynchronously.
|
||||||
|
executor.submit(id, 42).result()
|
||||||
|
queue_manager = executor._queue_management_thread
|
||||||
|
|
||||||
|
# Submit a task that fails at pickle and shutdown the executor
|
||||||
|
# without waiting
|
||||||
|
f = executor.submit(id, ErrorAtPickle())
|
||||||
|
executor.shutdown(wait=False)
|
||||||
|
with self.assertRaises(PicklingError):
|
||||||
|
f.result()
|
||||||
|
|
||||||
|
# Make sure the executor is eventually shutdown and do not leave
|
||||||
|
# dangling threads
|
||||||
|
queue_manager.join()
|
||||||
|
|
||||||
|
|
||||||
create_executor_tests(ExecutorDeadlockTest,
|
create_executor_tests(ExecutorDeadlockTest,
|
||||||
executor_mixins=(ProcessPoolForkMixin,
|
executor_mixins=(ProcessPoolForkMixin,
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Fix hanging ProcessPoolExcutor on ``shutdown(wait=False)`` when a task has
|
||||||
|
failed pickling.
|
Loading…
Reference in New Issue