import contextlib import os import sys import tracemalloc import unittest from unittest.mock import patch from test.support.script_helper import (assert_python_ok, assert_python_failure, interpreter_requires_environment) from test import support from test.support import os_helper try: import _testcapi except ImportError: _testcapi = None EMPTY_STRING_SIZE = sys.getsizeof(b'') INVALID_NFRAME = (-1, 2**30) def get_frames(nframe, lineno_delta): frames = [] frame = sys._getframe(1) for index in range(nframe): code = frame.f_code lineno = frame.f_lineno + lineno_delta frames.append((code.co_filename, lineno)) lineno_delta = 0 frame = frame.f_back if frame is None: break return tuple(frames) def allocate_bytes(size): nframe = tracemalloc.get_traceback_limit() bytes_len = (size - EMPTY_STRING_SIZE) frames = get_frames(nframe, 1) data = b'x' * bytes_len return data, tracemalloc.Traceback(frames, min(len(frames), nframe)) def create_snapshots(): traceback_limit = 2 # _tracemalloc._get_traces() returns a list of (domain, size, # traceback_frames) tuples. traceback_frames is a tuple of (filename, # line_number) tuples. raw_traces = [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (1, 2, (('a.py', 5), ('b.py', 4)), 3), (2, 66, (('b.py', 1),), 1), (3, 7, (('', 0),), 1), ] snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) raw_traces2 = [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (2, 2, (('a.py', 5), ('b.py', 4)), 3), (2, 5000, (('a.py', 5), ('b.py', 4)), 3), (4, 400, (('c.py', 578),), 1), ] snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) return (snapshot, snapshot2) def frame(filename, lineno): return tracemalloc._Frame((filename, lineno)) def traceback(*frames): return tracemalloc.Traceback(frames) def traceback_lineno(filename, lineno): return traceback((filename, lineno)) def traceback_filename(filename): return traceback_lineno(filename, 0) class TestTraceback(unittest.TestCase): def test_repr(self): def get_repr(*args) -> str: return repr(tracemalloc.Traceback(*args)) self.assertEqual(get_repr(()), "") self.assertEqual(get_repr((), 0), "") frames = (("f1", 1), ("f2", 2)) exp_repr_frames = ( "(," " )" ) self.assertEqual(get_repr(frames), f"") self.assertEqual(get_repr(frames, 2), f"") class TestTracemallocEnabled(unittest.TestCase): def setUp(self): if tracemalloc.is_tracing(): self.skipTest("tracemalloc must be stopped before the test") tracemalloc.start(1) def tearDown(self): tracemalloc.stop() def test_get_tracemalloc_memory(self): data = [allocate_bytes(123) for count in range(1000)] size = tracemalloc.get_tracemalloc_memory() self.assertGreaterEqual(size, 0) tracemalloc.clear_traces() size2 = tracemalloc.get_tracemalloc_memory() self.assertGreaterEqual(size2, 0) self.assertLessEqual(size2, size) def test_get_object_traceback(self): tracemalloc.clear_traces() obj_size = 12345 obj, obj_traceback = allocate_bytes(obj_size) traceback = tracemalloc.get_object_traceback(obj) self.assertEqual(traceback, obj_traceback) def test_new_reference(self): tracemalloc.clear_traces() # gc.collect() indirectly calls PyList_ClearFreeList() support.gc_collect() # Create a list and "destroy it": put it in the PyListObject free list obj = [] obj = None # Create a list which should reuse the previously created empty list obj = [] nframe = tracemalloc.get_traceback_limit() frames = get_frames(nframe, -3) obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe)) traceback = tracemalloc.get_object_traceback(obj) self.assertIsNotNone(traceback) self.assertEqual(traceback, obj_traceback) def test_set_traceback_limit(self): obj_size = 10 tracemalloc.stop() self.assertRaises(ValueError, tracemalloc.start, -1) tracemalloc.stop() tracemalloc.start(10) obj2, obj2_traceback = allocate_bytes(obj_size) traceback = tracemalloc.get_object_traceback(obj2) self.assertEqual(len(traceback), 10) self.assertEqual(traceback, obj2_traceback) tracemalloc.stop() tracemalloc.start(1) obj, obj_traceback = allocate_bytes(obj_size) traceback = tracemalloc.get_object_traceback(obj) self.assertEqual(len(traceback), 1) self.assertEqual(traceback, obj_traceback) def find_trace(self, traces, traceback): for trace in traces: if trace[2] == traceback._frames: return trace self.fail("trace not found") def test_get_traces(self): tracemalloc.clear_traces() obj_size = 12345 obj, obj_traceback = allocate_bytes(obj_size) traces = tracemalloc._get_traces() trace = self.find_trace(traces, obj_traceback) self.assertIsInstance(trace, tuple) domain, size, traceback, length = trace self.assertEqual(size, obj_size) self.assertEqual(traceback, obj_traceback._frames) tracemalloc.stop() self.assertEqual(tracemalloc._get_traces(), []) def test_get_traces_intern_traceback(self): # dummy wrappers to get more useful and identical frames in the traceback def allocate_bytes2(size): return allocate_bytes(size) def allocate_bytes3(size): return allocate_bytes2(size) def allocate_bytes4(size): return allocate_bytes3(size) # Ensure that two identical tracebacks are not duplicated tracemalloc.stop() tracemalloc.start(4) obj_size = 123 obj1, obj1_traceback = allocate_bytes4(obj_size) obj2, obj2_traceback = allocate_bytes4(obj_size) traces = tracemalloc._get_traces() obj1_traceback._frames = tuple(reversed(obj1_traceback._frames)) obj2_traceback._frames = tuple(reversed(obj2_traceback._frames)) trace1 = self.find_trace(traces, obj1_traceback) trace2 = self.find_trace(traces, obj2_traceback) domain1, size1, traceback1, length1 = trace1 domain2, size2, traceback2, length2 = trace2 self.assertIs(traceback2, traceback1) def test_get_traced_memory(self): # Python allocates some internals objects, so the test must tolerate # a small difference between the expected size and the real usage max_error = 2048 # allocate one object obj_size = 1024 * 1024 tracemalloc.clear_traces() obj, obj_traceback = allocate_bytes(obj_size) size, peak_size = tracemalloc.get_traced_memory() self.assertGreaterEqual(size, obj_size) self.assertGreaterEqual(peak_size, size) self.assertLessEqual(size - obj_size, max_error) self.assertLessEqual(peak_size - size, max_error) # destroy the object obj = None size2, peak_size2 = tracemalloc.get_traced_memory() self.assertLess(size2, size) self.assertGreaterEqual(size - size2, obj_size - max_error) self.assertGreaterEqual(peak_size2, peak_size) # clear_traces() must reset traced memory counters tracemalloc.clear_traces() self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) # allocate another object obj, obj_traceback = allocate_bytes(obj_size) size, peak_size = tracemalloc.get_traced_memory() self.assertGreaterEqual(size, obj_size) # stop() also resets traced memory counters tracemalloc.stop() self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) def test_clear_traces(self): obj, obj_traceback = allocate_bytes(123) traceback = tracemalloc.get_object_traceback(obj) self.assertIsNotNone(traceback) tracemalloc.clear_traces() traceback2 = tracemalloc.get_object_traceback(obj) self.assertIsNone(traceback2) def test_reset_peak(self): # Python allocates some internals objects, so the test must tolerate # a small difference between the expected size and the real usage tracemalloc.clear_traces() # Example: allocate a large piece of memory, temporarily large_sum = sum(list(range(100000))) size1, peak1 = tracemalloc.get_traced_memory() # reset_peak() resets peak to traced memory: peak2 < peak1 tracemalloc.reset_peak() size2, peak2 = tracemalloc.get_traced_memory() self.assertGreaterEqual(peak2, size2) self.assertLess(peak2, peak1) # check that peak continue to be updated if new memory is allocated: # peak3 > peak2 obj_size = 1024 * 1024 obj, obj_traceback = allocate_bytes(obj_size) size3, peak3 = tracemalloc.get_traced_memory() self.assertGreaterEqual(peak3, size3) self.assertGreater(peak3, peak2) self.assertGreaterEqual(peak3 - peak2, obj_size) def test_is_tracing(self): tracemalloc.stop() self.assertFalse(tracemalloc.is_tracing()) tracemalloc.start() self.assertTrue(tracemalloc.is_tracing()) def test_snapshot(self): obj, source = allocate_bytes(123) # take a snapshot snapshot = tracemalloc.take_snapshot() # This can vary self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10) # write on disk snapshot.dump(os_helper.TESTFN) self.addCleanup(os_helper.unlink, os_helper.TESTFN) # load from disk snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) self.assertEqual(snapshot2.traces, snapshot.traces) # tracemalloc must be tracing memory allocations to take a snapshot tracemalloc.stop() with self.assertRaises(RuntimeError) as cm: tracemalloc.take_snapshot() self.assertEqual(str(cm.exception), "the tracemalloc module must be tracing memory " "allocations to take a snapshot") def test_snapshot_save_attr(self): # take a snapshot with a new attribute snapshot = tracemalloc.take_snapshot() snapshot.test_attr = "new" snapshot.dump(os_helper.TESTFN) self.addCleanup(os_helper.unlink, os_helper.TESTFN) # load() should recreate the attribute snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) self.assertEqual(snapshot2.test_attr, "new") def fork_child(self): if not tracemalloc.is_tracing(): return 2 obj_size = 12345 obj, obj_traceback = allocate_bytes(obj_size) traceback = tracemalloc.get_object_traceback(obj) if traceback is None: return 3 # everything is fine return 0 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def test_fork(self): # check that tracemalloc is still working after fork pid = os.fork() if not pid: # child exitcode = 1 try: exitcode = self.fork_child() finally: os._exit(exitcode) else: support.wait_process(pid, exitcode=0) class TestSnapshot(unittest.TestCase): maxDiff = 4000 def test_create_snapshot(self): raw_traces = [(0, 5, (('a.py', 2),), 10)] with contextlib.ExitStack() as stack: stack.enter_context(patch.object(tracemalloc, 'is_tracing', return_value=True)) stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', return_value=5)) stack.enter_context(patch.object(tracemalloc, '_get_traces', return_value=raw_traces)) snapshot = tracemalloc.take_snapshot() self.assertEqual(snapshot.traceback_limit, 5) self.assertEqual(len(snapshot.traces), 1) trace = snapshot.traces[0] self.assertEqual(trace.size, 5) self.assertEqual(trace.traceback.total_nframe, 10) self.assertEqual(len(trace.traceback), 1) self.assertEqual(trace.traceback[0].filename, 'a.py') self.assertEqual(trace.traceback[0].lineno, 2) def test_filter_traces(self): snapshot, snapshot2 = create_snapshots() filter1 = tracemalloc.Filter(False, "b.py") filter2 = tracemalloc.Filter(True, "a.py", 2) filter3 = tracemalloc.Filter(True, "a.py", 5) original_traces = list(snapshot.traces._traces) # exclude b.py snapshot3 = snapshot.filter_traces((filter1,)) self.assertEqual(snapshot3.traces._traces, [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (1, 2, (('a.py', 5), ('b.py', 4)), 3), (3, 7, (('', 0),), 1), ]) # filter_traces() must not touch the original snapshot self.assertEqual(snapshot.traces._traces, original_traces) # only include two lines of a.py snapshot4 = snapshot3.filter_traces((filter2, filter3)) self.assertEqual(snapshot4.traces._traces, [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (1, 2, (('a.py', 5), ('b.py', 4)), 3), ]) # No filter: just duplicate the snapshot snapshot5 = snapshot.filter_traces(()) self.assertIsNot(snapshot5, snapshot) self.assertIsNot(snapshot5.traces, snapshot.traces) self.assertEqual(snapshot5.traces, snapshot.traces) self.assertRaises(TypeError, snapshot.filter_traces, filter1) def test_filter_traces_domain(self): snapshot, snapshot2 = create_snapshots() filter1 = tracemalloc.Filter(False, "a.py", domain=1) filter2 = tracemalloc.Filter(True, "a.py", domain=1) original_traces = list(snapshot.traces._traces) # exclude a.py of domain 1 snapshot3 = snapshot.filter_traces((filter1,)) self.assertEqual(snapshot3.traces._traces, [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (2, 66, (('b.py', 1),), 1), (3, 7, (('', 0),), 1), ]) # include domain 1 snapshot3 = snapshot.filter_traces((filter1,)) self.assertEqual(snapshot3.traces._traces, [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (2, 66, (('b.py', 1),), 1), (3, 7, (('', 0),), 1), ]) def test_filter_traces_domain_filter(self): snapshot, snapshot2 = create_snapshots() filter1 = tracemalloc.DomainFilter(False, domain=3) filter2 = tracemalloc.DomainFilter(True, domain=3) # exclude domain 2 snapshot3 = snapshot.filter_traces((filter1,)) self.assertEqual(snapshot3.traces._traces, [ (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (0, 10, (('a.py', 2), ('b.py', 4)), 3), (1, 2, (('a.py', 5), ('b.py', 4)), 3), (2, 66, (('b.py', 1),), 1), ]) # include domain 2 snapshot3 = snapshot.filter_traces((filter2,)) self.assertEqual(snapshot3.traces._traces, [ (3, 7, (('', 0),), 1), ]) def test_snapshot_group_by_line(self): snapshot, snapshot2 = create_snapshots() tb_0 = traceback_lineno('', 0) tb_a_2 = traceback_lineno('a.py', 2) tb_a_5 = traceback_lineno('a.py', 5) tb_b_1 = traceback_lineno('b.py', 1) tb_c_578 = traceback_lineno('c.py', 578) # stats per file and line stats1 = snapshot.statistics('lineno') self.assertEqual(stats1, [ tracemalloc.Statistic(tb_b_1, 66, 1), tracemalloc.Statistic(tb_a_2, 30, 3), tracemalloc.Statistic(tb_0, 7, 1), tracemalloc.Statistic(tb_a_5, 2, 1), ]) # stats per file and line (2) stats2 = snapshot2.statistics('lineno') self.assertEqual(stats2, [ tracemalloc.Statistic(tb_a_5, 5002, 2), tracemalloc.Statistic(tb_c_578, 400, 1), tracemalloc.Statistic(tb_a_2, 30, 3), ]) # stats diff per file and line statistics = snapshot2.compare_to(snapshot, 'lineno') self.assertEqual(statistics, [ tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), ]) def test_snapshot_group_by_file(self): snapshot, snapshot2 = create_snapshots() tb_0 = traceback_filename('') tb_a = traceback_filename('a.py') tb_b = traceback_filename('b.py') tb_c = traceback_filename('c.py') # stats per file stats1 = snapshot.statistics('filename') self.assertEqual(stats1, [ tracemalloc.Statistic(tb_b, 66, 1), tracemalloc.Statistic(tb_a, 32, 4), tracemalloc.Statistic(tb_0, 7, 1), ]) # stats per file (2) stats2 = snapshot2.statistics('filename') self.assertEqual(stats2, [ tracemalloc.Statistic(tb_a, 5032, 5), tracemalloc.Statistic(tb_c, 400, 1), ]) # stats diff per file diff = snapshot2.compare_to(snapshot, 'filename') self.assertEqual(diff, [ tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), ]) def test_snapshot_group_by_traceback(self): snapshot, snapshot2 = create_snapshots() # stats per file tb1 = traceback(('a.py', 2), ('b.py', 4)) tb2 = traceback(('a.py', 5), ('b.py', 4)) tb3 = traceback(('b.py', 1)) tb4 = traceback(('', 0)) stats1 = snapshot.statistics('traceback') self.assertEqual(stats1, [ tracemalloc.Statistic(tb3, 66, 1), tracemalloc.Statistic(tb1, 30, 3), tracemalloc.Statistic(tb4, 7, 1), tracemalloc.Statistic(tb2, 2, 1), ]) # stats per file (2) tb5 = traceback(('c.py', 578)) stats2 = snapshot2.statistics('traceback') self.assertEqual(stats2, [ tracemalloc.Statistic(tb2, 5002, 2), tracemalloc.Statistic(tb5, 400, 1), tracemalloc.Statistic(tb1, 30, 3), ]) # stats diff per file diff = snapshot2.compare_to(snapshot, 'traceback') self.assertEqual(diff, [ tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), ]) self.assertRaises(ValueError, snapshot.statistics, 'traceback', cumulative=True) def test_snapshot_group_by_cumulative(self): snapshot, snapshot2 = create_snapshots() tb_0 = traceback_filename('') tb_a = traceback_filename('a.py') tb_b = traceback_filename('b.py') tb_a_2 = traceback_lineno('a.py', 2) tb_a_5 = traceback_lineno('a.py', 5) tb_b_1 = traceback_lineno('b.py', 1) tb_b_4 = traceback_lineno('b.py', 4) # per file stats = snapshot.statistics('filename', True) self.assertEqual(stats, [ tracemalloc.Statistic(tb_b, 98, 5), tracemalloc.Statistic(tb_a, 32, 4), tracemalloc.Statistic(tb_0, 7, 1), ]) # per line stats = snapshot.statistics('lineno', True) self.assertEqual(stats, [ tracemalloc.Statistic(tb_b_1, 66, 1), tracemalloc.Statistic(tb_b_4, 32, 4), tracemalloc.Statistic(tb_a_2, 30, 3), tracemalloc.Statistic(tb_0, 7, 1), tracemalloc.Statistic(tb_a_5, 2, 1), ]) def test_trace_format(self): snapshot, snapshot2 = create_snapshots() trace = snapshot.traces[0] self.assertEqual(str(trace), 'b.py:4: 10 B') traceback = trace.traceback self.assertEqual(str(traceback), 'b.py:4') frame = traceback[0] self.assertEqual(str(frame), 'b.py:4') def test_statistic_format(self): snapshot, snapshot2 = create_snapshots() stats = snapshot.statistics('lineno') stat = stats[0] self.assertEqual(str(stat), 'b.py:1: size=66 B, count=1, average=66 B') def test_statistic_diff_format(self): snapshot, snapshot2 = create_snapshots() stats = snapshot2.compare_to(snapshot, 'lineno') stat = stats[0] self.assertEqual(str(stat), 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') def test_slices(self): snapshot, snapshot2 = create_snapshots() self.assertEqual(snapshot.traces[:2], (snapshot.traces[0], snapshot.traces[1])) traceback = snapshot.traces[0].traceback self.assertEqual(traceback[:2], (traceback[0], traceback[1])) def test_format_traceback(self): snapshot, snapshot2 = create_snapshots() def getline(filename, lineno): return ' <%s, %s>' % (filename, lineno) with unittest.mock.patch('tracemalloc.linecache.getline', side_effect=getline): tb = snapshot.traces[0].traceback self.assertEqual(tb.format(), [' File "b.py", line 4', ' ', ' File "a.py", line 2', ' ']) self.assertEqual(tb.format(limit=1), [' File "a.py", line 2', ' ']) self.assertEqual(tb.format(limit=-1), [' File "b.py", line 4', ' ']) self.assertEqual(tb.format(most_recent_first=True), [' File "a.py", line 2', ' ', ' File "b.py", line 4', ' ']) self.assertEqual(tb.format(limit=1, most_recent_first=True), [' File "a.py", line 2', ' ']) self.assertEqual(tb.format(limit=-1, most_recent_first=True), [' File "b.py", line 4', ' ']) class TestFilters(unittest.TestCase): maxDiff = 2048 def test_filter_attributes(self): # test default values f = tracemalloc.Filter(True, "abc") self.assertEqual(f.inclusive, True) self.assertEqual(f.filename_pattern, "abc") self.assertIsNone(f.lineno) self.assertEqual(f.all_frames, False) # test custom values f = tracemalloc.Filter(False, "test.py", 123, True) self.assertEqual(f.inclusive, False) self.assertEqual(f.filename_pattern, "test.py") self.assertEqual(f.lineno, 123) self.assertEqual(f.all_frames, True) # parameters passed by keyword f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) self.assertEqual(f.inclusive, False) self.assertEqual(f.filename_pattern, "test.py") self.assertEqual(f.lineno, 123) self.assertEqual(f.all_frames, True) # read-only attribute self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") def test_filter_match(self): # filter without line number f = tracemalloc.Filter(True, "abc") self.assertTrue(f._match_frame("abc", 0)) self.assertTrue(f._match_frame("abc", 5)) self.assertTrue(f._match_frame("abc", 10)) self.assertFalse(f._match_frame("12356", 0)) self.assertFalse(f._match_frame("12356", 5)) self.assertFalse(f._match_frame("12356", 10)) f = tracemalloc.Filter(False, "abc") self.assertFalse(f._match_frame("abc", 0)) self.assertFalse(f._match_frame("abc", 5)) self.assertFalse(f._match_frame("abc", 10)) self.assertTrue(f._match_frame("12356", 0)) self.assertTrue(f._match_frame("12356", 5)) self.assertTrue(f._match_frame("12356", 10)) # filter with line number > 0 f = tracemalloc.Filter(True, "abc", 5) self.assertFalse(f._match_frame("abc", 0)) self.assertTrue(f._match_frame("abc", 5)) self.assertFalse(f._match_frame("abc", 10)) self.assertFalse(f._match_frame("12356", 0)) self.assertFalse(f._match_frame("12356", 5)) self.assertFalse(f._match_frame("12356", 10)) f = tracemalloc.Filter(False, "abc", 5) self.assertTrue(f._match_frame("abc", 0)) self.assertFalse(f._match_frame("abc", 5)) self.assertTrue(f._match_frame("abc", 10)) self.assertTrue(f._match_frame("12356", 0)) self.assertTrue(f._match_frame("12356", 5)) self.assertTrue(f._match_frame("12356", 10)) # filter with line number 0 f = tracemalloc.Filter(True, "abc", 0) self.assertTrue(f._match_frame("abc", 0)) self.assertFalse(f._match_frame("abc", 5)) self.assertFalse(f._match_frame("abc", 10)) self.assertFalse(f._match_frame("12356", 0)) self.assertFalse(f._match_frame("12356", 5)) self.assertFalse(f._match_frame("12356", 10)) f = tracemalloc.Filter(False, "abc", 0) self.assertFalse(f._match_frame("abc", 0)) self.assertTrue(f._match_frame("abc", 5)) self.assertTrue(f._match_frame("abc", 10)) self.assertTrue(f._match_frame("12356", 0)) self.assertTrue(f._match_frame("12356", 5)) self.assertTrue(f._match_frame("12356", 10)) def test_filter_match_filename(self): def fnmatch(inclusive, filename, pattern): f = tracemalloc.Filter(inclusive, pattern) return f._match_frame(filename, 0) self.assertTrue(fnmatch(True, "abc", "abc")) self.assertFalse(fnmatch(True, "12356", "abc")) self.assertFalse(fnmatch(True, "", "abc")) self.assertFalse(fnmatch(False, "abc", "abc")) self.assertTrue(fnmatch(False, "12356", "abc")) self.assertTrue(fnmatch(False, "", "abc")) def test_filter_match_filename_joker(self): def fnmatch(filename, pattern): filter = tracemalloc.Filter(True, pattern) return filter._match_frame(filename, 0) # empty string self.assertFalse(fnmatch('abc', '')) self.assertFalse(fnmatch('', 'abc')) self.assertTrue(fnmatch('', '')) self.assertTrue(fnmatch('', '*')) # no * self.assertTrue(fnmatch('abc', 'abc')) self.assertFalse(fnmatch('abc', 'abcd')) self.assertFalse(fnmatch('abc', 'def')) # a* self.assertTrue(fnmatch('abc', 'a*')) self.assertTrue(fnmatch('abc', 'abc*')) self.assertFalse(fnmatch('abc', 'b*')) self.assertFalse(fnmatch('abc', 'abcd*')) # a*b self.assertTrue(fnmatch('abc', 'a*c')) self.assertTrue(fnmatch('abcdcx', 'a*cx')) self.assertFalse(fnmatch('abb', 'a*c')) self.assertFalse(fnmatch('abcdce', 'a*cx')) # a*b*c self.assertTrue(fnmatch('abcde', 'a*c*e')) self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) self.assertFalse(fnmatch('abcdd', 'a*c*e')) self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) # replace .pyc suffix with .py self.assertTrue(fnmatch('a.pyc', 'a.py')) self.assertTrue(fnmatch('a.py', 'a.pyc')) if os.name == 'nt': # case insensitive self.assertTrue(fnmatch('aBC', 'ABc')) self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) self.assertTrue(fnmatch('a.pyc', 'a.PY')) self.assertTrue(fnmatch('a.py', 'a.PYC')) else: # case sensitive self.assertFalse(fnmatch('aBC', 'ABc')) self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) self.assertFalse(fnmatch('a.pyc', 'a.PY')) self.assertFalse(fnmatch('a.py', 'a.PYC')) if os.name == 'nt': # normalize alternate separator "/" to the standard separator "\" self.assertTrue(fnmatch(r'a/b', r'a\b')) self.assertTrue(fnmatch(r'a\b', r'a/b')) self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) else: # there is no alternate separator self.assertFalse(fnmatch(r'a/b', r'a\b')) self.assertFalse(fnmatch(r'a\b', r'a/b')) self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) # as of 3.5, .pyo is no longer munged to .py self.assertFalse(fnmatch('a.pyo', 'a.py')) def test_filter_match_trace(self): t1 = (("a.py", 2), ("b.py", 3)) t2 = (("b.py", 4), ("b.py", 5)) t3 = (("c.py", 5), ('', 0)) unknown = (('', 0),) f = tracemalloc.Filter(True, "b.py", all_frames=True) self.assertTrue(f._match_traceback(t1)) self.assertTrue(f._match_traceback(t2)) self.assertFalse(f._match_traceback(t3)) self.assertFalse(f._match_traceback(unknown)) f = tracemalloc.Filter(True, "b.py", all_frames=False) self.assertFalse(f._match_traceback(t1)) self.assertTrue(f._match_traceback(t2)) self.assertFalse(f._match_traceback(t3)) self.assertFalse(f._match_traceback(unknown)) f = tracemalloc.Filter(False, "b.py", all_frames=True) self.assertFalse(f._match_traceback(t1)) self.assertFalse(f._match_traceback(t2)) self.assertTrue(f._match_traceback(t3)) self.assertTrue(f._match_traceback(unknown)) f = tracemalloc.Filter(False, "b.py", all_frames=False) self.assertTrue(f._match_traceback(t1)) self.assertFalse(f._match_traceback(t2)) self.assertTrue(f._match_traceback(t3)) self.assertTrue(f._match_traceback(unknown)) f = tracemalloc.Filter(False, "", all_frames=False) self.assertTrue(f._match_traceback(t1)) self.assertTrue(f._match_traceback(t2)) self.assertTrue(f._match_traceback(t3)) self.assertFalse(f._match_traceback(unknown)) f = tracemalloc.Filter(True, "", all_frames=True) self.assertFalse(f._match_traceback(t1)) self.assertFalse(f._match_traceback(t2)) self.assertTrue(f._match_traceback(t3)) self.assertTrue(f._match_traceback(unknown)) f = tracemalloc.Filter(False, "", all_frames=True) self.assertTrue(f._match_traceback(t1)) self.assertTrue(f._match_traceback(t2)) self.assertFalse(f._match_traceback(t3)) self.assertFalse(f._match_traceback(unknown)) class TestCommandLine(unittest.TestCase): def test_env_var_disabled_by_default(self): # not tracing by default code = 'import tracemalloc; print(tracemalloc.is_tracing())' ok, stdout, stderr = assert_python_ok('-c', code) stdout = stdout.rstrip() self.assertEqual(stdout, b'False') @unittest.skipIf(interpreter_requires_environment(), 'Cannot run -E tests when PYTHON env vars are required.') def test_env_var_ignored_with_E(self): """PYTHON* environment variables must be ignored when -E is present.""" code = 'import tracemalloc; print(tracemalloc.is_tracing())' ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') stdout = stdout.rstrip() self.assertEqual(stdout, b'False') def test_env_var_disabled(self): # tracing at startup code = 'import tracemalloc; print(tracemalloc.is_tracing())' ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0') stdout = stdout.rstrip() self.assertEqual(stdout, b'False') def test_env_var_enabled_at_startup(self): # tracing at startup code = 'import tracemalloc; print(tracemalloc.is_tracing())' ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') stdout = stdout.rstrip() self.assertEqual(stdout, b'True') def test_env_limit(self): # start and set the number of frames code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') stdout = stdout.rstrip() self.assertEqual(stdout, b'10') def check_env_var_invalid(self, nframe): with support.SuppressCrashReport(): ok, stdout, stderr = assert_python_failure( '-c', 'pass', PYTHONTRACEMALLOC=str(nframe)) if b'ValueError: the number of frames must be in range' in stderr: return if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr: return self.fail(f"unexpected output: {stderr!a}") def test_env_var_invalid(self): for nframe in INVALID_NFRAME: with self.subTest(nframe=nframe): self.check_env_var_invalid(nframe) def test_sys_xoptions(self): for xoptions, nframe in ( ('tracemalloc', 1), ('tracemalloc=1', 1), ('tracemalloc=15', 15), ): with self.subTest(xoptions=xoptions, nframe=nframe): code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) stdout = stdout.rstrip() self.assertEqual(stdout, str(nframe).encode('ascii')) def check_sys_xoptions_invalid(self, nframe): args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') with support.SuppressCrashReport(): ok, stdout, stderr = assert_python_failure(*args) if b'ValueError: the number of frames must be in range' in stderr: return if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr: return self.fail(f"unexpected output: {stderr!a}") def test_sys_xoptions_invalid(self): for nframe in INVALID_NFRAME: with self.subTest(nframe=nframe): self.check_sys_xoptions_invalid(nframe) @unittest.skipIf(_testcapi is None, 'need _testcapi') def test_pymem_alloc0(self): # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled # does not crash. code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' assert_python_ok('-X', 'tracemalloc', '-c', code) @unittest.skipIf(_testcapi is None, 'need _testcapi') class TestCAPI(unittest.TestCase): maxDiff = 80 * 20 def setUp(self): if tracemalloc.is_tracing(): self.skipTest("tracemalloc must be stopped before the test") self.domain = 5 self.size = 123 self.obj = allocate_bytes(self.size)[0] # for the type "object", id(obj) is the address of its memory block. # This type is not tracked by the garbage collector self.ptr = id(self.obj) def tearDown(self): tracemalloc.stop() def get_traceback(self): frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) if frames is not None: return tracemalloc.Traceback(frames) else: return None def track(self, release_gil=False, nframe=1): frames = get_frames(nframe, 1) _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, release_gil) return frames def untrack(self): _testcapi.tracemalloc_untrack(self.domain, self.ptr) def get_traced_memory(self): # Get the traced size in the domain snapshot = tracemalloc.take_snapshot() domain_filter = tracemalloc.DomainFilter(True, self.domain) snapshot = snapshot.filter_traces([domain_filter]) return sum(trace.size for trace in snapshot.traces) def check_track(self, release_gil): nframe = 5 tracemalloc.start(nframe) size = tracemalloc.get_traced_memory()[0] frames = self.track(release_gil, nframe) self.assertEqual(self.get_traceback(), tracemalloc.Traceback(frames)) self.assertEqual(self.get_traced_memory(), self.size) def test_track(self): self.check_track(False) def test_track_without_gil(self): # check that calling _PyTraceMalloc_Track() without holding the GIL # works too self.check_track(True) def test_track_already_tracked(self): nframe = 5 tracemalloc.start(nframe) # track a first time self.track() # calling _PyTraceMalloc_Track() must remove the old trace and add # a new trace with the new traceback frames = self.track(nframe=nframe) self.assertEqual(self.get_traceback(), tracemalloc.Traceback(frames)) def test_untrack(self): tracemalloc.start() self.track() self.assertIsNotNone(self.get_traceback()) self.assertEqual(self.get_traced_memory(), self.size) # untrack must remove the trace self.untrack() self.assertIsNone(self.get_traceback()) self.assertEqual(self.get_traced_memory(), 0) # calling _PyTraceMalloc_Untrack() multiple times must not crash self.untrack() self.untrack() def test_stop_track(self): tracemalloc.start() tracemalloc.stop() with self.assertRaises(RuntimeError): self.track() self.assertIsNone(self.get_traceback()) def test_stop_untrack(self): tracemalloc.start() self.track() tracemalloc.stop() with self.assertRaises(RuntimeError): self.untrack() def test_main(): support.run_unittest( TestTraceback, TestTracemallocEnabled, TestSnapshot, TestFilters, TestCommandLine, TestCAPI, ) if __name__ == "__main__": test_main()