gh-109413: Add more type hints to `libregrtest` (#126352)

This commit is contained in:
sobolevn 2024-11-04 13:15:57 +03:00 committed by GitHub
parent fe5a6ab7be
commit bfc1d2504c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 71 additions and 63 deletions

View File

@ -1,6 +1,7 @@
import os import os
import sys import sys
import unittest import unittest
from collections.abc import Container
from test import support from test import support
@ -34,7 +35,7 @@ def findtestdir(path: StrPath | None = None) -> StrPath:
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(*, testdir: StrPath | None = None, exclude=(), def findtests(*, testdir: StrPath | None = None, exclude: Container[str] = (),
split_test_dirs: set[TestName] = SPLITTESTDIRS, split_test_dirs: set[TestName] = SPLITTESTDIRS,
base_mod: str = "") -> TestList: base_mod: str = "") -> TestList:
"""Return a list of all applicable test modules.""" """Return a list of all applicable test modules."""
@ -60,8 +61,9 @@ def findtests(*, testdir: StrPath | None = None, exclude=(),
return sorted(tests) return sorted(tests)
def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(), def split_test_packages(tests, *, testdir: StrPath | None = None,
split_test_dirs=SPLITTESTDIRS): exclude: Container[str] = (),
split_test_dirs=SPLITTESTDIRS) -> list[TestName]:
testdir = findtestdir(testdir) testdir = findtestdir(testdir)
splitted = [] splitted = []
for name in tests: for name in tests:
@ -75,9 +77,9 @@ def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
return splitted return splitted
def _list_cases(suite): def _list_cases(suite: unittest.TestSuite) -> None:
for test in suite: for test in suite:
if isinstance(test, unittest.loader._FailedTest): if isinstance(test, unittest.loader._FailedTest): # type: ignore[attr-defined]
continue continue
if isinstance(test, unittest.TestSuite): if isinstance(test, unittest.TestSuite):
_list_cases(test) _list_cases(test)
@ -87,7 +89,7 @@ def _list_cases(suite):
def list_cases(tests: TestTuple, *, def list_cases(tests: TestTuple, *,
match_tests: TestFilter | None = None, match_tests: TestFilter | None = None,
test_dir: StrPath | None = None): test_dir: StrPath | None = None) -> None:
support.verbose = False support.verbose = False
set_match_tests(match_tests) set_match_tests(match_tests)

View File

@ -6,6 +6,7 @@ import sys
import sysconfig import sysconfig
import time import time
import trace import trace
from typing import NoReturn
from test.support import os_helper, MS_WINDOWS, flush_std_streams from test.support import os_helper, MS_WINDOWS, flush_std_streams
@ -154,7 +155,7 @@ class Regrtest:
self.next_single_test: TestName | None = None self.next_single_test: TestName | None = None
self.next_single_filename: StrPath | None = None self.next_single_filename: StrPath | None = None
def log(self, line=''): def log(self, line: str = '') -> None:
self.logger.log(line) self.logger.log(line)
def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]: def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]:
@ -230,11 +231,11 @@ class Regrtest:
return (tuple(selected), tests) return (tuple(selected), tests)
@staticmethod @staticmethod
def list_tests(tests: TestTuple): def list_tests(tests: TestTuple) -> None:
for name in tests: for name in tests:
print(name) print(name)
def _rerun_failed_tests(self, runtests: RunTests): def _rerun_failed_tests(self, runtests: RunTests) -> RunTests:
# Configure the runner to re-run tests # Configure the runner to re-run tests
if self.num_workers == 0 and not self.single_process: if self.num_workers == 0 and not self.single_process:
# Always run tests in fresh processes to have more deterministic # Always run tests in fresh processes to have more deterministic
@ -266,7 +267,7 @@ class Regrtest:
self.run_tests_sequentially(runtests) self.run_tests_sequentially(runtests)
return runtests return runtests
def rerun_failed_tests(self, runtests: RunTests): def rerun_failed_tests(self, runtests: RunTests) -> None:
if self.python_cmd: if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052 # Temp patch for https://github.com/python/cpython/issues/94052
self.log( self.log(
@ -335,7 +336,7 @@ class Regrtest:
if not self._run_bisect(runtests, name, progress): if not self._run_bisect(runtests, name, progress):
return return
def display_result(self, runtests): def display_result(self, runtests: RunTests) -> None:
# If running the test suite for PGO then no one cares about results. # If running the test suite for PGO then no one cares about results.
if runtests.pgo: if runtests.pgo:
return return
@ -365,7 +366,7 @@ class Regrtest:
return result return result
def run_tests_sequentially(self, runtests) -> None: def run_tests_sequentially(self, runtests: RunTests) -> None:
if self.coverage: if self.coverage:
tracer = trace.Trace(trace=False, count=True) tracer = trace.Trace(trace=False, count=True)
else: else:
@ -422,7 +423,7 @@ class Regrtest:
if previous_test: if previous_test:
print(previous_test) print(previous_test)
def get_state(self): def get_state(self) -> str:
state = self.results.get_state(self.fail_env_changed) state = self.results.get_state(self.fail_env_changed)
if self.first_state: if self.first_state:
state = f'{self.first_state} then {state}' state = f'{self.first_state} then {state}'
@ -452,7 +453,7 @@ class Regrtest:
if self.junit_filename: if self.junit_filename:
self.results.write_junit(self.junit_filename) self.results.write_junit(self.junit_filename)
def display_summary(self): def display_summary(self) -> None:
duration = time.perf_counter() - self.logger.start_time duration = time.perf_counter() - self.logger.start_time
filtered = bool(self.match_tests) filtered = bool(self.match_tests)
@ -466,7 +467,7 @@ class Regrtest:
state = self.get_state() state = self.get_state()
print(f"Result: {state}") print(f"Result: {state}")
def create_run_tests(self, tests: TestTuple): def create_run_tests(self, tests: TestTuple) -> RunTests:
return RunTests( return RunTests(
tests, tests,
fail_fast=self.fail_fast, fail_fast=self.fail_fast,
@ -674,9 +675,9 @@ class Regrtest:
f"Command: {cmd_text}") f"Command: {cmd_text}")
# continue executing main() # continue executing main()
def _add_python_opts(self): def _add_python_opts(self) -> None:
python_opts = [] python_opts: list[str] = []
regrtest_opts = [] regrtest_opts: list[str] = []
environ, keep_environ = self._add_cross_compile_opts(regrtest_opts) environ, keep_environ = self._add_cross_compile_opts(regrtest_opts)
if self.ci_mode: if self.ci_mode:
@ -709,7 +710,7 @@ class Regrtest:
self.tmp_dir = get_temp_dir(self.tmp_dir) self.tmp_dir = get_temp_dir(self.tmp_dir)
def main(self, tests: TestList | None = None): def main(self, tests: TestList | None = None) -> NoReturn:
if self.want_add_python_opts: if self.want_add_python_opts:
self._add_python_opts() self._add_python_opts()
@ -738,7 +739,7 @@ class Regrtest:
sys.exit(exitcode) sys.exit(exitcode)
def main(tests=None, _add_python_opts=False, **kwargs): def main(tests=None, _add_python_opts=False, **kwargs) -> NoReturn:
"""Run the Python suite.""" """Run the Python suite."""
ns = _parse_args(sys.argv[1:], **kwargs) ns = _parse_args(sys.argv[1:], **kwargs)
Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests) Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests)

View File

@ -50,7 +50,7 @@ PGO_TESTS = [
'test_xml_etree_c', 'test_xml_etree_c',
] ]
def setup_pgo_tests(cmdline_args, pgo_extended: bool): def setup_pgo_tests(cmdline_args, pgo_extended: bool) -> None:
if not cmdline_args and not pgo_extended: if not cmdline_args and not pgo_extended:
# run default set of tests for PGO training # run default set of tests for PGO training
cmdline_args[:] = PGO_TESTS[:] cmdline_args[:] = PGO_TESTS[:]

View File

@ -262,7 +262,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs):
sys._clear_internal_caches() sys._clear_internal_caches()
def warm_caches(): def warm_caches() -> None:
# char cache # char cache
s = bytes(range(256)) s = bytes(range(256))
for i in range(256): for i in range(256):

View File

@ -149,6 +149,7 @@ class TestResult:
case State.DID_NOT_RUN: case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests" return f"{self.test_name} ran no tests"
case State.TIMEOUT: case State.TIMEOUT:
assert self.duration is not None, "self.duration is None"
return f"{self.test_name} timed out ({format_duration(self.duration)})" return f"{self.test_name} timed out ({format_duration(self.duration)})"
case _: case _:
raise ValueError("unknown result state: {state!r}") raise ValueError("unknown result state: {state!r}")

