gh-109276: libregrtest: WASM use filename for JSON (#109340)

On Emscripten and WASI platforms, or if --python command line option
is used, libregrtest now uses a filename for the JSON file.

Emscripten and WASI buildbot workers run the main test process with a
different Python (Linux) which spawns Emscripten/WASI processes using
the command specified in --python command line option. Passing a file
descriptor from the parent process to the child process doesn't work
in this case.

* Add JsonFile and JsonFileType classes
* Add RunTests.json_file_use_filename() method.
* Add a test in test_regrtest on the --python command line option.
* test_regrtest: add parallel=False parameter.
* Split long RunWorkers._runtest() function into sub-functions.
This commit is contained in:
Victor Stinner 2023-09-13 00:41:25 +02:00 committed by GitHub
parent 5dcbbd8861
commit 75cdd9a904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 270 additions and 137 deletions

View File

@ -406,7 +406,7 @@ class Regrtest:
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
json_fd=None,
json_file=None,
)
def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:

View File

@ -1,3 +1,4 @@
import contextlib
import dataclasses
import faulthandler
import os.path
@ -9,7 +10,7 @@ import tempfile
import threading
import time
import traceback
from typing import Literal
from typing import Literal, TextIO
from test import support
from test.support import os_helper
@ -17,10 +18,10 @@ from test.support import os_helper
from .logger import Logger
from .result import TestResult, State
from .results import TestResults
from .runtests import RunTests
from .runtests import RunTests, JsonFile, JsonFileType
from .single import PROGRESS_MIN_TIME
from .utils import (
StrPath, StrJSON, TestName, MS_WINDOWS,
StrPath, StrJSON, TestName, MS_WINDOWS, TMP_PREFIX,
format_duration, print_warning, count, plural)
from .worker import create_worker_process, USE_PROCESS_GROUP
@ -83,6 +84,17 @@ class ExitThread(Exception):
pass
class WorkerError(Exception):
def __init__(self,
test_name: TestName,
err_msg: str | None,
stdout: str | None,
state: str = State.MULTIPROCESSING_ERROR):
result = TestResult(test_name, state=state)
self.mp_result = MultiprocessResult(result, stdout, err_msg)
super().__init__()
class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
@ -92,7 +104,7 @@ class WorkerThread(threading.Thread):
self.output = runner.output
self.timeout = runner.worker_timeout
self.log = runner.log
self.current_test_name = None
self.test_name = None
self.start_time = None
self._popen = None
self._killed = False
@ -104,7 +116,7 @@ class WorkerThread(threading.Thread):
info.append("running")
else:
info.append('stopped')
test = self.current_test_name
test = self.test_name
if test:
info.append(f'test={test}')
popen = self._popen
@ -147,25 +159,11 @@ class WorkerThread(threading.Thread):
self._stopped = True
self._kill()
def mp_result_error(
self,
test_result: TestResult,
stdout: str | None = None,
err_msg=None
) -> MultiprocessResult:
return MultiprocessResult(test_result, stdout, err_msg)
def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int,
def _run_process(self, runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> int:
try:
popen = create_worker_process(runtests, output_fd, json_fd,
tmp_dir)
self._killed = False
self._popen = popen
except:
self.current_test_name = None
raise
popen = create_worker_process(runtests, output_fd, tmp_dir)
self._popen = popen
self._killed = False
try:
if self._stopped:
@ -206,10 +204,9 @@ class WorkerThread(threading.Thread):
finally:
self._wait_completed()
self._popen = None
self.current_test_name = None
def _runtest(self, test_name: TestName) -> MultiprocessResult:
self.current_test_name = test_name
def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
"""Create stdout temporay file (file descriptor)."""
if MS_WINDOWS:
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
@ -219,85 +216,135 @@ class WorkerThread(threading.Thread):
else:
encoding = sys.stdout.encoding
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
stdout_file = tempfile.TemporaryFile('w+', encoding=encoding)
stack.enter_context(stdout_file)
return stdout_file
def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
"""Create JSON file."""
json_file_use_filename = self.runtests.json_file_use_filename()
if json_file_use_filename:
# create an empty file to make the creation atomic
# (to prevent races with other worker threads)
prefix = TMP_PREFIX + 'json_'
json_fd, json_filename = tempfile.mkstemp(prefix=prefix)
os.close(json_fd)
stack.callback(os_helper.unlink, json_filename)
json_file = JsonFile(json_filename, JsonFileType.FILENAME)
json_tmpfile = None
else:
json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
stack.enter_context(json_tmpfile)
json_fd = json_tmpfile.fileno()
if MS_WINDOWS:
json_handle = msvcrt.get_osfhandle(json_fd)
json_file = JsonFile(json_handle,
JsonFileType.WINDOWS_HANDLE)
else:
json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
return (json_file, json_tmpfile)
def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests:
"""Create the worker RunTests."""
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
err_msg = None
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with (tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file,
tempfile.TemporaryFile('w+', encoding='utf8') as json_file):
stdout_fd = stdout_file.fileno()
json_fd = json_file.fileno()
if MS_WINDOWS:
json_fd = msvcrt.get_osfhandle(json_fd)
kwargs = {}
if match_tests:
kwargs['match_tests'] = match_tests
return self.runtests.copy(
tests=tests,
json_file=json_file,
**kwargs)
kwargs = {}
if match_tests:
kwargs['match_tests'] = match_tests
worker_runtests = self.runtests.copy(
tests=tests,
json_fd=json_fd,
**kwargs)
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
if not support.is_wasi:
# Don't check for leaked temporary files and directories if Python is
# run on WASI. WASI don't pass environment variables like TMPDIR to
# worker processes.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(worker_runtests,
stdout_fd, json_fd, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
def run_tmp_files(self, worker_runtests: RunTests,
stdout_fd: int) -> (int, list[StrPath]):
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
if not support.is_wasi:
# Don't check for leaked temporary files and directories if Python is
# run on WASI. WASI don't pass environment variables like TMPDIR to
# worker processes.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
retcode = self._run_process(worker_runtests,
stdout_fd, json_fd)
tmp_files = ()
stdout_file.seek(0)
stdout_fd, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
retcode = self._run_process(worker_runtests, stdout_fd)
tmp_files = []
try:
stdout = stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Cannot read process stdout: {exc}"
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, err_msg=err_msg)
return (retcode, tmp_files)
try:
# deserialize run_tests_worker() output
json_file.seek(0)
worker_json: StrJSON = json_file.read()
if worker_json:
result = TestResult.from_json(worker_json)
else:
err_msg = "empty JSON"
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Fail to read or parser worker process JSON: {exc}"
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, stdout, err_msg=err_msg)
def read_stdout(self, stdout_file: TextIO) -> str:
stdout_file.seek(0)
try:
return stdout_file.read().strip()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
raise WorkerError(self.test_name,
f"Cannot read process stdout: {exc}", None)
if retcode is None:
result = TestResult(test_name, state=State.TIMEOUT)
return self.mp_result_error(result, stdout)
def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
stdout: str) -> TestResult:
try:
if json_tmpfile is not None:
json_tmpfile.seek(0)
worker_json: StrJSON = json_tmpfile.read()
else:
with json_file.open(encoding='utf8') as json_fp:
worker_json: StrJSON = json_fp.read()
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to read worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
state=State.MULTIPROCESSING_ERROR)
if not worker_json:
raise WorkerError(self.test_name, "empty JSON", stdout)
try:
return TestResult.from_json(worker_json)
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to parse worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
state=State.MULTIPROCESSING_ERROR)
def _runtest(self, test_name: TestName) -> MultiprocessResult:
with contextlib.ExitStack() as stack:
stdout_file = self.create_stdout(stack)
json_file, json_tmpfile = self.create_json_file(stack)
worker_runtests = self.create_worker_runtests(test_name, json_file)
retcode, tmp_files = self.run_tmp_files(worker_runtests,
stdout_file.fileno())
stdout = self.read_stdout(stdout_file)
if retcode is None:
raise WorkerError(self.test_name, None, stdout, state=State.TIMEOUT)
result = self.read_json(json_file, json_tmpfile, stdout)
if retcode != 0:
err_msg = "Exit code %s" % retcode
if err_msg:
result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
return self.mp_result_error(result, stdout, err_msg)
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout)
if tmp_files:
msg = (f'\n\n'
@ -319,7 +366,13 @@ class WorkerThread(threading.Thread):
break
self.start_time = time.monotonic()
mp_result = self._runtest(test_name)
self.test_name = test_name
try:
mp_result = self._runtest(test_name)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = None
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
@ -367,12 +420,12 @@ class WorkerThread(threading.Thread):
def get_running(workers: list[WorkerThread]) -> list[str]:
running = []
for worker in workers:
current_test_name = worker.current_test_name
if not current_test_name:
test_name = worker.test_name
if not test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
text = '%s (%s)' % (current_test_name, format_duration(dt))
text = f'{test_name} ({format_duration(dt)})'
running.append(text)
if not running:
return None

View File

@ -1,11 +1,62 @@
import contextlib
import dataclasses
import json
import os
import subprocess
from typing import Any
from test import support
from .utils import (
StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
class JsonFileType:
UNIX_FD = "UNIX_FD"
WINDOWS_HANDLE = "WINDOWS_HANDLE"
FILENAME = "FILENAME"
@dataclasses.dataclass(slots=True, frozen=True)
class JsonFile:
# See RunTests.json_file_use_filename()
file: int | StrPath
file_type: str
def configure_subprocess(self, popen_kwargs: dict) -> None:
match self.file_type:
case JsonFileType.UNIX_FD:
# Unix file descriptor
popen_kwargs['pass_fds'] = [self.file]
case JsonFileType.WINDOWS_HANDLE:
# Windows handle
startupinfo = subprocess.STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": [self.file]}
popen_kwargs['startupinfo'] = startupinfo
case JsonFileType.FILENAME:
# Filename: nothing to do to
pass
@contextlib.contextmanager
def inherit_subprocess(self):
if self.file_type == JsonFileType.WINDOWS_HANDLE:
os.set_handle_inheritable(self.file, True)
try:
yield
finally:
os.set_handle_inheritable(self.file, False)
else:
yield
def open(self, mode='r', *, encoding):
file = self.file
if self.file_type == JsonFileType.WINDOWS_HANDLE:
import msvcrt
# Create a file descriptor from the handle
file = msvcrt.open_osfhandle(file, os.O_WRONLY)
return open(file, mode, encoding=encoding)
@dataclasses.dataclass(slots=True, frozen=True)
class HuntRefleak:
warmups: int
@ -38,9 +89,7 @@ class RunTests:
python_cmd: tuple[str] | None
randomize: bool
random_seed: int | None
# On Unix, it's a file descriptor.
# On Windows, it's a handle.
json_fd: int | None
json_file: JsonFile | None
def copy(self, **override):
state = dataclasses.asdict(self)
@ -74,6 +123,17 @@ class RunTests:
def from_json(worker_json: StrJSON) -> 'RunTests':
return json.loads(worker_json, object_hook=_decode_runtests)
def json_file_use_filename(self) -> bool:
# json_file type depends on the platform:
# - Unix: file descriptor (int)
# - Windows: handle (int)
# - Emscripten/WASI or if --python is used: filename (str)
return (
bool(self.python_cmd)
or support.is_emscripten
or support.is_wasi
)
class _EncodeRunTests(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
@ -90,6 +150,8 @@ def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
data.pop('__runtests__')
if data['hunt_refleak']:
data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
if data['json_file']:
data['json_file'] = JsonFile(**data['json_file'])
return RunTests(**data)
else:
return data

View File

@ -17,8 +17,12 @@ from test.support import threading_helper
MS_WINDOWS = (sys.platform == 'win32')
WORK_DIR_PREFIX = 'test_python_'
WORKER_WORK_DIR_PREFIX = f'{WORK_DIR_PREFIX}worker_'
# All temporary files and temporary directories created by libregrtest should
# use TMP_PREFIX so cleanup_temp_dir() can remove them all.
TMP_PREFIX = 'test_python_'
WORK_DIR_PREFIX = TMP_PREFIX
WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
# Used to protect against threading._shutdown() hang.
@ -387,7 +391,7 @@ def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
# testing (see the -j option).
# Emscripten and WASI have stubbed getpid(), Emscripten has only
# milisecond clock resolution. Use randint() instead.
if sys.platform in {"emscripten", "wasi"}:
if support.is_emscripten or support.is_wasi:
nounce = random.randint(0, 1_000_000)
else:
nounce = os.getpid()
@ -580,7 +584,7 @@ def display_header():
def cleanup_temp_dir(tmp_dir: StrPath):
import glob
path = os.path.join(glob.escape(tmp_dir), WORK_DIR_PREFIX + '*')
path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):

View File

@ -7,18 +7,17 @@ from test import support
from test.support import os_helper
from .setup import setup_process, setup_test_dir
from .runtests import RunTests
from .runtests import RunTests, JsonFile
from .single import run_single_test
from .utils import (
StrPath, StrJSON, FilterTuple, MS_WINDOWS,
StrPath, StrJSON, FilterTuple,
get_temp_dir, get_work_dir, exit_timeout)
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
def create_worker_process(runtests: RunTests,
output_fd: int, json_fd: int,
def create_worker_process(runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> subprocess.Popen:
python_cmd = runtests.python_cmd
worker_json = runtests.as_json()
@ -55,34 +54,24 @@ def create_worker_process(runtests: RunTests,
close_fds=True,
cwd=work_dir,
)
if not MS_WINDOWS:
kwargs['pass_fds'] = [json_fd]
else:
startupinfo = subprocess.STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": [json_fd]}
kwargs['startupinfo'] = startupinfo
if USE_PROCESS_GROUP:
kwargs['start_new_session'] = True
if MS_WINDOWS:
os.set_handle_inheritable(json_fd, True)
try:
# Pass json_file to the worker process
json_file = runtests.json_file
json_file.configure_subprocess(kwargs)
with json_file.inherit_subprocess():
return subprocess.Popen(cmd, **kwargs)
finally:
if MS_WINDOWS:
os.set_handle_inheritable(json_fd, False)
def worker_process(worker_json: StrJSON) -> NoReturn:
runtests = RunTests.from_json(worker_json)
test_name = runtests.tests[0]
match_tests: FilterTuple | None = runtests.match_tests
json_fd: int = runtests.json_fd
if MS_WINDOWS:
import msvcrt
json_fd = msvcrt.open_osfhandle(json_fd, os.O_WRONLY)
# json_file type depends on the platform:
# - Unix: file descriptor (int)
# - Windows: handle (int)
# - Emscripten/WASI or if --python is used: filename (str)
json_file: JsonFile = runtests.json_file
setup_test_dir(runtests.test_dir)
setup_process()
@ -96,8 +85,8 @@ def worker_process(worker_json: StrJSON) -> NoReturn:
result = run_single_test(test_name, runtests)
with open(json_fd, 'w', encoding='utf-8') as json_file:
result.write_json_into(json_file)
with json_file.open('w', encoding='utf-8') as json_fp:
result.write_json_into(json_fp)
sys.exit(0)

View File

@ -13,6 +13,7 @@ import os.path
import platform
import random
import re
import shlex
import subprocess
import sys
import sysconfig
@ -432,13 +433,14 @@ class BaseTestCase(unittest.TestCase):
parser = re.finditer(regex, output, re.MULTILINE)
return list(match.group(1) for match in parser)
def check_executed_tests(self, output, tests, skipped=(), failed=(),
def check_executed_tests(self, output, tests, *, stats,
skipped=(), failed=(),
env_changed=(), omitted=(),
rerun=None, run_no_tests=(),
resource_denied=(),
randomize=False, interrupted=False,
randomize=False, parallel=False, interrupted=False,
fail_env_changed=False,
*, stats, forever=False, filtered=False):
forever=False, filtered=False):
if isinstance(tests, str):
tests = [tests]
if isinstance(skipped, str):
@ -455,6 +457,8 @@ class BaseTestCase(unittest.TestCase):
run_no_tests = [run_no_tests]
if isinstance(stats, int):
stats = TestStats(stats)
if parallel:
randomize = True
rerun_failed = []
if rerun is not None:
@ -1120,7 +1124,7 @@ class ArgsTestCase(BaseTestCase):
tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, tests, failed=crash_test,
randomize=True, stats=0)
parallel=True, stats=0)
def parse_methods(self, output):
regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE)
@ -1744,7 +1748,7 @@ class ArgsTestCase(BaseTestCase):
self.check_executed_tests(output, testnames,
env_changed=testnames,
fail_env_changed=True,
randomize=True,
parallel=True,
stats=len(testnames))
for testname in testnames:
self.assertIn(f"Warning -- {testname} leaked temporary "
@ -1784,7 +1788,7 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
randomize=True,
parallel=True,
stats=0)
def test_doctest(self):
@ -1823,7 +1827,7 @@ class ArgsTestCase(BaseTestCase):
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
randomize=True,
parallel=True,
stats=TestStats(1, 1, 0))
def _check_random_seed(self, run_workers: bool):
@ -1866,6 +1870,27 @@ class ArgsTestCase(BaseTestCase):
def test_random_seed_workers(self):
self._check_random_seed(run_workers=True)
def test_python_command(self):
code = textwrap.dedent(r"""
import sys
import unittest
class WorkerTests(unittest.TestCase):
def test_dev_mode(self):
self.assertTrue(sys.flags.dev_mode)
""")
tests = [self.create_test(code=code) for _ in range(3)]
# Custom Python command: "python -X dev"
python_cmd = [sys.executable, '-X', 'dev']
# test.libregrtest.cmdline uses shlex.split() to parse the Python
# command line string
python_cmd = shlex.join(python_cmd)
output = self.run_tests("--python", python_cmd, "-j0", *tests)
self.check_executed_tests(output, tests,
stats=len(tests), parallel=True)
class TestUtils(unittest.TestCase):
def test_format_duration(self):