gh-109047: concurrent.futures catches PythonFinalizationError (#109810)

concurrent.futures: The *executor manager thread* now catches
exceptions when adding an item to the *call queue*. During Python
finalization, creating a new thread can now raise RuntimeError. Catch
the exception and call terminate_broken() in this case.

Add test_python_finalization_error() to test_concurrent_futures.

concurrent.futures._ExecutorManagerThread changes:

* terminate_broken() no longer calls shutdown_workers() since the
  call queue is no longer working anymore (read and write ends of
  the queue pipe are closed).
* terminate_broken() now terminates child processes, not only
  wait until they complete.
* _ExecutorManagerThread.terminate_broken() now holds shutdown_lock
  to prevent race conditons with ProcessPoolExecutor.submit().

multiprocessing.Queue changes:

* Add _terminate_broken() method.
* _start_thread() sets _thread to None on exception to prevent
  leaking "dangling threads" even if the thread was not started
  yet.
This commit is contained in:
Victor Stinner 2023-09-29 21:31:19 +02:00 committed by GitHub
parent f3df8fa669
commit 6351842121
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 22 deletions

View File

@ -341,7 +341,14 @@ class _ExecutorManagerThread(threading.Thread):
# Main loop for the executor manager thread.
while True:
self.add_call_item_to_queue()
# gh-109047: During Python finalization, self.call_queue.put()
# creation of a thread can fail with RuntimeError.
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
@ -425,8 +432,8 @@ class _ExecutorManagerThread(threading.Thread):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)
elif wakeup_reader in ready:
is_broken = False
@ -463,7 +470,7 @@ class _ExecutorManagerThread(threading.Thread):
return (_global_shutdown or executor is None
or executor._shutdown_thread)
def terminate_broken(self, cause):
def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
@ -490,7 +497,7 @@ class _ExecutorManagerThread(threading.Thread):
for work_id, work_item in self.pending_work_items.items():
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError as exc:
except _base.InvalidStateError:
# set_exception() fails if the future is cancelled: ignore it.
# Trying to check if the future is cancelled before calling
# set_exception() would leave a race condition if the future is
@ -505,17 +512,14 @@ class _ExecutorManagerThread(threading.Thread):
for p in self.processes.values():
p.terminate()
# Prevent queue writing to a pipe which is no longer read.
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()
# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self.call_queue._writer.close()
self.call_queue._terminate_broken()
# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)
def terminate_broken(self, cause):
with self.shutdown_lock:
self._terminate_broken(cause)
def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
@ -558,15 +562,24 @@ class _ExecutorManagerThread(threading.Thread):
break
def join_executor_internals(self):
self.shutdown_workers()
with self.shutdown_lock:
self._join_executor_internals()
def _join_executor_internals(self, broken=False):
# If broken, call_queue was closed and so can no longer be used.
if not broken:
self.shutdown_workers()
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
with self.shutdown_lock:
self.thread_wakeup.close()
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
if broken:
p.terminate()
p.join()
def get_n_children_alive(self):

View File

@ -158,6 +158,20 @@ class Queue(object):
except AttributeError:
pass
def _terminate_broken(self):
# Close a Queue on error.
# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()
# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
if sys.platform == 'win32':
self._writer.close()
self.close()
self.join_thread()
def _start_thread(self):
debug('Queue._start_thread()')
@ -169,13 +183,19 @@ class Queue(object):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
name='QueueFeederThread',
daemon=True,
)
self._thread.daemon = True
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
try:
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
except:
# gh-109047: During Python finalization, creating a thread
# can fail with RuntimeError.
self._thread = None
raise
if not self._joincancelled:
self._jointhread = Finalize(

View File

@ -1,5 +1,6 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
@ -187,6 +188,34 @@ class ProcessPoolExecutorTest(ExecutorTest):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
def test_python_finalization_error(self):
# gh-109047: Catch RuntimeError on thread creation
# during Python finalization.
context = self.get_context()
# gh-109047: Mock the threading.start_new_thread() function to inject
# RuntimeError: simulate the error raised during Python finalization.
# Block the second creation: create _ExecutorManagerThread, but block
# QueueFeederThread.
orig_start_new_thread = threading._start_new_thread
nthread = 0
def mock_start_new_thread(func, *args):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
return orig_start_new_thread(func, *args)
with support.swap_attr(threading, '_start_new_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,

View File

@ -0,0 +1,4 @@
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
when adding an item to the *call queue*. During Python finalization, creating a
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
``terminate_broken()`` in this case. Patch by Victor Stinner.