191 lines
6.4 KiB
Python
191 lines
6.4 KiB
Python
import _winapi
|
|
import math
|
|
import msvcrt
|
|
import os
|
|
import subprocess
|
|
import uuid
|
|
import winreg
|
|
from test import support
|
|
from test.libregrtest.utils import print_warning
|
|
|
|
|
|
# Max size of asynchronous reads
|
|
BUFSIZE = 8192
|
|
# Seconds per measurement
|
|
SAMPLING_INTERVAL = 1
|
|
# Exponential damping factor to compute exponentially weighted moving average
|
|
# on 1 minute (60 seconds)
|
|
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
|
|
# Initialize the load using the arithmetic mean of the first NVALUE values
|
|
# of the Processor Queue Length
|
|
NVALUE = 5
|
|
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
|
|
# of typeperf are registered
|
|
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
|
|
r"\Perflib\CurrentLanguage")
|
|
|
|
|
|
class WindowsLoadTracker():
|
|
"""
|
|
This class asynchronously interacts with the `typeperf` command to read
|
|
the system load on Windows. Multiprocessing and threads can't be used
|
|
here because they interfere with the test suite's cases for those
|
|
modules.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._values = []
|
|
self._load = None
|
|
self._buffer = ''
|
|
self._popen = None
|
|
self.start()
|
|
|
|
def start(self):
|
|
# Create a named pipe which allows for asynchronous IO in Windows
|
|
pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
|
|
|
|
open_mode = _winapi.PIPE_ACCESS_INBOUND
|
|
open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
|
|
open_mode |= _winapi.FILE_FLAG_OVERLAPPED
|
|
|
|
# This is the read end of the pipe, where we will be grabbing output
|
|
self.pipe = _winapi.CreateNamedPipe(
|
|
pipe_name, open_mode, _winapi.PIPE_WAIT,
|
|
1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
|
|
)
|
|
# The write end of the pipe which is passed to the created process
|
|
pipe_write_end = _winapi.CreateFile(
|
|
pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
|
|
_winapi.OPEN_EXISTING, 0, _winapi.NULL
|
|
)
|
|
# Open up the handle as a python file object so we can pass it to
|
|
# subprocess
|
|
command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
|
|
|
|
# Connect to the read end of the pipe in overlap/async mode
|
|
overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
|
|
overlap.GetOverlappedResult(True)
|
|
|
|
# Spawn off the load monitor
|
|
counter_name = self._get_counter_name()
|
|
command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
|
|
self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
|
|
|
|
# Close our copy of the write end of the pipe
|
|
os.close(command_stdout)
|
|
|
|
def _get_counter_name(self):
|
|
# accessing the registry to get the counter localization name
|
|
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
|
|
counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
|
|
|
|
# Convert [key1, value1, key2, value2, ...] list
|
|
# to {key1: value1, key2: value2, ...} dict
|
|
counters = iter(counters)
|
|
counters_dict = dict(zip(counters, counters))
|
|
|
|
# System counter has key '2' and Processor Queue Length has key '44'
|
|
system = counters_dict['2']
|
|
process_queue_length = counters_dict['44']
|
|
return f'"\\{system}\\{process_queue_length}"'
|
|
|
|
def close(self, kill=True):
|
|
if self._popen is None:
|
|
return
|
|
|
|
self._load = None
|
|
|
|
if kill:
|
|
self._popen.kill()
|
|
self._popen.wait()
|
|
self._popen = None
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
def _parse_line(self, line):
|
|
# typeperf outputs in a CSV format like this:
|
|
# "07/19/2018 01:32:26.605","3.000000"
|
|
# (date, process queue length)
|
|
tokens = line.split(',')
|
|
if len(tokens) != 2:
|
|
raise ValueError
|
|
|
|
value = tokens[1]
|
|
if not value.startswith('"') or not value.endswith('"'):
|
|
raise ValueError
|
|
value = value[1:-1]
|
|
return float(value)
|
|
|
|
def _read_lines(self):
|
|
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
|
|
bytes_read, res = overlapped.GetOverlappedResult(False)
|
|
if res != 0:
|
|
return ()
|
|
|
|
output = overlapped.getbuffer()
|
|
output = output.decode('oem', 'replace')
|
|
output = self._buffer + output
|
|
lines = output.splitlines(True)
|
|
|
|
# bpo-36670: typeperf only writes a newline *before* writing a value,
|
|
# not after. Sometimes, the written line in incomplete (ex: only
|
|
# timestamp, without the process queue length). Only pass the last line
|
|
# to the parser if it's a valid value, otherwise store it in
|
|
# self._buffer.
|
|
try:
|
|
self._parse_line(lines[-1])
|
|
except ValueError:
|
|
self._buffer = lines.pop(-1)
|
|
else:
|
|
self._buffer = ''
|
|
|
|
return lines
|
|
|
|
def getloadavg(self):
|
|
if self._popen is None:
|
|
return None
|
|
|
|
returncode = self._popen.poll()
|
|
if returncode is not None:
|
|
self.close(kill=False)
|
|
return None
|
|
|
|
try:
|
|
lines = self._read_lines()
|
|
except BrokenPipeError:
|
|
self.close()
|
|
return None
|
|
|
|
for line in lines:
|
|
line = line.rstrip()
|
|
|
|
# Ignore the initial header:
|
|
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
|
|
if 'PDH-CSV' in line:
|
|
continue
|
|
|
|
# Ignore blank lines
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
processor_queue_length = self._parse_line(line)
|
|
except ValueError:
|
|
print_warning("Failed to parse typeperf output: %a" % line)
|
|
continue
|
|
|
|
# We use an exponentially weighted moving average, imitating the
|
|
# load calculation on Unix systems.
|
|
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
|
|
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
|
|
if self._load is not None:
|
|
self._load = (self._load * LOAD_FACTOR_1
|
|
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
|
|
elif len(self._values) < NVALUE:
|
|
self._values.append(processor_queue_length)
|
|
else:
|
|
self._load = sum(self._values) / len(self._values)
|
|
|
|
return self._load
|