Issue #26486: Backported some pickle tests from 3.x.

This commit is contained in:
Serhiy Storchaka 2016-03-06 09:05:47 +02:00
parent e41dc4f7bc
commit 7c033e60df
1 changed files with 208 additions and 4 deletions

View File

@ -60,6 +60,21 @@ def run_with_locale(catstr, *locales):
return inner
return decorator
def no_tracing(func):
"""Decorator to temporarily turn off tracing for the duration of a test."""
if not hasattr(sys, 'gettrace'):
return func
else:
def wrapper(*args, **kwargs):
original_trace = sys.gettrace()
try:
sys.settrace(None)
return func(*args, **kwargs)
finally:
sys.settrace(original_trace)
wrapper.__name__ = func.__name__
return wrapper
# Return True if opcode code appears in the pickle, else False.
def opcode_in_pickle(code, pickle):
@ -76,6 +91,16 @@ def count_opcode(code, pickle):
n += 1
return n
class UnseekableIO(StringIO.StringIO):
def peek(self, *args):
raise NotImplementedError
def seek(self, *args):
raise NotImplementedError
def tell(self):
raise NotImplementedError
# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state. Code
# should do this:
@ -922,6 +947,7 @@ class AbstractPickleTests(unittest.TestCase):
for proto in protocols:
s = self.dumps(i, proto)
x = self.loads(s)
self.assertIsInstance(x, C)
self.assertEqual(dir(x), dir(i))
self.assertIs(x.attr, x)
@ -934,6 +960,7 @@ class AbstractPickleTests(unittest.TestCase):
for proto in protocols:
s = self.dumps(l, proto)
x = self.loads(s)
self.assertIsInstance(x, list)
self.assertEqual(len(x), 1)
self.assertEqual(dir(x[0]), dir(i))
self.assertEqual(x[0].attr.keys(), [1])
@ -978,7 +1005,9 @@ class AbstractPickleTests(unittest.TestCase):
if have_unicode:
def test_unicode(self):
endcases = [u'', u'<\\u>', u'<\\\u1234>', u'<\n>',
u'<\\>', u'<\\\U00012345>']
u'<\\>', u'<\\\U00012345>',
# surrogates
u'<\udc80>']
for proto in protocols:
for u in endcases:
p = self.dumps(u, proto)
@ -1057,6 +1086,7 @@ class AbstractPickleTests(unittest.TestCase):
s = self.dumps(a, proto)
b = self.loads(s)
self.assertEqual(a, b)
self.assertIs(a.__class__, b.__class__)
def test_structseq(self):
import time
@ -1202,6 +1232,26 @@ class AbstractPickleTests(unittest.TestCase):
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
import weakref
classes = myclasses[:]
# Cannot create weakproxies to these classes
for c in (MyInt, MyLong, MyStr, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
B = C.__base__
x = C(C.sample)
x.foo = 42
p = weakref.proxy(x)
s = self.dumps(p, proto)
y = self.loads(s)
self.assertEqual(type(y), type(x)) # rather than type(p)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)
# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.
@ -1296,10 +1346,30 @@ class AbstractPickleTests(unittest.TestCase):
self.assertTrue(num_setitems >= 2)
def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x = SimpleNewObj.__new__(SimpleNewObj, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
if proto < 1:
self.assertIn('\nI64206', s) # INT
else:
self.assertIn('M\xce\xfa', s) # BININT2
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)
def test_complex_newobj(self):
x = ComplexNewObj.__new__(ComplexNewObj, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
if proto < 1:
self.assertIn('\nI64206', s) # INT
elif proto < 2:
self.assertIn('M\xce\xfa', s) # BININT2
else:
self.assertIn('U\x04FACE', s) # SHORT_BINSTRING
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
@ -1361,6 +1431,13 @@ class AbstractPickleTests(unittest.TestCase):
y = self.loads(s)
self.assertEqual(y._reduce_called, 1)
@no_tracing
def test_bad_getattr(self):
# Issue #3514: crash when there is an infinite loop in __getattr__
x = BadGetattr()
for proto in protocols:
self.assertRaises(RuntimeError, self.dumps, x, proto)
def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
@ -1413,6 +1490,50 @@ class AbstractPickleTests(unittest.TestCase):
for x_key, y_key in zip(x_keys, y_keys):
self.assertIs(x_key, y_key)
def test_large_pickles(self):
# Test the correctness of internal buffering routines when handling
# large data.
for proto in protocols:
data = (1, min, 'xy' * (30 * 1024), len)
dumped = self.dumps(data, proto)
loaded = self.loads(dumped)
self.assertEqual(len(loaded), len(data))
self.assertEqual(loaded, data)
def test_int_pickling_efficiency(self):
# Test compacity of int representation (see issue #12744)
for proto in protocols:
pickles = [self.dumps(2**n, proto) for n in xrange(0, 71, 5)]
sizes = list(map(len, pickles))
# the size function is monotonic
self.assertEqual(sorted(sizes), sizes)
if proto >= 2:
for p in pickles:
self.assertFalse(opcode_in_pickle(pickle.LONG, p))
def _check_pickling_with_opcode(self, obj, opcode, proto):
pickled = self.dumps(obj, proto)
self.assertTrue(opcode_in_pickle(opcode, pickled))
unpickled = self.loads(pickled)
self.assertEqual(obj, unpickled)
def test_appends_on_non_lists(self):
# Issue #17720
obj = REX_six([1, 2, 3])
for proto in protocols:
if proto == 0:
self._check_pickling_with_opcode(obj, pickle.APPEND, proto)
else:
self._check_pickling_with_opcode(obj, pickle.APPENDS, proto)
def test_setitems_on_non_dicts(self):
obj = REX_seven({1: -1, 2: -2, 3: -3})
for proto in protocols:
if proto == 0:
self._check_pickling_with_opcode(obj, pickle.SETITEM, proto)
else:
self._check_pickling_with_opcode(obj, pickle.SETITEMS, proto)
# Test classes for reduce_ex
@ -1452,6 +1573,39 @@ class REX_five(object):
return object.__reduce__(self)
# This one used to fail with infinite recursion
class REX_six(object):
"""This class is used to check the 4th argument (list iterator) of
the reduce protocol.
"""
def __init__(self, items=None):
if items is None:
items = []
self.items = items
def __eq__(self, other):
return type(self) is type(other) and self.items == other.items
def append(self, item):
self.items.append(item)
def extend(self, items):
for item in items:
self.append(item)
def __reduce__(self):
return type(self), (), None, iter(self.items), None
class REX_seven(object):
"""This class is used to check the 5th argument (dict iterator) of
the reduce protocol.
"""
def __init__(self, table=None):
if table is None:
table = {}
self.table = table
def __eq__(self, other):
return type(self) is type(other) and self.table == other.table
def __setitem__(self, key, value):
self.table[key] = value
def __reduce__(self):
return type(self), (), None, None, iter(self.table.items())
# Test classes for newobj
class MyInt(int):
@ -1490,10 +1644,20 @@ myclasses = [MyInt, MyLong, MyFloat,
class SlotList(MyList):
__slots__ = ["foo"]
class SimpleNewObj(object):
def __init__(self, a, b, c):
class SimpleNewObj(int):
def __init__(self, *args, **kwargs):
# raise an error, to make sure this isn't called
raise TypeError("SimpleNewObj.__init__() didn't expect to get called")
def __eq__(self, other):
return int(self) == int(other) and self.__dict__ == other.__dict__
class ComplexNewObj(SimpleNewObj):
def __getnewargs__(self):
return ('%X' % self, 16)
class BadGetattr:
def __getattr__(self, key):
self.foo
class AbstractPickleModuleTests(unittest.TestCase):
@ -1708,6 +1872,46 @@ class AbstractPicklerUnpicklerObjectTests(unittest.TestCase):
f.seek(0)
self.assertEqual(unpickler.load(), data2)
def _check_multiple_unpicklings(self, ioclass, seekable):
for proto in protocols:
data1 = [(x, str(x)) for x in xrange(2000)] + ["abcde", len]
f = ioclass()
pickler = self.pickler_class(f, protocol=proto)
pickler.dump(data1)
pickled = f.getvalue()
N = 5
f = ioclass(pickled * N)
unpickler = self.unpickler_class(f)
for i in xrange(N):
if seekable:
pos = f.tell()
self.assertEqual(unpickler.load(), data1)
if seekable:
self.assertEqual(f.tell(), pos + len(pickled))
self.assertRaises(EOFError, unpickler.load)
def test_multiple_unpicklings_seekable(self):
self._check_multiple_unpicklings(StringIO.StringIO, True)
def test_multiple_unpicklings_unseekable(self):
self._check_multiple_unpicklings(UnseekableIO, False)
def test_unpickling_buffering_readline(self):
# Issue #12687: the unpickler's buffering logic could fail with
# text mode opcodes.
import io
data = list(xrange(10))
for proto in protocols:
for buf_size in xrange(1, 11):
f = io.BufferedRandom(io.BytesIO(), buffer_size=buf_size)
pickler = self.pickler_class(f, protocol=proto)
pickler.dump(data)
f.seek(0)
unpickler = self.unpickler_class(f)
self.assertEqual(unpickler.load(), data)
class BigmemPickleTests(unittest.TestCase):
# Memory requirements: 1 byte per character for input strings, 1 byte