From 982bfa4da07b2e5749a0f4e68f99e972bcc3a549 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 1 Oct 2019 12:29:36 +0200 Subject: [PATCH] bpo-36670: Multiple regrtest bugfixes (GH-16511) * Windows: Fix counter name in WindowsLoadTracker. Counter names are localized: use the registry to get the counter name. Original change written by Lorenz Mende. * Regrtest.main() now ensures that the Windows load tracker is also killed if an exception is raised * TestWorkerProcess now ensures that worker processes are no longer running before exiting: kill also worker processes when an exception is raised. * Enhance regrtest messages and warnings: include test name, duration, add a worker identifier, etc. * Rename MultiprocessRunner to TestWorkerProcess * Use print_warning() to display warnings. Co-Authored-By: Lorenz Mende --- Lib/test/libregrtest/main.py | 17 ++- Lib/test/libregrtest/runtest_mp.py | 228 +++++++++++++++-------------- Lib/test/libregrtest/win_utils.py | 71 ++++++--- 3 files changed, 179 insertions(+), 137 deletions(-) diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 29970068692..fd701c452ce 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -508,10 +508,6 @@ class Regrtest: self.run_tests_sequential() def finalize(self): - if self.win_load_tracker is not None: - self.win_load_tracker.close() - self.win_load_tracker = None - if self.next_single_filename: if self.next_single_test: with open(self.next_single_filename, 'w') as fp: @@ -680,11 +676,16 @@ class Regrtest: # typeperf.exe for x64, x86 or ARM print(f'Failed to create WindowsLoadTracker: {error}') - self.run_tests() - self.display_result() + try: + self.run_tests() + self.display_result() - if self.ns.verbose2 and self.bad: - self.rerun_failed_tests() + if self.ns.verbose2 and self.bad: + self.rerun_failed_tests() + finally: + if self.win_load_tracker is not None: + self.win_load_tracker.close() + self.win_load_tracker = None self.finalize() diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index 9cb5be6bb8a..38b05781de5 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -15,7 +15,7 @@ from test.libregrtest.runtest import ( runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, format_test_result, TestResult, is_failed, TIMEOUT) from test.libregrtest.setup import setup_tests -from test.libregrtest.utils import format_duration +from test.libregrtest.utils import format_duration, print_warning # Display the running tests if nothing happened last N seconds @@ -103,9 +103,10 @@ class ExitThread(Exception): pass -class MultiprocessThread(threading.Thread): - def __init__(self, pending, output, ns, timeout): +class TestWorkerProcess(threading.Thread): + def __init__(self, worker_id, pending, output, ns, timeout): super().__init__() + self.worker_id = worker_id self.pending = pending self.output = output self.ns = ns @@ -114,12 +115,16 @@ class MultiprocessThread(threading.Thread): self.start_time = None self._popen = None self._killed = False + self._stopped = False def __repr__(self): - info = ['MultiprocessThread'] - test = self.current_test_name + info = [f'TestWorkerProcess #{self.worker_id}'] if self.is_alive(): - info.append('alive') + dt = time.monotonic() - self.start_time + info.append("running for %s" % format_duration(dt)) + else: + info.append('stopped') + test = self.current_test_name if test: info.append(f'test={test}') popen = self._popen @@ -128,53 +133,24 @@ class MultiprocessThread(threading.Thread): return '<%s>' % ' '.join(info) def _kill(self): - dt = time.monotonic() - self.start_time - - popen = self._popen - pid = popen.pid - print("Kill worker process %s running for %.1f sec" % (pid, dt), - file=sys.stderr, flush=True) - - try: - popen.kill() - return True - except OSError as exc: - print("WARNING: Failed to kill worker process %s: %r" % (pid, exc), - file=sys.stderr, flush=True) - return False - - def _close_wait(self): - popen = self._popen - - # stdout and stderr must be closed to ensure that communicate() - # does not hang - popen.stdout.close() - popen.stderr.close() - - try: - popen.wait(JOIN_TIMEOUT) - except (subprocess.TimeoutExpired, OSError) as exc: - print("WARNING: Failed to wait for worker process %s " - "completion (timeout=%.1f sec): %r" - % (popen.pid, JOIN_TIMEOUT, exc), - file=sys.stderr, flush=True) - - def kill(self): - """ - Kill the current process (if any). - - This method can be called by the thread running the process, - or by another thread. - """ + if self._killed: + return self._killed = True - if self._popen is None: + popen = self._popen + if popen is None: return - if not self._kill(): - return + print(f"Kill {self}", file=sys.stderr, flush=True) + try: + popen.kill() + except OSError as exc: + print_warning(f"Failed to kill {self}: {exc!r}") - self._close_wait() + def stop(self): + # Method called from a different thread to stop this thread + self._stopped = True + self._kill() def mp_result_error(self, test_name, error_type, stdout='', stderr='', err_msg=None): @@ -190,59 +166,69 @@ class MultiprocessThread(threading.Thread): try: stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT) except (subprocess.TimeoutExpired, OSError) as exc: - print("WARNING: Failed to read worker process %s output " - "(timeout=%.1f sec): %r" - % (popen.pid, JOIN_TIMEOUT, exc), - file=sys.stderr, flush=True) - - self._close_wait() + print_warning(f"Failed to read {self} output " + f"(timeout={format_duration(JOIN_TIMEOUT)}): " + f"{exc!r}") return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) - def _runtest(self, test_name): - try: - self.start_time = time.monotonic() - self.current_test_name = test_name + def _run_process(self, test_name): + self.start_time = time.monotonic() + self.current_test_name = test_name + try: + self._killed = False self._popen = run_test_in_subprocess(test_name, self.ns) popen = self._popen + except: + self.current_test_name = None + raise + + try: + if self._stopped: + # If kill() has been called before self._popen is set, + # self._popen is still running. Call again kill() + # to ensure that the process is killed. + self._kill() + raise ExitThread + try: - try: - if self._killed: - # If kill() has been called before self._popen is set, - # self._popen is still running. Call again kill() - # to ensure that the process is killed. - self.kill() - raise ExitThread + stdout, stderr = popen.communicate(timeout=self.timeout) + except subprocess.TimeoutExpired: + if self._stopped: + # kill() has been called: communicate() fails + # on reading closed stdout/stderr + raise ExitThread - try: - stdout, stderr = popen.communicate(timeout=self.timeout) - except subprocess.TimeoutExpired: - if self._killed: - # kill() has been called: communicate() fails - # on reading closed stdout/stderr - raise ExitThread - - return self._timedout(test_name) - except OSError: - if self._killed: - # kill() has been called: communicate() fails - # on reading closed stdout/stderr - raise ExitThread - raise - except: - self.kill() - raise - finally: - self._close_wait() + return self._timedout(test_name) + except OSError: + if self._stopped: + # kill() has been called: communicate() fails + # on reading closed stdout/stderr + raise ExitThread + raise retcode = popen.returncode - finally: - self.current_test_name = None - self._popen = None + stdout = stdout.strip() + stderr = stderr.rstrip() - stdout = stdout.strip() - stderr = stderr.rstrip() + return (retcode, stdout, stderr) + except: + self._kill() + raise + finally: + self._wait_completed() + self._popen = None + self.current_test_name = None + + def _runtest(self, test_name): + result = self._run_process(test_name) + + if isinstance(result, MultiprocessResult): + # _timedout() case + return result + + retcode, stdout, stderr = result err_msg = None if retcode != 0: @@ -266,7 +252,7 @@ class MultiprocessThread(threading.Thread): return MultiprocessResult(result, stdout, stderr, err_msg) def run(self): - while not self._killed: + while not self._stopped: try: try: test_name = next(self.pending) @@ -284,6 +270,33 @@ class MultiprocessThread(threading.Thread): self.output.put((True, traceback.format_exc())) break + def _wait_completed(self): + popen = self._popen + + # stdout and stderr must be closed to ensure that communicate() + # does not hang + popen.stdout.close() + popen.stderr.close() + + try: + popen.wait(JOIN_TIMEOUT) + except (subprocess.TimeoutExpired, OSError) as exc: + print_warning(f"Failed to wait for {self} completion " + f"(timeout={format_duration(JOIN_TIMEOUT)}): " + f"{exc!r}") + + def wait_stopped(self, start_time): + while True: + # Write a message every second + self.join(1.0) + if not self.is_alive(): + break + dt = time.monotonic() - start_time + print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True) + if dt > JOIN_TIMEOUT: + print_warning(f"Failed to join {self} in {format_duration(dt)}") + break + def get_running(workers): running = [] @@ -298,7 +311,7 @@ def get_running(workers): return running -class MultiprocessRunner: +class MultiprocessTestRunner: def __init__(self, regrtest): self.regrtest = regrtest self.ns = regrtest.ns @@ -311,30 +324,20 @@ class MultiprocessRunner: self.workers = None def start_workers(self): - self.workers = [MultiprocessThread(self.pending, self.output, - self.ns, self.worker_timeout) - for _ in range(self.ns.use_mp)] + self.workers = [TestWorkerProcess(index, self.pending, self.output, + self.ns, self.worker_timeout) + for index in range(1, self.ns.use_mp + 1)] print("Run tests in parallel using %s child processes" % len(self.workers)) for worker in self.workers: worker.start() - def wait_workers(self): + def stop_workers(self): start_time = time.monotonic() for worker in self.workers: - worker.kill() + worker.stop() for worker in self.workers: - while True: - worker.join(1.0) - if not worker.is_alive(): - break - dt = time.monotonic() - start_time - print("Wait for regrtest worker %r for %.1f sec" % (worker, dt), - flush=True) - if dt > JOIN_TIMEOUT: - print("Warning -- failed to join a regrtest worker %s" - % worker, flush=True) - break + worker.wait_stopped(start_time) def _get_result(self): if not any(worker.is_alive() for worker in self.workers): @@ -418,10 +421,11 @@ class MultiprocessRunner: if self.ns.timeout is not None: faulthandler.cancel_dump_traceback_later() - # a test failed (and --failfast is set) or all tests completed - self.pending.stop() - self.wait_workers() + # Always ensure that all worker processes are no longer + # worker when we exit this function + self.pending.stop() + self.stop_workers() def run_tests_multiprocess(regrtest): - MultiprocessRunner(regrtest).run_tests() + MultiprocessTestRunner(regrtest).run_tests() diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py index ec2d6c663e8..f0c17b906f5 100644 --- a/Lib/test/libregrtest/win_utils.py +++ b/Lib/test/libregrtest/win_utils.py @@ -3,16 +3,22 @@ import msvcrt import os import subprocess import uuid +import winreg from test import support +from test.libregrtest.utils import print_warning # Max size of asynchronous reads BUFSIZE = 8192 # Exponential damping factor (see below) LOAD_FACTOR_1 = 0.9200444146293232478931553241 + # Seconds per measurement SAMPLING_INTERVAL = 5 -COUNTER_NAME = r'\System\Processor Queue Length' +# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names +# of typeperf are registered +COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" + r"\Perflib\CurrentLanguage") class WindowsLoadTracker(): @@ -25,7 +31,8 @@ class WindowsLoadTracker(): def __init__(self): self.load = 0.0 - self.p = None + self.counter_name = '' + self.popen = None self.start() def start(self): @@ -55,31 +62,46 @@ class WindowsLoadTracker(): overlap.GetOverlappedResult(True) # Spawn off the load monitor - command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)] - self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD) + counter_name = self._get_counter_name() + command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] + self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD) # Close our copy of the write end of the pipe os.close(command_stdout) + def _get_counter_name(self): + # accessing the registry to get the counter localization name + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: + counters = winreg.QueryValueEx(perfkey, 'Counter')[0] + + # Convert [key1, value1, key2, value2, ...] list + # to {key1: value1, key2: value2, ...} dict + counters = iter(counters) + counters_dict = dict(zip(counters, counters)) + + # System counter has key '2' and Processor Queue Length has key '44' + system = counters_dict['2'] + process_queue_length = counters_dict['44'] + return f'"\\{system}\\{process_queue_length}"' + def close(self): - if self.p is None: + if self.popen is None: return - self.p.kill() - self.p.wait() - self.p = None + self.popen.kill() + self.popen.wait() + self.popen = None def __del__(self): self.close() def read_output(self): - import _winapi - overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) bytes_read, res = overlapped.GetOverlappedResult(False) if res != 0: return - return overlapped.getbuffer().decode() + output = overlapped.getbuffer() + return output.decode('oem', 'replace') def getloadavg(self): typeperf_output = self.read_output() @@ -89,14 +111,29 @@ class WindowsLoadTracker(): # Process the backlog of load values for line in typeperf_output.splitlines(): - # typeperf outputs in a CSV format like this: - # "07/19/2018 01:32:26.605","3.000000" - toks = line.split(',') - # Ignore blank lines and the initial header - if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2: + # Ignore the initial header: + # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" + if '\\\\' in line: + continue + + # Ignore blank lines + if not line.strip(): + continue + + # typeperf outputs in a CSV format like this: + # "07/19/2018 01:32:26.605","3.000000" + # (date, process queue length) + try: + tokens = line.split(',') + if len(tokens) != 2: + raise ValueError + + value = tokens[1].replace('"', '') + load = float(value) + except ValueError: + print_warning("Failed to parse typeperf output: %a" % line) continue - load = float(toks[1].replace('"', '')) # We use an exponentially weighted moving average, imitating the # load calculation on Unix systems. # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation