cpython/Lib/test/test_audit.py

261 lines
7.9 KiB
Python

"""Tests for sys.audit and sys.addaudithook
"""
import os
import subprocess
import sys
import unittest
from test import support
if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
raise unittest.SkipTest("test only relevant when sys.audit is available")
class TestHook:
"""Used in standard hook tests to collect any logged events.
Should be used in a with block to ensure that it has no impact
after the test completes. Audit hooks cannot be removed, so the
best we can do for the test run is disable it by calling close().
"""
def __init__(self, raise_on_events=None, exc_type=RuntimeError):
self.raise_on_events = raise_on_events or ()
self.exc_type = exc_type
self.seen = []
self.closed = False
def __enter__(self, *a):
sys.addaudithook(self)
return self
def __exit__(self, *a):
self.close()
def close(self):
self.closed = True
@property
def seen_events(self):
return [i[0] for i in self.seen]
def __call__(self, event, args):
if self.closed:
return
self.seen.append((event, args))
if event in self.raise_on_events:
raise self.exc_type("saw event " + event)
class TestFinalizeHook:
"""Used in the test_finalize_hooks function to ensure that hooks
are correctly cleaned up, that they are notified about the cleanup,
and are unable to prevent it.
"""
def __init__(self):
print("Created", id(self), file=sys.stderr, flush=True)
def __call__(self, event, args):
# Avoid recursion when we call id() below
if event == "builtins.id":
return
print(event, id(self), file=sys.stderr, flush=True)
if event == "cpython._PySys_ClearAuditHooks":
raise RuntimeError("Should be ignored")
elif event == "cpython.PyInterpreterState_Clear":
raise RuntimeError("Should be ignored")
def run_finalize_test():
"""Called by test_finalize_hooks in a subprocess."""
sys.addaudithook(TestFinalizeHook())
class AuditTest(unittest.TestCase):
def test_basic(self):
with TestHook() as hook:
sys.audit("test_event", 1, 2, 3)
self.assertEqual(hook.seen[0][0], "test_event")
self.assertEqual(hook.seen[0][1], (1, 2, 3))
def test_block_add_hook(self):
# Raising an exception should prevent a new hook from being added,
# but will not propagate out.
with TestHook(raise_on_events="sys.addaudithook") as hook1:
with TestHook() as hook2:
sys.audit("test_event")
self.assertIn("test_event", hook1.seen_events)
self.assertNotIn("test_event", hook2.seen_events)
def test_block_add_hook_baseexception(self):
# Raising BaseException will propagate out when adding a hook
with self.assertRaises(BaseException):
with TestHook(
raise_on_events="sys.addaudithook", exc_type=BaseException
) as hook1:
# Adding this next hook should raise BaseException
with TestHook() as hook2:
pass
def test_finalize_hooks(self):
events = []
with subprocess.Popen(
[
sys.executable,
"-c",
"import test.test_audit; test.test_audit.run_finalize_test()",
],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as p:
p.wait()
for line in p.stderr:
events.append(line.strip().partition(" "))
firstId = events[0][2]
self.assertSequenceEqual(
[
("Created", " ", firstId),
("cpython._PySys_ClearAuditHooks", " ", firstId),
],
events,
)
def test_pickle(self):
pickle = support.import_module("pickle")
class PicklePrint:
def __reduce_ex__(self, p):
return str, ("Pwned!",)
payload_1 = pickle.dumps(PicklePrint())
payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3))
# Before we add the hook, ensure our malicious pickle loads
self.assertEqual("Pwned!", pickle.loads(payload_1))
with TestHook(raise_on_events="pickle.find_class") as hook:
with self.assertRaises(RuntimeError):
# With the hook enabled, loading globals is not allowed
pickle.loads(payload_1)
# pickles with no globals are okay
pickle.loads(payload_2)
def test_monkeypatch(self):
class A:
pass
class B:
pass
class C(A):
pass
a = A()
with TestHook() as hook:
# Catch name changes
C.__name__ = "X"
# Catch type changes
C.__bases__ = (B,)
# Ensure bypassing __setattr__ is still caught
type.__dict__["__bases__"].__set__(C, (B,))
# Catch attribute replacement
C.__init__ = B.__init__
# Catch attribute addition
C.new_attr = 123
# Catch class changes
a.__class__ = B
actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"]
self.assertSequenceEqual(
[(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")],
actual,
)
def test_open(self):
# SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
try:
import ssl
load_dh_params = ssl.create_default_context().load_dh_params
except ImportError:
load_dh_params = None
# Try a range of "open" functions.
# All of them should fail
with TestHook(raise_on_events={"open"}) as hook:
for fn, *args in [
(open, support.TESTFN, "r"),
(open, sys.executable, "rb"),
(open, 3, "wb"),
(open, support.TESTFN, "w", -1, None, None, None, False, lambda *a: 1),
(load_dh_params, support.TESTFN),
]:
if not fn:
continue
self.assertRaises(RuntimeError, fn, *args)
actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]]
actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]]
self.assertSequenceEqual(
[
i
for i in [
(support.TESTFN, "r"),
(sys.executable, "r"),
(3, "w"),
(support.TESTFN, "w"),
(support.TESTFN, "rb") if load_dh_params else None,
]
if i is not None
],
actual_mode,
)
self.assertSequenceEqual([], actual_flag)
def test_cantrace(self):
traced = []
def trace(frame, event, *args):
if frame.f_code == TestHook.__call__.__code__:
traced.append(event)
old = sys.settrace(trace)
try:
with TestHook() as hook:
# No traced call
eval("1")
# No traced call
hook.__cantrace__ = False
eval("2")
# One traced call
hook.__cantrace__ = True
eval("3")
# Two traced calls (writing to private member, eval)
hook.__cantrace__ = 1
eval("4")
# One traced call (writing to private member)
hook.__cantrace__ = 0
finally:
sys.settrace(old)
self.assertSequenceEqual(["call"] * 4, traced)
if __name__ == "__main__":
if len(sys.argv) >= 2 and sys.argv[1] == "spython_test":
# Doesn't matter what we add - it will be blocked
sys.addaudithook(None)
sys.exit(0)
unittest.main()