mirror of https://github.com/python/cpython
185 lines
6.2 KiB
Python
185 lines
6.2 KiB
Python
import dataclasses
|
|
import json
|
|
from typing import Any
|
|
|
|
from test.support import TestStats
|
|
|
|
from .utils import (
|
|
StrJSON, TestName, FilterTuple,
|
|
format_duration, normalize_test_name, print_warning)
|
|
|
|
|
|
# Avoid enum.Enum to reduce the number of imports when tests are run
|
|
class State:
|
|
PASSED = "PASSED"
|
|
FAILED = "FAILED"
|
|
SKIPPED = "SKIPPED"
|
|
UNCAUGHT_EXC = "UNCAUGHT_EXC"
|
|
REFLEAK = "REFLEAK"
|
|
ENV_CHANGED = "ENV_CHANGED"
|
|
RESOURCE_DENIED = "RESOURCE_DENIED"
|
|
INTERRUPTED = "INTERRUPTED"
|
|
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
|
|
DID_NOT_RUN = "DID_NOT_RUN"
|
|
TIMEOUT = "TIMEOUT"
|
|
|
|
@staticmethod
|
|
def is_failed(state):
|
|
return state in {
|
|
State.FAILED,
|
|
State.UNCAUGHT_EXC,
|
|
State.REFLEAK,
|
|
State.MULTIPROCESSING_ERROR,
|
|
State.TIMEOUT}
|
|
|
|
@staticmethod
|
|
def has_meaningful_duration(state):
|
|
# Consider that the duration is meaningless for these cases.
|
|
# For example, if a whole test file is skipped, its duration
|
|
# is unlikely to be the duration of executing its tests,
|
|
# but just the duration to execute code which skips the test.
|
|
return state not in {
|
|
State.SKIPPED,
|
|
State.RESOURCE_DENIED,
|
|
State.INTERRUPTED,
|
|
State.MULTIPROCESSING_ERROR,
|
|
State.DID_NOT_RUN}
|
|
|
|
@staticmethod
|
|
def must_stop(state):
|
|
return state in {
|
|
State.INTERRUPTED,
|
|
State.MULTIPROCESSING_ERROR}
|
|
|
|
|
|
@dataclasses.dataclass(slots=True)
|
|
class TestResult:
|
|
test_name: TestName
|
|
state: str | None = None
|
|
# Test duration in seconds
|
|
duration: float | None = None
|
|
xml_data: list[str] | None = None
|
|
stats: TestStats | None = None
|
|
|
|
# errors and failures copied from support.TestFailedWithDetails
|
|
errors: list[tuple[str, str]] | None = None
|
|
failures: list[tuple[str, str]] | None = None
|
|
|
|
def is_failed(self, fail_env_changed: bool) -> bool:
|
|
if self.state == State.ENV_CHANGED:
|
|
return fail_env_changed
|
|
return State.is_failed(self.state)
|
|
|
|
def _format_failed(self):
|
|
if self.errors and self.failures:
|
|
le = len(self.errors)
|
|
lf = len(self.failures)
|
|
error_s = "error" + ("s" if le > 1 else "")
|
|
failure_s = "failure" + ("s" if lf > 1 else "")
|
|
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
|
|
|
|
if self.errors:
|
|
le = len(self.errors)
|
|
error_s = "error" + ("s" if le > 1 else "")
|
|
return f"{self.test_name} failed ({le} {error_s})"
|
|
|
|
if self.failures:
|
|
lf = len(self.failures)
|
|
failure_s = "failure" + ("s" if lf > 1 else "")
|
|
return f"{self.test_name} failed ({lf} {failure_s})"
|
|
|
|
return f"{self.test_name} failed"
|
|
|
|
def __str__(self) -> str:
|
|
match self.state:
|
|
case State.PASSED:
|
|
return f"{self.test_name} passed"
|
|
case State.FAILED:
|
|
return self._format_failed()
|
|
case State.SKIPPED:
|
|
return f"{self.test_name} skipped"
|
|
case State.UNCAUGHT_EXC:
|
|
return f"{self.test_name} failed (uncaught exception)"
|
|
case State.REFLEAK:
|
|
return f"{self.test_name} failed (reference leak)"
|
|
case State.ENV_CHANGED:
|
|
return f"{self.test_name} failed (env changed)"
|
|
case State.RESOURCE_DENIED:
|
|
return f"{self.test_name} skipped (resource denied)"
|
|
case State.INTERRUPTED:
|
|
return f"{self.test_name} interrupted"
|
|
case State.MULTIPROCESSING_ERROR:
|
|
return f"{self.test_name} process crashed"
|
|
case State.DID_NOT_RUN:
|
|
return f"{self.test_name} ran no tests"
|
|
case State.TIMEOUT:
|
|
return f"{self.test_name} timed out ({format_duration(self.duration)})"
|
|
case _:
|
|
raise ValueError("unknown result state: {state!r}")
|
|
|
|
def has_meaningful_duration(self):
|
|
return State.has_meaningful_duration(self.state)
|
|
|
|
def set_env_changed(self):
|
|
if self.state is None or self.state == State.PASSED:
|
|
self.state = State.ENV_CHANGED
|
|
|
|
def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
|
|
if State.must_stop(self.state):
|
|
return True
|
|
if fail_fast and self.is_failed(fail_env_changed):
|
|
return True
|
|
return False
|
|
|
|
def get_rerun_match_tests(self) -> FilterTuple | None:
|
|
match_tests = []
|
|
|
|
errors = self.errors or []
|
|
failures = self.failures or []
|
|
for error_list, is_error in (
|
|
(errors, True),
|
|
(failures, False),
|
|
):
|
|
for full_name, *_ in error_list:
|
|
match_name = normalize_test_name(full_name, is_error=is_error)
|
|
if match_name is None:
|
|
# 'setUpModule (test.test_sys)': don't filter tests
|
|
return None
|
|
if not match_name:
|
|
error_type = "ERROR" if is_error else "FAIL"
|
|
print_warning(f"rerun failed to parse {error_type} test name: "
|
|
f"{full_name!r}: don't filter tests")
|
|
return None
|
|
match_tests.append(match_name)
|
|
|
|
if not match_tests:
|
|
return None
|
|
return tuple(match_tests)
|
|
|
|
def write_json_into(self, file) -> None:
|
|
json.dump(self, file, cls=_EncodeTestResult)
|
|
|
|
@staticmethod
|
|
def from_json(worker_json: StrJSON) -> 'TestResult':
|
|
return json.loads(worker_json, object_hook=_decode_test_result)
|
|
|
|
|
|
class _EncodeTestResult(json.JSONEncoder):
|
|
def default(self, o: Any) -> dict[str, Any]:
|
|
if isinstance(o, TestResult):
|
|
result = dataclasses.asdict(o)
|
|
result["__test_result__"] = o.__class__.__name__
|
|
return result
|
|
else:
|
|
return super().default(o)
|
|
|
|
|
|
def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
|
|
if "__test_result__" in data:
|
|
data.pop('__test_result__')
|
|
if data['stats'] is not None:
|
|
data['stats'] = TestStats(**data['stats'])
|
|
return TestResult(**data)
|
|
else:
|
|
return data
|