import contextlib import queue import signal import sys import time import unittest import unittest.mock from pickle import PicklingError from concurrent import futures from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup from test import support from .util import ( create_executor_tests, setup_module, ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin) def _crash(delay=None): """Induces a segfault.""" if delay: time.sleep(delay) import faulthandler faulthandler.disable() faulthandler._sigsegv() def _crash_with_data(data): """Induces a segfault with dummy data in input.""" _crash() def _exit(): """Induces a sys exit with exitcode 1.""" sys.exit(1) def _raise_error(Err): """Function that raises an Exception in process.""" raise Err() def _raise_error_ignore_stderr(Err): """Function that raises an Exception in process and ignores stderr.""" import io sys.stderr = io.StringIO() raise Err() def _return_instance(cls): """Function that returns a instance of cls.""" return cls() class CrashAtPickle(object): """Bad object that triggers a segfault at pickling time.""" def __reduce__(self): _crash() class CrashAtUnpickle(object): """Bad object that triggers a segfault at unpickling time.""" def __reduce__(self): return _crash, () class ExitAtPickle(object): """Bad object that triggers a process exit at pickling time.""" def __reduce__(self): _exit() class ExitAtUnpickle(object): """Bad object that triggers a process exit at unpickling time.""" def __reduce__(self): return _exit, () class ErrorAtPickle(object): """Bad object that triggers an error at pickling time.""" def __reduce__(self): from pickle import PicklingError raise PicklingError("Error in pickle") class ErrorAtUnpickle(object): """Bad object that triggers an error at unpickling time.""" def __reduce__(self): from pickle import UnpicklingError return _raise_error_ignore_stderr, (UnpicklingError, ) class ExecutorDeadlockTest: TIMEOUT = support.LONG_TIMEOUT def _fail_on_deadlock(self, executor): # If we did not recover before TIMEOUT seconds, consider that the # executor is in a deadlock state and forcefully clean all its # composants. import faulthandler from tempfile import TemporaryFile with TemporaryFile(mode="w+") as f: faulthandler.dump_traceback(file=f) f.seek(0) tb = f.read() for p in executor._processes.values(): p.terminate() # This should be safe to call executor.shutdown here as all possible # deadlocks should have been broken. executor.shutdown(wait=True) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) self.fail(f"Executor deadlock:\n\n{tb}") def _check_crash(self, error, func, *args, ignore_stderr=False): # test for deadlock caused by crashes in a pool self.executor.shutdown(wait=True) executor = self.executor_type( max_workers=2, mp_context=self.get_context()) res = executor.submit(func, *args) if ignore_stderr: cm = support.captured_stderr() else: cm = contextlib.nullcontext() try: with self.assertRaises(error): with cm: res.result(timeout=self.TIMEOUT) except futures.TimeoutError: # If we did not recover before TIMEOUT seconds, # consider that the executor is in a deadlock state self._fail_on_deadlock(executor) executor.shutdown(wait=True) def test_error_at_task_pickle(self): # Check problem occurring while pickling a task in # the task_handler thread self._check_crash(PicklingError, id, ErrorAtPickle()) def test_exit_at_task_unpickle(self): # Check problem occurring while unpickling a task on workers self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) def test_error_at_task_unpickle(self): # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr() self.addCleanup(setattr, sys, 'stderr', sys.stderr) # Check problem occurring while unpickling a task on workers self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) def test_crash_at_task_unpickle(self): # Check problem occurring while unpickling a task on workers self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) def test_crash_during_func_exec_on_worker(self): # Check problem occurring during func execution on workers self._check_crash(BrokenProcessPool, _crash) def test_exit_during_func_exec_on_worker(self): # Check problem occurring during func execution on workers self._check_crash(SystemExit, _exit) def test_error_during_func_exec_on_worker(self): # Check problem occurring during func execution on workers self._check_crash(RuntimeError, _raise_error, RuntimeError) def test_crash_during_result_pickle_on_worker(self): # Check problem occurring while pickling a task result # on workers self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) def test_exit_during_result_pickle_on_worker(self): # Check problem occurring while pickling a task result # on workers self._check_crash(SystemExit, _return_instance, ExitAtPickle) def test_error_during_result_pickle_on_worker(self): # Check problem occurring while pickling a task result # on workers self._check_crash(PicklingError, _return_instance, ErrorAtPickle) def test_error_during_result_unpickle_in_result_handler(self): # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr() self.addCleanup(setattr, sys, 'stderr', sys.stderr) # Check problem occurring while unpickling a task in # the result_handler thread self._check_crash(BrokenProcessPool, _return_instance, ErrorAtUnpickle, ignore_stderr=True) def test_exit_during_result_unpickle_in_result_handler(self): # Check problem occurring while unpickling a task in # the result_handler thread self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock # if a worker fails after the shutdown call. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock f = executor.submit(_crash, delay=.1) executor.shutdown(wait=True) with self.assertRaises(BrokenProcessPool): f.result() def test_shutdown_deadlock_pickle(self): # Test that the pool calling shutdown with wait=False does not cause # a deadlock if a task fails at pickle after the shutdown call. # Reported in bpo-39104. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock # Start the executor and get the executor_manager_thread to collect # the threads and avoid dangling thread that should be cleaned up # asynchronously. executor.submit(id, 42).result() executor_manager = executor._executor_manager_thread # Submit a task that fails at pickle and shutdown the executor # without waiting f = executor.submit(id, ErrorAtPickle()) executor.shutdown(wait=False) with self.assertRaises(PicklingError): f.result() # Make sure the executor is eventually shutdown and do not leave # dangling threads executor_manager.join() def test_crash_big_data(self): # Test that there is a clean exception instead of a deadlock when a # child process crashes while some data is being written into the # queue. # https://github.com/python/cpython/issues/94777 self.executor.shutdown(wait=True) data = "a" * support.PIPE_MAX_SIZE with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock with self.assertRaises(BrokenProcessPool): list(executor.map(_crash_with_data, [data] * 10)) executor.shutdown(wait=True) def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): # Issue #105829: The _ExecutorManagerThread wakeup pipe could # fill up and block. See: https://github.com/python/cpython/issues/105829 # Lots of cargo culting while writing this test, apologies if # something is really stupid... self.executor.shutdown(wait=True) if not hasattr(signal, 'alarm'): raise unittest.SkipTest( "Tested platform does not support the alarm signal") def timeout(_signum, _frame): import faulthandler faulthandler.dump_traceback() raise RuntimeError("timed out while submitting jobs?") thread_run = futures.process._ExecutorManagerThread.run def mock_run(self): # Delay thread startup so the wakeup pipe can fill up and block time.sleep(3) thread_run(self) class MockWakeup(_ThreadWakeup): """Mock wakeup object to force the wakeup to block""" def __init__(self): super().__init__() self._dummy_queue = queue.Queue(maxsize=1) def wakeup(self): self._dummy_queue.put(None, block=True) super().wakeup() def clear(self): super().clear() try: while True: self._dummy_queue.get_nowait() except queue.Empty: pass with (unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run), unittest.mock.patch('concurrent.futures.process._ThreadWakeup', MockWakeup)): with self.executor_type(max_workers=2, mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock job_num = 100 job_data = range(job_num) # Need to use sigalarm for timeout detection because # Executor.submit is not guarded by any timeout (both # self._work_ids.put(self._queue_count) and # self._executor_manager_thread_wakeup.wakeup() might # timeout, maybe more?). In this specific case it was # the wakeup call that deadlocked on a blocking pipe. old_handler = signal.signal(signal.SIGALRM, timeout) try: signal.alarm(int(self.TIMEOUT)) self.assertEqual(job_num, len(list(executor.map(int, job_data)))) finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) create_executor_tests(globals(), ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) def setUpModule(): setup_module() if __name__ == "__main__": unittest.main()