cpython/Lib/test/test_finalization.py

539 lines
15 KiB
Python
Raw Permalink Normal View History

"""
Tests for object finalization semantics, as outlined in PEP 442.
"""
import contextlib
import gc
import unittest
import weakref
try:
from _testcapi import with_tp_del
except ImportError:
def with_tp_del(cls):
class C(object):
def __new__(cls, *args, **kwargs):
raise TypeError('requires _testcapi.with_tp_del')
return C
try:
from _testcapi import without_gc
except ImportError:
def without_gc(cls):
class C:
def __new__(cls, *args, **kwargs):
raise TypeError('requires _testcapi.without_gc')
return C
from test import support
class NonGCSimpleBase:
"""
The base class for all the objects under test, equipped with various
testing features.
"""
survivors = []
del_calls = []
tp_del_calls = []
errors = []
_cleaning = False
__slots__ = ()
@classmethod
def _cleanup(cls):
cls.survivors.clear()
cls.errors.clear()
gc.garbage.clear()
gc.collect()
cls.del_calls.clear()
cls.tp_del_calls.clear()
@classmethod
@contextlib.contextmanager
def test(cls):
"""
A context manager to use around all finalization tests.
"""
with support.disable_gc():
cls.del_calls.clear()
cls.tp_del_calls.clear()
NonGCSimpleBase._cleaning = False
try:
yield
if cls.errors:
raise cls.errors[0]
finally:
NonGCSimpleBase._cleaning = True
cls._cleanup()
def check_sanity(self):
"""
Check the object is sane (non-broken).
"""
def __del__(self):
"""
PEP 442 finalizer. Record that this was called, check the
object is in a sane state, and invoke a side effect.
"""
try:
if not self._cleaning:
self.del_calls.append(id(self))
self.check_sanity()
self.side_effect()
except Exception as e:
self.errors.append(e)
def side_effect(self):
"""
A side effect called on destruction.
"""
class SimpleBase(NonGCSimpleBase):
def __init__(self):
self.id_ = id(self)
def check_sanity(self):
assert self.id_ == id(self)
@without_gc
class NonGC(NonGCSimpleBase):
__slots__ = ()
@without_gc
class NonGCResurrector(NonGCSimpleBase):
__slots__ = ()
def side_effect(self):
"""
Resurrect self by storing self in a class-wide list.
"""
self.survivors.append(self)
class Simple(SimpleBase):
pass
# Can't inherit from NonGCResurrector, in case importing without_gc fails.
class SimpleResurrector(SimpleBase):
def side_effect(self):
"""
Resurrect self by storing self in a class-wide list.
"""
self.survivors.append(self)
class TestBase:
def setUp(self):
self.old_garbage = gc.garbage[:]
gc.garbage[:] = []
def tearDown(self):
# None of the tests here should put anything in gc.garbage
try:
self.assertEqual(gc.garbage, [])
finally:
del self.old_garbage
gc.collect()
def assert_del_calls(self, ids):
self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
def assert_tp_del_calls(self, ids):
self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
def assert_survivors(self, ids):
self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
def assert_garbage(self, ids):
self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
def clear_survivors(self):
SimpleBase.survivors.clear()
class SimpleFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization without refcycles.
"""
def test_simple(self):
with SimpleBase.test():
s = Simple()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_simple_resurrect(self):
with SimpleBase.test():
s = SimpleResurrector()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(ids)
self.assertIsNot(wr(), None)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
@support.cpython_only
def test_non_gc(self):
with SimpleBase.test():
s = NonGC()
self.assertFalse(gc.is_tracked(s))
ids = [id(s)]
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
@support.cpython_only
def test_non_gc_resurrect(self):
with SimpleBase.test():
s = NonGCResurrector()
self.assertFalse(gc.is_tracked(s))
ids = [id(s)]
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(ids)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids * 2)
self.assert_survivors(ids)
class SelfCycleBase:
def __init__(self):
super().__init__()
self.ref = self
def check_sanity(self):
super().check_sanity()
assert self.ref is self
class SimpleSelfCycle(SelfCycleBase, Simple):
pass
class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
pass
class SuicidalSelfCycle(SelfCycleBase, Simple):
def side_effect(self):
"""
Explicitly break the reference cycle.
"""
self.ref = None
class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization of an object having a single cyclic reference to
itself.
"""
def test_simple(self):
with SimpleBase.test():
s = SimpleSelfCycle()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_simple_resurrect(self):
# Test that __del__ can resurrect the object being finalized.
with SimpleBase.test():
s = SelfCycleResurrector()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(ids)
# XXX is this desirable?
self.assertIs(wr(), None)
# When trying to destroy the object a second time, __del__
# isn't called anymore (and the object isn't resurrected).
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
def test_simple_suicide(self):
# Test the GC is able to deal with an object that kills its last
# reference during __del__.
with SimpleBase.test():
s = SuicidalSelfCycle()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
class ChainedBase:
def chain(self, left):
self.suicided = False
self.left = left
left.right = self
def check_sanity(self):
super().check_sanity()
if self.suicided:
assert self.left is None
assert self.right is None
else:
left = self.left
if left.suicided:
assert left.right is None
else:
assert left.right is self
right = self.right
if right.suicided:
assert right.left is None
else:
assert right.left is self
class SimpleChained(ChainedBase, Simple):
pass
class ChainedResurrector(ChainedBase, SimpleResurrector):
pass
class SuicidalChained(ChainedBase, Simple):
def side_effect(self):
"""
Explicitly break the reference cycle.
"""
self.suicided = True
self.left = None
self.right = None
class CycleChainFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization of a cyclic chain. These tests are similar in
spirit to the self-cycle tests above, but the collectable object
graph isn't trivial anymore.
"""
def build_chain(self, classes):
nodes = [cls() for cls in classes]
for i in range(len(nodes)):
nodes[i].chain(nodes[i-1])
return nodes
def check_non_resurrecting_chain(self, classes):
N = len(classes)
with SimpleBase.test():
nodes = self.build_chain(classes)
ids = [id(s) for s in nodes]
wrs = [weakref.ref(s) for s in nodes]
del nodes
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertEqual([wr() for wr in wrs], [None] * N)
gc.collect()
self.assert_del_calls(ids)
def check_resurrecting_chain(self, classes):
N = len(classes)
with SimpleBase.test():
nodes = self.build_chain(classes)
N = len(nodes)
ids = [id(s) for s in nodes]
survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
wrs = [weakref.ref(s) for s in nodes]
del nodes
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(survivor_ids)
# XXX desirable?
self.assertEqual([wr() for wr in wrs], [None] * N)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_homogenous(self):
self.check_non_resurrecting_chain([SimpleChained] * 3)
def test_homogenous_resurrect(self):
self.check_resurrecting_chain([ChainedResurrector] * 3)
def test_homogenous_suicidal(self):
self.check_non_resurrecting_chain([SuicidalChained] * 3)
def test_heterogenous_suicidal_one(self):
self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
def test_heterogenous_suicidal_two(self):
self.check_non_resurrecting_chain(
[SuicidalChained] * 2 + [SimpleChained] * 2)
def test_heterogenous_resurrect_one(self):
self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
def test_heterogenous_resurrect_two(self):
self.check_resurrecting_chain(
[ChainedResurrector, SimpleChained, SuicidalChained] * 2)
def test_heterogenous_resurrect_three(self):
self.check_resurrecting_chain(
[ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
# NOTE: the tp_del slot isn't automatically inherited, so we have to call
# with_tp_del() for each instantiated class.
class LegacyBase(SimpleBase):
def __del__(self):
try:
# Do not invoke side_effect here, since we are now exercising
# the tp_del slot.
if not self._cleaning:
self.del_calls.append(id(self))
self.check_sanity()
except Exception as e:
self.errors.append(e)
def __tp_del__(self):
"""
Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
"""
try:
if not self._cleaning:
self.tp_del_calls.append(id(self))
self.check_sanity()
self.side_effect()
except Exception as e:
self.errors.append(e)
@with_tp_del
class Legacy(LegacyBase):
pass
@with_tp_del
class LegacyResurrector(LegacyBase):
def side_effect(self):
"""
Resurrect self by storing self in a class-wide list.
"""
self.survivors.append(self)
@with_tp_del
class LegacySelfCycle(SelfCycleBase, LegacyBase):
pass
@support.cpython_only
class LegacyFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization of objects with a tp_del.
"""
def tearDown(self):
# These tests need to clean up a bit more, since they create
# uncollectable objects.
gc.garbage.clear()
gc.collect()
super().tearDown()
def test_legacy(self):
with SimpleBase.test():
s = Legacy()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids)
def test_legacy_resurrect(self):
with SimpleBase.test():
s = LegacyResurrector()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids)
self.assert_survivors(ids)
# weakrefs are cleared before tp_del is called.
self.assertIs(wr(), None)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids * 2)
self.assert_survivors(ids)
self.assertIs(wr(), None)
def test_legacy_self_cycle(self):
# Self-cycles with legacy finalizers end up in gc.garbage.
with SimpleBase.test():
s = LegacySelfCycle()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls([])
self.assert_tp_del_calls([])
self.assert_survivors([])
self.assert_garbage(ids)
self.assertIsNot(wr(), None)
# Break the cycle to allow collection
gc.garbage[0].ref = None
self.assert_garbage([])
self.assertIs(wr(), None)
if __name__ == "__main__":
unittest.main()