# Test the internal _wmi module on Windows # This is used by the platform module, and potentially others import time import unittest from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT # Do this first so test will be skipped if module doesn't exist _wmi = import_helper.import_module('_wmi', required_on=['win']) def wmi_exec_query(query): # gh-112278: WMI maybe slow response when first call. try: return _wmi.exec_query(query) except BrokenPipeError: pass except WindowsError as e: if e.winerror != 258: raise time.sleep(LOOPBACK_TIMEOUT) return _wmi.exec_query(query) class WmiTests(unittest.TestCase): def test_wmi_query_os_version(self): r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0") self.assertEqual(1, len(r)) k, eq, v = r[0].partition("=") self.assertEqual("=", eq, r[0]) self.assertEqual("Version", k, r[0]) # Best we can check for the version is that it's digits, dot, digits, anything # Otherwise, we are likely checking the result of the query against itself self.assertRegex(v, r"\d+\.\d+.+$", r[0]) def test_wmi_query_repeated(self): # Repeated queries should not break for _ in range(10): self.test_wmi_query_os_version() def test_wmi_query_error(self): # Invalid queries fail with OSError try: wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName") except OSError as ex: if ex.winerror & 0xFFFFFFFF == 0x80041010: # This is the expected error code. All others should fail the test return self.fail("Expected OSError") def test_wmi_query_repeated_error(self): for _ in range(10): self.test_wmi_query_error() def test_wmi_query_not_select(self): # Queries other than SELECT are blocked to avoid potential exploits with self.assertRaises(ValueError): wmi_exec_query("not select, just in case someone tries something") @requires_resource('cpu') def test_wmi_query_overflow(self): # Ensure very big queries fail # Test multiple times to ensure consistency for _ in range(2): with self.assertRaises(OSError): wmi_exec_query("SELECT * FROM CIM_DataFile") def test_wmi_query_multiple_rows(self): # Multiple instances should have an extra null separator r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000") self.assertFalse(r.startswith("\0"), r) self.assertFalse(r.endswith("\0"), r) it = iter(r.split("\0")) try: while True: self.assertRegex(next(it), r"ProcessId=\d+") self.assertEqual("", next(it)) except StopIteration: pass def test_wmi_query_threads(self): from concurrent.futures import ThreadPoolExecutor query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000" with ThreadPoolExecutor(4) as pool: task = [pool.submit(wmi_exec_query, query) for _ in range(32)] for t in task: self.assertRegex(t.result(), "ProcessId=")