"""This script contains the actual auditing tests. It should not be imported directly, but should be run by the test_audit module with arguments identifying each test. """ import contextlib import os import sys class TestHook: """Used in standard hook tests to collect any logged events. Should be used in a with block to ensure that it has no impact after the test completes. """ def __init__(self, raise_on_events=None, exc_type=RuntimeError): self.raise_on_events = raise_on_events or () self.exc_type = exc_type self.seen = [] self.closed = False def __enter__(self, *a): sys.addaudithook(self) return self def __exit__(self, *a): self.close() def close(self): self.closed = True @property def seen_events(self): return [i[0] for i in self.seen] def __call__(self, event, args): if self.closed: return self.seen.append((event, args)) if event in self.raise_on_events: raise self.exc_type("saw event " + event) # Simple helpers, since we are not in unittest here def assertEqual(x, y): if x != y: raise AssertionError(f"{x!r} should equal {y!r}") def assertIn(el, series): if el not in series: raise AssertionError(f"{el!r} should be in {series!r}") def assertNotIn(el, series): if el in series: raise AssertionError(f"{el!r} should not be in {series!r}") def assertSequenceEqual(x, y): if len(x) != len(y): raise AssertionError(f"{x!r} should equal {y!r}") if any(ix != iy for ix, iy in zip(x, y)): raise AssertionError(f"{x!r} should equal {y!r}") @contextlib.contextmanager def assertRaises(ex_type): try: yield assert False, f"expected {ex_type}" except BaseException as ex: if isinstance(ex, AssertionError): raise assert type(ex) is ex_type, f"{ex} should be {ex_type}" def test_basic(): with TestHook() as hook: sys.audit("test_event", 1, 2, 3) assertEqual(hook.seen[0][0], "test_event") assertEqual(hook.seen[0][1], (1, 2, 3)) def test_block_add_hook(): # Raising an exception should prevent a new hook from being added, # but will not propagate out. with TestHook(raise_on_events="sys.addaudithook") as hook1: with TestHook() as hook2: sys.audit("test_event") assertIn("test_event", hook1.seen_events) assertNotIn("test_event", hook2.seen_events) def test_block_add_hook_baseexception(): # Raising BaseException will propagate out when adding a hook with assertRaises(BaseException): with TestHook( raise_on_events="sys.addaudithook", exc_type=BaseException ) as hook1: # Adding this next hook should raise BaseException with TestHook() as hook2: pass def test_marshal(): import marshal o = ("a", "b", "c", 1, 2, 3) payload = marshal.dumps(o) with TestHook() as hook: assertEqual(o, marshal.loads(marshal.dumps(o))) try: with open("test-marshal.bin", "wb") as f: marshal.dump(o, f) with open("test-marshal.bin", "rb") as f: assertEqual(o, marshal.load(f)) finally: os.unlink("test-marshal.bin") actual = [(a[0], a[1]) for e, a in hook.seen if e == "marshal.dumps"] assertSequenceEqual(actual, [(o, marshal.version)] * 2) actual = [a[0] for e, a in hook.seen if e == "marshal.loads"] assertSequenceEqual(actual, [payload]) actual = [e for e, a in hook.seen if e == "marshal.load"] assertSequenceEqual(actual, ["marshal.load"]) def test_pickle(): import pickle class PicklePrint: def __reduce_ex__(self, p): return str, ("Pwned!",) payload_1 = pickle.dumps(PicklePrint()) payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3)) # Before we add the hook, ensure our malicious pickle loads assertEqual("Pwned!", pickle.loads(payload_1)) with TestHook(raise_on_events="pickle.find_class") as hook: with assertRaises(RuntimeError): # With the hook enabled, loading globals is not allowed pickle.loads(payload_1) # pickles with no globals are okay pickle.loads(payload_2) def test_monkeypatch(): class A: pass class B: pass class C(A): pass a = A() with TestHook() as hook: # Catch name changes C.__name__ = "X" # Catch type changes C.__bases__ = (B,) # Ensure bypassing __setattr__ is still caught type.__dict__["__bases__"].__set__(C, (B,)) # Catch attribute replacement C.__init__ = B.__init__ # Catch attribute addition C.new_attr = 123 # Catch class changes a.__class__ = B actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"] assertSequenceEqual( [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], actual ) def test_open(testfn): # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open() try: import ssl load_dh_params = ssl.create_default_context().load_dh_params except ImportError: load_dh_params = None # Try a range of "open" functions. # All of them should fail with TestHook(raise_on_events={"open"}) as hook: for fn, *args in [ (open, testfn, "r"), (open, sys.executable, "rb"), (open, 3, "wb"), (open, testfn, "w", -1, None, None, None, False, lambda *a: 1), (load_dh_params, testfn), ]: if not fn: continue with assertRaises(RuntimeError): fn(*args) actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]] actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]] assertSequenceEqual( [ i for i in [ (testfn, "r"), (sys.executable, "r"), (3, "w"), (testfn, "w"), (testfn, "rb") if load_dh_params else None, ] if i is not None ], actual_mode, ) assertSequenceEqual([], actual_flag) def test_cantrace(): traced = [] def trace(frame, event, *args): if frame.f_code == TestHook.__call__.__code__: traced.append(event) old = sys.settrace(trace) try: with TestHook() as hook: # No traced call eval("1") # No traced call hook.__cantrace__ = False eval("2") # One traced call hook.__cantrace__ = True eval("3") # Two traced calls (writing to private member, eval) hook.__cantrace__ = 1 eval("4") # One traced call (writing to private member) hook.__cantrace__ = 0 finally: sys.settrace(old) assertSequenceEqual(["call"] * 4, traced) def test_mmap(): import mmap with TestHook() as hook: mmap.mmap(-1, 8) assertEqual(hook.seen[0][1][:2], (-1, 8)) def test_excepthook(): def excepthook(exc_type, exc_value, exc_tb): if exc_type is not RuntimeError: sys.__excepthook__(exc_type, exc_value, exc_tb) def hook(event, args): if event == "sys.excepthook": if not isinstance(args[2], args[1]): raise TypeError(f"Expected isinstance({args[2]!r}, " f"{args[1]!r})") if args[0] != excepthook: raise ValueError(f"Expected {args[0]} == {excepthook}") print(event, repr(args[2])) sys.addaudithook(hook) sys.excepthook = excepthook raise RuntimeError("fatal-error") def test_unraisablehook(): from _testcapi import err_formatunraisable def unraisablehook(hookargs): pass def hook(event, args): if event == "sys.unraisablehook": if args[0] != unraisablehook: raise ValueError(f"Expected {args[0]} == {unraisablehook}") print(event, repr(args[1].exc_value), args[1].err_msg) sys.addaudithook(hook) sys.unraisablehook = unraisablehook err_formatunraisable(RuntimeError("nonfatal-error"), "Exception ignored for audit hook test") def test_winreg(): from winreg import OpenKey, EnumKey, CloseKey, HKEY_LOCAL_MACHINE def hook(event, args): if not event.startswith("winreg."): return print(event, *args) sys.addaudithook(hook) k = OpenKey(HKEY_LOCAL_MACHINE, "Software") EnumKey(k, 0) try: EnumKey(k, 10000) except OSError: pass else: raise RuntimeError("Expected EnumKey(HKLM, 10000) to fail") kv = k.Detach() CloseKey(kv) def test_socket(): import socket def hook(event, args): if event.startswith("socket."): print(event, *args) sys.addaudithook(hook) socket.gethostname() # Don't care if this fails, we just want the audit message sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Don't care if this fails, we just want the audit message sock.bind(('127.0.0.1', 8080)) except Exception: pass finally: sock.close() def test_gc(): import gc def hook(event, args): if event.startswith("gc."): print(event, *args) sys.addaudithook(hook) gc.get_objects(generation=1) x = object() y = [x] gc.get_referrers(x) gc.get_referents(y) def test_http_client(): import http.client def hook(event, args): if event.startswith("http.client."): print(event, *args[1:]) sys.addaudithook(hook) conn = http.client.HTTPConnection('www.python.org') try: conn.request('GET', '/') except OSError: print('http.client.send', '[cannot send]') finally: conn.close() def test_sqlite3(): import sqlite3 def hook(event, *args): if event.startswith("sqlite3."): print(event, *args) sys.addaudithook(hook) cx1 = sqlite3.connect(":memory:") cx2 = sqlite3.Connection(":memory:") # Configured without --enable-loadable-sqlite-extensions try: if hasattr(sqlite3.Connection, "enable_load_extension"): cx1.enable_load_extension(False) try: cx1.load_extension("test") except sqlite3.OperationalError: pass else: raise RuntimeError("Expected sqlite3.load_extension to fail") finally: cx1.close() cx2.close() def test_sys_getframe(): import sys def hook(event, args): if event.startswith("sys."): print(event, args[0].f_code.co_name) sys.addaudithook(hook) sys._getframe() def test_sys_getframemodulename(): import sys def hook(event, args): if event.startswith("sys."): print(event, *args) sys.addaudithook(hook) sys._getframemodulename() def test_threading(): import _thread def hook(event, args): if event.startswith(("_thread.", "cpython.PyThreadState", "test.")): print(event, args) sys.addaudithook(hook) lock = _thread.allocate_lock() lock.acquire() class test_func: def __repr__(self): return "" def __call__(self): sys.audit("test.test_func") lock.release() i = _thread.start_new_thread(test_func(), ()) lock.acquire() handle = _thread.start_joinable_thread(test_func()) handle.join() def test_threading_abort(): # Ensures that aborting PyThreadState_New raises the correct exception import _thread class ThreadNewAbortError(Exception): pass def hook(event, args): if event == "cpython.PyThreadState_New": raise ThreadNewAbortError() sys.addaudithook(hook) try: _thread.start_new_thread(lambda: None, ()) except ThreadNewAbortError: # Other exceptions are raised and the test will fail pass def test_wmi_exec_query(): import _wmi def hook(event, args): if event.startswith("_wmi."): print(event, args[0]) sys.addaudithook(hook) try: _wmi.exec_query("SELECT * FROM Win32_OperatingSystem") except WindowsError as e: # gh-112278: WMI may be slow response when first called, but we still # get the audit event, so just ignore the timeout if e.winerror != 258: raise def test_syslog(): import syslog def hook(event, args): if event.startswith("syslog."): print(event, *args) sys.addaudithook(hook) syslog.openlog('python') syslog.syslog('test') syslog.setlogmask(syslog.LOG_DEBUG) syslog.closelog() # implicit open syslog.syslog('test2') # open with default ident syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0) sys.argv = None syslog.openlog() syslog.closelog() def test_not_in_gc(): import gc hook = lambda *a: None sys.addaudithook(hook) for o in gc.get_objects(): if isinstance(o, list): assert hook not in o def test_time(mode): import time def hook(event, args): if event.startswith("time."): if mode == 'print': print(event, *args) elif mode == 'fail': raise AssertionError('hook failed') sys.addaudithook(hook) time.sleep(0) time.sleep(0.0625) # 1/16, a small exact float try: time.sleep(-1) except ValueError: pass def test_sys_monitoring_register_callback(): import sys def hook(event, args): if event.startswith("sys.monitoring"): print(event, args) sys.addaudithook(hook) sys.monitoring.register_callback(1, 1, None) def test_winapi_createnamedpipe(pipe_name): import _winapi def hook(event, args): if event == "_winapi.CreateNamedPipe": print(event, args) sys.addaudithook(hook) _winapi.CreateNamedPipe(pipe_name, _winapi.PIPE_ACCESS_DUPLEX, 8, 2, 0, 0, 0, 0) if __name__ == "__main__": from test.support import suppress_msvcrt_asserts suppress_msvcrt_asserts() test = sys.argv[1] globals()[test](*sys.argv[2:])