View File

@ -71,7 +71,7 @@ class TestResults:
return ', '.join(state) return ', '.join(state)
def get_exitcode(self, fail_env_changed, fail_rerun): def get_exitcode(self, fail_env_changed: bool, fail_rerun: bool) -> int:
exitcode = 0 exitcode = 0
if self.bad: if self.bad:
exitcode = EXITCODE_BAD_TEST exitcode = EXITCODE_BAD_TEST
@ -87,7 +87,7 @@ class TestResults:
exitcode = EXITCODE_BAD_TEST exitcode = EXITCODE_BAD_TEST
return exitcode return exitcode
def accumulate_result(self, result: TestResult, runtests: RunTests): def accumulate_result(self, result: TestResult, runtests: RunTests) -> None:
test_name = result.test_name test_name = result.test_name
rerun = runtests.rerun rerun = runtests.rerun
fail_env_changed = runtests.fail_env_changed fail_env_changed = runtests.fail_env_changed
@ -135,7 +135,7 @@ class TestResults:
counts = {loc: 1 for loc in self.covered_lines} counts = {loc: 1 for loc in self.covered_lines}
return trace.CoverageResults(counts=counts) return trace.CoverageResults(counts=counts)
def need_rerun(self): def need_rerun(self) -> bool:
return bool(self.rerun_results) return bool(self.rerun_results)
def prepare_rerun(self, *, clear: bool = True) -> tuple[TestTuple, FilterDict]: def prepare_rerun(self, *, clear: bool = True) -> tuple[TestTuple, FilterDict]:
@ -158,7 +158,7 @@ class TestResults:
return (tuple(tests), match_tests_dict) return (tuple(tests), match_tests_dict)
def add_junit(self, xml_data: list[str]): def add_junit(self, xml_data: list[str]) -> None:
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
for e in xml_data: for e in xml_data:
try: try:
@ -167,7 +167,7 @@ class TestResults:
print(xml_data, file=sys.__stderr__) print(xml_data, file=sys.__stderr__)
raise raise
def write_junit(self, filename: StrPath): def write_junit(self, filename: StrPath) -> None:
if not self.testsuite_xml: if not self.testsuite_xml:
# Don't create empty XML file # Don't create empty XML file
return return
@ -192,7 +192,7 @@ class TestResults:
for s in ET.tostringlist(root): for s in ET.tostringlist(root):
f.write(s) f.write(s)
def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool): def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool) -> None:
if print_slowest: if print_slowest:
self.test_times.sort(reverse=True) self.test_times.sort(reverse=True)
print() print()
@ -234,7 +234,7 @@ class TestResults:
print() print()
print("Test suite interrupted by signal SIGINT.") print("Test suite interrupted by signal SIGINT.")
def display_summary(self, first_runtests: RunTests, filtered: bool): def display_summary(self, first_runtests: RunTests, filtered: bool) -> None:
# Total tests # Total tests
stats = self.stats stats = self.stats
text = f'run={stats.tests_run:,}' text = f'run={stats.tests_run:,}'

View File

