import faulthandler import gc import importlib import io import sys import time import traceback import unittest from test import support from test.support import threading_helper from .filter import match_test from .result import State, TestResult, TestStats from .runtests import RunTests from .save_env import saved_test_environment from .setup import setup_tests from .testresult import get_test_runner from .utils import ( TestName, clear_caches, remove_testfn, abs_module_name, print_warning) # Minimum duration of a test to display its duration or to mention that # the test is running in background PROGRESS_MIN_TIME = 30.0 # seconds def run_unittest(test_mod): loader = unittest.TestLoader() tests = loader.loadTestsFromModule(test_mod) for error in loader.errors: print(error, file=sys.stderr) if loader.errors: raise Exception("errors while loading tests") _filter_suite(tests, match_test) return _run_suite(tests) def _filter_suite(suite, pred): """Recursively filter test cases in a suite based on a predicate.""" newtests = [] for test in suite._tests: if isinstance(test, unittest.TestSuite): _filter_suite(test, pred) newtests.append(test) else: if pred(test): newtests.append(test) suite._tests = newtests def _run_suite(suite): """Run tests from a unittest.TestSuite-derived class.""" runner = get_test_runner(sys.stdout, verbosity=support.verbose, capture_output=(support.junit_xml_list is not None)) result = runner.run(suite) if support.junit_xml_list is not None: support.junit_xml_list.append(result.get_xml_element()) if not result.testsRun and not result.skipped and not result.errors: raise support.TestDidNotRun if not result.wasSuccessful(): stats = TestStats.from_unittest(result) if len(result.errors) == 1 and not result.failures: err = result.errors[0][1] elif len(result.failures) == 1 and not result.errors: err = result.failures[0][1] else: err = "multiple errors occurred" if not support.verbose: err += "; run in verbose mode for details" errors = [(str(tc), exc_str) for tc, exc_str in result.errors] failures = [(str(tc), exc_str) for tc, exc_str in result.failures] raise support.TestFailedWithDetails(err, errors, failures, stats=stats) return result def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None: # Run test_func(), collect statistics, and detect reference and memory # leaks. if runtests.hunt_refleak: from .refleak import runtest_refleak refleak, test_result = runtest_refleak(result.test_name, test_func, runtests.hunt_refleak, runtests.quiet) else: test_result = test_func() refleak = False if refleak: result.state = State.REFLEAK stats: TestStats | None match test_result: case TestStats(): stats = test_result case unittest.TestResult(): stats = TestStats.from_unittest(test_result) case None: print_warning(f"{result.test_name} test runner returned None: {test_func}") stats = None case _: # Don't import doctest at top level since only few tests return # a doctest.TestResult instance. import doctest if isinstance(test_result, doctest.TestResults): stats = TestStats.from_doctest(test_result) else: print_warning(f"Unknown test result type: {type(test_result)}") stats = None result.stats = stats # Storage of uncollectable GC objects (gc.garbage) GC_GARBAGE = [] def _load_run_test(result: TestResult, runtests: RunTests) -> None: # Load the test module and run the tests. test_name = result.test_name module_name = abs_module_name(test_name, runtests.test_dir) test_mod = importlib.import_module(module_name) if hasattr(test_mod, "test_main"): # https://github.com/python/cpython/issues/89392 raise Exception(f"Module {test_name} defines test_main() which " f"is no longer supported by regrtest") def test_func(): return run_unittest(test_mod) try: regrtest_runner(result, test_func, runtests) finally: # First kill any dangling references to open files etc. # This can also issue some ResourceWarnings which would otherwise get # triggered during the following test run, and possibly produce # failures. support.gc_collect() remove_testfn(test_name, runtests.verbose) if gc.garbage: support.environment_altered = True print_warning(f"{test_name} created {len(gc.garbage)} " f"uncollectable object(s)") # move the uncollectable objects somewhere, # so we don't see them again GC_GARBAGE.extend(gc.garbage) gc.garbage.clear() support.reap_children() def _runtest_env_changed_exc(result: TestResult, runtests: RunTests, display_failure: bool = True) -> None: # Handle exceptions, detect environment changes. # Reset the environment_altered flag to detect if a test altered # the environment support.environment_altered = False pgo = runtests.pgo if pgo: display_failure = False quiet = runtests.quiet test_name = result.test_name try: clear_caches() support.gc_collect() with saved_test_environment(test_name, runtests.verbose, quiet, pgo=pgo): _load_run_test(result, runtests) except support.ResourceDenied as exc: if not quiet and not pgo: print(f"{test_name} skipped -- {exc}", flush=True) result.state = State.RESOURCE_DENIED return except unittest.SkipTest as exc: if not quiet and not pgo: print(f"{test_name} skipped -- {exc}", flush=True) result.state = State.SKIPPED return except support.TestFailedWithDetails as exc: msg = f"test {test_name} failed" if display_failure: msg = f"{msg} -- {exc}" print(msg, file=sys.stderr, flush=True) result.state = State.FAILED result.errors = exc.errors result.failures = exc.failures result.stats = exc.stats return except support.TestFailed as exc: msg = f"test {test_name} failed" if display_failure: msg = f"{msg} -- {exc}" print(msg, file=sys.stderr, flush=True) result.state = State.FAILED result.stats = exc.stats return except support.TestDidNotRun: result.state = State.DID_NOT_RUN return except KeyboardInterrupt: print() result.state = State.INTERRUPTED return except: if not pgo: msg = traceback.format_exc() print(f"test {test_name} crashed -- {msg}", file=sys.stderr, flush=True) result.state = State.UNCAUGHT_EXC return if support.environment_altered: result.set_env_changed() # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) if result.state is None: result.state = State.PASSED def _runtest(result: TestResult, runtests: RunTests) -> None: # Capture stdout and stderr, set faulthandler timeout, # and create JUnit XML report. verbose = runtests.verbose output_on_failure = runtests.output_on_failure timeout = runtests.timeout if timeout is not None and threading_helper.can_start_thread: use_timeout = True faulthandler.dump_traceback_later(timeout, exit=True) else: use_timeout = False try: setup_tests(runtests) if output_on_failure: support.verbose = True stream = io.StringIO() orig_stdout = sys.stdout orig_stderr = sys.stderr print_warning = support.print_warning orig_print_warnings_stderr = print_warning.orig_stderr output = None try: sys.stdout = stream sys.stderr = stream # print_warning() writes into the temporary stream to preserve # messages order. If support.environment_altered becomes true, # warnings will be written to sys.stderr below. print_warning.orig_stderr = stream _runtest_env_changed_exc(result, runtests, display_failure=False) # Ignore output if the test passed successfully if result.state != State.PASSED: output = stream.getvalue() finally: sys.stdout = orig_stdout sys.stderr = orig_stderr print_warning.orig_stderr = orig_print_warnings_stderr if output is not None: sys.stderr.write(output) sys.stderr.flush() else: # Tell tests to be moderately quiet support.verbose = verbose _runtest_env_changed_exc(result, runtests, display_failure=not verbose) xml_list = support.junit_xml_list if xml_list: import xml.etree.ElementTree as ET result.xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list] finally: if use_timeout: faulthandler.cancel_dump_traceback_later() support.junit_xml_list = None def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult: """Run a single test. test_name -- the name of the test Returns a TestResult. If runtests.use_junit, xml_data is a list containing each generated testsuite element. """ start_time = time.perf_counter() result = TestResult(test_name) pgo = runtests.pgo try: _runtest(result, runtests) except: if not pgo: msg = traceback.format_exc() print(f"test {test_name} crashed -- {msg}", file=sys.stderr, flush=True) result.state = State.UNCAUGHT_EXC sys.stdout.flush() sys.stderr.flush() result.duration = time.perf_counter() - start_time return result