mirror of https://github.com/python/cpython
892 lines
30 KiB
Python
892 lines
30 KiB
Python
import faulthandler
|
|
import locale
|
|
import os
|
|
import platform
|
|
import random
|
|
import re
|
|
import sys
|
|
import sysconfig
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from test.libregrtest.cmdline import _parse_args
|
|
from test.libregrtest.runtest import (
|
|
findtests, split_test_packages, runtest, abs_module_name,
|
|
PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
|
|
from test.libregrtest.setup import setup_tests
|
|
from test.libregrtest.pgo import setup_pgo_tests
|
|
from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
|
|
printlist, get_build_info)
|
|
from test import support
|
|
from test.support import TestStats
|
|
from test.support import os_helper
|
|
from test.support import threading_helper
|
|
|
|
|
|
# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
|
|
# Used to protect against threading._shutdown() hang.
|
|
# Must be smaller than buildbot "1200 seconds without output" limit.
|
|
EXIT_TIMEOUT = 120.0
|
|
|
|
EXITCODE_BAD_TEST = 2
|
|
EXITCODE_ENV_CHANGED = 3
|
|
EXITCODE_NO_TESTS_RAN = 4
|
|
EXITCODE_RERUN_FAIL = 5
|
|
EXITCODE_INTERRUPTED = 130
|
|
|
|
|
|
class Regrtest:
|
|
"""Execute a test suite.
|
|
|
|
This also parses command-line options and modifies its behavior
|
|
accordingly.
|
|
|
|
tests -- a list of strings containing test names (optional)
|
|
testdir -- the directory in which to look for tests (optional)
|
|
|
|
Users other than the Python test suite will certainly want to
|
|
specify testdir; if it's omitted, the directory containing the
|
|
Python test suite is searched for.
|
|
|
|
If the tests argument is omitted, the tests listed on the
|
|
command-line will be used. If that's empty, too, then all *.py
|
|
files beginning with test_ will be used.
|
|
|
|
The other default arguments (verbose, quiet, exclude,
|
|
single, randomize, use_resources, trace, coverdir,
|
|
print_slow, and random_seed) allow programmers calling main()
|
|
directly to set the values that would normally be set by flags
|
|
on the command line.
|
|
"""
|
|
def __init__(self):
|
|
# Namespace of command line options
|
|
self.ns = None
|
|
|
|
# tests
|
|
self.tests = []
|
|
self.selected = []
|
|
self.all_runtests: list[RunTests] = []
|
|
|
|
# test results
|
|
self.good: list[str] = []
|
|
self.bad: list[str] = []
|
|
self.rerun_bad: list[str] = []
|
|
self.skipped: list[str] = []
|
|
self.resource_denied: list[str] = []
|
|
self.environment_changed: list[str] = []
|
|
self.run_no_tests: list[str] = []
|
|
self.rerun: list[str] = []
|
|
|
|
self.need_rerun: list[TestResult] = []
|
|
self.first_state: str | None = None
|
|
self.interrupted = False
|
|
self.total_stats = TestStats()
|
|
|
|
# used by --slow
|
|
self.test_times = []
|
|
|
|
# used by --coverage, trace.Trace instance
|
|
self.tracer = None
|
|
|
|
# used to display the progress bar "[ 3/100]"
|
|
self.start_time = time.perf_counter()
|
|
self.test_count_text = ''
|
|
self.test_count_width = 1
|
|
|
|
# used by --single
|
|
self.next_single_test = None
|
|
self.next_single_filename = None
|
|
|
|
# used by --junit-xml
|
|
self.testsuite_xml = None
|
|
|
|
# misc
|
|
self.win_load_tracker = None
|
|
self.tmp_dir = None
|
|
|
|
def get_executed(self):
|
|
return (set(self.good) | set(self.bad) | set(self.skipped)
|
|
| set(self.resource_denied) | set(self.environment_changed)
|
|
| set(self.run_no_tests))
|
|
|
|
def accumulate_result(self, result, rerun=False):
|
|
fail_env_changed = self.ns.fail_env_changed
|
|
test_name = result.test_name
|
|
|
|
match result.state:
|
|
case State.PASSED:
|
|
self.good.append(test_name)
|
|
case State.ENV_CHANGED:
|
|
self.environment_changed.append(test_name)
|
|
case State.SKIPPED:
|
|
self.skipped.append(test_name)
|
|
case State.RESOURCE_DENIED:
|
|
self.resource_denied.append(test_name)
|
|
case State.INTERRUPTED:
|
|
self.interrupted = True
|
|
case State.DID_NOT_RUN:
|
|
self.run_no_tests.append(test_name)
|
|
case _:
|
|
if result.is_failed(fail_env_changed):
|
|
self.bad.append(test_name)
|
|
self.need_rerun.append(result)
|
|
else:
|
|
raise ValueError(f"invalid test state: {result.state!r}")
|
|
|
|
if result.has_meaningful_duration() and not rerun:
|
|
self.test_times.append((result.duration, test_name))
|
|
if result.stats is not None:
|
|
self.total_stats.accumulate(result.stats)
|
|
if rerun:
|
|
self.rerun.append(test_name)
|
|
|
|
xml_data = result.xml_data
|
|
if xml_data:
|
|
import xml.etree.ElementTree as ET
|
|
for e in xml_data:
|
|
try:
|
|
self.testsuite_xml.append(ET.fromstring(e))
|
|
except ET.ParseError:
|
|
print(xml_data, file=sys.__stderr__)
|
|
raise
|
|
|
|
def log(self, line=''):
|
|
empty = not line
|
|
|
|
# add the system load prefix: "load avg: 1.80 "
|
|
load_avg = self.getloadavg()
|
|
if load_avg is not None:
|
|
line = f"load avg: {load_avg:.2f} {line}"
|
|
|
|
# add the timestamp prefix: "0:01:05 "
|
|
test_time = time.perf_counter() - self.start_time
|
|
|
|
mins, secs = divmod(int(test_time), 60)
|
|
hours, mins = divmod(mins, 60)
|
|
test_time = "%d:%02d:%02d" % (hours, mins, secs)
|
|
|
|
line = f"{test_time} {line}"
|
|
if empty:
|
|
line = line[:-1]
|
|
|
|
print(line, flush=True)
|
|
|
|
def display_progress(self, test_index, text):
|
|
quiet = self.ns.quiet
|
|
pgo = self.ns.pgo
|
|
if quiet:
|
|
return
|
|
|
|
# "[ 51/405/1] test_tcl passed"
|
|
line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
|
|
fails = len(self.bad) + len(self.environment_changed)
|
|
if fails and not pgo:
|
|
line = f"{line}/{fails}"
|
|
self.log(f"[{line}] {text}")
|
|
|
|
def parse_args(self, kwargs):
|
|
ns = _parse_args(sys.argv[1:], **kwargs)
|
|
|
|
if ns.xmlpath:
|
|
support.junit_xml_list = self.testsuite_xml = []
|
|
|
|
strip_py_suffix(ns.args)
|
|
|
|
if ns.huntrleaks:
|
|
warmup, repetitions, _ = ns.huntrleaks
|
|
if warmup < 1 or repetitions < 1:
|
|
msg = ("Invalid values for the --huntrleaks/-R parameters. The "
|
|
"number of warmups and repetitions must be at least 1 "
|
|
"each (1:1).")
|
|
print(msg, file=sys.stderr, flush=True)
|
|
sys.exit(2)
|
|
|
|
if ns.tempdir:
|
|
ns.tempdir = os.path.expanduser(ns.tempdir)
|
|
|
|
self.ns = ns
|
|
|
|
def find_tests(self, tests):
|
|
ns = self.ns
|
|
single = ns.single
|
|
fromfile = ns.fromfile
|
|
pgo = ns.pgo
|
|
exclude = ns.exclude
|
|
test_dir = ns.testdir
|
|
starting_test = ns.start
|
|
randomize = ns.randomize
|
|
|
|
self.tests = tests
|
|
|
|
if single:
|
|
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
|
|
try:
|
|
with open(self.next_single_filename, 'r') as fp:
|
|
next_test = fp.read().strip()
|
|
self.tests = [next_test]
|
|
except OSError:
|
|
pass
|
|
|
|
if fromfile:
|
|
self.tests = []
|
|
# regex to match 'test_builtin' in line:
|
|
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
|
|
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
|
|
with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
|
|
for line in fp:
|
|
line = line.split('#', 1)[0]
|
|
line = line.strip()
|
|
match = regex.search(line)
|
|
if match is not None:
|
|
self.tests.append(match.group())
|
|
|
|
strip_py_suffix(self.tests)
|
|
|
|
if pgo:
|
|
# add default PGO tests if no tests are specified
|
|
setup_pgo_tests(ns)
|
|
|
|
exclude_tests = set()
|
|
if exclude:
|
|
for arg in ns.args:
|
|
exclude_tests.add(arg)
|
|
ns.args = []
|
|
|
|
alltests = findtests(testdir=test_dir, exclude=exclude_tests)
|
|
|
|
if not fromfile:
|
|
self.selected = self.tests or ns.args
|
|
if self.selected:
|
|
self.selected = split_test_packages(self.selected)
|
|
else:
|
|
self.selected = alltests
|
|
else:
|
|
self.selected = self.tests
|
|
|
|
if single:
|
|
self.selected = self.selected[:1]
|
|
try:
|
|
pos = alltests.index(self.selected[0])
|
|
self.next_single_test = alltests[pos + 1]
|
|
except IndexError:
|
|
pass
|
|
|
|
# Remove all the selected tests that precede start if it's set.
|
|
if starting_test:
|
|
try:
|
|
del self.selected[:self.selected.index(starting_test)]
|
|
except ValueError:
|
|
print(f"Cannot find starting test: {starting_test}")
|
|
sys.exit(1)
|
|
|
|
if randomize:
|
|
if ns.random_seed is None:
|
|
ns.random_seed = random.randrange(10000000)
|
|
random.seed(ns.random_seed)
|
|
random.shuffle(self.selected)
|
|
|
|
def list_tests(self):
|
|
for name in self.selected:
|
|
print(name)
|
|
|
|
def _list_cases(self, suite):
|
|
for test in suite:
|
|
if isinstance(test, unittest.loader._FailedTest):
|
|
continue
|
|
if isinstance(test, unittest.TestSuite):
|
|
self._list_cases(test)
|
|
elif isinstance(test, unittest.TestCase):
|
|
if support.match_test(test):
|
|
print(test.id())
|
|
|
|
def list_cases(self):
|
|
ns = self.ns
|
|
test_dir = ns.testdir
|
|
support.verbose = False
|
|
support.set_match_tests(ns.match_tests, ns.ignore_tests)
|
|
|
|
skipped = []
|
|
for test_name in self.selected:
|
|
module_name = abs_module_name(test_name, test_dir)
|
|
try:
|
|
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
|
|
self._list_cases(suite)
|
|
except unittest.SkipTest:
|
|
skipped.append(test_name)
|
|
|
|
if skipped:
|
|
sys.stdout.flush()
|
|
stderr = sys.stderr
|
|
print(file=stderr)
|
|
print(count(len(skipped), "test"), "skipped:", file=stderr)
|
|
printlist(skipped, file=stderr)
|
|
|
|
def get_rerun_match(self, rerun_list) -> MatchTestsDict:
|
|
rerun_match_tests = {}
|
|
for result in rerun_list:
|
|
match_tests = result.get_rerun_match_tests()
|
|
# ignore empty match list
|
|
if match_tests:
|
|
rerun_match_tests[result.test_name] = match_tests
|
|
return rerun_match_tests
|
|
|
|
def _rerun_failed_tests(self, need_rerun):
|
|
# Configure the runner to re-run tests
|
|
ns = self.ns
|
|
ns.verbose = True
|
|
ns.failfast = False
|
|
ns.verbose3 = False
|
|
ns.forever = False
|
|
if ns.use_mp is None:
|
|
ns.use_mp = 1
|
|
|
|
# Get tests to re-run
|
|
tests = [result.test_name for result in need_rerun]
|
|
match_tests = self.get_rerun_match(need_rerun)
|
|
self.set_tests(tests)
|
|
|
|
# Clear previously failed tests
|
|
self.rerun_bad.extend(self.bad)
|
|
self.bad.clear()
|
|
self.need_rerun.clear()
|
|
|
|
# Re-run failed tests
|
|
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
|
|
runtests = RunTests(tests, match_tests=match_tests, rerun=True)
|
|
self.all_runtests.append(runtests)
|
|
self._run_tests_mp(runtests)
|
|
|
|
def rerun_failed_tests(self, need_rerun):
|
|
if self.ns.python:
|
|
# Temp patch for https://github.com/python/cpython/issues/94052
|
|
self.log(
|
|
"Re-running failed tests is not supported with --python "
|
|
"host runner option."
|
|
)
|
|
return
|
|
|
|
self.first_state = self.get_tests_state()
|
|
|
|
print()
|
|
self._rerun_failed_tests(need_rerun)
|
|
|
|
if self.bad:
|
|
print(count(len(self.bad), 'test'), "failed again:")
|
|
printlist(self.bad)
|
|
|
|
self.display_result()
|
|
|
|
def display_result(self):
|
|
pgo = self.ns.pgo
|
|
quiet = self.ns.quiet
|
|
print_slow = self.ns.print_slow
|
|
|
|
# If running the test suite for PGO then no one cares about results.
|
|
if pgo:
|
|
return
|
|
|
|
print()
|
|
print("== Tests result: %s ==" % self.get_tests_state())
|
|
|
|
if self.interrupted:
|
|
print("Test suite interrupted by signal SIGINT.")
|
|
|
|
omitted = set(self.selected) - self.get_executed()
|
|
if omitted:
|
|
print()
|
|
print(count(len(omitted), "test"), "omitted:")
|
|
printlist(omitted)
|
|
|
|
if self.good and not quiet:
|
|
print()
|
|
if (not self.bad
|
|
and not self.skipped
|
|
and not self.interrupted
|
|
and len(self.good) > 1):
|
|
print("All", end=' ')
|
|
print(count(len(self.good), "test"), "OK.")
|
|
|
|
if print_slow:
|
|
self.test_times.sort(reverse=True)
|
|
print()
|
|
print("10 slowest tests:")
|
|
for test_time, test in self.test_times[:10]:
|
|
print("- %s: %s" % (test, format_duration(test_time)))
|
|
|
|
if self.bad:
|
|
print()
|
|
print(count(len(self.bad), "test"), "failed:")
|
|
printlist(self.bad)
|
|
|
|
if self.environment_changed:
|
|
print()
|
|
print("{} altered the execution environment:".format(
|
|
count(len(self.environment_changed), "test")))
|
|
printlist(self.environment_changed)
|
|
|
|
if self.skipped and not quiet:
|
|
print()
|
|
print(count(len(self.skipped), "test"), "skipped:")
|
|
printlist(self.skipped)
|
|
|
|
if self.resource_denied and not quiet:
|
|
print()
|
|
print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
|
|
printlist(self.resource_denied)
|
|
|
|
if self.rerun:
|
|
print()
|
|
print("%s:" % count(len(self.rerun), "re-run test"))
|
|
printlist(self.rerun)
|
|
|
|
if self.run_no_tests:
|
|
print()
|
|
print(count(len(self.run_no_tests), "test"), "run no tests:")
|
|
printlist(self.run_no_tests)
|
|
|
|
def run_test(self, test_index, test_name, previous_test, save_modules):
|
|
text = test_name
|
|
if previous_test:
|
|
text = '%s -- %s' % (text, previous_test)
|
|
self.display_progress(test_index, text)
|
|
|
|
if self.tracer:
|
|
# If we're tracing code coverage, then we don't exit with status
|
|
# if on a false return value from main.
|
|
cmd = ('result = runtest(self.ns, test_name); '
|
|
'self.accumulate_result(result)')
|
|
ns = dict(locals())
|
|
self.tracer.runctx(cmd, globals=globals(), locals=ns)
|
|
result = ns['result']
|
|
else:
|
|
result = runtest(self.ns, test_name)
|
|
self.accumulate_result(result)
|
|
|
|
# Unload the newly imported modules (best effort finalization)
|
|
for module in sys.modules.keys():
|
|
if module not in save_modules and module.startswith("test."):
|
|
support.unload(module)
|
|
|
|
return result
|
|
|
|
def run_tests_sequentially(self, runtests):
|
|
ns = self.ns
|
|
coverage = ns.trace
|
|
fail_fast = ns.failfast
|
|
fail_env_changed = ns.fail_env_changed
|
|
timeout = ns.timeout
|
|
|
|
if coverage:
|
|
import trace
|
|
self.tracer = trace.Trace(trace=False, count=True)
|
|
|
|
save_modules = sys.modules.keys()
|
|
|
|
msg = "Run tests sequentially"
|
|
if timeout:
|
|
msg += " (timeout: %s)" % format_duration(timeout)
|
|
self.log(msg)
|
|
|
|
previous_test = None
|
|
tests_iter = runtests.iter_tests()
|
|
for test_index, test_name in enumerate(tests_iter, 1):
|
|
start_time = time.perf_counter()
|
|
|
|
result = self.run_test(test_index, test_name,
|
|
previous_test, save_modules)
|
|
|
|
if result.must_stop(fail_fast, fail_env_changed):
|
|
break
|
|
|
|
previous_test = str(result)
|
|
test_time = time.perf_counter() - start_time
|
|
if test_time >= PROGRESS_MIN_TIME:
|
|
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
|
|
elif result.state == State.PASSED:
|
|
# be quiet: say nothing if the test passed shortly
|
|
previous_test = None
|
|
|
|
if previous_test:
|
|
print(previous_test)
|
|
|
|
def display_header(self):
|
|
# Print basic platform information
|
|
print("==", platform.python_implementation(), *sys.version.split())
|
|
print("==", platform.platform(aliased=True),
|
|
"%s-endian" % sys.byteorder)
|
|
print("== Python build:", ' '.join(get_build_info()))
|
|
print("== cwd:", os.getcwd())
|
|
cpu_count = os.cpu_count()
|
|
if cpu_count:
|
|
print("== CPU count:", cpu_count)
|
|
print("== encodings: locale=%s, FS=%s"
|
|
% (locale.getencoding(), sys.getfilesystemencoding()))
|
|
self.display_sanitizers()
|
|
|
|
def display_sanitizers(self):
|
|
# This makes it easier to remember what to set in your local
|
|
# environment when trying to reproduce a sanitizer failure.
|
|
asan = support.check_sanitizer(address=True)
|
|
msan = support.check_sanitizer(memory=True)
|
|
ubsan = support.check_sanitizer(ub=True)
|
|
sanitizers = []
|
|
if asan:
|
|
sanitizers.append("address")
|
|
if msan:
|
|
sanitizers.append("memory")
|
|
if ubsan:
|
|
sanitizers.append("undefined behavior")
|
|
if not sanitizers:
|
|
return
|
|
|
|
print(f"== sanitizers: {', '.join(sanitizers)}")
|
|
for sanitizer, env_var in (
|
|
(asan, "ASAN_OPTIONS"),
|
|
(msan, "MSAN_OPTIONS"),
|
|
(ubsan, "UBSAN_OPTIONS"),
|
|
):
|
|
options= os.environ.get(env_var)
|
|
if sanitizer and options is not None:
|
|
print(f"== {env_var}={options!r}")
|
|
|
|
def no_tests_run(self):
|
|
return not any((self.good, self.bad, self.skipped, self.interrupted,
|
|
self.environment_changed))
|
|
|
|
def get_tests_state(self):
|
|
fail_env_changed = self.ns.fail_env_changed
|
|
|
|
result = []
|
|
if self.bad:
|
|
result.append("FAILURE")
|
|
elif fail_env_changed and self.environment_changed:
|
|
result.append("ENV CHANGED")
|
|
elif self.no_tests_run():
|
|
result.append("NO TESTS RAN")
|
|
|
|
if self.interrupted:
|
|
result.append("INTERRUPTED")
|
|
|
|
if not result:
|
|
result.append("SUCCESS")
|
|
|
|
result = ', '.join(result)
|
|
if self.first_state:
|
|
result = '%s then %s' % (self.first_state, result)
|
|
return result
|
|
|
|
def _run_tests_mp(self, runtests: RunTests) -> None:
|
|
from test.libregrtest.runtest_mp import run_tests_multiprocess
|
|
# If we're on windows and this is the parent runner (not a worker),
|
|
# track the load average.
|
|
if sys.platform == 'win32':
|
|
from test.libregrtest.win_utils import WindowsLoadTracker
|
|
|
|
try:
|
|
self.win_load_tracker = WindowsLoadTracker()
|
|
except PermissionError as error:
|
|
# Standard accounts may not have access to the performance
|
|
# counters.
|
|
print(f'Failed to create WindowsLoadTracker: {error}')
|
|
|
|
try:
|
|
run_tests_multiprocess(self, runtests)
|
|
finally:
|
|
if self.win_load_tracker is not None:
|
|
self.win_load_tracker.close()
|
|
self.win_load_tracker = None
|
|
|
|
def set_tests(self, tests):
|
|
self.tests = tests
|
|
if self.ns.forever:
|
|
self.test_count_text = ''
|
|
self.test_count_width = 3
|
|
else:
|
|
self.test_count_text = '/{}'.format(len(self.tests))
|
|
self.test_count_width = len(self.test_count_text) - 1
|
|
|
|
def run_tests(self):
|
|
# For a partial run, we do not need to clutter the output.
|
|
if (self.ns.header
|
|
or not(self.ns.pgo or self.ns.quiet or self.ns.single
|
|
or self.tests or self.ns.args)):
|
|
self.display_header()
|
|
|
|
if self.ns.huntrleaks:
|
|
warmup, repetitions, _ = self.ns.huntrleaks
|
|
if warmup < 3:
|
|
msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
|
|
"3 warmup repetitions can give false positives!")
|
|
print(msg, file=sys.stdout, flush=True)
|
|
|
|
if self.ns.randomize:
|
|
print("Using random seed", self.ns.random_seed)
|
|
|
|
tests = self.selected
|
|
self.set_tests(tests)
|
|
runtests = RunTests(tests, forever=self.ns.forever)
|
|
self.all_runtests.append(runtests)
|
|
if self.ns.use_mp:
|
|
self._run_tests_mp(runtests)
|
|
else:
|
|
self.run_tests_sequentially(runtests)
|
|
|
|
def finalize(self):
|
|
if self.next_single_filename:
|
|
if self.next_single_test:
|
|
with open(self.next_single_filename, 'w') as fp:
|
|
fp.write(self.next_single_test + '\n')
|
|
else:
|
|
os.unlink(self.next_single_filename)
|
|
|
|
if self.tracer:
|
|
r = self.tracer.results()
|
|
r.write_results(show_missing=True, summary=True,
|
|
coverdir=self.ns.coverdir)
|
|
|
|
if self.ns.runleaks:
|
|
os.system("leaks %d" % os.getpid())
|
|
|
|
self.save_xml_result()
|
|
|
|
def display_summary(self):
|
|
duration = time.perf_counter() - self.start_time
|
|
first_runtests = self.all_runtests[0]
|
|
# the second runtests (re-run failed tests) disables forever,
|
|
# use the first runtests
|
|
forever = first_runtests.forever
|
|
filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
|
|
|
|
# Total duration
|
|
print()
|
|
print("Total duration: %s" % format_duration(duration))
|
|
|
|
# Total tests
|
|
total = self.total_stats
|
|
text = f'run={total.tests_run:,}'
|
|
if filtered:
|
|
text = f"{text} (filtered)"
|
|
stats = [text]
|
|
if total.failures:
|
|
stats.append(f'failures={total.failures:,}')
|
|
if total.skipped:
|
|
stats.append(f'skipped={total.skipped:,}')
|
|
print(f"Total tests: {' '.join(stats)}")
|
|
|
|
# Total test files
|
|
all_tests = [self.good, self.bad, self.rerun,
|
|
self.skipped,
|
|
self.environment_changed, self.run_no_tests]
|
|
run = sum(map(len, all_tests))
|
|
text = f'run={run}'
|
|
if not forever:
|
|
ntest = len(first_runtests.tests)
|
|
text = f"{text}/{ntest}"
|
|
if filtered:
|
|
text = f"{text} (filtered)"
|
|
report = [text]
|
|
for name, tests in (
|
|
('failed', self.bad),
|
|
('env_changed', self.environment_changed),
|
|
('skipped', self.skipped),
|
|
('resource_denied', self.resource_denied),
|
|
('rerun', self.rerun),
|
|
('run_no_tests', self.run_no_tests),
|
|
):
|
|
if tests:
|
|
report.append(f'{name}={len(tests)}')
|
|
print(f"Total test files: {' '.join(report)}")
|
|
|
|
# Result
|
|
result = self.get_tests_state()
|
|
print(f"Result: {result}")
|
|
|
|
def save_xml_result(self):
|
|
if not self.ns.xmlpath and not self.testsuite_xml:
|
|
return
|
|
|
|
import xml.etree.ElementTree as ET
|
|
root = ET.Element("testsuites")
|
|
|
|
# Manually count the totals for the overall summary
|
|
totals = {'tests': 0, 'errors': 0, 'failures': 0}
|
|
for suite in self.testsuite_xml:
|
|
root.append(suite)
|
|
for k in totals:
|
|
try:
|
|
totals[k] += int(suite.get(k, 0))
|
|
except ValueError:
|
|
pass
|
|
|
|
for k, v in totals.items():
|
|
root.set(k, str(v))
|
|
|
|
xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
|
|
with open(xmlpath, 'wb') as f:
|
|
for s in ET.tostringlist(root):
|
|
f.write(s)
|
|
|
|
def fix_umask(self):
|
|
if support.is_emscripten:
|
|
# Emscripten has default umask 0o777, which breaks some tests.
|
|
# see https://github.com/emscripten-core/emscripten/issues/17269
|
|
old_mask = os.umask(0)
|
|
if old_mask == 0o777:
|
|
os.umask(0o027)
|
|
else:
|
|
os.umask(old_mask)
|
|
|
|
def set_temp_dir(self):
|
|
if self.ns.tempdir:
|
|
self.tmp_dir = self.ns.tempdir
|
|
|
|
if not self.tmp_dir:
|
|
# When tests are run from the Python build directory, it is best practice
|
|
# to keep the test files in a subfolder. This eases the cleanup of leftover
|
|
# files using the "make distclean" command.
|
|
if sysconfig.is_python_build():
|
|
self.tmp_dir = sysconfig.get_config_var('abs_builddir')
|
|
if self.tmp_dir is None:
|
|
# bpo-30284: On Windows, only srcdir is available. Using
|
|
# abs_builddir mostly matters on UNIX when building Python
|
|
# out of the source tree, especially when the source tree
|
|
# is read only.
|
|
self.tmp_dir = sysconfig.get_config_var('srcdir')
|
|
self.tmp_dir = os.path.join(self.tmp_dir, 'build')
|
|
else:
|
|
self.tmp_dir = tempfile.gettempdir()
|
|
|
|
self.tmp_dir = os.path.abspath(self.tmp_dir)
|
|
|
|
def is_worker(self):
|
|
return (self.ns.worker_args is not None)
|
|
|
|
def create_temp_dir(self):
|
|
os.makedirs(self.tmp_dir, exist_ok=True)
|
|
|
|
# Define a writable temp dir that will be used as cwd while running
|
|
# the tests. The name of the dir includes the pid to allow parallel
|
|
# testing (see the -j option).
|
|
# Emscripten and WASI have stubbed getpid(), Emscripten has only
|
|
# milisecond clock resolution. Use randint() instead.
|
|
if sys.platform in {"emscripten", "wasi"}:
|
|
nounce = random.randint(0, 1_000_000)
|
|
else:
|
|
nounce = os.getpid()
|
|
|
|
if self.is_worker():
|
|
test_cwd = 'test_python_worker_{}'.format(nounce)
|
|
else:
|
|
test_cwd = 'test_python_{}'.format(nounce)
|
|
test_cwd += os_helper.FS_NONASCII
|
|
test_cwd = os.path.join(self.tmp_dir, test_cwd)
|
|
return test_cwd
|
|
|
|
def cleanup(self):
|
|
import glob
|
|
|
|
path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
|
|
print("Cleanup %s directory" % self.tmp_dir)
|
|
for name in glob.glob(path):
|
|
if os.path.isdir(name):
|
|
print("Remove directory: %s" % name)
|
|
os_helper.rmtree(name)
|
|
else:
|
|
print("Remove file: %s" % name)
|
|
os_helper.unlink(name)
|
|
|
|
def main(self, tests=None, **kwargs):
|
|
self.parse_args(kwargs)
|
|
|
|
self.set_temp_dir()
|
|
|
|
self.fix_umask()
|
|
|
|
if self.ns.cleanup:
|
|
self.cleanup()
|
|
sys.exit(0)
|
|
|
|
test_cwd = self.create_temp_dir()
|
|
|
|
try:
|
|
# Run the tests in a context manager that temporarily changes the CWD
|
|
# to a temporary and writable directory. If it's not possible to
|
|
# create or change the CWD, the original CWD will be used.
|
|
# The original CWD is available from os_helper.SAVEDCWD.
|
|
with os_helper.temp_cwd(test_cwd, quiet=True):
|
|
# When using multiprocessing, worker processes will use test_cwd
|
|
# as their parent temporary directory. So when the main process
|
|
# exit, it removes also subdirectories of worker processes.
|
|
self.ns.tempdir = test_cwd
|
|
|
|
self._main(tests, kwargs)
|
|
except SystemExit as exc:
|
|
# bpo-38203: Python can hang at exit in Py_Finalize(), especially
|
|
# on threading._shutdown() call: put a timeout
|
|
if threading_helper.can_start_thread:
|
|
faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
|
|
|
|
sys.exit(exc.code)
|
|
|
|
def getloadavg(self):
|
|
if self.win_load_tracker is not None:
|
|
return self.win_load_tracker.getloadavg()
|
|
|
|
if hasattr(os, 'getloadavg'):
|
|
return os.getloadavg()[0]
|
|
|
|
return None
|
|
|
|
def get_exitcode(self):
|
|
exitcode = 0
|
|
if self.bad:
|
|
exitcode = EXITCODE_BAD_TEST
|
|
elif self.interrupted:
|
|
exitcode = EXITCODE_INTERRUPTED
|
|
elif self.ns.fail_env_changed and self.environment_changed:
|
|
exitcode = EXITCODE_ENV_CHANGED
|
|
elif self.no_tests_run():
|
|
exitcode = EXITCODE_NO_TESTS_RAN
|
|
elif self.rerun and self.ns.fail_rerun:
|
|
exitcode = EXITCODE_RERUN_FAIL
|
|
return exitcode
|
|
|
|
def action_run_tests(self):
|
|
self.run_tests()
|
|
self.display_result()
|
|
|
|
need_rerun = self.need_rerun
|
|
if self.ns.rerun and need_rerun:
|
|
self.rerun_failed_tests(need_rerun)
|
|
|
|
self.display_summary()
|
|
self.finalize()
|
|
|
|
def _main(self, tests, kwargs):
|
|
if self.is_worker():
|
|
from test.libregrtest.runtest_mp import run_tests_worker
|
|
run_tests_worker(self.ns.worker_args)
|
|
return
|
|
|
|
if self.ns.wait:
|
|
input("Press any key to continue...")
|
|
|
|
setup_tests(self.ns)
|
|
self.find_tests(tests)
|
|
|
|
exitcode = 0
|
|
if self.ns.list_tests:
|
|
self.list_tests()
|
|
elif self.ns.list_cases:
|
|
self.list_cases()
|
|
else:
|
|
self.action_run_tests()
|
|
exitcode = self.get_exitcode()
|
|
|
|
sys.exit(exitcode)
|
|
|
|
|
|
def main(tests=None, **kwargs):
|
|
"""Run the Python suite."""
|
|
Regrtest().main(tests=tests, **kwargs)
|