mirror of https://github.com/python/cpython
225 lines
7.4 KiB
Python
225 lines
7.4 KiB
Python
import pickle
|
|
import io
|
|
import collections
|
|
import struct
|
|
import sys
|
|
|
|
import unittest
|
|
from test import support
|
|
|
|
from test.pickletester import AbstractPickleTests
|
|
from test.pickletester import AbstractPickleModuleTests
|
|
from test.pickletester import AbstractPersistentPicklerTests
|
|
from test.pickletester import AbstractPicklerUnpicklerObjectTests
|
|
from test.pickletester import AbstractDispatchTableTests
|
|
from test.pickletester import BigmemPickleTests
|
|
|
|
try:
|
|
import _pickle
|
|
has_c_implementation = True
|
|
except ImportError:
|
|
has_c_implementation = False
|
|
|
|
|
|
class PickleTests(AbstractPickleModuleTests):
|
|
pass
|
|
|
|
|
|
class PyPicklerTests(AbstractPickleTests):
|
|
|
|
pickler = pickle._Pickler
|
|
unpickler = pickle._Unpickler
|
|
|
|
def dumps(self, arg, proto=None):
|
|
f = io.BytesIO()
|
|
p = self.pickler(f, proto)
|
|
p.dump(arg)
|
|
f.seek(0)
|
|
return bytes(f.read())
|
|
|
|
def loads(self, buf, **kwds):
|
|
f = io.BytesIO(buf)
|
|
u = self.unpickler(f, **kwds)
|
|
return u.load()
|
|
|
|
|
|
class InMemoryPickleTests(AbstractPickleTests, BigmemPickleTests):
|
|
|
|
pickler = pickle._Pickler
|
|
unpickler = pickle._Unpickler
|
|
|
|
def dumps(self, arg, protocol=None):
|
|
return pickle.dumps(arg, protocol)
|
|
|
|
def loads(self, buf, **kwds):
|
|
return pickle.loads(buf, **kwds)
|
|
|
|
|
|
class PyPersPicklerTests(AbstractPersistentPicklerTests):
|
|
|
|
pickler = pickle._Pickler
|
|
unpickler = pickle._Unpickler
|
|
|
|
def dumps(self, arg, proto=None):
|
|
class PersPickler(self.pickler):
|
|
def persistent_id(subself, obj):
|
|
return self.persistent_id(obj)
|
|
f = io.BytesIO()
|
|
p = PersPickler(f, proto)
|
|
p.dump(arg)
|
|
f.seek(0)
|
|
return f.read()
|
|
|
|
def loads(self, buf, **kwds):
|
|
class PersUnpickler(self.unpickler):
|
|
def persistent_load(subself, obj):
|
|
return self.persistent_load(obj)
|
|
f = io.BytesIO(buf)
|
|
u = PersUnpickler(f, **kwds)
|
|
return u.load()
|
|
|
|
|
|
class PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
|
|
|
|
pickler_class = pickle._Pickler
|
|
unpickler_class = pickle._Unpickler
|
|
|
|
|
|
class PyDispatchTableTests(AbstractDispatchTableTests):
|
|
|
|
pickler_class = pickle._Pickler
|
|
|
|
def get_dispatch_table(self):
|
|
return pickle.dispatch_table.copy()
|
|
|
|
|
|
class PyChainDispatchTableTests(AbstractDispatchTableTests):
|
|
|
|
pickler_class = pickle._Pickler
|
|
|
|
def get_dispatch_table(self):
|
|
return collections.ChainMap({}, pickle.dispatch_table)
|
|
|
|
|
|
if has_c_implementation:
|
|
class CPicklerTests(PyPicklerTests):
|
|
pickler = _pickle.Pickler
|
|
unpickler = _pickle.Unpickler
|
|
|
|
class CPersPicklerTests(PyPersPicklerTests):
|
|
pickler = _pickle.Pickler
|
|
unpickler = _pickle.Unpickler
|
|
|
|
class CDumpPickle_LoadPickle(PyPicklerTests):
|
|
pickler = _pickle.Pickler
|
|
unpickler = pickle._Unpickler
|
|
|
|
class DumpPickle_CLoadPickle(PyPicklerTests):
|
|
pickler = pickle._Pickler
|
|
unpickler = _pickle.Unpickler
|
|
|
|
class CPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
|
|
pickler_class = _pickle.Pickler
|
|
unpickler_class = _pickle.Unpickler
|
|
|
|
def test_issue18339(self):
|
|
unpickler = self.unpickler_class(io.BytesIO())
|
|
with self.assertRaises(TypeError):
|
|
unpickler.memo = object
|
|
# used to cause a segfault
|
|
with self.assertRaises(ValueError):
|
|
unpickler.memo = {-1: None}
|
|
unpickler.memo = {1: None}
|
|
|
|
class CDispatchTableTests(AbstractDispatchTableTests):
|
|
pickler_class = pickle.Pickler
|
|
def get_dispatch_table(self):
|
|
return pickle.dispatch_table.copy()
|
|
|
|
class CChainDispatchTableTests(AbstractDispatchTableTests):
|
|
pickler_class = pickle.Pickler
|
|
def get_dispatch_table(self):
|
|
return collections.ChainMap({}, pickle.dispatch_table)
|
|
|
|
@support.cpython_only
|
|
class SizeofTests(unittest.TestCase):
|
|
check_sizeof = support.check_sizeof
|
|
|
|
def test_pickler(self):
|
|
basesize = support.calcobjsize('5P2n3i2n3iP')
|
|
p = _pickle.Pickler(io.BytesIO())
|
|
self.assertEqual(object.__sizeof__(p), basesize)
|
|
MT_size = struct.calcsize('3nP0n')
|
|
ME_size = struct.calcsize('Pn0P')
|
|
check = self.check_sizeof
|
|
check(p, basesize +
|
|
MT_size + 8 * ME_size + # Minimal memo table size.
|
|
sys.getsizeof(b'x'*4096)) # Minimal write buffer size.
|
|
for i in range(6):
|
|
p.dump(chr(i))
|
|
check(p, basesize +
|
|
MT_size + 32 * ME_size + # Size of memo table required to
|
|
# save references to 6 objects.
|
|
0) # Write buffer is cleared after every dump().
|
|
|
|
def test_unpickler(self):
|
|
basesize = support.calcobjsize('2Pn2P 2P2n2i5P 2P3n6P2n2i')
|
|
unpickler = _pickle.Unpickler
|
|
P = struct.calcsize('P') # Size of memo table entry.
|
|
n = struct.calcsize('n') # Size of mark table entry.
|
|
check = self.check_sizeof
|
|
for encoding in 'ASCII', 'UTF-16', 'latin-1':
|
|
for errors in 'strict', 'replace':
|
|
u = unpickler(io.BytesIO(),
|
|
encoding=encoding, errors=errors)
|
|
self.assertEqual(object.__sizeof__(u), basesize)
|
|
check(u, basesize +
|
|
32 * P + # Minimal memo table size.
|
|
len(encoding) + 1 + len(errors) + 1)
|
|
|
|
stdsize = basesize + len('ASCII') + 1 + len('strict') + 1
|
|
def check_unpickler(data, memo_size, marks_size):
|
|
dump = pickle.dumps(data)
|
|
u = unpickler(io.BytesIO(dump),
|
|
encoding='ASCII', errors='strict')
|
|
u.load()
|
|
check(u, stdsize + memo_size * P + marks_size * n)
|
|
|
|
check_unpickler(0, 32, 0)
|
|
# 20 is minimal non-empty mark stack size.
|
|
check_unpickler([0] * 100, 32, 20)
|
|
# 128 is memo table size required to save references to 100 objects.
|
|
check_unpickler([chr(i) for i in range(100)], 128, 20)
|
|
def recurse(deep):
|
|
data = 0
|
|
for i in range(deep):
|
|
data = [data, data]
|
|
return data
|
|
check_unpickler(recurse(0), 32, 0)
|
|
check_unpickler(recurse(1), 32, 20)
|
|
check_unpickler(recurse(20), 32, 58)
|
|
check_unpickler(recurse(50), 64, 58)
|
|
check_unpickler(recurse(100), 128, 134)
|
|
|
|
u = unpickler(io.BytesIO(pickle.dumps('a', 0)),
|
|
encoding='ASCII', errors='strict')
|
|
u.load()
|
|
check(u, stdsize + 32 * P + 2 + 1)
|
|
|
|
|
|
def test_main():
|
|
tests = [PickleTests, PyPicklerTests, PyPersPicklerTests,
|
|
PyDispatchTableTests, PyChainDispatchTableTests]
|
|
if has_c_implementation:
|
|
tests.extend([CPicklerTests, CPersPicklerTests,
|
|
CDumpPickle_LoadPickle, DumpPickle_CLoadPickle,
|
|
PyPicklerUnpicklerObjectTests,
|
|
CPicklerUnpicklerObjectTests,
|
|
CDispatchTableTests, CChainDispatchTableTests,
|
|
InMemoryPickleTests, SizeofTests])
|
|
support.run_unittest(*tests)
|
|
support.run_doctest(pickle)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|