diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a8c5bb6aa1a..40597ffee73 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1006,11 +1006,6 @@ create_executor_tests(ProcessPoolExecutorTest, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) -def hide_process_stderr(): - import io - sys.stderr = io.StringIO() - - def _crash(delay=None): """Induces a segfault.""" if delay: @@ -1027,13 +1022,18 @@ def _exit(): def _raise_error(Err): """Function that raises an Exception in process.""" - hide_process_stderr() + raise Err() + + +def _raise_error_ignore_stderr(Err): + """Function that raises an Exception in process and ignores stderr.""" + import io + sys.stderr = io.StringIO() raise Err() def _return_instance(cls): """Function that returns a instance of cls.""" - hide_process_stderr() return cls() @@ -1072,17 +1072,12 @@ class ErrorAtUnpickle(object): """Bad object that triggers an error at unpickling time.""" def __reduce__(self): from pickle import UnpicklingError - return _raise_error, (UnpicklingError, ) + return _raise_error_ignore_stderr, (UnpicklingError, ) class ExecutorDeadlockTest: TIMEOUT = support.SHORT_TIMEOUT - @classmethod - def _sleep_id(cls, x, delay): - time.sleep(delay) - return x - def _fail_on_deadlock(self, executor): # If we did not recover before TIMEOUT seconds, consider that the # executor is in a deadlock state and forcefully clean all its @@ -1102,57 +1097,84 @@ class ExecutorDeadlockTest: self.fail(f"Executor deadlock:\n\n{tb}") - def test_crash(self): - # extensive testing for deadlock caused by crashes in a pool. + def _check_crash(self, error, func, *args, ignore_stderr=False): + # test for deadlock caused by crashes in a pool self.executor.shutdown(wait=True) - crash_cases = [ - # Check problem occurring while pickling a task in - # the task_handler thread - (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), - # Check problem occurring while unpickling a task on workers - (id, (ExitAtUnpickle(),), BrokenProcessPool, - "exit at task unpickle"), - (id, (ErrorAtUnpickle(),), BrokenProcessPool, - "error at task unpickle"), - (id, (CrashAtUnpickle(),), BrokenProcessPool, - "crash at task unpickle"), - # Check problem occurring during func execution on workers - (_crash, (), BrokenProcessPool, - "crash during func execution on worker"), - (_exit, (), SystemExit, - "exit during func execution on worker"), - (_raise_error, (RuntimeError, ), RuntimeError, - "error during func execution on worker"), - # Check problem occurring while pickling a task result - # on workers - (_return_instance, (CrashAtPickle,), BrokenProcessPool, - "crash during result pickle on worker"), - (_return_instance, (ExitAtPickle,), SystemExit, - "exit during result pickle on worker"), - (_return_instance, (ErrorAtPickle,), PicklingError, - "error during result pickle on worker"), - # Check problem occurring while unpickling a task in - # the result_handler thread - (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, - "error during result unpickle in result_handler"), - (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, - "exit during result unpickle in result_handler") - ] - for func, args, error, name in crash_cases: - with self.subTest(name): - # The captured_stderr reduces the noise in the test report - with support.captured_stderr(): - executor = self.executor_type( - max_workers=2, mp_context=get_context(self.ctx)) - res = executor.submit(func, *args) - with self.assertRaises(error): - try: - res.result(timeout=self.TIMEOUT) - except futures.TimeoutError: - # If we did not recover before TIMEOUT seconds, - # consider that the executor is in a deadlock state - self._fail_on_deadlock(executor) - executor.shutdown(wait=True) + + executor = self.executor_type( + max_workers=2, mp_context=get_context(self.ctx)) + res = executor.submit(func, *args) + + if ignore_stderr: + cm = support.captured_stderr() + else: + cm = contextlib.nullcontext() + + try: + with self.assertRaises(error): + with cm: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_error_at_task_pickle(self): + # Check problem occurring while pickling a task in + # the task_handler thread + self._check_crash(PicklingError, id, ErrorAtPickle()) + + def test_exit_at_task_unpickle(self): + # Check problem occurring while unpickling a task on workers + self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) + + def test_error_at_task_unpickle(self): + # Check problem occurring while unpickling a task on workers + self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) + + def test_crash_at_task_unpickle(self): + # Check problem occurring while unpickling a task on workers + self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) + + def test_crash_during_func_exec_on_worker(self): + # Check problem occurring during func execution on workers + self._check_crash(BrokenProcessPool, _crash) + + def test_exit_during_func_exec_on_worker(self): + # Check problem occurring during func execution on workers + self._check_crash(SystemExit, _exit) + + def test_error_during_func_exec_on_worker(self): + # Check problem occurring during func execution on workers + self._check_crash(RuntimeError, _raise_error, RuntimeError) + + def test_crash_during_result_pickle_on_worker(self): + # Check problem occurring while pickling a task result + # on workers + self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) + + def test_exit_during_result_pickle_on_worker(self): + # Check problem occurring while pickling a task result + # on workers + self._check_crash(SystemExit, _return_instance, ExitAtPickle) + + def test_error_during_result_pickle_on_worker(self): + # Check problem occurring while pickling a task result + # on workers + self._check_crash(PicklingError, _return_instance, ErrorAtPickle) + + def test_error_during_result_unpickle_in_result_handler(self): + # Check problem occurring while unpickling a task in + # the result_handler thread + self._check_crash(BrokenProcessPool, + _return_instance, ErrorAtUnpickle, + ignore_stderr=True) + + def test_exit_during_result_unpickle_in_result_handler(self): + # Check problem occurring while unpickling a task in + # the result_handler thread + self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) def test_shutdown_deadlock(self): # Test that the pool calling shutdown do not cause deadlock