bpo-39995: Split test_concurrent_futures.test_crash() into sub-tests (GH-19739)

Now only test_error_during_result_unpickle_in_result_handler()
captures and ignores sys.stderr in the test process.

Tools like test.bisect_cmd don't support subTest() but only
work with the granularity of one method.

Remove unused ExecutorDeadlockTest._sleep_id() method.
This commit is contained in:
Victor Stinner 2020-04-27 21:36:51 +02:00 committed by GitHub
parent 1a275013d1
commit 5d1f32d33b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 85 additions and 63 deletions

View File

@ -1006,11 +1006,6 @@ create_executor_tests(ProcessPoolExecutorTest,
ProcessPoolForkserverMixin, ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin)) ProcessPoolSpawnMixin))
def hide_process_stderr():
import io
sys.stderr = io.StringIO()
def _crash(delay=None): def _crash(delay=None):
"""Induces a segfault.""" """Induces a segfault."""
if delay: if delay:
@ -1027,13 +1022,18 @@ def _exit():
def _raise_error(Err): def _raise_error(Err):
"""Function that raises an Exception in process.""" """Function that raises an Exception in process."""
hide_process_stderr() raise Err()
def _raise_error_ignore_stderr(Err):
"""Function that raises an Exception in process and ignores stderr."""
import io
sys.stderr = io.StringIO()
raise Err() raise Err()
def _return_instance(cls): def _return_instance(cls):
"""Function that returns a instance of cls.""" """Function that returns a instance of cls."""
hide_process_stderr()
return cls() return cls()
@ -1072,17 +1072,12 @@ class ErrorAtUnpickle(object):
"""Bad object that triggers an error at unpickling time.""" """Bad object that triggers an error at unpickling time."""
def __reduce__(self): def __reduce__(self):
from pickle import UnpicklingError from pickle import UnpicklingError
return _raise_error, (UnpicklingError, ) return _raise_error_ignore_stderr, (UnpicklingError, )
class ExecutorDeadlockTest: class ExecutorDeadlockTest:
TIMEOUT = support.SHORT_TIMEOUT TIMEOUT = support.SHORT_TIMEOUT
@classmethod
def _sleep_id(cls, x, delay):
time.sleep(delay)
return x
def _fail_on_deadlock(self, executor): def _fail_on_deadlock(self, executor):
# If we did not recover before TIMEOUT seconds, consider that the # If we did not recover before TIMEOUT seconds, consider that the
# executor is in a deadlock state and forcefully clean all its # executor is in a deadlock state and forcefully clean all its
@ -1102,57 +1097,84 @@ class ExecutorDeadlockTest:
self.fail(f"Executor deadlock:\n\n{tb}") self.fail(f"Executor deadlock:\n\n{tb}")
def test_crash(self): def _check_crash(self, error, func, *args, ignore_stderr=False):
# extensive testing for deadlock caused by crashes in a pool. # test for deadlock caused by crashes in a pool
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
crash_cases = [
# Check problem occurring while pickling a task in executor = self.executor_type(
# the task_handler thread max_workers=2, mp_context=get_context(self.ctx))
(id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), res = executor.submit(func, *args)
# Check problem occurring while unpickling a task on workers
(id, (ExitAtUnpickle(),), BrokenProcessPool, if ignore_stderr:
"exit at task unpickle"), cm = support.captured_stderr()
(id, (ErrorAtUnpickle(),), BrokenProcessPool, else:
"error at task unpickle"), cm = contextlib.nullcontext()
(id, (CrashAtUnpickle(),), BrokenProcessPool,
"crash at task unpickle"), try:
# Check problem occurring during func execution on workers with self.assertRaises(error):
(_crash, (), BrokenProcessPool, with cm:
"crash during func execution on worker"), res.result(timeout=self.TIMEOUT)
(_exit, (), SystemExit, except futures.TimeoutError:
"exit during func execution on worker"), # If we did not recover before TIMEOUT seconds,
(_raise_error, (RuntimeError, ), RuntimeError, # consider that the executor is in a deadlock state
"error during func execution on worker"), self._fail_on_deadlock(executor)
# Check problem occurring while pickling a task result executor.shutdown(wait=True)
# on workers
(_return_instance, (CrashAtPickle,), BrokenProcessPool, def test_error_at_task_pickle(self):
"crash during result pickle on worker"), # Check problem occurring while pickling a task in
(_return_instance, (ExitAtPickle,), SystemExit, # the task_handler thread
"exit during result pickle on worker"), self._check_crash(PicklingError, id, ErrorAtPickle())
(_return_instance, (ErrorAtPickle,), PicklingError,
"error during result pickle on worker"), def test_exit_at_task_unpickle(self):
# Check problem occurring while unpickling a task in # Check problem occurring while unpickling a task on workers
# the result_handler thread self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
(_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
"error during result unpickle in result_handler"), def test_error_at_task_unpickle(self):
(_return_instance, (ExitAtUnpickle,), BrokenProcessPool, # Check problem occurring while unpickling a task on workers
"exit during result unpickle in result_handler") self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
]
for func, args, error, name in crash_cases: def test_crash_at_task_unpickle(self):
with self.subTest(name): # Check problem occurring while unpickling a task on workers
# The captured_stderr reduces the noise in the test report self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
with support.captured_stderr():
executor = self.executor_type( def test_crash_during_func_exec_on_worker(self):
max_workers=2, mp_context=get_context(self.ctx)) # Check problem occurring during func execution on workers
res = executor.submit(func, *args) self._check_crash(BrokenProcessPool, _crash)
with self.assertRaises(error):
try: def test_exit_during_func_exec_on_worker(self):
res.result(timeout=self.TIMEOUT) # Check problem occurring during func execution on workers
except futures.TimeoutError: self._check_crash(SystemExit, _exit)
# If we did not recover before TIMEOUT seconds,
# consider that the executor is in a deadlock state def test_error_during_func_exec_on_worker(self):
self._fail_on_deadlock(executor) # Check problem occurring during func execution on workers
executor.shutdown(wait=True) self._check_crash(RuntimeError, _raise_error, RuntimeError)
def test_crash_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
def test_exit_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(SystemExit, _return_instance, ExitAtPickle)
def test_error_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
def test_error_during_result_unpickle_in_result_handler(self):
# Check problem occurring while unpickling a task in
# the result_handler thread
self._check_crash(BrokenProcessPool,
_return_instance, ErrorAtUnpickle,
ignore_stderr=True)
def test_exit_during_result_unpickle_in_result_handler(self):
# Check problem occurring while unpickling a task in
# the result_handler thread
self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
def test_shutdown_deadlock(self): def test_shutdown_deadlock(self):
# Test that the pool calling shutdown do not cause deadlock # Test that the pool calling shutdown do not cause deadlock