[3.8] bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) (GH-16560)
* bpo-36670, regrtest: Fix WindowsLoadTracker() for partial line (GH-16550) WindowsLoadTracker.read_output() now uses a short buffer for incomplete line. (cherry picked from commit3e04cd268e
) * bpo-36670: Enhance regrtest WindowsLoadTracker (GH-16553) The last line is now passed to the parser even if it does not end with a newline, but only if it's a valid value. (cherry picked from commitc65119d5bf
) * bpo-36670: Enhance regrtest (GH-16556) * Add log() method: add timestamp and load average prefixes to main messages. * WindowsLoadTracker: * LOAD_FACTOR_1 is now computed using SAMPLING_INTERVAL * Initialize the load to the arithmetic mean of the first 5 values of the Processor Queue Length value (so over 5 seconds), rather than 0.0. * Handle BrokenPipeError and when typeperf exit. * format_duration(1.5) now returns '1.5 sec', rather than '1 sec 500 ms' (cherry picked from commit098e25672f
) (cherry picked from commitde3195c937
) Co-authored-by: Victor Stinner <vstinner@python.org>
This commit is contained in:
parent
ab98cd8aee
commit
f9016e5fc9
|
@ -138,16 +138,8 @@ class Regrtest:
|
|||
print(xml_data, file=sys.__stderr__)
|
||||
raise
|
||||
|
||||
def display_progress(self, test_index, text):
|
||||
if self.ns.quiet:
|
||||
return
|
||||
|
||||
# "[ 51/405/1] test_tcl passed"
|
||||
line = f"{test_index:{self.test_count_width}}{self.test_count}"
|
||||
fails = len(self.bad) + len(self.environment_changed)
|
||||
if fails and not self.ns.pgo:
|
||||
line = f"{line}/{fails}"
|
||||
line = f"[{line}] {text}"
|
||||
def log(self, line=''):
|
||||
empty = not line
|
||||
|
||||
# add the system load prefix: "load avg: 1.80 "
|
||||
load_avg = self.getloadavg()
|
||||
|
@ -158,8 +150,23 @@ class Regrtest:
|
|||
test_time = time.monotonic() - self.start_time
|
||||
test_time = datetime.timedelta(seconds=int(test_time))
|
||||
line = f"{test_time} {line}"
|
||||
|
||||
if empty:
|
||||
line = line[:-1]
|
||||
|
||||
print(line, flush=True)
|
||||
|
||||
def display_progress(self, test_index, text):
|
||||
if self.ns.quiet:
|
||||
return
|
||||
|
||||
# "[ 51/405/1] test_tcl passed"
|
||||
line = f"{test_index:{self.test_count_width}}{self.test_count}"
|
||||
fails = len(self.bad) + len(self.environment_changed)
|
||||
if fails and not self.ns.pgo:
|
||||
line = f"{line}/{fails}"
|
||||
self.log(f"[{line}] {text}")
|
||||
|
||||
def parse_args(self, kwargs):
|
||||
ns = _parse_args(sys.argv[1:], **kwargs)
|
||||
|
||||
|
@ -297,11 +304,11 @@ class Regrtest:
|
|||
|
||||
self.first_result = self.get_tests_result()
|
||||
|
||||
print()
|
||||
print("Re-running failed tests in verbose mode")
|
||||
self.log()
|
||||
self.log("Re-running failed tests in verbose mode")
|
||||
self.rerun = self.bad[:]
|
||||
for test_name in self.rerun:
|
||||
print(f"Re-running {test_name} in verbose mode", flush=True)
|
||||
self.log(f"Re-running {test_name} in verbose mode")
|
||||
self.ns.verbose = True
|
||||
result = runtest(self.ns, test_name)
|
||||
|
||||
|
@ -382,7 +389,7 @@ class Regrtest:
|
|||
|
||||
save_modules = sys.modules.keys()
|
||||
|
||||
print("Run tests sequentially")
|
||||
self.log("Run tests sequentially")
|
||||
|
||||
previous_test = None
|
||||
for test_index, test_name in enumerate(self.tests, 1):
|
||||
|
|
|
@ -104,13 +104,14 @@ class ExitThread(Exception):
|
|||
|
||||
|
||||
class TestWorkerProcess(threading.Thread):
|
||||
def __init__(self, worker_id, pending, output, ns, timeout):
|
||||
def __init__(self, worker_id, runner):
|
||||
super().__init__()
|
||||
self.worker_id = worker_id
|
||||
self.pending = pending
|
||||
self.output = output
|
||||
self.ns = ns
|
||||
self.timeout = timeout
|
||||
self.pending = runner.pending
|
||||
self.output = runner.output
|
||||
self.ns = runner.ns
|
||||
self.timeout = runner.worker_timeout
|
||||
self.regrtest = runner.regrtest
|
||||
self.current_test_name = None
|
||||
self.start_time = None
|
||||
self._popen = None
|
||||
|
@ -292,7 +293,8 @@ class TestWorkerProcess(threading.Thread):
|
|||
if not self.is_alive():
|
||||
break
|
||||
dt = time.monotonic() - start_time
|
||||
print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
|
||||
self.regrtest.log(f"Waiting for {self} thread "
|
||||
f"for {format_duration(dt)}")
|
||||
if dt > JOIN_TIMEOUT:
|
||||
print_warning(f"Failed to join {self} in {format_duration(dt)}")
|
||||
break
|
||||
|
@ -314,6 +316,7 @@ def get_running(workers):
|
|||
class MultiprocessTestRunner:
|
||||
def __init__(self, regrtest):
|
||||
self.regrtest = regrtest
|
||||
self.log = self.regrtest.log
|
||||
self.ns = regrtest.ns
|
||||
self.output = queue.Queue()
|
||||
self.pending = MultiprocessIterator(self.regrtest.tests)
|
||||
|
@ -324,11 +327,10 @@ class MultiprocessTestRunner:
|
|||
self.workers = None
|
||||
|
||||
def start_workers(self):
|
||||
self.workers = [TestWorkerProcess(index, self.pending, self.output,
|
||||
self.ns, self.worker_timeout)
|
||||
self.workers = [TestWorkerProcess(index, self)
|
||||
for index in range(1, self.ns.use_mp + 1)]
|
||||
print("Run tests in parallel using %s child processes"
|
||||
% len(self.workers))
|
||||
self.log("Run tests in parallel using %s child processes"
|
||||
% len(self.workers))
|
||||
for worker in self.workers:
|
||||
worker.start()
|
||||
|
||||
|
@ -362,7 +364,7 @@ class MultiprocessTestRunner:
|
|||
# display progress
|
||||
running = get_running(self.workers)
|
||||
if running and not self.ns.pgo:
|
||||
print('running: %s' % ', '.join(running), flush=True)
|
||||
self.log('running: %s' % ', '.join(running))
|
||||
|
||||
def display_result(self, mp_result):
|
||||
result = mp_result.result
|
||||
|
@ -382,8 +384,7 @@ class MultiprocessTestRunner:
|
|||
if item[0]:
|
||||
# Thread got an exception
|
||||
format_exc = item[1]
|
||||
print(f"regrtest worker thread failed: {format_exc}",
|
||||
file=sys.stderr, flush=True)
|
||||
print_warning(f"regrtest worker thread failed: {format_exc}")
|
||||
return True
|
||||
|
||||
self.test_index += 1
|
||||
|
|
|
@ -16,11 +16,14 @@ def format_duration(seconds):
|
|||
if minutes:
|
||||
parts.append('%s min' % minutes)
|
||||
if seconds:
|
||||
parts.append('%s sec' % seconds)
|
||||
if ms:
|
||||
parts.append('%s ms' % ms)
|
||||
if parts:
|
||||
# 2 min 1 sec
|
||||
parts.append('%s sec' % seconds)
|
||||
else:
|
||||
# 1.0 sec
|
||||
parts.append('%.1f sec' % (seconds + ms / 1000))
|
||||
if not parts:
|
||||
return '0 ms'
|
||||
return '%s ms' % ms
|
||||
|
||||
parts = parts[:2]
|
||||
return ' '.join(parts)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import _winapi
|
||||
import math
|
||||
import msvcrt
|
||||
import os
|
||||
import subprocess
|
||||
|
@ -10,11 +11,14 @@ from test.libregrtest.utils import print_warning
|
|||
|
||||
# Max size of asynchronous reads
|
||||
BUFSIZE = 8192
|
||||
# Exponential damping factor (see below)
|
||||
LOAD_FACTOR_1 = 0.9200444146293232478931553241
|
||||
|
||||
# Seconds per measurement
|
||||
SAMPLING_INTERVAL = 5
|
||||
SAMPLING_INTERVAL = 1
|
||||
# Exponential damping factor to compute exponentially weighted moving average
|
||||
# on 1 minute (60 seconds)
|
||||
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
|
||||
# Initialize the load using the arithmetic mean of the first NVALUE values
|
||||
# of the Processor Queue Length
|
||||
NVALUE = 5
|
||||
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
|
||||
# of typeperf are registered
|
||||
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
|
||||
|
@ -30,9 +34,10 @@ class WindowsLoadTracker():
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.load = 0.0
|
||||
self.counter_name = ''
|
||||
self.popen = None
|
||||
self._values = []
|
||||
self._load = None
|
||||
self._buffer = ''
|
||||
self._popen = None
|
||||
self.start()
|
||||
|
||||
def start(self):
|
||||
|
@ -64,7 +69,7 @@ class WindowsLoadTracker():
|
|||
# Spawn off the load monitor
|
||||
counter_name = self._get_counter_name()
|
||||
command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
|
||||
self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
|
||||
self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
|
||||
|
||||
# Close our copy of the write end of the pipe
|
||||
os.close(command_stdout)
|
||||
|
@ -84,52 +89,88 @@ class WindowsLoadTracker():
|
|||
process_queue_length = counters_dict['44']
|
||||
return f'"\\{system}\\{process_queue_length}"'
|
||||
|
||||
def close(self):
|
||||
if self.popen is None:
|
||||
def close(self, kill=True):
|
||||
if self._popen is None:
|
||||
return
|
||||
self.popen.kill()
|
||||
self.popen.wait()
|
||||
self.popen = None
|
||||
|
||||
self._load = None
|
||||
|
||||
if kill:
|
||||
self._popen.kill()
|
||||
self._popen.wait()
|
||||
self._popen = None
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def read_output(self):
|
||||
def _parse_line(self, line):
|
||||
# typeperf outputs in a CSV format like this:
|
||||
# "07/19/2018 01:32:26.605","3.000000"
|
||||
# (date, process queue length)
|
||||
tokens = line.split(',')
|
||||
if len(tokens) != 2:
|
||||
raise ValueError
|
||||
|
||||
value = tokens[1]
|
||||
if not value.startswith('"') or not value.endswith('"'):
|
||||
raise ValueError
|
||||
value = value[1:-1]
|
||||
return float(value)
|
||||
|
||||
def _read_lines(self):
|
||||
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
|
||||
bytes_read, res = overlapped.GetOverlappedResult(False)
|
||||
if res != 0:
|
||||
return
|
||||
return ()
|
||||
|
||||
output = overlapped.getbuffer()
|
||||
return output.decode('oem', 'replace')
|
||||
output = output.decode('oem', 'replace')
|
||||
output = self._buffer + output
|
||||
lines = output.splitlines(True)
|
||||
|
||||
# bpo-36670: typeperf only writes a newline *before* writing a value,
|
||||
# not after. Sometimes, the written line in incomplete (ex: only
|
||||
# timestamp, without the process queue length). Only pass the last line
|
||||
# to the parser if it's a valid value, otherwise store it in
|
||||
# self._buffer.
|
||||
try:
|
||||
self._parse_line(lines[-1])
|
||||
except ValueError:
|
||||
self._buffer = lines.pop(-1)
|
||||
else:
|
||||
self._buffer = ''
|
||||
|
||||
return lines
|
||||
|
||||
def getloadavg(self):
|
||||
typeperf_output = self.read_output()
|
||||
# Nothing to update, just return the current load
|
||||
if not typeperf_output:
|
||||
return self.load
|
||||
if self._popen is None:
|
||||
return None
|
||||
|
||||
returncode = self._popen.poll()
|
||||
if returncode is not None:
|
||||
self.close(kill=False)
|
||||
return None
|
||||
|
||||
try:
|
||||
lines = self._read_lines()
|
||||
except BrokenPipeError:
|
||||
self.close()
|
||||
return None
|
||||
|
||||
for line in lines:
|
||||
line = line.rstrip()
|
||||
|
||||
# Process the backlog of load values
|
||||
for line in typeperf_output.splitlines():
|
||||
# Ignore the initial header:
|
||||
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
|
||||
if '\\\\' in line:
|
||||
if 'PDH-CSV' in line:
|
||||
continue
|
||||
|
||||
# Ignore blank lines
|
||||
if not line.strip():
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# typeperf outputs in a CSV format like this:
|
||||
# "07/19/2018 01:32:26.605","3.000000"
|
||||
# (date, process queue length)
|
||||
try:
|
||||
tokens = line.split(',')
|
||||
if len(tokens) != 2:
|
||||
raise ValueError
|
||||
|
||||
value = tokens[1].replace('"', '')
|
||||
load = float(value)
|
||||
processor_queue_length = self._parse_line(line)
|
||||
except ValueError:
|
||||
print_warning("Failed to parse typeperf output: %a" % line)
|
||||
continue
|
||||
|
@ -137,7 +178,13 @@ class WindowsLoadTracker():
|
|||
# We use an exponentially weighted moving average, imitating the
|
||||
# load calculation on Unix systems.
|
||||
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
|
||||
new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
|
||||
self.load = new_load
|
||||
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
|
||||
if self._load is not None:
|
||||
self._load = (self._load * LOAD_FACTOR_1
|
||||
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
|
||||
elif len(self._values) < NVALUE:
|
||||
self._values.append(processor_queue_length)
|
||||
else:
|
||||
self._load = sum(self._values) / len(self._values)
|
||||
|
||||
return self.load
|
||||
return self._load
|
||||
|
|
|
@ -24,6 +24,7 @@ from test.libregrtest import utils
|
|||
Py_DEBUG = hasattr(sys, 'gettotalrefcount')
|
||||
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
|
||||
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
|
||||
LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'
|
||||
|
||||
TEST_INTERRUPTED = textwrap.dedent("""
|
||||
from signal import SIGINT
|
||||
|
@ -390,8 +391,8 @@ class BaseTestCase(unittest.TestCase):
|
|||
self.assertRegex(output, regex)
|
||||
|
||||
def parse_executed_tests(self, output):
|
||||
regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
|
||||
% self.TESTNAME_REGEX)
|
||||
regex = (r'^%s\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)'
|
||||
% (LOG_PREFIX, self.TESTNAME_REGEX))
|
||||
parser = re.finditer(regex, output, re.MULTILINE)
|
||||
return list(match.group(1) for match in parser)
|
||||
|
||||
|
@ -451,9 +452,10 @@ class BaseTestCase(unittest.TestCase):
|
|||
if rerun:
|
||||
regex = list_regex('%s re-run test%s', rerun)
|
||||
self.check_line(output, regex)
|
||||
self.check_line(output, "Re-running failed tests in verbose mode")
|
||||
regex = LOG_PREFIX + r"Re-running failed tests in verbose mode"
|
||||
self.check_line(output, regex)
|
||||
for test_name in rerun:
|
||||
regex = f"Re-running {test_name} in verbose mode"
|
||||
regex = LOG_PREFIX + f"Re-running {test_name} in verbose mode"
|
||||
self.check_line(output, regex)
|
||||
|
||||
if no_test_ran:
|
||||
|
@ -1173,9 +1175,9 @@ class TestUtils(unittest.TestCase):
|
|||
self.assertEqual(utils.format_duration(10e-3),
|
||||
'10 ms')
|
||||
self.assertEqual(utils.format_duration(1.5),
|
||||
'1 sec 500 ms')
|
||||
'1.5 sec')
|
||||
self.assertEqual(utils.format_duration(1),
|
||||
'1 sec')
|
||||
'1.0 sec')
|
||||
self.assertEqual(utils.format_duration(2 * 60),
|
||||
'2 min')
|
||||
self.assertEqual(utils.format_duration(2 * 60 + 1),
|
||||
|
|
Loading…
Reference in New Issue