"""This module includes tests of the code object representation. >>> def f(x): ... def g(y): ... return x + y ... return g ... >>> dump(f.__code__) name: f argcount: 1 posonlyargcount: 0 kwonlyargcount: 0 names: () varnames: ('x', 'g') cellvars: ('x',) freevars: () nlocals: 2 flags: 3 consts: ('None', '') >>> dump(f(4).__code__) name: g argcount: 1 posonlyargcount: 0 kwonlyargcount: 0 names: () varnames: ('y',) cellvars: () freevars: ('x',) nlocals: 1 flags: 19 consts: ('None',) >>> def h(x, y): ... a = x + y ... b = x - y ... c = a * b ... return c ... >>> dump(h.__code__) name: h argcount: 2 posonlyargcount: 0 kwonlyargcount: 0 names: () varnames: ('x', 'y', 'a', 'b', 'c') cellvars: () freevars: () nlocals: 5 flags: 3 consts: ('None',) >>> def attrs(obj): ... print(obj.attr1) ... print(obj.attr2) ... print(obj.attr3) >>> dump(attrs.__code__) name: attrs argcount: 1 posonlyargcount: 0 kwonlyargcount: 0 names: ('print', 'attr1', 'attr2', 'attr3') varnames: ('obj',) cellvars: () freevars: () nlocals: 1 flags: 3 consts: ('None',) >>> def optimize_away(): ... 'doc string' ... 'not a docstring' ... 53 ... 0x53 >>> dump(optimize_away.__code__) name: optimize_away argcount: 0 posonlyargcount: 0 kwonlyargcount: 0 names: () varnames: () cellvars: () freevars: () nlocals: 0 flags: 3 consts: ("'doc string'", 'None') >>> def keywordonly_args(a,b,*,k1): ... return a,b,k1 ... >>> dump(keywordonly_args.__code__) name: keywordonly_args argcount: 2 posonlyargcount: 0 kwonlyargcount: 1 names: () varnames: ('a', 'b', 'k1') cellvars: () freevars: () nlocals: 3 flags: 3 consts: ('None',) >>> def posonly_args(a,b,/,c): ... return a,b,c ... >>> dump(posonly_args.__code__) name: posonly_args argcount: 3 posonlyargcount: 2 kwonlyargcount: 0 names: () varnames: ('a', 'b', 'c') cellvars: () freevars: () nlocals: 3 flags: 3 consts: ('None',) """ import copy import inspect import sys import threading import doctest import unittest import textwrap import weakref import dis try: import ctypes except ImportError: ctypes = None from test.support import (cpython_only, check_impl_detail, requires_debug_ranges, gc_collect) from test.support.script_helper import assert_python_ok from test.support import threading_helper from opcode import opmap, opname COPY_FREE_VARS = opmap['COPY_FREE_VARS'] def consts(t): """Yield a doctest-safe sequence of object reprs.""" for elt in t: r = repr(elt) if r.startswith("" % elt.co_name else: yield r def dump(co): """Print out a text representation of a code object.""" for attr in ["name", "argcount", "posonlyargcount", "kwonlyargcount", "names", "varnames", "cellvars", "freevars", "nlocals", "flags"]: print("%s: %s" % (attr, getattr(co, "co_" + attr))) print("consts:", tuple(consts(co.co_consts))) # Needed for test_closure_injection below # Defined at global scope to avoid implicitly closing over __class__ def external_getitem(self, i): return f"Foreign getitem: {super().__getitem__(i)}" class CodeTest(unittest.TestCase): @cpython_only def test_newempty(self): import _testcapi co = _testcapi.code_newempty("filename", "funcname", 15) self.assertEqual(co.co_filename, "filename") self.assertEqual(co.co_name, "funcname") self.assertEqual(co.co_firstlineno, 15) #Empty code object should raise, but not crash the VM with self.assertRaises(Exception): exec(co) @cpython_only def test_closure_injection(self): # From https://bugs.python.org/issue32176 from types import FunctionType def create_closure(__class__): return (lambda: __class__).__closure__ def new_code(c): '''A new code object with a __class__ cell added to freevars''' return c.replace(co_freevars=c.co_freevars + ('__class__',), co_code=bytes([COPY_FREE_VARS, 1])+c.co_code) def add_foreign_method(cls, name, f): code = new_code(f.__code__) assert not f.__closure__ closure = create_closure(cls) defaults = f.__defaults__ setattr(cls, name, FunctionType(code, globals(), name, defaults, closure)) class List(list): pass add_foreign_method(List, "__getitem__", external_getitem) # Ensure the closure injection actually worked function = List.__getitem__ class_ref = function.__closure__[0].cell_contents self.assertIs(class_ref, List) # Ensure the zero-arg super() call in the injected method works obj = List([1, 2, 3]) self.assertEqual(obj[0], "Foreign getitem: 1") def test_constructor(self): def func(): pass co = func.__code__ CodeType = type(co) # test code constructor CodeType(co.co_argcount, co.co_posonlyargcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, co.co_flags, co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_qualname, co.co_firstlineno, co.co_linetable, co.co_exceptiontable, co.co_freevars, co.co_cellvars) def test_qualname(self): self.assertEqual( CodeTest.test_qualname.__code__.co_qualname, CodeTest.test_qualname.__qualname__ ) def test_replace(self): def func(): x = 1 return x code = func.__code__ # different co_name, co_varnames, co_consts def func2(): y = 2 z = 3 return y code2 = func2.__code__ for attr, value in ( ("co_argcount", 0), ("co_posonlyargcount", 0), ("co_kwonlyargcount", 0), ("co_nlocals", 1), ("co_stacksize", 1), ("co_flags", code.co_flags | inspect.CO_COROUTINE), ("co_firstlineno", 100), ("co_code", code2.co_code), ("co_consts", code2.co_consts), ("co_names", ("myname",)), ("co_varnames", ('spam',)), ("co_freevars", ("freevar",)), ("co_cellvars", ("cellvar",)), ("co_filename", "newfilename"), ("co_name", "newname"), ("co_linetable", code2.co_linetable), ): with self.subTest(attr=attr, value=value): new_code = code.replace(**{attr: value}) self.assertEqual(getattr(new_code, attr), value) new_code = copy.replace(code, **{attr: value}) self.assertEqual(getattr(new_code, attr), value) new_code = code.replace(co_varnames=code2.co_varnames, co_nlocals=code2.co_nlocals) self.assertEqual(new_code.co_varnames, code2.co_varnames) self.assertEqual(new_code.co_nlocals, code2.co_nlocals) new_code = copy.replace(code, co_varnames=code2.co_varnames, co_nlocals=code2.co_nlocals) self.assertEqual(new_code.co_varnames, code2.co_varnames) self.assertEqual(new_code.co_nlocals, code2.co_nlocals) def test_nlocals_mismatch(self): def func(): x = 1 return x co = func.__code__ assert co.co_nlocals > 0; # First we try the constructor. CodeType = type(co) for diff in (-1, 1): with self.assertRaises(ValueError): CodeType(co.co_argcount, co.co_posonlyargcount, co.co_kwonlyargcount, # This is the only change. co.co_nlocals + diff, co.co_stacksize, co.co_flags, co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_qualname, co.co_firstlineno, co.co_linetable, co.co_exceptiontable, co.co_freevars, co.co_cellvars, ) # Then we try the replace method. with self.assertRaises(ValueError): co.replace(co_nlocals=co.co_nlocals - 1) with self.assertRaises(ValueError): co.replace(co_nlocals=co.co_nlocals + 1) def test_shrinking_localsplus(self): # Check that PyCode_NewWithPosOnlyArgs resizes both # localsplusnames and localspluskinds, if an argument is a cell. def func(arg): return lambda: arg code = func.__code__ newcode = code.replace(co_name="func") # Should not raise SystemError self.assertEqual(code, newcode) def test_empty_linetable(self): def func(): pass new_code = code = func.__code__.replace(co_linetable=b'') self.assertEqual(list(new_code.co_lines()), []) def test_co_lnotab_is_deprecated(self): # TODO: remove in 3.14 def func(): pass with self.assertWarns(DeprecationWarning): func.__code__.co_lnotab def test_invalid_bytecode(self): def foo(): pass # assert that opcode 229 is invalid self.assertEqual(opname[229], '<229>') # change first opcode to 0xeb (=229) foo.__code__ = foo.__code__.replace( co_code=b'\xe5' + foo.__code__.co_code[1:]) msg = "unknown opcode 229" with self.assertRaisesRegex(SystemError, msg): foo() @requires_debug_ranges() def test_co_positions_artificial_instructions(self): import dis namespace = {} exec(textwrap.dedent("""\ try: 1/0 except Exception as e: exc = e """), namespace) exc = namespace['exc'] traceback = exc.__traceback__ code = traceback.tb_frame.f_code artificial_instructions = [] for instr, positions in zip( dis.get_instructions(code, show_caches=True), code.co_positions(), strict=True ): # If any of the positions is None, then all have to # be None as well for the case above. There are still # some places in the compiler, where the artificial instructions # get assigned the first_lineno but they don't have other positions. # There is no easy way of inferring them at that stage, so for now # we don't support it. self.assertIn(positions.count(None), [0, 3, 4]) if not any(positions): artificial_instructions.append(instr) self.assertEqual( [ (instruction.opname, instruction.argval) for instruction in artificial_instructions ], [ ("PUSH_EXC_INFO", None), ("LOAD_CONST", None), # artificial 'None' ("STORE_NAME", "e"), # XX: we know the location for this ("DELETE_NAME", "e"), ("RERAISE", 1), ("COPY", 3), ("POP_EXCEPT", None), ("RERAISE", 1) ] ) def test_endline_and_columntable_none_when_no_debug_ranges(self): # Make sure that if `-X no_debug_ranges` is used, there is # minimal debug info code = textwrap.dedent(""" def f(): pass positions = f.__code__.co_positions() for line, end_line, column, end_column in positions: assert line == end_line assert column is None assert end_column is None """) assert_python_ok('-X', 'no_debug_ranges', '-c', code) def test_endline_and_columntable_none_when_no_debug_ranges_env(self): # Same as above but using the environment variable opt out. code = textwrap.dedent(""" def f(): pass positions = f.__code__.co_positions() for line, end_line, column, end_column in positions: assert line == end_line assert column is None assert end_column is None """) assert_python_ok('-c', code, PYTHONNODEBUGRANGES='1') # co_positions behavior when info is missing. @requires_debug_ranges() def test_co_positions_empty_linetable(self): def func(): x = 1 new_code = func.__code__.replace(co_linetable=b'') positions = new_code.co_positions() for line, end_line, column, end_column in positions: self.assertIsNone(line) self.assertEqual(end_line, new_code.co_firstlineno + 1) def test_code_equality(self): def f(): try: a() except: b() else: c() finally: d() code_a = f.__code__ code_b = code_a.replace(co_linetable=b"") code_c = code_a.replace(co_exceptiontable=b"") code_d = code_b.replace(co_exceptiontable=b"") self.assertNotEqual(code_a, code_b) self.assertNotEqual(code_a, code_c) self.assertNotEqual(code_a, code_d) self.assertNotEqual(code_b, code_c) self.assertNotEqual(code_b, code_d) self.assertNotEqual(code_c, code_d) def test_code_hash_uses_firstlineno(self): c1 = (lambda: 1).__code__ c2 = (lambda: 1).__code__ self.assertNotEqual(c1, c2) self.assertNotEqual(hash(c1), hash(c2)) c3 = c1.replace(co_firstlineno=17) self.assertNotEqual(c1, c3) self.assertNotEqual(hash(c1), hash(c3)) def test_code_hash_uses_order(self): # Swapping posonlyargcount and kwonlyargcount should change the hash. c = (lambda x, y, *, z=1, w=1: 1).__code__ self.assertEqual(c.co_argcount, 2) self.assertEqual(c.co_posonlyargcount, 0) self.assertEqual(c.co_kwonlyargcount, 2) swapped = c.replace(co_posonlyargcount=2, co_kwonlyargcount=0) self.assertNotEqual(c, swapped) self.assertNotEqual(hash(c), hash(swapped)) def test_code_hash_uses_bytecode(self): c = (lambda x, y: x + y).__code__ d = (lambda x, y: x * y).__code__ c1 = c.replace(co_code=d.co_code) self.assertNotEqual(c, c1) self.assertNotEqual(hash(c), hash(c1)) def isinterned(s): return s is sys.intern(('_' + s + '_')[1:-1]) class CodeConstsTest(unittest.TestCase): def find_const(self, consts, value): for v in consts: if v == value: return v self.assertIn(value, consts) # raises an exception self.fail('Should never be reached') def assertIsInterned(self, s): if not isinterned(s): self.fail('String %r is not interned' % (s,)) def assertIsNotInterned(self, s): if isinterned(s): self.fail('String %r is interned' % (s,)) @cpython_only def test_interned_string(self): co = compile('res = "str_value"', '?', 'exec') v = self.find_const(co.co_consts, 'str_value') self.assertIsInterned(v) @cpython_only def test_interned_string_in_tuple(self): co = compile('res = ("str_value",)', '?', 'exec') v = self.find_const(co.co_consts, ('str_value',)) self.assertIsInterned(v[0]) @cpython_only def test_interned_string_in_frozenset(self): co = compile('res = a in {"str_value"}', '?', 'exec') v = self.find_const(co.co_consts, frozenset(('str_value',))) self.assertIsInterned(tuple(v)[0]) @cpython_only def test_interned_string_default(self): def f(a='str_value'): return a self.assertIsInterned(f()) @cpython_only def test_interned_string_with_null(self): co = compile(r'res = "str\0value!"', '?', 'exec') v = self.find_const(co.co_consts, 'str\0value!') self.assertIsNotInterned(v) class CodeWeakRefTest(unittest.TestCase): def test_basic(self): # Create a code object in a clean environment so that we know we have # the only reference to it left. namespace = {} exec("def f(): pass", globals(), namespace) f = namespace["f"] del namespace self.called = False def callback(code): self.called = True # f is now the last reference to the function, and through it, the code # object. While we hold it, check that we can create a weakref and # deref it. Then delete it, and check that the callback gets called and # the reference dies. coderef = weakref.ref(f.__code__, callback) self.assertTrue(bool(coderef())) del f gc_collect() # For PyPy or other GCs. self.assertFalse(bool(coderef())) self.assertTrue(self.called) # Python implementation of location table parsing algorithm def read(it): return next(it) def read_varint(it): b = read(it) val = b & 63; shift = 0; while b & 64: b = read(it) shift += 6 val |= (b&63) << shift return val def read_signed_varint(it): uval = read_varint(it) if uval & 1: return -(uval >> 1) else: return uval >> 1 def parse_location_table(code): line = code.co_firstlineno it = iter(code.co_linetable) while True: try: first_byte = read(it) except StopIteration: return code = (first_byte >> 3) & 15 length = (first_byte & 7) + 1 if code == 15: yield (code, length, None, None, None, None) elif code == 14: line_delta = read_signed_varint(it) line += line_delta end_line = line + read_varint(it) col = read_varint(it) if col == 0: col = None else: col -= 1 end_col = read_varint(it) if end_col == 0: end_col = None else: end_col -= 1 yield (code, length, line, end_line, col, end_col) elif code == 13: # No column line_delta = read_signed_varint(it) line += line_delta yield (code, length, line, line, None, None) elif code in (10, 11, 12): # new line line_delta = code - 10 line += line_delta column = read(it) end_column = read(it) yield (code, length, line, line, column, end_column) else: assert (0 <= code < 10) second_byte = read(it) column = code << 3 | (second_byte >> 4) yield (code, length, line, line, column, column + (second_byte & 15)) def positions_from_location_table(code): for _, length, line, end_line, col, end_col in parse_location_table(code): for _ in range(length): yield (line, end_line, col, end_col) def dedup(lst, prev=object()): for item in lst: if item != prev: yield item prev = item def lines_from_postions(positions): return dedup(l for (l, _, _, _) in positions) def misshappen(): """ """ x = ( 4 + y ) y = ( a + b + d ) return q if ( x ) else p def bug93662(): example_report_generation_message= ( """ """ ).strip() raise ValueError() class CodeLocationTest(unittest.TestCase): def check_positions(self, func): pos1 = list(func.__code__.co_positions()) pos2 = list(positions_from_location_table(func.__code__)) for l1, l2 in zip(pos1, pos2): self.assertEqual(l1, l2) self.assertEqual(len(pos1), len(pos2)) def test_positions(self): self.check_positions(parse_location_table) self.check_positions(misshappen) self.check_positions(bug93662) def check_lines(self, func): co = func.__code__ lines1 = [line for _, _, line in co.co_lines()] self.assertEqual(lines1, list(dedup(lines1))) lines2 = list(lines_from_postions(positions_from_location_table(co))) for l1, l2 in zip(lines1, lines2): self.assertEqual(l1, l2) self.assertEqual(len(lines1), len(lines2)) def test_lines(self): self.check_lines(parse_location_table) self.check_lines(misshappen) self.check_lines(bug93662) @cpython_only def test_code_new_empty(self): # If this test fails, it means that the construction of PyCode_NewEmpty # needs to be modified! Please update this test *and* PyCode_NewEmpty, # so that they both stay in sync. def f(): pass PY_CODE_LOCATION_INFO_NO_COLUMNS = 13 f.__code__ = f.__code__.replace( co_stacksize=1, co_firstlineno=42, co_code=bytes( [ dis.opmap["RESUME"], 0, dis.opmap["LOAD_ASSERTION_ERROR"], 0, dis.opmap["RAISE_VARARGS"], 1, ] ), co_linetable=bytes( [ (1 << 7) | (PY_CODE_LOCATION_INFO_NO_COLUMNS << 3) | (3 - 1), 0, ] ), ) self.assertRaises(AssertionError, f) self.assertEqual( list(f.__code__.co_positions()), 3 * [(42, 42, None, None)], ) if check_impl_detail(cpython=True) and ctypes is not None: py = ctypes.pythonapi freefunc = ctypes.CFUNCTYPE(None,ctypes.c_voidp) RequestCodeExtraIndex = py.PyUnstable_Eval_RequestCodeExtraIndex RequestCodeExtraIndex.argtypes = (freefunc,) RequestCodeExtraIndex.restype = ctypes.c_ssize_t SetExtra = py.PyUnstable_Code_SetExtra SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp) SetExtra.restype = ctypes.c_int GetExtra = py.PyUnstable_Code_GetExtra GetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.POINTER(ctypes.c_voidp)) GetExtra.restype = ctypes.c_int LAST_FREED = None def myfree(ptr): global LAST_FREED LAST_FREED = ptr FREE_FUNC = freefunc(myfree) FREE_INDEX = RequestCodeExtraIndex(FREE_FUNC) class CoExtra(unittest.TestCase): def get_func(self): # Defining a function causes the containing function to have a # reference to the code object. We need the code objects to go # away, so we eval a lambda. return eval('lambda:42') def test_get_non_code(self): f = self.get_func() self.assertRaises(SystemError, SetExtra, 42, FREE_INDEX, ctypes.c_voidp(100)) self.assertRaises(SystemError, GetExtra, 42, FREE_INDEX, ctypes.c_voidp(100)) def test_bad_index(self): f = self.get_func() self.assertRaises(SystemError, SetExtra, f.__code__, FREE_INDEX+100, ctypes.c_voidp(100)) self.assertEqual(GetExtra(f.__code__, FREE_INDEX+100, ctypes.c_voidp(100)), 0) def test_free_called(self): # Verify that the provided free function gets invoked # when the code object is cleaned up. f = self.get_func() SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(100)) del f self.assertEqual(LAST_FREED, 100) def test_get_set(self): # Test basic get/set round tripping. f = self.get_func() extra = ctypes.c_voidp() SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(200)) # reset should free... SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(300)) self.assertEqual(LAST_FREED, 200) extra = ctypes.c_voidp() GetExtra(f.__code__, FREE_INDEX, extra) self.assertEqual(extra.value, 300) del f @threading_helper.requires_working_threading() def test_free_different_thread(self): # Freeing a code object on a different thread then # where the co_extra was set should be safe. f = self.get_func() class ThreadTest(threading.Thread): def __init__(self, f, test): super().__init__() self.f = f self.test = test def run(self): del self.f self.test.assertEqual(LAST_FREED, 500) SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(500)) tt = ThreadTest(f, self) del f tt.start() tt.join() self.assertEqual(LAST_FREED, 500) def load_tests(loader, tests, pattern): tests.addTest(doctest.DocTestSuite()) return tests if __name__ == "__main__": unittest.main()