import _overlapped import _thread import _winapi import math import struct import winreg # Seconds per measurement SAMPLING_INTERVAL = 1 # Exponential damping factor to compute exponentially weighted moving average # on 1 minute (60 seconds) LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) # Initialize the load using the arithmetic mean of the first NVALUE values # of the Processor Queue Length NVALUE = 5 class WindowsLoadTracker(): """ This class asynchronously reads the performance counters to calculate the system load on Windows. A "raw" thread is used here to prevent interference with the test suite's cases for the threading module. """ def __init__(self): # make __del__ not fail if pre-flight test fails self._running = None self._stopped = None # Pre-flight test for access to the performance data; # `PermissionError` will be raised if not allowed winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA) self._values = [] self._load = None self._running = _overlapped.CreateEvent(None, True, False, None) self._stopped = _overlapped.CreateEvent(None, True, False, None) _thread.start_new_thread(self._update_load, (), {}) def _update_load(self, # localize module access to prevent shutdown errors _wait=_winapi.WaitForSingleObject, _signal=_overlapped.SetEvent): # run until signaled to stop while _wait(self._running, 1000): self._calculate_load() # notify stopped _signal(self._stopped) def _calculate_load(self, # localize module access to prevent shutdown errors _query=winreg.QueryValueEx, _hkey=winreg.HKEY_PERFORMANCE_DATA, _unpack=struct.unpack_from): # get the 'System' object data, _ = _query(_hkey, '2') # PERF_DATA_BLOCK { # WCHAR Signature[4] 8 + # DWOWD LittleEndian 4 + # DWORD Version 4 + # DWORD Revision 4 + # DWORD TotalByteLength 4 + # DWORD HeaderLength = 24 byte offset # ... # } obj_start, = _unpack('L', data, 24) # PERF_OBJECT_TYPE { # DWORD TotalByteLength # DWORD DefinitionLength # DWORD HeaderLength # ... # } data_start, defn_start = _unpack('4xLL', data, obj_start) data_base = obj_start + data_start defn_base = obj_start + defn_start # find the 'Processor Queue Length' counter (index=44) while defn_base < data_base: # PERF_COUNTER_DEFINITION { # DWORD ByteLength # DWORD CounterNameTitleIndex # ... [7 DWORDs/28 bytes] # DWORD CounterOffset # } size, idx, offset = _unpack('LL28xL', data, defn_base) defn_base += size if idx == 44: counter_offset = data_base + offset # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD) processor_queue_length, = _unpack('L', data, counter_offset) break else: return # We use an exponentially weighted moving average, imitating the # load calculation on Unix systems. # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average if self._load is not None: self._load = (self._load * LOAD_FACTOR_1 + processor_queue_length * (1.0 - LOAD_FACTOR_1)) elif len(self._values) < NVALUE: self._values.append(processor_queue_length) else: self._load = sum(self._values) / len(self._values) def close(self, kill=True): self.__del__() return def __del__(self, # localize module access to prevent shutdown errors _wait=_winapi.WaitForSingleObject, _close=_winapi.CloseHandle, _signal=_overlapped.SetEvent): if self._running is not None: # tell the update thread to quit _signal(self._running) # wait for the update thread to signal done _wait(self._stopped, -1) # cleanup events _close(self._running) _close(self._stopped) self._running = self._stopped = None def getloadavg(self): return self._load