@ -5,12 +5,12 @@ import os
import shlex import shlex
import subprocess import subprocess
import sys import sys
from typing import Any from typing import Any, Iterator
from test import support from test import support
from .utils import ( from .utils import (
StrPath, StrJSON, TestTuple, TestFilter, FilterTuple, FilterDict) StrPath, StrJSON, TestTuple, TestName, TestFilter, FilterTuple, FilterDict)
class JsonFileType: class JsonFileType:
@ -41,8 +41,8 @@ class JsonFile:
popen_kwargs['startupinfo'] = startupinfo popen_kwargs['startupinfo'] = startupinfo
@contextlib.contextmanager @contextlib.contextmanager
def inherit_subprocess(self): def inherit_subprocess(self) -> Iterator[None]:
if self.file_type == JsonFileType.WINDOWS_HANDLE: if sys.platform == 'win32' and self.file_type == JsonFileType.WINDOWS_HANDLE:
os.set_handle_inheritable(self.file, True) os.set_handle_inheritable(self.file, True)
try: try:
yield yield
@ -106,25 +106,25 @@ class RunTests:
state.update(override) state.update(override)
return RunTests(**state) return RunTests(**state)
def create_worker_runtests(self, **override): def create_worker_runtests(self, **override) -> WorkerRunTests:
state = dataclasses.asdict(self) state = dataclasses.asdict(self)
state.update(override) state.update(override)
return WorkerRunTests(**state) return WorkerRunTests(**state)
def get_match_tests(self, test_name) -> FilterTuple | None: def get_match_tests(self, test_name: TestName) -> FilterTuple | None:
if self.match_tests_dict is not None: if self.match_tests_dict is not None:
return self.match_tests_dict.get(test_name, None) return self.match_tests_dict.get(test_name, None)
else: else:
return None return None
def get_jobs(self): def get_jobs(self) -> int | None:
# Number of run_single_test() calls needed to run all tests. # Number of run_single_test() calls needed to run all tests.
# None means that there is not bound limit (--forever option). # None means that there is not bound limit (--forever option).
if self.forever: if self.forever:
return None return None
return len(self.tests) return len(self.tests)
def iter_tests(self): def iter_tests(self) -> Iterator[TestName]:
if self.forever: if self.forever:
while True: while True:
yield from self.tests yield from self.tests

View File

@ -25,9 +25,10 @@ def setup_test_dir(testdir: str | None) -> None:
sys.path.insert(0, os.path.abspath(testdir)) sys.path.insert(0, os.path.abspath(testdir))
def setup_process(): def setup_process() -> None:
fix_umask() fix_umask()
assert sys.__stderr__ is not None, "sys.__stderr__ is None"
try: try:
stderr_fd = sys.__stderr__.fileno() stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError): except (ValueError, AttributeError):
@ -35,7 +36,7 @@ def setup_process():
# and ValueError on a closed stream. # and ValueError on a closed stream.
# #
# Catch AttributeError for stderr being None. # Catch AttributeError for stderr being None.
stderr_fd = None pass
else: else:
# Display the Python traceback on fatal errors (e.g. segfault) # Display the Python traceback on fatal errors (e.g. segfault)
faulthandler.enable(all_threads=True, file=stderr_fd) faulthandler.enable(all_threads=True, file=stderr_fd)
@ -68,7 +69,7 @@ def setup_process():
for index, path in enumerate(module.__path__): for index, path in enumerate(module.__path__):
module.__path__[index] = os.path.abspath(path) module.__path__[index] = os.path.abspath(path)
if getattr(module, '__file__', None): if getattr(module, '__file__', None):
module.__file__ = os.path.abspath(module.__file__) module.__file__ = os.path.abspath(module.__file__) # type: ignore[type-var]
if hasattr(sys, 'addaudithook'): if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested # Add an auditing hook for all tests to ensure PySys_Audit is tested
@ -87,7 +88,7 @@ def setup_process():
os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII) os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
def setup_tests(runtests: RunTests): def setup_tests(runtests: RunTests) -> None:
support.verbose = runtests.verbose support.verbose = runtests.verbose
support.failfast = runtests.fail_fast support.failfast = runtests.fail_fast
support.PGO = runtests.pgo support.PGO = runtests.pgo

View File

@ -28,6 +28,6 @@ TSAN_TESTS = [
] ]
def setup_tsan_tests(cmdline_args): def setup_tsan_tests(cmdline_args) -> None:
if not cmdline_args: if not cmdline_args:
cmdline_args[:] = TSAN_TESTS[:] cmdline_args[:] = TSAN_TESTS[:]

View File

