gh-109162: Refactor libregrtest WorkerJob (#109171)

* Rename --worker-args command line option to --worker-json.
* Rename _parse_worker_args() to _parse_worker_json().
* WorkerJob:

  * Add runtests attribute
  * Remove test_name and rerun attribute

* Rename run_test_in_subprocess() to create_worker_process().
* Rename run_tests_worker() to worker_process().
* create_worker_process() uses json.dump(): write directly JSON to
  stdout.
* Convert MultiprocessResult to a frozen dataclass.
* Rename RunTests.match_tests to RunTests.match_tests_dict.
This commit is contained in:
Victor Stinner 2023-09-09 03:03:39 +02:00 committed by GitHub
parent 489ca0acf0
commit a56c928756
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 41 deletions

View File

@ -170,6 +170,7 @@ class Namespace(argparse.Namespace):
self.ignore_tests = None
self.pgo = False
self.pgo_extended = False
self.worker_json = None
super().__init__(**kwargs)
@ -205,7 +206,7 @@ def _create_parser():
group.add_argument('--wait', action='store_true',
help='wait for user input, e.g., allow a debugger '
'to be attached')
group.add_argument('--worker-args', metavar='ARGS')
group.add_argument('--worker-json', metavar='ARGS')
group.add_argument('-S', '--start', metavar='START',
help='the name of the test at which to start.' +
more_details)

View File

@ -336,7 +336,7 @@ class Regrtest:
# Get tests to re-run
tests = [result.test_name for result in need_rerun]
match_tests = self.get_rerun_match(need_rerun)
match_tests_dict = self.get_rerun_match(need_rerun)
# Clear previously failed tests
self.rerun_bad.extend(self.bad)
@ -346,7 +346,7 @@ class Regrtest:
# Re-run failed tests
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
runtests = runtests.copy(tests=tuple(tests),
match_tests=match_tests,
match_tests_dict=match_tests_dict,
rerun=True,
forever=False)
self.set_tests(runtests)
@ -742,7 +742,7 @@ class Regrtest:
self.tmp_dir = os.path.abspath(self.tmp_dir)
def is_worker(self):
return (self.ns.worker_args is not None)
return (self.ns.worker_json is not None)
def create_temp_dir(self):
os.makedirs(self.tmp_dir, exist_ok=True)
@ -870,8 +870,8 @@ class Regrtest:
def _main(self):
if self.is_worker():
from test.libregrtest.runtest_mp import run_tests_worker
run_tests_worker(self.ns.worker_args)
from test.libregrtest.runtest_mp import worker_process
worker_process(self.ns.worker_json)
return
if self.want_wait:

View File

@ -208,7 +208,7 @@ class TestResult:
@dataclasses.dataclass(slots=True, frozen=True)
class RunTests:
tests: TestTuple
match_tests: FilterDict | None = None
match_tests_dict: FilterDict | None = None
rerun: bool = False
forever: bool = False
@ -218,8 +218,8 @@ class RunTests:
return RunTests(**state)
def get_match_tests(self, test_name) -> FilterTuple | None:
if self.match_tests is not None:
return self.match_tests.get(test_name, None)
if self.match_tests_dict is not None:
return self.match_tests_dict.get(test_name, None)
else:
return None

View File

@ -46,9 +46,8 @@ USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
@dataclasses.dataclass(slots=True)
class WorkerJob:
test_name: str
runtests: RunTests
namespace: Namespace
rerun: bool = False
match_tests: FilterTuple | None = None
@ -70,6 +69,7 @@ class _EncodeWorkerJob(json.JSONEncoder):
def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
if "__worker_job__" in d:
d.pop('__worker_job__')
d['runtests'] = RunTests(**d['runtests'])
return WorkerJob(**d)
if "__namespace__" in d:
d.pop('__namespace__')
@ -78,17 +78,16 @@ def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
return d
def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
return json.loads(worker_json,
object_hook=_decode_worker_job)
def _parse_worker_json(worker_json: str) -> tuple[Namespace, str]:
return json.loads(worker_json, object_hook=_decode_worker_job)
def run_test_in_subprocess(worker_job: WorkerJob,
output_file: TextIO,
tmp_dir: str | None = None) -> subprocess.Popen:
def create_worker_process(worker_job: WorkerJob,
output_file: TextIO,
tmp_dir: str | None = None) -> subprocess.Popen:
ns = worker_job.namespace
python = ns.python
worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
worker_json = json.dumps(worker_job, cls=_EncodeWorkerJob)
if python is not None:
executable = python
@ -97,7 +96,7 @@ def run_test_in_subprocess(worker_job: WorkerJob,
cmd = [*executable, *support.args_from_interpreter_flags(),
'-u', # Unbuffered stdout and stderr
'-m', 'test.regrtest',
'--worker-args', worker_args]
'--worker-json', worker_json]
env = dict(os.environ)
if tmp_dir is not None:
@ -122,16 +121,16 @@ def run_test_in_subprocess(worker_job: WorkerJob,
return subprocess.Popen(cmd, **kw)
def run_tests_worker(worker_json: str) -> NoReturn:
worker_job = _parse_worker_args(worker_json)
def worker_process(worker_json: str) -> NoReturn:
worker_job = _parse_worker_json(worker_json)
runtests = worker_job.runtests
ns = worker_job.namespace
test_name = worker_job.test_name
rerun = worker_job.rerun
match_tests = worker_job.match_tests
test_name = runtests.tests[0]
match_tests: FilterTuple | None = worker_job.match_tests
setup_tests(ns)
if rerun:
if runtests.rerun:
if match_tests:
matching = "matching: " + ", ".join(match_tests)
print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
@ -139,14 +138,15 @@ def run_tests_worker(worker_json: str) -> NoReturn:
print(f"Re-running {test_name} in verbose mode", flush=True)
ns.verbose = True
if match_tests is not None:
ns.match_tests = match_tests
if match_tests is not None:
ns.match_tests = match_tests
result = runtest(ns, test_name)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
print(json.dumps(result, cls=EncodeTestResult), flush=True)
json.dump(result, sys.stdout, cls=EncodeTestResult)
sys.stdout.flush()
sys.exit(0)
@ -173,7 +173,8 @@ class MultiprocessIterator:
self.tests_iter = None
class MultiprocessResult(NamedTuple):
@dataclasses.dataclass(slots=True, frozen=True)
class MultiprocessResult:
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
worker_stdout: str | None = None
@ -198,7 +199,6 @@ class TestWorkerProcess(threading.Thread):
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.rerun = runner.rerun
self.current_test_name = None
self.start_time = None
self._popen = None
@ -264,9 +264,8 @@ class TestWorkerProcess(threading.Thread):
def _run_process(self, worker_job, output_file: TextIO,
tmp_dir: str | None = None) -> int:
self.current_test_name = worker_job.test_name
try:
popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
popen = create_worker_process(worker_job, output_file, tmp_dir)
self._killed = False
self._popen = popen
@ -316,6 +315,8 @@ class TestWorkerProcess(threading.Thread):
self.current_test_name = None
def _runtest(self, test_name: str) -> MultiprocessResult:
self.current_test_name = test_name
if sys.platform == 'win32':
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
# page for the sys.stdout encoding. If the main process runs in a
@ -324,15 +325,20 @@ class TestWorkerProcess(threading.Thread):
else:
encoding = sys.stdout.encoding
match_tests = self.runtests.get_match_tests(test_name)
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
worker_runtests = self.runtests.copy(tests=tests)
worker_job = WorkerJob(
worker_runtests,
namespace=self.ns,
match_tests=match_tests)
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
worker_job = WorkerJob(test_name,
namespace=self.ns,
rerun=self.rerun,
match_tests=match_tests)
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.

View File

@ -75,10 +75,10 @@ class ParseArgsTestCase(unittest.TestCase):
ns = libregrtest._parse_args(['--wait'])
self.assertTrue(ns.wait)
def test_worker_args(self):
ns = libregrtest._parse_args(['--worker-args', '[[], {}]'])
self.assertEqual(ns.worker_args, '[[], {}]')
self.checkError(['--worker-args'], 'expected one argument')
def test_worker_json(self):
ns = libregrtest._parse_args(['--worker-json', '[[], {}]'])
self.assertEqual(ns.worker_json, '[[], {}]')
self.checkError(['--worker-json'], 'expected one argument')
def test_start(self):
for opt in '-S', '--start':