mirror of https://github.com/python/cpython
470 lines
15 KiB
Python
470 lines
15 KiB
Python
import gc
|
|
import operator
|
|
import re
|
|
import sys
|
|
import textwrap
|
|
import threading
|
|
import types
|
|
import unittest
|
|
import weakref
|
|
try:
|
|
import _testcapi
|
|
except ImportError:
|
|
_testcapi = None
|
|
|
|
from test import support
|
|
from test.support import threading_helper, Py_GIL_DISABLED
|
|
from test.support.script_helper import assert_python_ok
|
|
|
|
|
|
class ClearTest(unittest.TestCase):
|
|
"""
|
|
Tests for frame.clear().
|
|
"""
|
|
|
|
def inner(self, x=5, **kwargs):
|
|
1/0
|
|
|
|
def outer(self, **kwargs):
|
|
try:
|
|
self.inner(**kwargs)
|
|
except ZeroDivisionError as e:
|
|
exc = e
|
|
return exc
|
|
|
|
def clear_traceback_frames(self, tb):
|
|
"""
|
|
Clear all frames in a traceback.
|
|
"""
|
|
while tb is not None:
|
|
tb.tb_frame.clear()
|
|
tb = tb.tb_next
|
|
|
|
def test_clear_locals(self):
|
|
class C:
|
|
pass
|
|
c = C()
|
|
wr = weakref.ref(c)
|
|
exc = self.outer(c=c)
|
|
del c
|
|
support.gc_collect()
|
|
# A reference to c is held through the frames
|
|
self.assertIsNot(None, wr())
|
|
self.clear_traceback_frames(exc.__traceback__)
|
|
support.gc_collect()
|
|
# The reference was released by .clear()
|
|
self.assertIs(None, wr())
|
|
|
|
def test_clear_locals_after_f_locals_access(self):
|
|
# see gh-113939
|
|
class C:
|
|
pass
|
|
|
|
wr = None
|
|
def inner():
|
|
nonlocal wr
|
|
c = C()
|
|
wr = weakref.ref(c)
|
|
1/0
|
|
|
|
try:
|
|
inner()
|
|
except ZeroDivisionError as exc:
|
|
support.gc_collect()
|
|
self.assertIsNotNone(wr())
|
|
exc.__traceback__.tb_next.tb_frame.clear()
|
|
support.gc_collect()
|
|
self.assertIsNone(wr())
|
|
|
|
def test_clear_does_not_clear_specials(self):
|
|
class C:
|
|
pass
|
|
c = C()
|
|
exc = self.outer(c=c)
|
|
del c
|
|
f = exc.__traceback__.tb_frame
|
|
f.clear()
|
|
self.assertIsNot(f.f_code, None)
|
|
self.assertIsNot(f.f_locals, None)
|
|
self.assertIsNot(f.f_builtins, None)
|
|
self.assertIsNot(f.f_globals, None)
|
|
|
|
def test_clear_generator(self):
|
|
endly = False
|
|
def g():
|
|
nonlocal endly
|
|
try:
|
|
yield
|
|
self.inner()
|
|
finally:
|
|
endly = True
|
|
gen = g()
|
|
next(gen)
|
|
self.assertFalse(endly)
|
|
|
|
# Cannot clear a suspended frame
|
|
with self.assertRaisesRegex(RuntimeError, r'suspended frame'):
|
|
gen.gi_frame.clear()
|
|
self.assertFalse(endly)
|
|
|
|
def test_clear_executing(self):
|
|
# Attempting to clear an executing frame is forbidden.
|
|
try:
|
|
1/0
|
|
except ZeroDivisionError as e:
|
|
f = e.__traceback__.tb_frame
|
|
with self.assertRaises(RuntimeError):
|
|
f.clear()
|
|
with self.assertRaises(RuntimeError):
|
|
f.f_back.clear()
|
|
|
|
def test_clear_executing_generator(self):
|
|
# Attempting to clear an executing generator frame is forbidden.
|
|
endly = False
|
|
def g():
|
|
nonlocal endly
|
|
try:
|
|
1/0
|
|
except ZeroDivisionError as e:
|
|
f = e.__traceback__.tb_frame
|
|
with self.assertRaises(RuntimeError):
|
|
f.clear()
|
|
with self.assertRaises(RuntimeError):
|
|
f.f_back.clear()
|
|
yield f
|
|
finally:
|
|
endly = True
|
|
gen = g()
|
|
f = next(gen)
|
|
self.assertFalse(endly)
|
|
# Cannot clear a suspended frame
|
|
with self.assertRaisesRegex(RuntimeError, 'suspended frame'):
|
|
f.clear()
|
|
self.assertFalse(endly)
|
|
|
|
def test_lineno_with_tracing(self):
|
|
def record_line():
|
|
f = sys._getframe(1)
|
|
lines.append(f.f_lineno-f.f_code.co_firstlineno)
|
|
|
|
def test(trace):
|
|
record_line()
|
|
if trace:
|
|
sys._getframe(0).f_trace = True
|
|
record_line()
|
|
record_line()
|
|
|
|
expected_lines = [1, 4, 5]
|
|
lines = []
|
|
test(False)
|
|
self.assertEqual(lines, expected_lines)
|
|
lines = []
|
|
test(True)
|
|
self.assertEqual(lines, expected_lines)
|
|
|
|
@support.cpython_only
|
|
def test_clear_refcycles(self):
|
|
# .clear() doesn't leave any refcycle behind
|
|
with support.disable_gc():
|
|
class C:
|
|
pass
|
|
c = C()
|
|
wr = weakref.ref(c)
|
|
exc = self.outer(c=c)
|
|
del c
|
|
self.assertIsNot(None, wr())
|
|
self.clear_traceback_frames(exc.__traceback__)
|
|
self.assertIs(None, wr())
|
|
|
|
|
|
class FrameAttrsTest(unittest.TestCase):
|
|
|
|
def make_frames(self):
|
|
def outer():
|
|
x = 5
|
|
y = 6
|
|
def inner():
|
|
z = x + 2
|
|
1/0
|
|
t = 9
|
|
return inner()
|
|
try:
|
|
outer()
|
|
except ZeroDivisionError as e:
|
|
tb = e.__traceback__
|
|
frames = []
|
|
while tb:
|
|
frames.append(tb.tb_frame)
|
|
tb = tb.tb_next
|
|
return frames
|
|
|
|
def test_locals(self):
|
|
f, outer, inner = self.make_frames()
|
|
outer_locals = outer.f_locals
|
|
self.assertIsInstance(outer_locals.pop('inner'), types.FunctionType)
|
|
self.assertEqual(outer_locals, {'x': 5, 'y': 6})
|
|
inner_locals = inner.f_locals
|
|
self.assertEqual(inner_locals, {'x': 5, 'z': 7})
|
|
|
|
def test_clear_locals(self):
|
|
# Test f_locals after clear() (issue #21897)
|
|
f, outer, inner = self.make_frames()
|
|
outer.clear()
|
|
inner.clear()
|
|
self.assertEqual(outer.f_locals, {})
|
|
self.assertEqual(inner.f_locals, {})
|
|
|
|
def test_locals_clear_locals(self):
|
|
# Test f_locals before and after clear() (to exercise caching)
|
|
f, outer, inner = self.make_frames()
|
|
outer.f_locals
|
|
inner.f_locals
|
|
outer.clear()
|
|
inner.clear()
|
|
self.assertEqual(outer.f_locals, {})
|
|
self.assertEqual(inner.f_locals, {})
|
|
|
|
def test_f_lineno_del_segfault(self):
|
|
f, _, _ = self.make_frames()
|
|
with self.assertRaises(AttributeError):
|
|
del f.f_lineno
|
|
|
|
|
|
class ReprTest(unittest.TestCase):
|
|
"""
|
|
Tests for repr(frame).
|
|
"""
|
|
|
|
def test_repr(self):
|
|
def outer():
|
|
x = 5
|
|
y = 6
|
|
def inner():
|
|
z = x + 2
|
|
1/0
|
|
t = 9
|
|
return inner()
|
|
|
|
offset = outer.__code__.co_firstlineno
|
|
try:
|
|
outer()
|
|
except ZeroDivisionError as e:
|
|
tb = e.__traceback__
|
|
frames = []
|
|
while tb:
|
|
frames.append(tb.tb_frame)
|
|
tb = tb.tb_next
|
|
else:
|
|
self.fail("should have raised")
|
|
|
|
f_this, f_outer, f_inner = frames
|
|
file_repr = re.escape(repr(__file__))
|
|
self.assertRegex(repr(f_this),
|
|
r"^<frame at 0x[0-9a-fA-F]+, file %s, line %d, code test_repr>$"
|
|
% (file_repr, offset + 23))
|
|
self.assertRegex(repr(f_outer),
|
|
r"^<frame at 0x[0-9a-fA-F]+, file %s, line %d, code outer>$"
|
|
% (file_repr, offset + 7))
|
|
self.assertRegex(repr(f_inner),
|
|
r"^<frame at 0x[0-9a-fA-F]+, file %s, line %d, code inner>$"
|
|
% (file_repr, offset + 5))
|
|
|
|
class TestIncompleteFrameAreInvisible(unittest.TestCase):
|
|
|
|
def test_issue95818(self):
|
|
# See GH-95818 for details
|
|
code = textwrap.dedent(f"""
|
|
import gc
|
|
|
|
gc.set_threshold(1,1,1)
|
|
class GCHello:
|
|
def __del__(self):
|
|
print("Destroyed from gc")
|
|
|
|
def gen():
|
|
yield
|
|
|
|
fd = open({__file__!r})
|
|
l = [fd, GCHello()]
|
|
l.append(l)
|
|
del fd
|
|
del l
|
|
gen()
|
|
""")
|
|
assert_python_ok("-c", code)
|
|
|
|
@support.cpython_only
|
|
@unittest.skipIf(Py_GIL_DISABLED, "test requires precise GC scheduling")
|
|
def test_sneaky_frame_object(self):
|
|
|
|
def trace(frame, event, arg):
|
|
"""
|
|
Don't actually do anything, just force a frame object to be created.
|
|
"""
|
|
|
|
def callback(phase, info):
|
|
"""
|
|
Yo dawg, I heard you like frames, so I'm allocating a frame while
|
|
you're allocating a frame, so you can have a frame while you have a
|
|
frame!
|
|
"""
|
|
nonlocal sneaky_frame_object
|
|
sneaky_frame_object = sys._getframe().f_back.f_back
|
|
# We're done here:
|
|
gc.callbacks.remove(callback)
|
|
|
|
def f():
|
|
while True:
|
|
yield
|
|
|
|
old_threshold = gc.get_threshold()
|
|
old_callbacks = gc.callbacks[:]
|
|
old_enabled = gc.isenabled()
|
|
old_trace = sys.gettrace()
|
|
try:
|
|
# Stop the GC for a second while we set things up:
|
|
gc.disable()
|
|
# Create a paused generator:
|
|
g = f()
|
|
next(g)
|
|
# Move all objects to the oldest generation, and tell the GC to run
|
|
# on the *very next* allocation:
|
|
gc.collect()
|
|
gc.set_threshold(1, 0, 0)
|
|
sys._clear_internal_caches()
|
|
# Okay, so here's the nightmare scenario:
|
|
# - We're tracing the resumption of a generator, which creates a new
|
|
# frame object.
|
|
# - The allocation of this frame object triggers a collection
|
|
# *before* the frame object is actually created.
|
|
# - During the collection, we request the exact same frame object.
|
|
# This test does it with a GC callback, but in real code it would
|
|
# likely be a trace function, weakref callback, or finalizer.
|
|
# - The collection finishes, and the original frame object is
|
|
# created. We now have two frame objects fighting over ownership
|
|
# of the same interpreter frame!
|
|
sys.settrace(trace)
|
|
gc.callbacks.append(callback)
|
|
sneaky_frame_object = None
|
|
gc.enable()
|
|
next(g)
|
|
# g.gi_frame should be the frame object from the callback (the
|
|
# one that was *requested* second, but *created* first):
|
|
self.assertIs(g.gi_frame, sneaky_frame_object)
|
|
finally:
|
|
gc.set_threshold(*old_threshold)
|
|
gc.callbacks[:] = old_callbacks
|
|
sys.settrace(old_trace)
|
|
if old_enabled:
|
|
gc.enable()
|
|
|
|
@support.cpython_only
|
|
@threading_helper.requires_working_threading()
|
|
def test_sneaky_frame_object_teardown(self):
|
|
|
|
class SneakyDel:
|
|
def __del__(self):
|
|
"""
|
|
Stash a reference to the entire stack for walking later.
|
|
|
|
It may look crazy, but you'd be surprised how common this is
|
|
when using a test runner (like pytest). The typical recipe is:
|
|
ResourceWarning + -Werror + a custom sys.unraisablehook.
|
|
"""
|
|
nonlocal sneaky_frame_object
|
|
sneaky_frame_object = sys._getframe()
|
|
|
|
class SneakyThread(threading.Thread):
|
|
"""
|
|
A separate thread isn't needed to make this code crash, but it does
|
|
make crashes more consistent, since it means sneaky_frame_object is
|
|
backed by freed memory after the thread completes!
|
|
"""
|
|
|
|
def run(self):
|
|
"""Run SneakyDel.__del__ as this frame is popped."""
|
|
ref = SneakyDel()
|
|
|
|
sneaky_frame_object = None
|
|
t = SneakyThread()
|
|
t.start()
|
|
t.join()
|
|
# sneaky_frame_object can be anything, really, but it's crucial that
|
|
# SneakyThread.run's frame isn't anywhere on the stack while it's being
|
|
# torn down:
|
|
self.assertIsNotNone(sneaky_frame_object)
|
|
while sneaky_frame_object is not None:
|
|
self.assertIsNot(
|
|
sneaky_frame_object.f_code, SneakyThread.run.__code__
|
|
)
|
|
sneaky_frame_object = sneaky_frame_object.f_back
|
|
|
|
def test_entry_frames_are_invisible_during_teardown(self):
|
|
class C:
|
|
"""A weakref'able class."""
|
|
|
|
def f():
|
|
"""Try to find globals and locals as this frame is being cleared."""
|
|
ref = C()
|
|
# Ignore the fact that exec(C()) is a nonsense callback. We're only
|
|
# using exec here because it tries to access the current frame's
|
|
# globals and locals. If it's trying to get those from a shim frame,
|
|
# we'll crash before raising:
|
|
return weakref.ref(ref, exec)
|
|
|
|
with support.catch_unraisable_exception() as catcher:
|
|
# Call from C, so there is a shim frame directly above f:
|
|
weak = operator.call(f) # BOOM!
|
|
# Cool, we didn't crash. Check that the callback actually happened:
|
|
self.assertIs(catcher.unraisable.exc_type, TypeError)
|
|
self.assertIsNone(weak())
|
|
|
|
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
|
class TestCAPI(unittest.TestCase):
|
|
def getframe(self):
|
|
return sys._getframe()
|
|
|
|
def test_frame_getters(self):
|
|
frame = self.getframe()
|
|
self.assertEqual(frame.f_locals, _testcapi.frame_getlocals(frame))
|
|
self.assertIs(frame.f_globals, _testcapi.frame_getglobals(frame))
|
|
self.assertIs(frame.f_builtins, _testcapi.frame_getbuiltins(frame))
|
|
self.assertEqual(frame.f_lasti, _testcapi.frame_getlasti(frame))
|
|
|
|
def test_getvar(self):
|
|
current_frame = sys._getframe()
|
|
x = 1
|
|
self.assertEqual(_testcapi.frame_getvar(current_frame, "x"), 1)
|
|
self.assertEqual(_testcapi.frame_getvarstring(current_frame, b"x"), 1)
|
|
with self.assertRaises(NameError):
|
|
_testcapi.frame_getvar(current_frame, "y")
|
|
with self.assertRaises(NameError):
|
|
_testcapi.frame_getvarstring(current_frame, b"y")
|
|
|
|
# wrong name type
|
|
with self.assertRaises(TypeError):
|
|
_testcapi.frame_getvar(current_frame, b'x')
|
|
with self.assertRaises(TypeError):
|
|
_testcapi.frame_getvar(current_frame, 123)
|
|
|
|
def getgenframe(self):
|
|
yield sys._getframe()
|
|
|
|
def test_frame_get_generator(self):
|
|
gen = self.getgenframe()
|
|
frame = next(gen)
|
|
self.assertIs(gen, _testcapi.frame_getgenerator(frame))
|
|
|
|
def test_frame_fback_api(self):
|
|
"""Test that accessing `f_back` does not cause a segmentation fault on
|
|
a frame created with `PyFrame_New` (GH-99110)."""
|
|
def dummy():
|
|
pass
|
|
|
|
frame = _testcapi.frame_new(dummy.__code__, globals(), locals())
|
|
# The following line should not cause a segmentation fault.
|
|
self.assertIsNone(frame.f_back)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|