Merge r58434:

Fixes http://bugs.python.org/issue1233 - bsddb.dbshelve.DBShelf.append
was useless due to inverted logic.  Also adds a test case for RECNO dbs
to test_dbshelve.
This commit is contained in:
Gregory P. Smith 2007-10-12 19:13:19 +00:00
parent 041683df42
commit 5c5f1703e5
2 changed files with 79 additions and 23 deletions

View File

@ -77,6 +77,9 @@ def open(filename, flags=db.DB_CREATE, mode=0o660, filetype=db.DB_HASH,
#--------------------------------------------------------------------------- #---------------------------------------------------------------------------
class DBShelveError(db.DBError): pass
class DBShelf(DictMixin): class DBShelf(DictMixin):
"""A shelf to hold pickled objects, built upon a bsddb DB object. It """A shelf to hold pickled objects, built upon a bsddb DB object. It
automatically pickles/unpickles data objects going to/from the DB. automatically pickles/unpickles data objects going to/from the DB.
@ -152,10 +155,10 @@ class DBShelf(DictMixin):
return self.db.append(data, txn) return self.db.append(data, txn)
def append(self, value, txn=None): def append(self, value, txn=None):
if self.get_type() != db.DB_RECNO: if self.get_type() == db.DB_RECNO:
self.append = self.__append self.append = self.__append
return self.append(value, txn=txn) return self.append(value, txn=txn)
raise db.DBError("append() only supported when dbshelve opened with filetype=dbshelve.db.DB_RECNO") raise DBShelveError("append() only supported when dbshelve opened with filetype=dbshelve.db.DB_RECNO")
def associate(self, secondaryDB, callback, flags=0): def associate(self, secondaryDB, callback, flags=0):

View File

@ -50,17 +50,22 @@ class DBShelveTestCase(unittest.TestCase):
except os.error: except os.error:
pass pass
def mk(self, key):
"""Turn key into an appropriate key type for this db"""
# override in child class for RECNO
return key.encode("ascii")
def populateDB(self, d): def populateDB(self, d):
for x in letters: for x in letters:
d[('S' + x).encode("ascii")] = 10 * x # add a string d[self.mk('S' + x)] = 10 * x # add a string
d[('I' + x).encode("ascii")] = ord(x) # add an integer d[self.mk('I' + x)] = ord(x) # add an integer
d[('L' + x).encode("ascii")] = [x] * 10 # add a list d[self.mk('L' + x)] = [x] * 10 # add a list
inst = DataClass() # add an instance inst = DataClass() # add an instance
inst.S = 10 * x inst.S = 10 * x
inst.I = ord(x) inst.I = ord(x)
inst.L = [x] * 10 inst.L = [x] * 10
d[('O' + x).encode("ascii")] = inst d[self.mk('O' + x)] = inst
# overridable in derived classes to affect how the shelf is created/opened # overridable in derived classes to affect how the shelf is created/opened
@ -100,14 +105,14 @@ class DBShelveTestCase(unittest.TestCase):
print("keys:", k) print("keys:", k)
print("stats:", s) print("stats:", s)
self.assertFalse(d.has_key(b'bad key')) self.assertFalse(d.has_key(self.mk('bad key')))
self.assertTrue(d.has_key(b'IA'), d.keys()) self.assertTrue(d.has_key(self.mk('IA')), d.keys())
self.assertTrue(d.has_key(b'OA')) self.assertTrue(d.has_key(self.mk('OA')))
d.delete(b'IA') d.delete(self.mk('IA'))
del d[b'OA'] del d[self.mk('OA')]
self.assertFalse(d.has_key(b'IA')) self.assertFalse(d.has_key(self.mk('IA')))
self.assertFalse(d.has_key(b'OA')) self.assertFalse(d.has_key(self.mk('OA')))
self.assertEqual(len(d), l-2) self.assertEqual(len(d), l-2)
values = [] values = []
@ -129,18 +134,18 @@ class DBShelveTestCase(unittest.TestCase):
for key, value in items: for key, value in items:
self.checkrec(key, value) self.checkrec(key, value)
self.assertEqual(d.get(b'bad key'), None) self.assertEqual(d.get(self.mk('bad key')), None)
self.assertEqual(d.get(b'bad key', None), None) self.assertEqual(d.get(self.mk('bad key'), None), None)
self.assertEqual(d.get(b'bad key', b'a string'), b'a string') self.assertEqual(d.get(self.mk('bad key'), b'a string'), b'a string')
self.assertEqual(d.get(b'bad key', [1, 2, 3]), [1, 2, 3]) self.assertEqual(d.get(self.mk('bad key'), [1, 2, 3]), [1, 2, 3])
d.set_get_returns_none(0) d.set_get_returns_none(0)
self.assertRaises(db.DBNotFoundError, d.get, b'bad key') self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key'))
d.set_get_returns_none(1) d.set_get_returns_none(1)
d.put(b'new key', b'new data') d.put(self.mk('new key'), b'new data')
self.assertEqual(d.get(b'new key'), b'new data') self.assertEqual(d.get(self.mk('new key')), b'new data')
self.assertEqual(d[b'new key'], b'new data') self.assertEqual(d[self.mk('new key')], b'new data')
@ -179,12 +184,23 @@ class DBShelveTestCase(unittest.TestCase):
self.assertEqual(count, len(d)) self.assertEqual(count, len(d))
c.set(b'SS') c.set(self.mk('SS'))
key, value = c.current() key, value = c.current()
self.checkrec(key, value) self.checkrec(key, value)
del c del c
def test03_append(self):
# NOTE: this is overridden in RECNO subclass, don't change its name.
if verbose:
print('\n', '-=' * 30)
print("Running %s.test03_append..." % self.__class__.__name__)
self.assertRaises(dbshelve.DBShelveError,
self.d.append, b'unit test was here')
def checkrec(self, key, value): def checkrec(self, key, value):
# override this in a subclass if the key type is different
x = key[1:] x = key[1:]
if key[0:1] == b'S': if key[0:1] == b'S':
self.assertEquals(type(value), str) self.assertEquals(type(value), str)
@ -289,7 +305,43 @@ class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
# TODO: Add test cases for a DBShelf in a RECNO DB. # test cases for a DBShelf in a RECNO DB.
class RecNoShelveTestCase(BasicShelveTestCase):
dbtype = db.DB_RECNO
dbflags = db.DB_CREATE
def setUp(self):
BasicShelveTestCase.setUp(self)
# pool to assign integer key values out of
self.key_pool = list(range(1, 5000))
self.key_map = {} # map string keys to the number we gave them
self.intkey_map = {} # reverse map of above
def mk(self, key):
if key not in self.key_map:
self.key_map[key] = self.key_pool.pop(0)
self.intkey_map[self.key_map[key]] = key.encode('ascii')
return self.key_map[key]
def checkrec(self, intkey, value):
key = self.intkey_map[intkey]
BasicShelveTestCase.checkrec(self, key, value)
def test03_append(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test03_append..." % self.__class__.__name__)
self.d[1] = b'spam'
self.d[5] = b'eggs'
self.assertEqual(6, self.d.append(b'spam'))
self.assertEqual(7, self.d.append(b'baked beans'))
self.assertEqual(b'spam', self.d.get(6))
self.assertEqual(b'spam', self.d.get(1))
self.assertEqual(b'baked beans', self.d.get(7))
self.assertEqual(b'eggs', self.d.get(5))
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@ -306,6 +358,7 @@ def test_suite():
suite.addTest(unittest.makeSuite(EnvHashShelveTestCase)) suite.addTest(unittest.makeSuite(EnvHashShelveTestCase))
suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase)) suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase))
suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase)) suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase))
suite.addTest(unittest.makeSuite(RecNoShelveTestCase))
return suite return suite