2022-09-07 17:09:20 -03:00
|
|
|
# Test the internal _wmi module on Windows
|
|
|
|
# This is used by the platform module, and potentially others
|
|
|
|
|
|
|
|
import unittest
|
2022-09-08 18:02:04 -03:00
|
|
|
from test.support import import_helper, requires_resource
|
2022-09-07 17:09:20 -03:00
|
|
|
|
|
|
|
|
|
|
|
# Do this first so test will be skipped if module doesn't exist
|
|
|
|
_wmi = import_helper.import_module('_wmi', required_on=['win'])
|
|
|
|
|
|
|
|
|
|
|
|
class WmiTests(unittest.TestCase):
|
|
|
|
def test_wmi_query_os_version(self):
|
|
|
|
r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
|
|
|
|
self.assertEqual(1, len(r))
|
|
|
|
k, eq, v = r[0].partition("=")
|
|
|
|
self.assertEqual("=", eq, r[0])
|
|
|
|
self.assertEqual("Version", k, r[0])
|
|
|
|
# Best we can check for the version is that it's digits, dot, digits, anything
|
|
|
|
# Otherwise, we are likely checking the result of the query against itself
|
2022-09-08 18:02:04 -03:00
|
|
|
self.assertRegex(v, r"\d+\.\d+.+$", r[0])
|
2022-09-07 17:09:20 -03:00
|
|
|
|
|
|
|
def test_wmi_query_repeated(self):
|
|
|
|
# Repeated queries should not break
|
|
|
|
for _ in range(10):
|
|
|
|
self.test_wmi_query_os_version()
|
|
|
|
|
|
|
|
def test_wmi_query_error(self):
|
|
|
|
# Invalid queries fail with OSError
|
|
|
|
try:
|
|
|
|
_wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
|
|
|
|
except OSError as ex:
|
|
|
|
if ex.winerror & 0xFFFFFFFF == 0x80041010:
|
|
|
|
# This is the expected error code. All others should fail the test
|
|
|
|
return
|
|
|
|
self.fail("Expected OSError")
|
|
|
|
|
|
|
|
def test_wmi_query_repeated_error(self):
|
|
|
|
for _ in range(10):
|
|
|
|
self.test_wmi_query_error()
|
|
|
|
|
|
|
|
def test_wmi_query_not_select(self):
|
|
|
|
# Queries other than SELECT are blocked to avoid potential exploits
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
_wmi.exec_query("not select, just in case someone tries something")
|
|
|
|
|
2022-09-08 18:02:04 -03:00
|
|
|
@requires_resource('cpu')
|
2022-09-07 17:09:20 -03:00
|
|
|
def test_wmi_query_overflow(self):
|
|
|
|
# Ensure very big queries fail
|
|
|
|
# Test multiple times to ensure consistency
|
|
|
|
for _ in range(2):
|
|
|
|
with self.assertRaises(OSError):
|
|
|
|
_wmi.exec_query("SELECT * FROM CIM_DataFile")
|
|
|
|
|
|
|
|
def test_wmi_query_multiple_rows(self):
|
|
|
|
# Multiple instances should have an extra null separator
|
|
|
|
r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
|
|
|
|
self.assertFalse(r.startswith("\0"), r)
|
|
|
|
self.assertFalse(r.endswith("\0"), r)
|
|
|
|
it = iter(r.split("\0"))
|
|
|
|
try:
|
|
|
|
while True:
|
2022-09-08 18:02:04 -03:00
|
|
|
self.assertRegex(next(it), r"ProcessId=\d+")
|
2022-09-07 17:09:20 -03:00
|
|
|
self.assertEqual("", next(it))
|
|
|
|
except StopIteration:
|
|
|
|
pass
|
2022-09-08 18:02:04 -03:00
|
|
|
|
|
|
|
def test_wmi_query_threads(self):
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
|
|
|
|
with ThreadPoolExecutor(4) as pool:
|
|
|
|
task = [pool.submit(_wmi.exec_query, query) for _ in range(32)]
|
|
|
|
for t in task:
|
|
|
|
self.assertRegex(t.result(), "ProcessId=")
|