mirror of https://github.com/python/cpython
Move get_signal_name() to test.support (#121251)
* Move get_signal_name() from test.libregrtest to test.support. * Use get_signal_name() in support.script_helper. * support.script_helper now decodes stdout and stderr from UTF-8, instead of ASCII, if a command failed.
This commit is contained in:
parent
bfe0e4d769
commit
7435f053b4
|
@ -22,7 +22,7 @@ from .runtests import RunTests, WorkerRunTests, JsonFile, JsonFileType
|
||||||
from .single import PROGRESS_MIN_TIME
|
from .single import PROGRESS_MIN_TIME
|
||||||
from .utils import (
|
from .utils import (
|
||||||
StrPath, TestName,
|
StrPath, TestName,
|
||||||
format_duration, print_warning, count, plural, get_signal_name)
|
format_duration, print_warning, count, plural)
|
||||||
from .worker import create_worker_process, USE_PROCESS_GROUP
|
from .worker import create_worker_process, USE_PROCESS_GROUP
|
||||||
|
|
||||||
if MS_WINDOWS:
|
if MS_WINDOWS:
|
||||||
|
@ -366,7 +366,7 @@ class WorkerThread(threading.Thread):
|
||||||
err_msg=None,
|
err_msg=None,
|
||||||
state=State.TIMEOUT)
|
state=State.TIMEOUT)
|
||||||
if retcode != 0:
|
if retcode != 0:
|
||||||
name = get_signal_name(retcode)
|
name = support.get_signal_name(retcode)
|
||||||
if name:
|
if name:
|
||||||
retcode = f"{retcode} ({name})"
|
retcode = f"{retcode} ({name})"
|
||||||
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
|
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
|
||||||
|
|
|
@ -685,35 +685,6 @@ def cleanup_temp_dir(tmp_dir: StrPath):
|
||||||
print("Remove file: %s" % name)
|
print("Remove file: %s" % name)
|
||||||
os_helper.unlink(name)
|
os_helper.unlink(name)
|
||||||
|
|
||||||
WINDOWS_STATUS = {
|
|
||||||
0xC0000005: "STATUS_ACCESS_VIOLATION",
|
|
||||||
0xC00000FD: "STATUS_STACK_OVERFLOW",
|
|
||||||
0xC000013A: "STATUS_CONTROL_C_EXIT",
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_signal_name(exitcode):
|
|
||||||
if exitcode < 0:
|
|
||||||
signum = -exitcode
|
|
||||||
try:
|
|
||||||
return signal.Signals(signum).name
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Shell exit code (ex: WASI build)
|
|
||||||
if 128 < exitcode < 256:
|
|
||||||
signum = exitcode - 128
|
|
||||||
try:
|
|
||||||
return signal.Signals(signum).name
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
return WINDOWS_STATUS[exitcode]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
ILLEGAL_XML_CHARS_RE = re.compile(
|
ILLEGAL_XML_CHARS_RE = re.compile(
|
||||||
'['
|
'['
|
||||||
|
|
|
@ -2632,3 +2632,35 @@ def initialized_with_pyrepl():
|
||||||
"""Detect whether PyREPL was used during Python initialization."""
|
"""Detect whether PyREPL was used during Python initialization."""
|
||||||
# If the main module has a __file__ attribute it's a Python module, which means PyREPL.
|
# If the main module has a __file__ attribute it's a Python module, which means PyREPL.
|
||||||
return hasattr(sys.modules["__main__"], "__file__")
|
return hasattr(sys.modules["__main__"], "__file__")
|
||||||
|
|
||||||
|
|
||||||
|
WINDOWS_STATUS = {
|
||||||
|
0xC0000005: "STATUS_ACCESS_VIOLATION",
|
||||||
|
0xC00000FD: "STATUS_STACK_OVERFLOW",
|
||||||
|
0xC000013A: "STATUS_CONTROL_C_EXIT",
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_signal_name(exitcode):
|
||||||
|
import signal
|
||||||
|
|
||||||
|
if exitcode < 0:
|
||||||
|
signum = -exitcode
|
||||||
|
try:
|
||||||
|
return signal.Signals(signum).name
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Shell exit code (ex: WASI build)
|
||||||
|
if 128 < exitcode < 256:
|
||||||
|
signum = exitcode - 128
|
||||||
|
try:
|
||||||
|
return signal.Signals(signum).name
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
return WINDOWS_STATUS[exitcode]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
|
@ -70,23 +70,25 @@ class _PythonRunResult(collections.namedtuple("_PythonRunResult",
|
||||||
out = b'(... truncated stdout ...)' + out[-maxlen:]
|
out = b'(... truncated stdout ...)' + out[-maxlen:]
|
||||||
if len(err) > maxlen:
|
if len(err) > maxlen:
|
||||||
err = b'(... truncated stderr ...)' + err[-maxlen:]
|
err = b'(... truncated stderr ...)' + err[-maxlen:]
|
||||||
out = out.decode('ascii', 'replace').rstrip()
|
out = out.decode('utf8', 'replace').rstrip()
|
||||||
err = err.decode('ascii', 'replace').rstrip()
|
err = err.decode('utf8', 'replace').rstrip()
|
||||||
raise AssertionError("Process return code is %d\n"
|
|
||||||
"command line: %r\n"
|
exitcode = self.rc
|
||||||
"\n"
|
signame = support.get_signal_name(exitcode)
|
||||||
"stdout:\n"
|
if signame:
|
||||||
"---\n"
|
exitcode = f"{exitcode} ({signame})"
|
||||||
"%s\n"
|
raise AssertionError(f"Process return code is {exitcode}\n"
|
||||||
"---\n"
|
f"command line: {cmd_line!r}\n"
|
||||||
"\n"
|
f"\n"
|
||||||
"stderr:\n"
|
f"stdout:\n"
|
||||||
"---\n"
|
f"---\n"
|
||||||
"%s\n"
|
f"{out}\n"
|
||||||
"---"
|
f"---\n"
|
||||||
% (self.rc, cmd_line,
|
f"\n"
|
||||||
out,
|
f"stderr:\n"
|
||||||
err))
|
f"---\n"
|
||||||
|
f"{err}\n"
|
||||||
|
f"---")
|
||||||
|
|
||||||
|
|
||||||
# Executing the interpreter in a subprocess
|
# Executing the interpreter in a subprocess
|
||||||
|
|
|
@ -2329,16 +2329,6 @@ class TestUtils(unittest.TestCase):
|
||||||
self.assertIsNone(normalize('setUpModule (test.test_x)', is_error=True))
|
self.assertIsNone(normalize('setUpModule (test.test_x)', is_error=True))
|
||||||
self.assertIsNone(normalize('tearDownModule (test.test_module)', is_error=True))
|
self.assertIsNone(normalize('tearDownModule (test.test_module)', is_error=True))
|
||||||
|
|
||||||
def test_get_signal_name(self):
|
|
||||||
for exitcode, expected in (
|
|
||||||
(-int(signal.SIGINT), 'SIGINT'),
|
|
||||||
(-int(signal.SIGSEGV), 'SIGSEGV'),
|
|
||||||
(128 + int(signal.SIGABRT), 'SIGABRT'),
|
|
||||||
(3221225477, "STATUS_ACCESS_VIOLATION"),
|
|
||||||
(0xC00000FD, "STATUS_STACK_OVERFLOW"),
|
|
||||||
):
|
|
||||||
self.assertEqual(utils.get_signal_name(exitcode), expected, exitcode)
|
|
||||||
|
|
||||||
def test_format_resources(self):
|
def test_format_resources(self):
|
||||||
format_resources = utils.format_resources
|
format_resources = utils.format_resources
|
||||||
ALL_RESOURCES = utils.ALL_RESOURCES
|
ALL_RESOURCES = utils.ALL_RESOURCES
|
||||||
|
|
|
@ -3,6 +3,7 @@ import importlib
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
import signal
|
||||||
import socket
|
import socket
|
||||||
import stat
|
import stat
|
||||||
import subprocess
|
import subprocess
|
||||||
|
@ -732,6 +733,17 @@ class TestSupport(unittest.TestCase):
|
||||||
self.assertEqual(support.copy_python_src_ignore(path, os.listdir(path)),
|
self.assertEqual(support.copy_python_src_ignore(path, os.listdir(path)),
|
||||||
ignored)
|
ignored)
|
||||||
|
|
||||||
|
def test_get_signal_name(self):
|
||||||
|
for exitcode, expected in (
|
||||||
|
(-int(signal.SIGINT), 'SIGINT'),
|
||||||
|
(-int(signal.SIGSEGV), 'SIGSEGV'),
|
||||||
|
(128 + int(signal.SIGABRT), 'SIGABRT'),
|
||||||
|
(3221225477, "STATUS_ACCESS_VIOLATION"),
|
||||||
|
(0xC00000FD, "STATUS_STACK_OVERFLOW"),
|
||||||
|
):
|
||||||
|
self.assertEqual(support.get_signal_name(exitcode), expected,
|
||||||
|
exitcode)
|
||||||
|
|
||||||
# XXX -follows a list of untested API
|
# XXX -follows a list of untested API
|
||||||
# make_legacy_pyc
|
# make_legacy_pyc
|
||||||
# is_resource_enabled
|
# is_resource_enabled
|
||||||
|
|
Loading…
Reference in New Issue