mirror of https://github.com/python/cpython
Convert the weakref test suite to PyUNIT, and add tests that exercise weak
references on function objects and both bound and unbound methods.
This commit is contained in:
parent
84a5934f8a
commit
b0fefc5121
|
@ -1,25 +1 @@
|
||||||
test_weakref
|
test_weakref
|
||||||
Basic Weak References
|
|
||||||
-- Liveness and referent identity
|
|
||||||
-- Reference objects with callbacks
|
|
||||||
-- Proxy objects with callbacks
|
|
||||||
-- Re-use of weak reference objects
|
|
||||||
reference objects
|
|
||||||
proxy objects
|
|
||||||
clearing ref 2
|
|
||||||
clearing ref 1
|
|
||||||
clearing ref 2
|
|
||||||
clearing ref 1
|
|
||||||
|
|
||||||
Weak Valued Dictionaries
|
|
||||||
objects are stored in weak dict
|
|
||||||
weak dict test complete
|
|
||||||
|
|
||||||
Weak Keyed Dictionaries
|
|
||||||
objects are stored in weak dict
|
|
||||||
weak key dict test complete
|
|
||||||
|
|
||||||
Non-callable Proxy References
|
|
||||||
XXX -- tests not written!
|
|
||||||
|
|
||||||
Callable Proxy References
|
|
||||||
|
|
|
@ -1,137 +1,202 @@
|
||||||
import sys
|
import sys
|
||||||
|
import unittest
|
||||||
import weakref
|
import weakref
|
||||||
|
|
||||||
from test_support import TestFailed, verify
|
from test_support import run_unittest, verify
|
||||||
|
|
||||||
|
|
||||||
class C:
|
class C:
|
||||||
|
def method(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
print "Basic Weak References"
|
class Callable:
|
||||||
|
bar = None
|
||||||
|
|
||||||
print "-- Liveness and referent identity"
|
def __call__(self, x):
|
||||||
|
self.bar = x
|
||||||
|
|
||||||
o = C()
|
|
||||||
ref = weakref.ref(o)
|
|
||||||
verify(ref() is not None, "weak reference to live object should be live")
|
|
||||||
o2 = ref()
|
|
||||||
verify(ref() is not None, "weak ref should still be live")
|
|
||||||
verify(o is o2, "<ref>() should return original object if live")
|
|
||||||
del o, o2
|
|
||||||
del ref
|
|
||||||
|
|
||||||
cbcalled = 0
|
def create_function():
|
||||||
def callback(o):
|
def f(): pass
|
||||||
global cbcalled
|
return f
|
||||||
cbcalled = 1
|
|
||||||
|
|
||||||
o = C()
|
def create_bound_method():
|
||||||
ref2 = weakref.ref(o, callback)
|
return C().method
|
||||||
del o
|
|
||||||
verify(cbcalled,
|
def create_unbound_method():
|
||||||
|
return C.method
|
||||||
|
|
||||||
|
|
||||||
|
class TestBase(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.cbcalled = 0
|
||||||
|
|
||||||
|
def callback(self, ref):
|
||||||
|
self.cbcalled += 1
|
||||||
|
|
||||||
|
|
||||||
|
class ReferencesTestCase(TestBase):
|
||||||
|
|
||||||
|
def test_basic_ref(self):
|
||||||
|
self.check_basic_ref(C)
|
||||||
|
self.check_basic_ref(create_function)
|
||||||
|
self.check_basic_ref(create_bound_method)
|
||||||
|
self.check_basic_ref(create_unbound_method)
|
||||||
|
|
||||||
|
def test_basic_callback(self):
|
||||||
|
self.check_basic_callback(C)
|
||||||
|
self.check_basic_callback(create_function)
|
||||||
|
self.check_basic_callback(create_bound_method)
|
||||||
|
self.check_basic_callback(create_unbound_method)
|
||||||
|
|
||||||
|
def test_multiple_callbacks(self):
|
||||||
|
o = C()
|
||||||
|
ref1 = weakref.ref(o, self.callback)
|
||||||
|
ref2 = weakref.ref(o, self.callback)
|
||||||
|
del o
|
||||||
|
self.assert_(ref1() is None,
|
||||||
|
"expected reference to be invalidated")
|
||||||
|
self.assert_(ref2() is None,
|
||||||
|
"expected reference to be invalidated")
|
||||||
|
self.assert_(self.cbcalled == 2,
|
||||||
|
"callback not called the right number of times")
|
||||||
|
|
||||||
|
def test_proxy_ref(self):
|
||||||
|
o = C()
|
||||||
|
o.bar = 1
|
||||||
|
ref1 = weakref.proxy(o, self.callback)
|
||||||
|
ref2 = weakref.proxy(o, self.callback)
|
||||||
|
del o
|
||||||
|
|
||||||
|
def check(proxy):
|
||||||
|
proxy.bar
|
||||||
|
|
||||||
|
self.assertRaises(weakref.ReferenceError, check, ref1)
|
||||||
|
self.assertRaises(weakref.ReferenceError, check, ref2)
|
||||||
|
self.assert_(self.cbcalled == 2)
|
||||||
|
|
||||||
|
def check_basic_ref(self, factory):
|
||||||
|
o = factory()
|
||||||
|
ref = weakref.ref(o)
|
||||||
|
self.assert_(ref() is not None,
|
||||||
|
"weak reference to live object should be live")
|
||||||
|
o2 = ref()
|
||||||
|
self.assert_(o is o2,
|
||||||
|
"<ref>() should return original object if live")
|
||||||
|
|
||||||
|
def check_basic_callback(self, factory):
|
||||||
|
self.cbcalled = 0
|
||||||
|
o = factory()
|
||||||
|
ref = weakref.ref(o, self.callback)
|
||||||
|
del o
|
||||||
|
verify(self.cbcalled == 1,
|
||||||
"callback did not properly set 'cbcalled'")
|
"callback did not properly set 'cbcalled'")
|
||||||
verify(ref2() is None,
|
verify(ref() is None,
|
||||||
"ref2 should be dead after deleting object reference")
|
"ref2 should be dead after deleting object reference")
|
||||||
del ref2
|
|
||||||
|
|
||||||
|
def test_ref_reuse(self):
|
||||||
|
o = C()
|
||||||
|
ref1 = weakref.ref(o)
|
||||||
|
# create a proxy to make sure that there's an intervening creation
|
||||||
|
# between these two; it should make no difference
|
||||||
|
proxy = weakref.proxy(o)
|
||||||
|
ref2 = weakref.ref(o)
|
||||||
|
self.assert_(ref1 is ref2,
|
||||||
|
"reference object w/out callback should be re-used")
|
||||||
|
|
||||||
print "-- Reference objects with callbacks"
|
o = C()
|
||||||
o = C()
|
proxy = weakref.proxy(o)
|
||||||
o.bar = 1
|
ref1 = weakref.ref(o)
|
||||||
ref1 = weakref.ref(o, id)
|
ref2 = weakref.ref(o)
|
||||||
ref2 = weakref.ref(o, id)
|
self.assert_(ref1 is ref2,
|
||||||
del o
|
"reference object w/out callback should be re-used")
|
||||||
verify(ref1() is None,
|
self.assert_(weakref.getweakrefcount(o) == 2,
|
||||||
"expected reference to be invalidated")
|
|
||||||
verify(ref2() is None,
|
|
||||||
"expected reference to be invalidated")
|
|
||||||
|
|
||||||
|
|
||||||
print "-- Proxy objects with callbacks"
|
|
||||||
o = C()
|
|
||||||
o.bar = 1
|
|
||||||
ref1 = weakref.proxy(o, id)
|
|
||||||
ref2 = weakref.proxy(o, id)
|
|
||||||
del o
|
|
||||||
try:
|
|
||||||
ref1.bar
|
|
||||||
except weakref.ReferenceError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise TestFailed("expected ReferenceError exception")
|
|
||||||
try:
|
|
||||||
ref2.bar
|
|
||||||
except weakref.ReferenceError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise TestFailed("expected ReferenceError exception")
|
|
||||||
|
|
||||||
|
|
||||||
print "-- Re-use of weak reference objects"
|
|
||||||
print " reference objects"
|
|
||||||
|
|
||||||
o = C()
|
|
||||||
ref1 = weakref.ref(o)
|
|
||||||
# create a proxy to make sure that there's an intervening creation
|
|
||||||
# between these two; it should make no difference
|
|
||||||
proxy = weakref.proxy(o)
|
|
||||||
ref2 = weakref.ref(o)
|
|
||||||
verify(ref1 is ref2,
|
|
||||||
"reference object w/out callback should have been re-used")
|
|
||||||
|
|
||||||
o = C()
|
|
||||||
proxy = weakref.proxy(o)
|
|
||||||
ref1 = weakref.ref(o)
|
|
||||||
ref2 = weakref.ref(o)
|
|
||||||
verify(ref1 is ref2,
|
|
||||||
"reference object w/out callback should have been re-used")
|
|
||||||
verify(weakref.getweakrefcount(o) == 2,
|
|
||||||
"wrong weak ref count for object")
|
"wrong weak ref count for object")
|
||||||
del proxy
|
del proxy
|
||||||
verify(weakref.getweakrefcount(o) == 1,
|
self.assert_(weakref.getweakrefcount(o) == 1,
|
||||||
"wrong weak ref count for object after deleting proxy")
|
"wrong weak ref count for object after deleting proxy")
|
||||||
|
|
||||||
print " proxy objects"
|
def test_proxy_reuse(self):
|
||||||
|
o = C()
|
||||||
o = C()
|
proxy1 = weakref.proxy(o)
|
||||||
ref3 = weakref.proxy(o)
|
ref = weakref.ref(o)
|
||||||
ref4 = weakref.proxy(o)
|
proxy2 = weakref.proxy(o)
|
||||||
verify(ref3 is ref4,
|
self.assert_(proxy1 is proxy2,
|
||||||
"proxy object w/out callback should have been re-used")
|
"proxy object w/out callback should have been re-used")
|
||||||
|
|
||||||
|
def test_basic_proxy(self):
|
||||||
|
o = C()
|
||||||
|
self.check_proxy(o, weakref.proxy(o))
|
||||||
|
|
||||||
def clearing1(r):
|
def test_callable_proxy(self):
|
||||||
print "clearing ref 1"
|
o = Callable()
|
||||||
|
ref1 = weakref.proxy(o)
|
||||||
|
|
||||||
def clearing2(r):
|
self.check_proxy(o, ref1)
|
||||||
print "clearing ref 2"
|
|
||||||
|
|
||||||
o = C()
|
self.assert_(type(ref1) is weakref.CallableProxyType,
|
||||||
ref1 = weakref.ref(o, clearing1)
|
"proxy is not of callable type")
|
||||||
ref2 = weakref.ref(o, clearing2)
|
ref1('twinkies!')
|
||||||
verify(weakref.getweakrefcount(o) == 2,
|
self.assert_(o.bar == 'twinkies!',
|
||||||
|
"call through proxy not passed through to original")
|
||||||
|
|
||||||
|
# expect due to too few args
|
||||||
|
self.assertRaises(TypeError, ref1)
|
||||||
|
|
||||||
|
# expect due to too many args
|
||||||
|
self.assertRaises(TypeError, ref1, 1, 2, 3)
|
||||||
|
|
||||||
|
def check_proxy(self, o, proxy):
|
||||||
|
o.foo = 1
|
||||||
|
self.assert_(proxy.foo == 1,
|
||||||
|
"proxy does not reflect attribute addition")
|
||||||
|
o.foo = 2
|
||||||
|
self.assert_(proxy.foo == 2,
|
||||||
|
"proxy does not reflect attribute modification")
|
||||||
|
del o.foo
|
||||||
|
self.assert_(not hasattr(proxy, 'foo'),
|
||||||
|
"proxy does not reflect attribute removal")
|
||||||
|
|
||||||
|
proxy.foo = 1
|
||||||
|
self.assert_(o.foo == 1,
|
||||||
|
"object does not reflect attribute addition via proxy")
|
||||||
|
proxy.foo = 2
|
||||||
|
self.assert_(
|
||||||
|
o.foo == 2,
|
||||||
|
"object does not reflect attribute modification via proxy")
|
||||||
|
del proxy.foo
|
||||||
|
self.assert_(not hasattr(o, 'foo'),
|
||||||
|
"object does not reflect attribute removal via proxy")
|
||||||
|
|
||||||
|
def test_getweakrefcount(self):
|
||||||
|
o = C()
|
||||||
|
ref1 = weakref.ref(o)
|
||||||
|
ref2 = weakref.ref(o, self.callback)
|
||||||
|
self.assert_(weakref.getweakrefcount(o) == 2,
|
||||||
"got wrong number of weak reference objects")
|
"got wrong number of weak reference objects")
|
||||||
del o
|
|
||||||
|
|
||||||
o = C()
|
proxy1 = weakref.proxy(o)
|
||||||
ref1 = weakref.ref(o, clearing1)
|
proxy2 = weakref.proxy(o, self.callback)
|
||||||
ref2 = weakref.ref(o, clearing2)
|
self.assert_(weakref.getweakrefcount(o) == 4,
|
||||||
del ref1
|
"got wrong number of weak reference objects")
|
||||||
verify(weakref.getweakrefs(o) == [ref2],
|
|
||||||
|
def test_getweakrefs(self):
|
||||||
|
o = C()
|
||||||
|
ref1 = weakref.ref(o, self.callback)
|
||||||
|
ref2 = weakref.ref(o, self.callback)
|
||||||
|
del ref1
|
||||||
|
self.assert_(weakref.getweakrefs(o) == [ref2],
|
||||||
"list of refs does not match")
|
"list of refs does not match")
|
||||||
del o
|
|
||||||
|
|
||||||
o = C()
|
o = C()
|
||||||
ref1 = weakref.ref(o, clearing1)
|
ref1 = weakref.ref(o, self.callback)
|
||||||
ref2 = weakref.ref(o, clearing2)
|
ref2 = weakref.ref(o, self.callback)
|
||||||
del ref2
|
del ref2
|
||||||
verify(weakref.getweakrefs(o) == [ref1],
|
self.assert_(weakref.getweakrefs(o) == [ref1],
|
||||||
"list of refs does not match")
|
"list of refs does not match")
|
||||||
del o
|
|
||||||
|
|
||||||
print
|
|
||||||
print "Weak Valued Dictionaries"
|
|
||||||
|
|
||||||
class Object:
|
class Object:
|
||||||
def __init__(self, arg):
|
def __init__(self, arg):
|
||||||
|
@ -139,112 +204,63 @@ class Object:
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<Object %r>" % self.arg
|
return "<Object %r>" % self.arg
|
||||||
|
|
||||||
dict = weakref.mapping()
|
|
||||||
objects = map(Object, range(10))
|
class MappingTestCase(TestBase):
|
||||||
for o in objects:
|
|
||||||
|
COUNT = 10
|
||||||
|
|
||||||
|
def test_weak_values(self):
|
||||||
|
dict = weakref.mapping()
|
||||||
|
objects = map(Object, range(self.COUNT))
|
||||||
|
for o in objects:
|
||||||
dict[o.arg] = o
|
dict[o.arg] = o
|
||||||
print "objects are stored in weak dict"
|
|
||||||
for o in objects:
|
for o in objects:
|
||||||
verify(weakref.getweakrefcount(o) == 1,
|
self.assert_(weakref.getweakrefcount(o) == 1,
|
||||||
"wrong number of weak references to %r!" % o)
|
"wrong number of weak references to %r!" % o)
|
||||||
verify(o is dict[o.arg],
|
self.assert_(o is dict[o.arg],
|
||||||
"wrong object returned by weak dict!")
|
"wrong object returned by weak dict!")
|
||||||
items1 = dict.items()
|
items1 = dict.items()
|
||||||
items2 = dict.copy().items()
|
items2 = dict.copy().items()
|
||||||
items1.sort()
|
items1.sort()
|
||||||
items2.sort()
|
items2.sort()
|
||||||
verify(items1 == items2,
|
self.assert_(items1 == items2,
|
||||||
"cloning of weak-valued dictionary did not work!")
|
"cloning of weak-valued dictionary did not work!")
|
||||||
del items1, items2
|
del items1, items2
|
||||||
dict.clear()
|
self.assert_(len(dict) == self.COUNT)
|
||||||
print "weak dict test complete"
|
del objects[0]
|
||||||
|
self.assert_(len(dict) == (self.COUNT - 1),
|
||||||
|
"deleting object did not cause dictionary update")
|
||||||
|
del objects, o
|
||||||
|
self.assert_(len(dict) == 0,
|
||||||
|
"deleting the values did not clear the dictionary")
|
||||||
|
|
||||||
print
|
def test_weak_keys(self):
|
||||||
print "Weak Keyed Dictionaries"
|
dict = weakref.mapping(weakkeys=1)
|
||||||
|
objects = map(Object, range(self.COUNT))
|
||||||
dict = weakref.mapping(weakkeys=1)
|
for o in objects:
|
||||||
objects = map(Object, range(10))
|
|
||||||
for o in objects:
|
|
||||||
dict[o] = o.arg
|
dict[o] = o.arg
|
||||||
print "objects are stored in weak dict"
|
|
||||||
for o in objects:
|
for o in objects:
|
||||||
verify(weakref.getweakrefcount(o) == 1,
|
self.assert_(weakref.getweakrefcount(o) == 1,
|
||||||
"wrong number of weak references to %r!" % o)
|
"wrong number of weak references to %r!" % o)
|
||||||
verify(o.arg is dict[o],
|
self.assert_(o.arg is dict[o],
|
||||||
"wrong object returned by weak dict!")
|
"wrong object returned by weak dict!")
|
||||||
items1 = dict.items()
|
items1 = dict.items()
|
||||||
items2 = dict.copy().items()
|
items2 = dict.copy().items()
|
||||||
items1.sort()
|
items1.sort()
|
||||||
items2.sort()
|
items2.sort()
|
||||||
verify(items1 == items2,
|
self.assert_(items1 == items2,
|
||||||
"cloning of weak-keyed dictionary did not work!")
|
"cloning of weak-keyed dictionary did not work!")
|
||||||
del items1, items2
|
del items1, items2
|
||||||
del objects, o
|
self.assert_(len(dict) == self.COUNT)
|
||||||
verify(len(dict)==0, "deleting the keys did not clear the dictionary")
|
del objects[0]
|
||||||
print "weak key dict test complete"
|
self.assert_(len(dict) == (self.COUNT - 1),
|
||||||
|
"deleting object did not cause dictionary update")
|
||||||
|
del objects, o
|
||||||
|
self.assert_(len(dict) == 0,
|
||||||
|
"deleting the keys did not clear the dictionary")
|
||||||
|
|
||||||
|
|
||||||
print
|
run_unittest(ReferencesTestCase)
|
||||||
print "Non-callable Proxy References"
|
run_unittest(MappingTestCase)
|
||||||
print "XXX -- tests not written!"
|
|
||||||
|
|
||||||
|
|
||||||
def test_proxy(o, proxy):
|
|
||||||
o.foo = 1
|
|
||||||
verify(proxy.foo == 1,
|
|
||||||
"proxy does not reflect attribute addition")
|
|
||||||
o.foo = 2
|
|
||||||
verify(proxy.foo == 2,
|
|
||||||
"proxy does not reflect attribute modification")
|
|
||||||
del o.foo
|
|
||||||
verify(not hasattr(proxy, 'foo'),
|
|
||||||
"proxy does not reflect attribute removal")
|
|
||||||
|
|
||||||
proxy.foo = 1
|
|
||||||
verify(o.foo == 1,
|
|
||||||
"object does not reflect attribute addition via proxy")
|
|
||||||
proxy.foo = 2
|
|
||||||
verify(o.foo == 2,
|
|
||||||
"object does not reflect attribute modification via proxy")
|
|
||||||
del proxy.foo
|
|
||||||
verify(not hasattr(o, 'foo'),
|
|
||||||
"object does not reflect attribute removal via proxy")
|
|
||||||
|
|
||||||
|
|
||||||
o = C()
|
|
||||||
test_proxy(o, weakref.proxy(o))
|
|
||||||
|
|
||||||
print
|
|
||||||
print "Callable Proxy References"
|
|
||||||
|
|
||||||
class Callable:
|
|
||||||
bar = None
|
|
||||||
def __call__(self, x):
|
|
||||||
self.bar = x
|
|
||||||
|
|
||||||
o = Callable()
|
|
||||||
ref1 = weakref.proxy(o)
|
|
||||||
|
|
||||||
test_proxy(o, ref1)
|
|
||||||
|
|
||||||
verify(type(ref1) is weakref.CallableProxyType,
|
|
||||||
"proxy is not of callable type")
|
|
||||||
ref1('twinkies!')
|
|
||||||
verify(o.bar == 'twinkies!',
|
|
||||||
"call through proxy not passed through to original")
|
|
||||||
|
|
||||||
try:
|
|
||||||
ref1()
|
|
||||||
except TypeError:
|
|
||||||
# expect due to too few args
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise TestFailed("did not catch expected TypeError -- too few args")
|
|
||||||
|
|
||||||
try:
|
|
||||||
ref1(1, 2, 3)
|
|
||||||
except TypeError:
|
|
||||||
# expect due to too many args
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise TestFailed("did not catch expected TypeError -- too many args")
|
|
||||||
|
|
Loading…
Reference in New Issue