@ -58,7 +58,7 @@ FilterTuple = tuple[TestName, ...]
FilterDict = dict[TestName, FilterTuple] FilterDict = dict[TestName, FilterTuple]
def format_duration(seconds): def format_duration(seconds: float) -> str:
ms = math.ceil(seconds * 1e3) ms = math.ceil(seconds * 1e3)
seconds, ms = divmod(ms, 1000) seconds, ms = divmod(ms, 1000)
minutes, seconds = divmod(seconds, 60) minutes, seconds = divmod(seconds, 60)
@ -92,7 +92,7 @@ def strip_py_suffix(names: list[str] | None) -> None:
names[idx] = basename names[idx] = basename
def plural(n, singular, plural=None): def plural(n: int, singular: str, plural: str | None = None) -> str:
if n == 1: if n == 1:
return singular return singular
elif plural is not None: elif plural is not None:
@ -101,7 +101,7 @@ def plural(n, singular, plural=None):
return singular + 's' return singular + 's'
def count(n, word): def count(n: int, word: str) -> str:
if n == 1: if n == 1:
return f"{n} {word}" return f"{n} {word}"
else: else:
@ -123,14 +123,14 @@ def printlist(x, width=70, indent=4, file=None):
file=file) file=file)
def print_warning(msg): def print_warning(msg: str) -> None:
support.print_warning(msg) support.print_warning(msg)
orig_unraisablehook = None orig_unraisablehook: Callable[..., None] | None = None
def regrtest_unraisable_hook(unraisable): def regrtest_unraisable_hook(unraisable) -> None:
global orig_unraisablehook global orig_unraisablehook
support.environment_altered = True support.environment_altered = True
support.print_warning("Unraisable exception") support.print_warning("Unraisable exception")
@ -138,22 +138,23 @@ def regrtest_unraisable_hook(unraisable):
try: try:
support.flush_std_streams() support.flush_std_streams()
sys.stderr = support.print_warning.orig_stderr sys.stderr = support.print_warning.orig_stderr
assert orig_unraisablehook is not None, "orig_unraisablehook not set"
orig_unraisablehook(unraisable) orig_unraisablehook(unraisable)
sys.stderr.flush() sys.stderr.flush()
finally: finally:
sys.stderr = old_stderr sys.stderr = old_stderr
def setup_unraisable_hook(): def setup_unraisable_hook() -> None:
global orig_unraisablehook global orig_unraisablehook
orig_unraisablehook = sys.unraisablehook orig_unraisablehook = sys.unraisablehook
sys.unraisablehook = regrtest_unraisable_hook sys.unraisablehook = regrtest_unraisable_hook
orig_threading_excepthook = None orig_threading_excepthook: Callable[..., None] | None = None
def regrtest_threading_excepthook(args): def regrtest_threading_excepthook(args) -> None:
global orig_threading_excepthook global orig_threading_excepthook
support.environment_altered = True support.environment_altered = True
support.print_warning(f"Uncaught thread exception: {args.exc_type.__name__}") support.print_warning(f"Uncaught thread exception: {args.exc_type.__name__}")
@ -161,13 +162,14 @@ def regrtest_threading_excepthook(args):
try: try:
support.flush_std_streams() support.flush_std_streams()
sys.stderr = support.print_warning.orig_stderr sys.stderr = support.print_warning.orig_stderr
assert orig_threading_excepthook is not None, "orig_threading_excepthook not set"
orig_threading_excepthook(args) orig_threading_excepthook(args)
sys.stderr.flush() sys.stderr.flush()
finally: finally:
sys.stderr = old_stderr sys.stderr = old_stderr
def setup_threading_excepthook(): def setup_threading_excepthook() -> None:
global orig_threading_excepthook global orig_threading_excepthook
import threading import threading
orig_threading_excepthook = threading.excepthook orig_threading_excepthook = threading.excepthook
@ -476,7 +478,7 @@ def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
return os.path.abspath(tmp_dir) return os.path.abspath(tmp_dir)
def fix_umask(): def fix_umask() -> None:
if support.is_emscripten: if support.is_emscripten:
# Emscripten has default umask 0o777, which breaks some tests. # Emscripten has default umask 0o777, which breaks some tests.
# see https://github.com/emscripten-core/emscripten/issues/17269 # see https://github.com/emscripten-core/emscripten/issues/17269
@ -572,7 +574,8 @@ _TEST_LIFECYCLE_HOOKS = frozenset((
'setUpModule', 'tearDownModule', 'setUpModule', 'tearDownModule',
)) ))
def normalize_test_name(test_full_name, *, is_error=False): def normalize_test_name(test_full_name: str, *,
is_error: bool = False) -> str | None:
short_name = test_full_name.split(" ")[0] short_name = test_full_name.split(" ")[0]
if is_error and short_name in _TEST_LIFECYCLE_HOOKS: if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
if test_full_name.startswith(('setUpModule (', 'tearDownModule (')): if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
@ -593,7 +596,7 @@ def normalize_test_name(test_full_name, *, is_error=False):
return short_name return short_name
def adjust_rlimit_nofile(): def adjust_rlimit_nofile() -> None:
""" """
On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256) On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
for our test suite to succeed. Raise it to something more reasonable. 1024 for our test suite to succeed. Raise it to something more reasonable. 1024
@ -619,17 +622,17 @@ def adjust_rlimit_nofile():
f"{new_fd_limit}: {err}.") f"{new_fd_limit}: {err}.")
def get_host_runner(): def get_host_runner() -> str:
if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None: if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
hostrunner = sysconfig.get_config_var("HOSTRUNNER") hostrunner = sysconfig.get_config_var("HOSTRUNNER")
return hostrunner return hostrunner
def is_cross_compiled(): def is_cross_compiled() -> bool:
return ('_PYTHON_HOST_PLATFORM' in os.environ) return ('_PYTHON_HOST_PLATFORM' in os.environ)
def format_resources(use_resources: Iterable[str]): def format_resources(use_resources: Iterable[str]) -> str:
use_resources = set(use_resources) use_resources = set(use_resources)
all_resources = set(ALL_RESOURCES) all_resources = set(ALL_RESOURCES)
@ -654,7 +657,7 @@ def format_resources(use_resources: Iterable[str]):
def display_header(use_resources: tuple[str, ...], def display_header(use_resources: tuple[str, ...],
python_cmd: tuple[str, ...] | None): python_cmd: tuple[str, ...] | None) -> None:
# Print basic platform information # Print basic platform information
print("==", platform.python_implementation(), *sys.version.split()) print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True), print("==", platform.platform(aliased=True),
@ -732,7 +735,7 @@ def display_header(use_resources: tuple[str, ...],
print(flush=True) print(flush=True)
def cleanup_temp_dir(tmp_dir: StrPath): def cleanup_temp_dir(tmp_dir: StrPath) -> None:
import glob import glob
path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*') path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
@ -763,5 +766,5 @@ def _sanitize_xml_replace(regs):
return ''.join(f'\\x{ord(ch):02x}' if ch <= '\xff' else ascii(ch)[1:-1] return ''.join(f'\\x{ord(ch):02x}' if ch <= '\xff' else ascii(ch)[1:-1]
for ch in text) for ch in text)
def sanitize_xml(text): def sanitize_xml(text: str) -> str:
return ILLEGAL_XML_CHARS_RE.sub(_sanitize_xml_replace, text) return ILLEGAL_XML_CHARS_RE.sub(_sanitize_xml_replace, text)

View File

@ -104,7 +104,7 @@ def worker_process(worker_json: StrJSON) -> NoReturn:
sys.exit(0) sys.exit(0)
def main(): def main() -> NoReturn:
if len(sys.argv) != 2: if len(sys.argv) != 2:
print("usage: python -m test.libregrtest.worker JSON") print("usage: python -m test.libregrtest.worker JSON")
sys.exit(1) sys.exit(1)

View File

@ -1,6 +1,6 @@
# Requirements file for external linters and checks we run on # Requirements file for external linters and checks we run on
# Tools/clinic, Tools/cases_generator/, and Tools/peg_generator/ in CI # Tools/clinic, Tools/cases_generator/, and Tools/peg_generator/ in CI
mypy==1.12 mypy==1.13
# needed for peg_generator: # needed for peg_generator:
types-psutil==6.0.0.20240901 types-psutil==6.0.0.20240901