"""Test suite for the sys.monitoring.""" import collections import dis import functools import math import operator import sys import textwrap import types import unittest import test.support from test.support import requires_specialization, script_helper from test.support.import_helper import import_module _testcapi = test.support.import_helper.import_module("_testcapi") PAIR = (0,1) def f1(): pass def f2(): len([]) sys.getsizeof(0) def floop(): for item in PAIR: pass def gen(): yield yield def g1(): for _ in gen(): pass TEST_TOOL = 2 TEST_TOOL2 = 3 TEST_TOOL3 = 4 def nth_line(func, offset): return func.__code__.co_firstlineno + offset class MonitoringBasicTest(unittest.TestCase): def test_has_objects(self): m = sys.monitoring m.events m.use_tool_id m.free_tool_id m.get_tool m.get_events m.set_events m.get_local_events m.set_local_events m.register_callback m.restart_events m.DISABLE m.MISSING m.events.NO_EVENTS def test_tool(self): sys.monitoring.use_tool_id(TEST_TOOL, "MonitoringTest.Tool") self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), "MonitoringTest.Tool") sys.monitoring.set_events(TEST_TOOL, 15) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 15) sys.monitoring.set_events(TEST_TOOL, 0) with self.assertRaises(ValueError): sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RETURN) with self.assertRaises(ValueError): sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RAISE) sys.monitoring.free_tool_id(TEST_TOOL) self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), None) with self.assertRaises(ValueError): sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.CALL) class MonitoringTestBase: def setUp(self): # Check that a previous test hasn't left monitoring on. for tool in range(6): self.assertEqual(sys.monitoring.get_events(tool), 0) self.assertIs(sys.monitoring.get_tool(TEST_TOOL), None) self.assertIs(sys.monitoring.get_tool(TEST_TOOL2), None) self.assertIs(sys.monitoring.get_tool(TEST_TOOL3), None) sys.monitoring.use_tool_id(TEST_TOOL, "test " + self.__class__.__name__) sys.monitoring.use_tool_id(TEST_TOOL2, "test2 " + self.__class__.__name__) sys.monitoring.use_tool_id(TEST_TOOL3, "test3 " + self.__class__.__name__) def tearDown(self): # Check that test hasn't left monitoring on. for tool in range(6): self.assertEqual(sys.monitoring.get_events(tool), 0) sys.monitoring.free_tool_id(TEST_TOOL) sys.monitoring.free_tool_id(TEST_TOOL2) sys.monitoring.free_tool_id(TEST_TOOL3) class MonitoringCountTest(MonitoringTestBase, unittest.TestCase): def check_event_count(self, func, event, expected): class Counter: def __init__(self): self.count = 0 def __call__(self, *args): self.count += 1 counter = Counter() sys.monitoring.register_callback(TEST_TOOL, event, counter) if event == E.C_RETURN or event == E.C_RAISE: sys.monitoring.set_events(TEST_TOOL, E.CALL) else: sys.monitoring.set_events(TEST_TOOL, event) self.assertEqual(counter.count, 0) counter.count = 0 func() self.assertEqual(counter.count, expected) prev = sys.monitoring.register_callback(TEST_TOOL, event, None) counter.count = 0 func() self.assertEqual(counter.count, 0) self.assertEqual(prev, counter) sys.monitoring.set_events(TEST_TOOL, 0) def test_start_count(self): self.check_event_count(f1, E.PY_START, 1) def test_resume_count(self): self.check_event_count(g1, E.PY_RESUME, 2) def test_return_count(self): self.check_event_count(f1, E.PY_RETURN, 1) def test_call_count(self): self.check_event_count(f2, E.CALL, 3) def test_c_return_count(self): self.check_event_count(f2, E.C_RETURN, 2) E = sys.monitoring.events INSTRUMENTED_EVENTS = [ (E.PY_START, "start"), (E.PY_RESUME, "resume"), (E.PY_RETURN, "return"), (E.PY_YIELD, "yield"), (E.JUMP, "jump"), (E.BRANCH, "branch"), ] EXCEPT_EVENTS = [ (E.RAISE, "raise"), (E.PY_UNWIND, "unwind"), (E.EXCEPTION_HANDLED, "exception_handled"), ] SIMPLE_EVENTS = INSTRUMENTED_EVENTS + EXCEPT_EVENTS + [ (E.C_RAISE, "c_raise"), (E.C_RETURN, "c_return"), ] SIMPLE_EVENT_SET = functools.reduce(operator.or_, [ev for (ev, _) in SIMPLE_EVENTS], 0) | E.CALL def just_pass(): pass just_pass.events = [ "py_call", "start", "return", ] def just_raise(): raise Exception just_raise.events = [ 'py_call', "start", "raise", "unwind", ] def just_call(): len([]) just_call.events = [ 'py_call', "start", "c_call", "c_return", "return", ] def caught(): try: 1/0 except Exception: pass caught.events = [ 'py_call', "start", "raise", "exception_handled", "branch", "return", ] def nested_call(): just_pass() nested_call.events = [ "py_call", "start", "py_call", "start", "return", "return", ] PY_CALLABLES = (types.FunctionType, types.MethodType) class MonitoringEventsBase(MonitoringTestBase): def gather_events(self, func): events = [] for event, event_name in SIMPLE_EVENTS: def record(*args, event_name=event_name): events.append(event_name) sys.monitoring.register_callback(TEST_TOOL, event, record) def record_call(code, offset, obj, arg): if isinstance(obj, PY_CALLABLES): events.append("py_call") else: events.append("c_call") sys.monitoring.register_callback(TEST_TOOL, E.CALL, record_call) sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET) events = [] try: func() except: pass sys.monitoring.set_events(TEST_TOOL, 0) #Remove the final event, the call to `sys.monitoring.set_events` events = events[:-1] return events def check_events(self, func, expected=None): events = self.gather_events(func) if expected is None: expected = func.events self.assertEqual(events, expected) class MonitoringEventsTest(MonitoringEventsBase, unittest.TestCase): def test_just_pass(self): self.check_events(just_pass) def test_just_raise(self): try: self.check_events(just_raise) except Exception: pass self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0) def test_just_call(self): self.check_events(just_call) def test_caught(self): self.check_events(caught) def test_nested_call(self): self.check_events(nested_call) UP_EVENTS = (E.C_RETURN, E.C_RAISE, E.PY_RETURN, E.PY_UNWIND, E.PY_YIELD) DOWN_EVENTS = (E.PY_START, E.PY_RESUME) from test.profilee import testfunc class SimulateProfileTest(MonitoringEventsBase, unittest.TestCase): def test_balanced(self): events = self.gather_events(testfunc) c = collections.Counter(events) self.assertEqual(c["c_call"], c["c_return"]) self.assertEqual(c["start"], c["return"] + c["unwind"]) self.assertEqual(c["raise"], c["exception_handled"] + c["unwind"]) def test_frame_stack(self): self.maxDiff = None stack = [] errors = [] seen = set() def up(*args): frame = sys._getframe(1) if not stack: errors.append("empty") else: expected = stack.pop() if frame != expected: errors.append(f" Popping {frame} expected {expected}") def down(*args): frame = sys._getframe(1) stack.append(frame) seen.add(frame.f_code) def call(code, offset, callable, arg): if not isinstance(callable, PY_CALLABLES): stack.append(sys._getframe(1)) for event in UP_EVENTS: sys.monitoring.register_callback(TEST_TOOL, event, up) for event in DOWN_EVENTS: sys.monitoring.register_callback(TEST_TOOL, event, down) sys.monitoring.register_callback(TEST_TOOL, E.CALL, call) sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET) testfunc() sys.monitoring.set_events(TEST_TOOL, 0) self.assertEqual(errors, []) self.assertEqual(stack, [sys._getframe()]) self.assertEqual(len(seen), 9) class CounterWithDisable: def __init__(self): self.disable = False self.count = 0 def __call__(self, *args): self.count += 1 if self.disable: return sys.monitoring.DISABLE class RecorderWithDisable: def __init__(self, events): self.disable = False self.events = events def __call__(self, code, event): self.events.append(event) if self.disable: return sys.monitoring.DISABLE class MontoringDisableAndRestartTest(MonitoringTestBase, unittest.TestCase): def test_disable(self): try: counter = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter) sys.monitoring.set_events(TEST_TOOL, E.PY_START) self.assertEqual(counter.count, 0) counter.count = 0 f1() self.assertEqual(counter.count, 1) counter.disable = True counter.count = 0 f1() self.assertEqual(counter.count, 1) counter.count = 0 f1() self.assertEqual(counter.count, 0) sys.monitoring.set_events(TEST_TOOL, 0) finally: sys.monitoring.restart_events() def test_restart(self): try: counter = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter) sys.monitoring.set_events(TEST_TOOL, E.PY_START) counter.disable = True f1() counter.count = 0 f1() self.assertEqual(counter.count, 0) sys.monitoring.restart_events() counter.count = 0 f1() self.assertEqual(counter.count, 1) sys.monitoring.set_events(TEST_TOOL, 0) finally: sys.monitoring.restart_events() class MultipleMonitorsTest(MonitoringTestBase, unittest.TestCase): def test_two_same(self): try: self.assertEqual(sys.monitoring._all_events(), {}) counter1 = CounterWithDisable() counter2 = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1) sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2) sys.monitoring.set_events(TEST_TOOL, E.PY_START) sys.monitoring.set_events(TEST_TOOL2, E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START) self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)}) counter1.count = 0 counter2.count = 0 f1() count1 = counter1.count count2 = counter2.count self.assertEqual((count1, count2), (1, 1)) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.set_events(TEST_TOOL2, 0) sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None) sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None) self.assertEqual(sys.monitoring._all_events(), {}) def test_three_same(self): try: self.assertEqual(sys.monitoring._all_events(), {}) counter1 = CounterWithDisable() counter2 = CounterWithDisable() counter3 = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1) sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2) sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, counter3) sys.monitoring.set_events(TEST_TOOL, E.PY_START) sys.monitoring.set_events(TEST_TOOL2, E.PY_START) sys.monitoring.set_events(TEST_TOOL3, E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL3), E.PY_START) self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2) | (1 << TEST_TOOL3)}) counter1.count = 0 counter2.count = 0 counter3.count = 0 f1() count1 = counter1.count count2 = counter2.count count3 = counter3.count self.assertEqual((count1, count2, count3), (1, 1, 1)) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.set_events(TEST_TOOL2, 0) sys.monitoring.set_events(TEST_TOOL3, 0) sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None) sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None) sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, None) self.assertEqual(sys.monitoring._all_events(), {}) def test_two_different(self): try: self.assertEqual(sys.monitoring._all_events(), {}) counter1 = CounterWithDisable() counter2 = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1) sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, counter2) sys.monitoring.set_events(TEST_TOOL, E.PY_START) sys.monitoring.set_events(TEST_TOOL2, E.PY_RETURN) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_RETURN) self.assertEqual(sys.monitoring._all_events(), {'PY_START': 1 << TEST_TOOL, 'PY_RETURN': 1 << TEST_TOOL2}) counter1.count = 0 counter2.count = 0 f1() count1 = counter1.count count2 = counter2.count self.assertEqual((count1, count2), (1, 1)) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.set_events(TEST_TOOL2, 0) sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None) sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, None) self.assertEqual(sys.monitoring._all_events(), {}) def test_two_with_disable(self): try: self.assertEqual(sys.monitoring._all_events(), {}) counter1 = CounterWithDisable() counter2 = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1) sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2) sys.monitoring.set_events(TEST_TOOL, E.PY_START) sys.monitoring.set_events(TEST_TOOL2, E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START) self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)}) counter1.count = 0 counter2.count = 0 counter1.disable = True f1() count1 = counter1.count count2 = counter2.count self.assertEqual((count1, count2), (1, 1)) counter1.count = 0 counter2.count = 0 f1() count1 = counter1.count count2 = counter2.count self.assertEqual((count1, count2), (0, 1)) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.set_events(TEST_TOOL2, 0) sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None) sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None) self.assertEqual(sys.monitoring._all_events(), {}) sys.monitoring.restart_events() def test_with_instruction_event(self): """Test that the second tool can set events with instruction events set by the first tool.""" def f(): pass code = f.__code__ try: self.assertEqual(sys.monitoring._all_events(), {}) sys.monitoring.set_local_events(TEST_TOOL, code, E.INSTRUCTION | E.LINE) sys.monitoring.set_local_events(TEST_TOOL2, code, E.LINE) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.set_events(TEST_TOOL2, 0) self.assertEqual(sys.monitoring._all_events(), {}) class LineMonitoringTest(MonitoringTestBase, unittest.TestCase): def test_lines_single(self): try: self.assertEqual(sys.monitoring._all_events(), {}) events = [] recorder = RecorderWithDisable(events) sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder) sys.monitoring.set_events(TEST_TOOL, E.LINE) f1() sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) start = nth_line(LineMonitoringTest.test_lines_single, 0) self.assertEqual(events, [start+7, nth_line(f1, 1), start+8]) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) self.assertEqual(sys.monitoring._all_events(), {}) sys.monitoring.restart_events() def test_lines_loop(self): try: self.assertEqual(sys.monitoring._all_events(), {}) events = [] recorder = RecorderWithDisable(events) sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder) sys.monitoring.set_events(TEST_TOOL, E.LINE) floop() sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) start = nth_line(LineMonitoringTest.test_lines_loop, 0) floop_1 = nth_line(floop, 1) floop_2 = nth_line(floop, 2) self.assertEqual( events, [start+7, floop_1, floop_2, floop_1, floop_2, floop_1, start+8] ) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) self.assertEqual(sys.monitoring._all_events(), {}) sys.monitoring.restart_events() def test_lines_two(self): try: self.assertEqual(sys.monitoring._all_events(), {}) events = [] recorder = RecorderWithDisable(events) events2 = [] recorder2 = RecorderWithDisable(events2) sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder) sys.monitoring.register_callback(TEST_TOOL2, E.LINE, recorder2) sys.monitoring.set_events(TEST_TOOL, E.LINE); sys.monitoring.set_events(TEST_TOOL2, E.LINE) f1() sys.monitoring.set_events(TEST_TOOL, 0); sys.monitoring.set_events(TEST_TOOL2, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None) start = nth_line(LineMonitoringTest.test_lines_two, 0) expected = [start+10, nth_line(f1, 1), start+11] self.assertEqual(events, expected) self.assertEqual(events2, expected) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.set_events(TEST_TOOL2, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None) self.assertEqual(sys.monitoring._all_events(), {}) sys.monitoring.restart_events() def check_lines(self, func, expected, tool=TEST_TOOL): try: self.assertEqual(sys.monitoring._all_events(), {}) events = [] recorder = RecorderWithDisable(events) sys.monitoring.register_callback(tool, E.LINE, recorder) sys.monitoring.set_events(tool, E.LINE) func() sys.monitoring.set_events(tool, 0) sys.monitoring.register_callback(tool, E.LINE, None) lines = [ line - func.__code__.co_firstlineno for line in events[1:-1] ] self.assertEqual(lines, expected) finally: sys.monitoring.set_events(tool, 0) def test_linear(self): def func(): line = 1 line = 2 line = 3 line = 4 line = 5 self.check_lines(func, [1,2,3,4,5]) def test_branch(self): def func(): if "true".startswith("t"): line = 2 line = 3 else: line = 5 line = 6 self.check_lines(func, [1,2,3,6]) def test_try_except(self): def func1(): try: line = 2 line = 3 except: line = 5 line = 6 self.check_lines(func1, [1,2,3,6]) def func2(): try: line = 2 raise 3 except: line = 5 line = 6 self.check_lines(func2, [1,2,3,4,5,6]) def test_generator_with_line(self): def f(): def a(): yield def b(): yield from a() next(b()) self.check_lines(f, [1,3,5,4,2,4]) class TestDisable(MonitoringTestBase, unittest.TestCase): def gen(self, cond): for i in range(10): if cond: yield 1 else: yield 2 def raise_handle_reraise(self): try: 1/0 except: raise def test_disable_legal_events(self): for event, name in INSTRUMENTED_EVENTS: try: counter = CounterWithDisable() counter.disable = True sys.monitoring.register_callback(TEST_TOOL, event, counter) sys.monitoring.set_events(TEST_TOOL, event) for _ in self.gen(1): pass self.assertLess(counter.count, 4) finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, event, None) def test_disable_illegal_events(self): for event, name in EXCEPT_EVENTS: try: counter = CounterWithDisable() counter.disable = True sys.monitoring.register_callback(TEST_TOOL, event, counter) sys.monitoring.set_events(TEST_TOOL, event) with self.assertRaises(ValueError): self.raise_handle_reraise() finally: sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, event, None) class ExceptionRecorder: event_type = E.RAISE def __init__(self, events): self.events = events def __call__(self, code, offset, exc): self.events.append(("raise", type(exc))) class CheckEvents(MonitoringTestBase, unittest.TestCase): def get_events(self, func, tool, recorders): try: self.assertEqual(sys.monitoring._all_events(), {}) event_list = [] all_events = 0 for recorder in recorders: ev = recorder.event_type sys.monitoring.register_callback(tool, ev, recorder(event_list)) all_events |= ev sys.monitoring.set_events(tool, all_events) func() sys.monitoring.set_events(tool, 0) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, None) return event_list finally: sys.monitoring.set_events(tool, 0) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, None) def check_events(self, func, expected, tool=TEST_TOOL, recorders=(ExceptionRecorder,)): events = self.get_events(func, tool, recorders) if events != expected: print(events, file = sys.stderr) self.assertEqual(events, expected) def check_balanced(self, func, recorders): events = self.get_events(func, TEST_TOOL, recorders) self.assertEqual(len(events)%2, 0) for r, h in zip(events[::2],events[1::2]): r0 = r[0] self.assertIn(r0, ("raise", "reraise")) h0 = h[0] self.assertIn(h0, ("handled", "unwind")) self.assertEqual(r[1], h[1]) class StopiterationRecorder(ExceptionRecorder): event_type = E.STOP_ITERATION class ReraiseRecorder(ExceptionRecorder): event_type = E.RERAISE def __call__(self, code, offset, exc): self.events.append(("reraise", type(exc))) class UnwindRecorder(ExceptionRecorder): event_type = E.PY_UNWIND def __call__(self, code, offset, exc): self.events.append(("unwind", type(exc), code.co_name)) class ExceptionHandledRecorder(ExceptionRecorder): event_type = E.EXCEPTION_HANDLED def __call__(self, code, offset, exc): self.events.append(("handled", type(exc))) class ThrowRecorder(ExceptionRecorder): event_type = E.PY_THROW def __call__(self, code, offset, exc): self.events.append(("throw", type(exc))) class CallRecorder: event_type = E.CALL def __init__(self, events): self.events = events def __call__(self, code, offset, func, arg): self.events.append(("call", func.__name__, arg)) class ReturnRecorder: event_type = E.PY_RETURN def __init__(self, events): self.events = events def __call__(self, code, offset, val): self.events.append(("return", code.co_name, val)) class ExceptionMonitoringTest(CheckEvents): exception_recorders = ( ExceptionRecorder, ReraiseRecorder, ExceptionHandledRecorder, UnwindRecorder ) def test_simple_try_except(self): def func1(): try: line = 2 raise KeyError except: line = 5 line = 6 self.check_events(func1, [("raise", KeyError)]) # gh-116090: This test doesn't really require specialization, but running # it without specialization exposes a monitoring bug. @requires_specialization def test_implicit_stop_iteration(self): def gen(): yield 1 return 2 def implicit_stop_iteration(): for _ in gen(): pass self.check_events(implicit_stop_iteration, [("raise", StopIteration)], recorders=(StopiterationRecorder,)) initial = [ ("raise", ZeroDivisionError), ("handled", ZeroDivisionError) ] reraise = [ ("reraise", ZeroDivisionError), ("handled", ZeroDivisionError) ] def test_explicit_reraise(self): def func(): try: try: 1/0 except: raise except: pass self.check_balanced( func, recorders = self.exception_recorders) def test_explicit_reraise_named(self): def func(): try: try: 1/0 except Exception as ex: raise except: pass self.check_balanced( func, recorders = self.exception_recorders) def test_implicit_reraise(self): def func(): try: try: 1/0 except ValueError: pass except: pass self.check_balanced( func, recorders = self.exception_recorders) def test_implicit_reraise_named(self): def func(): try: try: 1/0 except ValueError as ex: pass except: pass self.check_balanced( func, recorders = self.exception_recorders) def test_try_finally(self): def func(): try: try: 1/0 finally: pass except: pass self.check_balanced( func, recorders = self.exception_recorders) def test_async_for(self): def func(): async def async_generator(): for i in range(1): raise ZeroDivisionError yield i async def async_loop(): try: async for item in async_generator(): pass except Exception: pass try: async_loop().send(None) except StopIteration: pass self.check_balanced( func, recorders = self.exception_recorders) def test_throw(self): def gen(): yield 1 yield 2 def func(): try: g = gen() next(g) g.throw(IndexError) except IndexError: pass self.check_balanced( func, recorders = self.exception_recorders) events = self.get_events( func, TEST_TOOL, self.exception_recorders + (ThrowRecorder,) ) self.assertEqual(events[0], ("throw", IndexError)) @requires_specialization def test_no_unwind_for_shim_frame(self): class B: def __init__(self): raise ValueError() def f(): try: return B() except ValueError: pass for _ in range(100): f() recorders = ( ReturnRecorder, UnwindRecorder ) events = self.get_events(f, TEST_TOOL, recorders) adaptive_insts = dis.get_instructions(f, adaptive=True) self.assertIn( "CALL_ALLOC_AND_ENTER_INIT", [i.opname for i in adaptive_insts] ) #There should be only one unwind event expected = [ ('unwind', ValueError, '__init__'), ('return', 'f', None), ] self.assertEqual(events, expected) class LineRecorder: event_type = E.LINE def __init__(self, events): self.events = events def __call__(self, code, line): self.events.append(("line", code.co_name, line - code.co_firstlineno)) class CEventRecorder: def __init__(self, events): self.events = events def __call__(self, code, offset, func, arg): self.events.append((self.event_name, func.__name__, arg)) class CReturnRecorder(CEventRecorder): event_type = E.C_RETURN event_name = "C return" class CRaiseRecorder(CEventRecorder): event_type = E.C_RAISE event_name = "C raise" MANY_RECORDERS = ExceptionRecorder, CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder class TestManyEvents(CheckEvents): def test_simple(self): def func1(): line1 = 1 line2 = 2 line3 = 3 self.check_events(func1, recorders = MANY_RECORDERS, expected = [ ('line', 'get_events', 10), ('call', 'func1', sys.monitoring.MISSING), ('line', 'func1', 1), ('line', 'func1', 2), ('line', 'func1', 3), ('line', 'get_events', 11), ('call', 'set_events', 2)]) def test_c_call(self): def func2(): line1 = 1 [].append(2) line3 = 3 self.check_events(func2, recorders = MANY_RECORDERS, expected = [ ('line', 'get_events', 10), ('call', 'func2', sys.monitoring.MISSING), ('line', 'func2', 1), ('line', 'func2', 2), ('call', 'append', [2]), ('C return', 'append', [2]), ('line', 'func2', 3), ('line', 'get_events', 11), ('call', 'set_events', 2)]) def test_try_except(self): def func3(): try: line = 2 raise KeyError except: line = 5 line = 6 self.check_events(func3, recorders = MANY_RECORDERS, expected = [ ('line', 'get_events', 10), ('call', 'func3', sys.monitoring.MISSING), ('line', 'func3', 1), ('line', 'func3', 2), ('line', 'func3', 3), ('raise', KeyError), ('line', 'func3', 4), ('line', 'func3', 5), ('line', 'func3', 6), ('line', 'get_events', 11), ('call', 'set_events', 2)]) class InstructionRecorder: event_type = E.INSTRUCTION def __init__(self, events): self.events = events def __call__(self, code, offset): # Filter out instructions in check_events to lower noise if code.co_name != "get_events": self.events.append(("instruction", code.co_name, offset)) LINE_AND_INSTRUCTION_RECORDERS = InstructionRecorder, LineRecorder class TestLineAndInstructionEvents(CheckEvents): maxDiff = None def test_simple(self): def func1(): line1 = 1 line2 = 2 line3 = 3 self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func1', 1), ('instruction', 'func1', 2), ('instruction', 'func1', 4), ('line', 'func1', 2), ('instruction', 'func1', 6), ('instruction', 'func1', 8), ('line', 'func1', 3), ('instruction', 'func1', 10), ('instruction', 'func1', 12), ('instruction', 'func1', 14), ('line', 'get_events', 11)]) def test_c_call(self): def func2(): line1 = 1 [].append(2) line3 = 3 self.check_events(func2, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func2', 1), ('instruction', 'func2', 2), ('instruction', 'func2', 4), ('line', 'func2', 2), ('instruction', 'func2', 6), ('instruction', 'func2', 8), ('instruction', 'func2', 28), ('instruction', 'func2', 30), ('instruction', 'func2', 38), ('line', 'func2', 3), ('instruction', 'func2', 40), ('instruction', 'func2', 42), ('instruction', 'func2', 44), ('line', 'get_events', 11)]) def test_try_except(self): def func3(): try: line = 2 raise KeyError except: line = 5 line = 6 self.check_events(func3, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func3', 1), ('instruction', 'func3', 2), ('line', 'func3', 2), ('instruction', 'func3', 4), ('instruction', 'func3', 6), ('line', 'func3', 3), ('instruction', 'func3', 8), ('instruction', 'func3', 18), ('instruction', 'func3', 20), ('line', 'func3', 4), ('instruction', 'func3', 22), ('line', 'func3', 5), ('instruction', 'func3', 24), ('instruction', 'func3', 26), ('instruction', 'func3', 28), ('line', 'func3', 6), ('instruction', 'func3', 30), ('instruction', 'func3', 32), ('instruction', 'func3', 34), ('line', 'get_events', 11)]) def test_with_restart(self): def func1(): line1 = 1 line2 = 2 line3 = 3 self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func1', 1), ('instruction', 'func1', 2), ('instruction', 'func1', 4), ('line', 'func1', 2), ('instruction', 'func1', 6), ('instruction', 'func1', 8), ('line', 'func1', 3), ('instruction', 'func1', 10), ('instruction', 'func1', 12), ('instruction', 'func1', 14), ('line', 'get_events', 11)]) sys.monitoring.restart_events() self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func1', 1), ('instruction', 'func1', 2), ('instruction', 'func1', 4), ('line', 'func1', 2), ('instruction', 'func1', 6), ('instruction', 'func1', 8), ('line', 'func1', 3), ('instruction', 'func1', 10), ('instruction', 'func1', 12), ('instruction', 'func1', 14), ('line', 'get_events', 11)]) def test_turn_off_only_instruction(self): """ LINE events should be recorded after INSTRUCTION event is turned off """ events = [] def line(*args): events.append("line") sys.monitoring.set_events(TEST_TOOL, 0) sys.monitoring.register_callback(TEST_TOOL, E.LINE, line) sys.monitoring.register_callback(TEST_TOOL, E.INSTRUCTION, lambda *args: None) sys.monitoring.set_events(TEST_TOOL, E.LINE | E.INSTRUCTION) sys.monitoring.set_events(TEST_TOOL, E.LINE) events = [] a = 0 sys.monitoring.set_events(TEST_TOOL, 0) self.assertGreater(len(events), 0) class TestInstallIncrementally(MonitoringTestBase, unittest.TestCase): def check_events(self, func, must_include, tool=TEST_TOOL, recorders=(ExceptionRecorder,)): try: self.assertEqual(sys.monitoring._all_events(), {}) event_list = [] all_events = 0 for recorder in recorders: all_events |= recorder.event_type sys.monitoring.set_events(tool, all_events) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, recorder(event_list)) func() sys.monitoring.set_events(tool, 0) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, None) for line in must_include: self.assertIn(line, event_list) finally: sys.monitoring.set_events(tool, 0) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, None) @staticmethod def func1(): line1 = 1 MUST_INCLUDE_LI = [ ('instruction', 'func1', 2), ('line', 'func1', 2), ('instruction', 'func1', 4), ('instruction', 'func1', 6)] def test_line_then_instruction(self): recorders = [ LineRecorder, InstructionRecorder ] self.check_events(self.func1, recorders = recorders, must_include = self.MUST_INCLUDE_LI) def test_instruction_then_line(self): recorders = [ InstructionRecorder, LineRecorder ] self.check_events(self.func1, recorders = recorders, must_include = self.MUST_INCLUDE_LI) @staticmethod def func2(): len(()) MUST_INCLUDE_CI = [ ('instruction', 'func2', 2), ('call', 'func2', sys.monitoring.MISSING), ('call', 'len', ()), ('instruction', 'func2', 12), ('instruction', 'func2', 14)] def test_call_then_instruction(self): recorders = [ CallRecorder, InstructionRecorder ] self.check_events(self.func2, recorders = recorders, must_include = self.MUST_INCLUDE_CI) def test_instruction_then_call(self): recorders = [ InstructionRecorder, CallRecorder ] self.check_events(self.func2, recorders = recorders, must_include = self.MUST_INCLUDE_CI) LOCAL_RECORDERS = CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder class TestLocalEvents(MonitoringTestBase, unittest.TestCase): def check_events(self, func, expected, tool=TEST_TOOL, recorders=()): try: self.assertEqual(sys.monitoring._all_events(), {}) event_list = [] all_events = 0 for recorder in recorders: ev = recorder.event_type sys.monitoring.register_callback(tool, ev, recorder(event_list)) all_events |= ev sys.monitoring.set_local_events(tool, func.__code__, all_events) func() sys.monitoring.set_local_events(tool, func.__code__, 0) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, None) self.assertEqual(event_list, expected) finally: sys.monitoring.set_local_events(tool, func.__code__, 0) for recorder in recorders: sys.monitoring.register_callback(tool, recorder.event_type, None) def test_simple(self): def func1(): line1 = 1 line2 = 2 line3 = 3 self.check_events(func1, recorders = LOCAL_RECORDERS, expected = [ ('line', 'func1', 1), ('line', 'func1', 2), ('line', 'func1', 3)]) def test_c_call(self): def func2(): line1 = 1 [].append(2) line3 = 3 self.check_events(func2, recorders = LOCAL_RECORDERS, expected = [ ('line', 'func2', 1), ('line', 'func2', 2), ('call', 'append', [2]), ('C return', 'append', [2]), ('line', 'func2', 3)]) def test_try_except(self): def func3(): try: line = 2 raise KeyError except: line = 5 line = 6 self.check_events(func3, recorders = LOCAL_RECORDERS, expected = [ ('line', 'func3', 1), ('line', 'func3', 2), ('line', 'func3', 3), ('line', 'func3', 4), ('line', 'func3', 5), ('line', 'func3', 6)]) def test_set_non_local_event(self): with self.assertRaises(ValueError): sys.monitoring.set_local_events(TEST_TOOL, just_call.__code__, E.RAISE) def line_from_offset(code, offset): for start, end, line in code.co_lines(): if start <= offset < end: if line is None: return f"[offset={offset}]" return line - code.co_firstlineno return -1 class JumpRecorder: event_type = E.JUMP name = "jump" def __init__(self, events): self.events = events def __call__(self, code, from_, to): from_line = line_from_offset(code, from_) to_line = line_from_offset(code, to) self.events.append((self.name, code.co_name, from_line, to_line)) class BranchRecorder(JumpRecorder): event_type = E.BRANCH name = "branch" JUMP_AND_BRANCH_RECORDERS = JumpRecorder, BranchRecorder JUMP_BRANCH_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder FLOW_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder, ExceptionRecorder, ReturnRecorder class TestBranchAndJumpEvents(CheckEvents): maxDiff = None def test_loop(self): def func(): x = 1 for a in range(2): if a: x = 4 else: x = 6 7 self.check_events(func, recorders = JUMP_AND_BRANCH_RECORDERS, expected = [ ('branch', 'func', 2, 2), ('branch', 'func', 3, 6), ('jump', 'func', 6, 2), ('branch', 'func', 2, 2), ('branch', 'func', 3, 4), ('jump', 'func', 4, 2), ('branch', 'func', 2, 7)]) self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func', 1), ('line', 'func', 2), ('branch', 'func', 2, 2), ('line', 'func', 3), ('branch', 'func', 3, 6), ('line', 'func', 6), ('jump', 'func', 6, 2), ('line', 'func', 2), ('branch', 'func', 2, 2), ('line', 'func', 3), ('branch', 'func', 3, 4), ('line', 'func', 4), ('jump', 'func', 4, 2), ('line', 'func', 2), ('branch', 'func', 2, 7), ('line', 'func', 7), ('line', 'get_events', 11)]) def test_except_star(self): class Foo: def meth(self): pass def func(): try: try: raise KeyError except* Exception as e: f = Foo(); f.meth() except KeyError: pass self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func', 1), ('line', 'func', 2), ('line', 'func', 3), ('line', 'func', 4), ('branch', 'func', 4, 4), ('line', 'func', 5), ('line', 'meth', 1), ('jump', 'func', 5, '[offset=118]'), ('branch', 'func', '[offset=122]', '[offset=126]'), ('line', 'get_events', 11)]) self.check_events(func, recorders = FLOW_AND_LINE_RECORDERS, expected = [ ('line', 'get_events', 10), ('line', 'func', 1), ('line', 'func', 2), ('line', 'func', 3), ('raise', KeyError), ('line', 'func', 4), ('branch', 'func', 4, 4), ('line', 'func', 5), ('line', 'meth', 1), ('return', 'meth', None), ('jump', 'func', 5, '[offset=118]'), ('branch', 'func', '[offset=122]', '[offset=126]'), ('return', 'func', None), ('line', 'get_events', 11)]) class TestLoadSuperAttr(CheckEvents): RECORDERS = CallRecorder, LineRecorder, CRaiseRecorder, CReturnRecorder def _exec(self, co): d = {} exec(co, d, d) return d def _exec_super(self, codestr, optimized=False): # The compiler checks for statically visible shadowing of the name # `super`, and declines to emit `LOAD_SUPER_ATTR` if shadowing is found. # So inserting `super = super` prevents the compiler from emitting # `LOAD_SUPER_ATTR`, and allows us to test that monitoring events for # `LOAD_SUPER_ATTR` are equivalent to those we'd get from the # un-optimized `LOAD_GLOBAL super; CALL; LOAD_ATTR` form. assignment = "x = 1" if optimized else "super = super" codestr = f"{assignment}\n{textwrap.dedent(codestr)}" co = compile(codestr, "", "exec") # validate that we really do have a LOAD_SUPER_ATTR, only when optimized self.assertEqual(self._has_load_super_attr(co), optimized) return self._exec(co) def _has_load_super_attr(self, co): has = any(instr.opname == "LOAD_SUPER_ATTR" for instr in dis.get_instructions(co)) if not has: has = any( isinstance(c, types.CodeType) and self._has_load_super_attr(c) for c in co.co_consts ) return has def _super_method_call(self, optimized=False): codestr = """ class A: def method(self, x): return x class B(A): def method(self, x): return super( ).method( x ) b = B() def f(): return b.method(1) """ d = self._exec_super(codestr, optimized) expected = [ ('line', 'get_events', 10), ('call', 'f', sys.monitoring.MISSING), ('line', 'f', 1), ('call', 'method', d["b"]), ('line', 'method', 1), ('call', 'super', sys.monitoring.MISSING), ('C return', 'super', sys.monitoring.MISSING), ('line', 'method', 2), ('line', 'method', 3), ('line', 'method', 2), ('call', 'method', 1), ('line', 'method', 1), ('line', 'method', 1), ('line', 'get_events', 11), ('call', 'set_events', 2), ] return d["f"], expected def test_method_call(self): nonopt_func, nonopt_expected = self._super_method_call(optimized=False) opt_func, opt_expected = self._super_method_call(optimized=True) self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected) self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected) def _super_method_call_error(self, optimized=False): codestr = """ class A: def method(self, x): return x class B(A): def method(self, x): return super( x, self, ).method( x ) b = B() def f(): try: return b.method(1) except TypeError: pass else: assert False, "should have raised TypeError" """ d = self._exec_super(codestr, optimized) expected = [ ('line', 'get_events', 10), ('call', 'f', sys.monitoring.MISSING), ('line', 'f', 1), ('line', 'f', 2), ('call', 'method', d["b"]), ('line', 'method', 1), ('line', 'method', 2), ('line', 'method', 3), ('line', 'method', 1), ('call', 'super', 1), ('C raise', 'super', 1), ('line', 'f', 3), ('line', 'f', 4), ('line', 'get_events', 11), ('call', 'set_events', 2), ] return d["f"], expected def test_method_call_error(self): nonopt_func, nonopt_expected = self._super_method_call_error(optimized=False) opt_func, opt_expected = self._super_method_call_error(optimized=True) self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected) self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected) def _super_attr(self, optimized=False): codestr = """ class A: x = 1 class B(A): def method(self): return super( ).x b = B() def f(): return b.method() """ d = self._exec_super(codestr, optimized) expected = [ ('line', 'get_events', 10), ('call', 'f', sys.monitoring.MISSING), ('line', 'f', 1), ('call', 'method', d["b"]), ('line', 'method', 1), ('call', 'super', sys.monitoring.MISSING), ('C return', 'super', sys.monitoring.MISSING), ('line', 'method', 2), ('line', 'method', 1), ('line', 'get_events', 11), ('call', 'set_events', 2) ] return d["f"], expected def test_attr(self): nonopt_func, nonopt_expected = self._super_attr(optimized=False) opt_func, opt_expected = self._super_attr(optimized=True) self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected) self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected) def test_vs_other_type_call(self): code_template = textwrap.dedent(""" class C: def method(self): return {cls}().__repr__{call} c = C() def f(): return c.method() """) def get_expected(name, call_method, ns): repr_arg = 0 if name == "int" else sys.monitoring.MISSING return [ ('line', 'get_events', 10), ('call', 'f', sys.monitoring.MISSING), ('line', 'f', 1), ('call', 'method', ns["c"]), ('line', 'method', 1), ('call', name, sys.monitoring.MISSING), ('C return', name, sys.monitoring.MISSING), *( [ ('call', '__repr__', repr_arg), ('C return', '__repr__', repr_arg), ] if call_method else [] ), ('line', 'get_events', 11), ('call', 'set_events', 2), ] for call_method in [True, False]: with self.subTest(call_method=call_method): call_str = "()" if call_method else "" code_super = code_template.format(cls="super", call=call_str) code_int = code_template.format(cls="int", call=call_str) co_super = compile(code_super, '', 'exec') self.assertTrue(self._has_load_super_attr(co_super)) ns_super = self._exec(co_super) ns_int = self._exec(code_int) self.check_events( ns_super["f"], recorders=self.RECORDERS, expected=get_expected("super", call_method, ns_super) ) self.check_events( ns_int["f"], recorders=self.RECORDERS, expected=get_expected("int", call_method, ns_int) ) class TestSetGetEvents(MonitoringTestBase, unittest.TestCase): def test_global(self): sys.monitoring.set_events(TEST_TOOL, E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START) sys.monitoring.set_events(TEST_TOOL2, E.PY_START) self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START) sys.monitoring.set_events(TEST_TOOL, 0) self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0) sys.monitoring.set_events(TEST_TOOL2,0) self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), 0) def test_local(self): code = f1.__code__ sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START) self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START) sys.monitoring.set_local_events(TEST_TOOL2, code, E.PY_START) self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), E.PY_START) sys.monitoring.set_local_events(TEST_TOOL, code, 0) self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0) sys.monitoring.set_local_events(TEST_TOOL2, code, 0) self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), 0) class TestUninitialized(unittest.TestCase, MonitoringTestBase): @staticmethod def f(): pass def test_get_local_events_uninitialized(self): self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, self.f.__code__), 0) class TestRegressions(MonitoringTestBase, unittest.TestCase): def test_105162(self): caught = None def inner(): nonlocal caught try: yield except Exception: caught = "inner" yield def outer(): nonlocal caught try: yield from inner() except Exception: caught = "outer" yield def run(): gen = outer() gen.send(None) gen.throw(Exception) run() self.assertEqual(caught, "inner") caught = None try: sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME) run() self.assertEqual(caught, "inner") finally: sys.monitoring.set_events(TEST_TOOL, 0) def test_108390(self): class Foo: def __init__(self, set_event): if set_event: sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME) def make_foo_optimized_then_set_event(): for i in range(100): Foo(i == 99) try: make_foo_optimized_then_set_event() finally: sys.monitoring.set_events(TEST_TOOL, 0) def test_gh108976(self): sys.monitoring.use_tool_id(0, "test") self.addCleanup(sys.monitoring.free_tool_id, 0) sys.monitoring.set_events(0, 0) sys.monitoring.register_callback(0, E.LINE, lambda *args: sys.monitoring.set_events(0, 0)) sys.monitoring.register_callback(0, E.INSTRUCTION, lambda *args: 0) sys.monitoring.set_events(0, E.LINE | E.INSTRUCTION) sys.monitoring.set_events(0, 0) def test_call_function_ex(self): def f(a=1, b=2): return a + b args = (1, 2) empty_args = [] call_data = [] sys.monitoring.use_tool_id(0, "test") self.addCleanup(sys.monitoring.free_tool_id, 0) sys.monitoring.set_events(0, 0) sys.monitoring.register_callback(0, E.CALL, lambda code, offset, callable, arg0: call_data.append((callable, arg0))) sys.monitoring.set_events(0, E.CALL) f(*args) f(*empty_args) sys.monitoring.set_events(0, 0) self.assertEqual(call_data[0], (f, 1)) self.assertEqual(call_data[1], (f, sys.monitoring.MISSING)) class TestOptimizer(MonitoringTestBase, unittest.TestCase): def setUp(self): _testinternalcapi = import_module("_testinternalcapi") if hasattr(_testinternalcapi, "get_optimizer"): self.old_opt = _testinternalcapi.get_optimizer() opt = _testinternalcapi.new_counter_optimizer() _testinternalcapi.set_optimizer(opt) super(TestOptimizer, self).setUp() def tearDown(self): super(TestOptimizer, self).tearDown() import _testinternalcapi if hasattr(_testinternalcapi, "get_optimizer"): _testinternalcapi.set_optimizer(self.old_opt) def test_for_loop(self): def test_func(x): i = 0 while i < x: i += 1 code = test_func.__code__ sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START) self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START) test_func(1000) sys.monitoring.set_local_events(TEST_TOOL, code, 0) self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0) class TestTier2Optimizer(CheckEvents): def test_monitoring_already_opimized_loop(self): def test_func(recorder): set_events = sys.monitoring.set_events line = E.LINE i = 0 for i in range(551): # Turn on events without branching once i reaches 500. set_events(TEST_TOOL, line * int(i >= 500)) pass pass pass self.assertEqual(sys.monitoring._all_events(), {}) events = [] recorder = LineRecorder(events) sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder) try: test_func(recorder) finally: sys.monitoring.register_callback(TEST_TOOL, E.LINE, None) sys.monitoring.set_events(TEST_TOOL, 0) self.assertGreater(len(events), 250) class TestMonitoringAtShutdown(unittest.TestCase): def test_monitoring_live_at_shutdown(self): # gh-115832: An object destructor running during the final GC of # interpreter shutdown triggered an infinite loop in the # instrumentation code. script = test.support.findfile("_test_monitoring_shutdown.py") script_helper.run_test_script(script) class TestCApiEventGeneration(MonitoringTestBase, unittest.TestCase): class Scope: def __init__(self, *args): self.args = args def __enter__(self): _testcapi.monitoring_enter_scope(*self.args) def __exit__(self, *args): _testcapi.monitoring_exit_scope() def setUp(self): super(TestCApiEventGeneration, self).setUp() capi = _testcapi self.codelike = capi.CodeLike(2) self.cases = [ # (Event, function, *args) ( 1, E.PY_START, capi.fire_event_py_start), ( 1, E.PY_RESUME, capi.fire_event_py_resume), ( 1, E.PY_YIELD, capi.fire_event_py_yield, 10), ( 1, E.PY_RETURN, capi.fire_event_py_return, 20), ( 2, E.CALL, capi.fire_event_call, callable, 40), ( 1, E.JUMP, capi.fire_event_jump, 60), ( 1, E.BRANCH, capi.fire_event_branch, 70), ( 1, E.PY_THROW, capi.fire_event_py_throw, ValueError(1)), ( 1, E.RAISE, capi.fire_event_raise, ValueError(2)), ( 1, E.EXCEPTION_HANDLED, capi.fire_event_exception_handled, ValueError(5)), ( 1, E.PY_UNWIND, capi.fire_event_py_unwind, ValueError(6)), ( 1, E.STOP_ITERATION, capi.fire_event_stop_iteration, 7), ( 1, E.STOP_ITERATION, capi.fire_event_stop_iteration, StopIteration(8)), ] self.EXPECT_RAISED_EXCEPTION = [E.PY_THROW, E.RAISE, E.EXCEPTION_HANDLED, E.PY_UNWIND] def check_event_count(self, event, func, args, expected, callback_raises=None): class Counter: def __init__(self, callback_raises): self.callback_raises = callback_raises self.count = 0 def __call__(self, *args): self.count += 1 if self.callback_raises: exc = self.callback_raises self.callback_raises = None raise exc try: counter = Counter(callback_raises) sys.monitoring.register_callback(TEST_TOOL, event, counter) if event == E.C_RETURN or event == E.C_RAISE: sys.monitoring.set_events(TEST_TOOL, E.CALL) else: sys.monitoring.set_events(TEST_TOOL, event) event_value = int(math.log2(event)) with self.Scope(self.codelike, event_value): counter.count = 0 try: func(*args) except ValueError as e: self.assertIsInstance(expected, ValueError) self.assertEqual(str(e), str(expected)) return else: self.assertEqual(counter.count, expected) prev = sys.monitoring.register_callback(TEST_TOOL, event, None) with self.Scope(self.codelike, event_value): counter.count = 0 func(*args) self.assertEqual(counter.count, 0) self.assertEqual(prev, counter) finally: sys.monitoring.set_events(TEST_TOOL, 0) def test_fire_event(self): for expected, event, function, *args in self.cases: offset = 0 self.codelike = _testcapi.CodeLike(1) with self.subTest(function.__name__): args_ = (self.codelike, offset) + tuple(args) self.check_event_count(event, function, args_, expected) def test_missing_exception(self): for _, event, function, *args in self.cases: if event not in self.EXPECT_RAISED_EXCEPTION: continue assert args and isinstance(args[-1], BaseException) offset = 0 self.codelike = _testcapi.CodeLike(1) with self.subTest(function.__name__): args_ = (self.codelike, offset) + tuple(args[:-1]) + (None,) evt = int(math.log2(event)) expected = ValueError(f"Firing event {evt} with no exception set") self.check_event_count(event, function, args_, expected) def test_fire_event_failing_callback(self): for expected, event, function, *args in self.cases: offset = 0 self.codelike = _testcapi.CodeLike(1) with self.subTest(function.__name__): args_ = (self.codelike, offset) + tuple(args) exc = OSError(42) with self.assertRaises(type(exc)): self.check_event_count(event, function, args_, expected, callback_raises=exc) CANNOT_DISABLE = { E.PY_THROW, E.RAISE, E.RERAISE, E.EXCEPTION_HANDLED, E.PY_UNWIND } def check_disable(self, event, func, args, expected): try: counter = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, event, counter) if event == E.C_RETURN or event == E.C_RAISE: sys.monitoring.set_events(TEST_TOOL, E.CALL) else: sys.monitoring.set_events(TEST_TOOL, event) event_value = int(math.log2(event)) with self.Scope(self.codelike, event_value): counter.count = 0 func(*args) self.assertEqual(counter.count, expected) counter.disable = True if event in self.CANNOT_DISABLE: # use try-except rather then assertRaises to avoid # events from framework code try: counter.count = 0 func(*args) self.assertEqual(counter.count, expected) except ValueError: pass else: self.Error("Expected a ValueError") else: counter.count = 0 func(*args) self.assertEqual(counter.count, expected) counter.count = 0 func(*args) self.assertEqual(counter.count, expected - 1) finally: sys.monitoring.set_events(TEST_TOOL, 0) def test_disable_event(self): for expected, event, function, *args in self.cases: offset = 0 self.codelike = _testcapi.CodeLike(2) with self.subTest(function.__name__): args_ = (self.codelike, 0) + tuple(args) self.check_disable(event, function, args_, expected) def test_enter_scope_two_events(self): try: yield_counter = CounterWithDisable() unwind_counter = CounterWithDisable() sys.monitoring.register_callback(TEST_TOOL, E.PY_YIELD, yield_counter) sys.monitoring.register_callback(TEST_TOOL, E.PY_UNWIND, unwind_counter) sys.monitoring.set_events(TEST_TOOL, E.PY_YIELD | E.PY_UNWIND) yield_value = int(math.log2(E.PY_YIELD)) unwind_value = int(math.log2(E.PY_UNWIND)) cl = _testcapi.CodeLike(2) common_args = (cl, 0) with self.Scope(cl, yield_value, unwind_value): yield_counter.count = 0 unwind_counter.count = 0 _testcapi.fire_event_py_unwind(*common_args, ValueError(42)) assert(yield_counter.count == 0) assert(unwind_counter.count == 1) _testcapi.fire_event_py_yield(*common_args, ValueError(42)) assert(yield_counter.count == 1) assert(unwind_counter.count == 1) yield_counter.disable = True _testcapi.fire_event_py_yield(*common_args, ValueError(42)) assert(yield_counter.count == 2) assert(unwind_counter.count == 1) _testcapi.fire_event_py_yield(*common_args, ValueError(42)) assert(yield_counter.count == 2) assert(unwind_counter.count == 1) finally: sys.monitoring.set_events(TEST_TOOL, 0)