some test suite cleanup, use tempfile.mkdtemp() in setUp and

shutil.rmtree() in tearDown().  add missing tests to the list
in the test_bsddb3 suite.
This commit is contained in:
Gregory P. Smith 2007-08-28 08:05:56 +00:00
parent a280ca7594
commit 3fd22da612
22 changed files with 180 additions and 229 deletions

View File

@ -10,7 +10,7 @@
# software has been tested, but no warranty is expressed or
# implied.
#
# -- Gregory P. Smith <greg@electricrain.com>
# -- Gregory P. Smith <greg@krypto.org>
# This provides a simple database table interface built on top of
# the Python BerkeleyDB 3 interface.

View File

@ -9,7 +9,7 @@
# software has been tested, but no warranty is expressed or
# implied.
#
# Author: Gregory P. Smith <greg@electricrain.com>
# Author: Gregory P. Smith <greg@krypto.org>
#
# Note: I don't know how useful this is in reality since when a
# DBLockDeadlockError happens the current transaction is supposed to be
@ -19,13 +19,7 @@
#
#------------------------------------------------------------------------
#
# import the time.sleep function in a namespace safe way to allow
# "from bsddb.dbutils import *"
#
from time import sleep as _sleep
import time
from . import db
# always sleep at least N seconds between retrys
@ -60,17 +54,22 @@ def DeadlockWrap(function, *_args, **_kwargs):
while True:
try:
return function(*_args, **_kwargs)
except db.DBLockDeadlockError:
except db.DBLockDeadlockError as e:
if _deadlock_VerboseFile:
_deadlock_VerboseFile.write(
'dbutils.DeadlockWrap: sleeping %1.3f\n' % sleeptime)
_sleep(sleeptime)
'bsddb.dbutils.DeadlockWrap: ' +
'sleeping %1.3f\n' % sleeptime)
time.sleep(sleeptime)
# exponential backoff in the sleep time
sleeptime *= 2
if sleeptime > _deadlock_MaxSleepTime:
sleeptime = _deadlock_MaxSleepTime
max_retries -= 1
if max_retries == -1:
if _deadlock_VerboseFile:
_deadlock_VerboseFile.write(
'bsddb.dbutils.DeadlockWrap: ' +
'max_retries reached, reraising %s\n' % e)
raise

View File

@ -11,13 +11,13 @@ except ImportError:
# For Python 2.3
from bsddb import db
verbose = 0
verbose = False
if 'verbose' in sys.argv:
verbose = 1
verbose = True
sys.argv.remove('verbose')
if 'silent' in sys.argv: # take care of old flag, just in case
verbose = 0
verbose = False
sys.argv.remove('silent')

View File

@ -2,6 +2,7 @@
TestCases for DB.associate.
"""
import shutil
import sys, os
import tempfile
import time
@ -14,7 +15,7 @@ except ImportError:
have_threads = 0
import unittest
from .test_all import verbose
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
@ -91,25 +92,14 @@ musicdata = {
class AssociateErrorTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try:
os.mkdir(homeDir)
except os.error:
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
def tearDown(self):
self.env.close()
self.env = None
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test00_associateDBError(self):
@ -151,27 +141,16 @@ class AssociateTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try:
os.mkdir(homeDir)
except os.error:
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags)
def tearDown(self):
self.closeDB()
self.env.close()
self.env = None
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def addDataToDB(self, d, txn=None):
for key, value in musicdata.items():

View File

@ -20,7 +20,7 @@ except ImportError:
# For Python 2.3
from bsddb import db
from .test_all import verbose
from bsddb.test.test_all import verbose
DASH = b'-'
letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
@ -54,27 +54,21 @@ class BasicTestCase(unittest.TestCase):
def setUp(self):
if self.useEnv:
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try:
shutil.rmtree(homeDir)
except OSError as e:
# unix returns ENOENT, windows returns ESRCH
if e.errno not in (errno.ENOENT, errno.ESRCH): raise
os.mkdir(homeDir)
self.homeDir = tempfile.mkdtemp()
try:
self.env = db.DBEnv()
self.env.set_lg_max(1024*1024)
self.env.set_tx_max(30)
self.env.set_tx_timestamp(int(time.time()))
self.env.set_flags(self.envsetflags, 1)
self.env.open(homeDir, self.envflags | db.DB_CREATE)
tempfile.tempdir = homeDir
self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
old_tempfile_tempdir = tempfile.tempdir
tempfile.tempdir = self.homeDir
self.filename = os.path.split(tempfile.mktemp())[1]
tempfile.tempdir = None
tempfile.tempdir = old_tempfile_tempdir
# Yes, a bare except is intended, since we're re-raising the exc.
except:
shutil.rmtree(homeDir)
shutil.rmtree(self.homeDir)
raise
else:
self.env = None

View File

@ -2,10 +2,10 @@
TestCases for python DB Btree key comparison function.
"""
import shutil
import sys, os, re
from io import StringIO
from . import test_all
import tempfile
import unittest
try:
@ -18,15 +18,18 @@ except ImportError:
lexical_cmp = cmp
def lowercase_cmp(left, right):
return cmp (left.lower(), right.lower())
return cmp (str(left, encoding='ascii').lower(),
str(right, encoding='ascii').lower())
def make_reverse_comparator (cmp):
def reverse (left, right, delegate=cmp):
return - delegate (left, right)
return reverse
_expected_lexical_test_data = ['', 'CCCP', 'a', 'aaa', 'b', 'c', 'cccce', 'ccccf']
_expected_lowercase_test_data = ['', 'a', 'aaa', 'b', 'c', 'CC', 'cccce', 'ccccf', 'CCCP']
_expected_lexical_test_data = [bytes(_) for _ in
('', 'CCCP', 'a', 'aaa', 'b', 'c', 'cccce', 'ccccf')]
_expected_lowercase_test_data = [bytes(_) for _ in
('', 'a', 'aaa', 'b', 'c', 'CC', 'cccce', 'ccccf', 'CCCP')]
class ComparatorTests (unittest.TestCase):
def comparator_test_helper (self, comparator, expected_data):
@ -52,15 +55,10 @@ class AbstractBtreeKeyCompareTestCase (unittest.TestCase):
def setUp (self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join (os.path.dirname (sys.argv[0]), 'db_home')
self.homeDir = homeDir
try:
os.mkdir (homeDir)
except os.error:
pass
self.homeDir = tempfile.mkdtemp()
env = db.DBEnv ()
env.open (homeDir,
env.open (self.homeDir,
db.DB_CREATE | db.DB_INIT_MPOOL
| db.DB_INIT_LOCK | db.DB_THREAD)
self.env = env
@ -70,8 +68,7 @@ class AbstractBtreeKeyCompareTestCase (unittest.TestCase):
if self.env is not None:
self.env.close ()
self.env = None
import glob
map (os.remove, glob.glob (os.path.join (self.homeDir, '*')))
shutil.rmtree(self.homeDir)
def addDataToDB (self, data):
i = 0
@ -110,7 +107,7 @@ class AbstractBtreeKeyCompareTestCase (unittest.TestCase):
self.failUnless (index < len (expected),
"to many values returned from cursor")
self.failUnless (expected[index] == key,
"expected value `%s' at %d but got `%s'"
"expected value %r at %d but got %r"
% (expected[index], index, key))
index = index + 1
rec = curs.next ()
@ -140,10 +137,10 @@ class BtreeKeyCompareTestCase (AbstractBtreeKeyCompareTestCase):
def socialist_comparator (l, r):
return 0
self.createDB (socialist_comparator)
self.addDataToDB (['b', 'a', 'd'])
self.addDataToDB ([b'b', b'a', b'd'])
# all things being equal the first key will be the only key
# in the database... (with the last key's value fwiw)
self.finishTest (['b'])
self.finishTest ([b'b'])
class BtreeExceptionsTestCase (AbstractBtreeKeyCompareTestCase):
@ -247,4 +244,4 @@ def test_suite ():
return res
if __name__ == '__main__':
unittest.main (defaultTest = 'suite')
unittest.main (defaultTest = 'test_suite')

View File

@ -7,7 +7,7 @@ import sys, os
import unittest
import tempfile
from .test_all import verbose
from bsddb.test.test_all import verbose
from bsddb import db, hashopen, btopen, rnopen

View File

@ -1,5 +1,7 @@
import unittest
import sys, os, glob
import shutil
import tempfile
from bsddb import db
@ -11,11 +13,7 @@ class pget_bugTestCase(unittest.TestCase):
db_name = 'test-cursor_pget.db'
def setUp(self):
self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
try:
os.mkdir(self.homeDir)
except os.error:
pass
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
self.primary_db = db.DB(self.env)
@ -36,9 +34,7 @@ class pget_bugTestCase(unittest.TestCase):
del self.secondary_db
del self.primary_db
del self.env
for file in glob.glob(os.path.join(self.homeDir, '*')):
os.remove(file)
os.removedirs(self.homeDir)
shutil.rmtree(self.homeDir)
def test_pget(self):
cursor = self.secondary_db.cursor()

View File

@ -1,4 +1,5 @@
import shutil
import sys, os
import unittest
import glob
@ -16,23 +17,17 @@ except ImportError:
class dbobjTestCase(unittest.TestCase):
"""Verify that dbobj.DB and dbobj.DBEnv work properly"""
db_home = 'db_home'
db_name = 'test-dbobj.db'
def setUp(self):
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.homeDir = tempfile.mkdtemp()
def tearDown(self):
if hasattr(self, 'db'):
del self.db
if hasattr(self, 'env'):
del self.env
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test01_both(self):
class TestDBEnv(dbobj.DBEnv): pass

View File

@ -3,12 +3,13 @@ TestCases for checking dbShelve objects.
"""
import os
import shutil
import tempfile, random
import unittest
from bsddb import db, dbshelve
from .test_all import verbose
from bsddb.test.test_all import verbose
#----------------------------------------------------------------------
@ -78,9 +79,15 @@ class DBShelveTestCase(unittest.TestCase):
print("Running %s.test01_basics..." % self.__class__.__name__)
self.populateDB(self.d)
if verbose:
print(1, self.d.keys())
self.d.sync()
if verbose:
print(2, self.d.keys())
self.do_close()
self.do_open()
if verbose:
print(3, self.d.keys())
d = self.d
l = len(d)
@ -93,15 +100,15 @@ class DBShelveTestCase(unittest.TestCase):
print("keys:", k)
print("stats:", s)
assert 0 == d.has_key(b'bad key')
assert 1 == d.has_key(b'IA')
assert 1 == d.has_key(b'OA')
self.assertFalse(d.has_key(b'bad key'))
self.assertTrue(d.has_key(b'IA'), d.keys())
self.assertTrue(d.has_key(b'OA'))
d.delete(b'IA')
del d[b'OA']
assert 0 == d.has_key(b'IA')
assert 0 == d.has_key(b'OA')
assert len(d) == l-2
self.assertFalse(d.has_key(b'IA'))
self.assertFalse(d.has_key(b'OA'))
self.assertEqual(len(d), l-2)
values = []
for key in d.keys():
@ -112,28 +119,28 @@ class DBShelveTestCase(unittest.TestCase):
self.checkrec(key, value)
dbvalues = sorted(d.values(), key=lambda x: (str(type(x)), x))
assert len(dbvalues) == len(d.keys())
self.assertEqual(len(dbvalues), len(d.keys()))
values.sort(key=lambda x: (str(type(x)), x))
assert values == dbvalues, "%r != %r" % (values, dbvalues)
self.assertEqual(values, dbvalues, "%r != %r" % (values, dbvalues))
items = d.items()
assert len(items) == len(values)
self.assertEqual(len(items), len(values))
for key, value in items:
self.checkrec(key, value)
assert d.get(b'bad key') == None
assert d.get(b'bad key', None) == None
assert d.get(b'bad key', b'a string') == b'a string'
assert d.get(b'bad key', [1, 2, 3]) == [1, 2, 3]
self.assertEqual(d.get(b'bad key'), None)
self.assertEqual(d.get(b'bad key', None), None)
self.assertEqual(d.get(b'bad key', b'a string'), b'a string')
self.assertEqual(d.get(b'bad key', [1, 2, 3]), [1, 2, 3])
d.set_get_returns_none(0)
self.assertRaises(db.DBNotFoundError, d.get, b'bad key')
d.set_get_returns_none(1)
d.put(b'new key', b'new data')
assert d.get(b'new key') == b'new data'
assert d[b'new key'] == b'new data'
self.assertEqual(d.get(b'new key'), b'new data')
self.assertEqual(d[b'new key'], b'new data')
@ -157,7 +164,7 @@ class DBShelveTestCase(unittest.TestCase):
rec = c.next()
del c
assert count == len(d)
self.assertEqual(count, len(d))
count = 0
c = d.cursor()
@ -170,7 +177,7 @@ class DBShelveTestCase(unittest.TestCase):
self.checkrec(key, value)
rec = c.prev()
assert count == len(d)
self.assertEqual(count, len(d))
c.set(b'SS')
key, value = c.current()
@ -233,15 +240,15 @@ class ThreadHashShelveTestCase(BasicShelveTestCase):
#----------------------------------------------------------------------
class BasicEnvShelveTestCase(DBShelveTestCase):
def do_open(self):
self.homeDir = homeDir = os.path.join(
tempfile.gettempdir(), 'db_home')
try: os.mkdir(homeDir)
except os.error: pass
self.env = db.DBEnv()
self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
def setUp(self):
self.homeDir = tempfile.mkdtemp()
self.filename = 'dbshelve_db_file.db'
self.do_open()
def do_open(self):
self.env = db.DBEnv()
self.env.open(self.homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
self.filename = os.path.split(self.filename)[1]
self.d = dbshelve.DBShelf(self.env)
self.d.open(self.filename, self.dbtype, self.dbflags)
@ -253,10 +260,7 @@ class BasicEnvShelveTestCase(DBShelveTestCase):
def tearDown(self):
self.do_close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)

View File

@ -16,16 +16,17 @@
# software has been tested, but no warranty is expressed or
# implied.
#
# -- Gregory P. Smith <greg@electricrain.com>
# -- Gregory P. Smith <greg@krypto.org>
#
# $Id$
import shutil
import sys, os, re
import pickle
import tempfile
import unittest
from .test_all import verbose
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
@ -39,23 +40,16 @@ except ImportError:
#----------------------------------------------------------------------
class TableDBTestCase(unittest.TestCase):
db_home = 'db_home'
db_name = 'test-table.db'
def setUp(self):
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.homeDir = tempfile.mkdtemp()
self.tdb = dbtables.bsdTableDB(
filename='tabletest.db', dbhome=homeDir, create=1)
filename='tabletest.db', dbhome=self.homeDir, create=1)
def tearDown(self):
self.tdb.close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test01(self):
tabname = "test01"

View File

@ -3,6 +3,7 @@ is closed before its DB objects.
"""
import os
import shutil
import sys
import tempfile
import glob
@ -15,7 +16,7 @@ except ImportError:
# For Python 2.3
from bsddb import db
from .test_all import verbose
from bsddb.test.test_all import verbose
# We're going to get warnings in this module about trying to close the db when
# its env is already closed. Let's just ignore those.
@ -33,17 +34,14 @@ else:
class DBEnvClosedEarlyCrash(unittest.TestCase):
def setUp(self):
self.homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
try: os.mkdir(self.homeDir)
except os.error: pass
self.homeDir = tempfile.mkdtemp()
old_tempfile_tempdir = tempfile.tempdir
tempfile.tempdir = self.homeDir
self.filename = os.path.split(tempfile.mktemp())[1]
tempfile.tempdir = None
tempfile.tempdir = old_tempfile_tempdir
def tearDown(self):
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test01_close_dbenv_before_db(self):

View File

@ -9,7 +9,7 @@ import unittest
from bsddb import db
from .test_all import verbose
from bsddb.test.test_all import verbose
letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

View File

@ -1,6 +1,7 @@
"""TestCases for using the DB.join and DBCursor.join_item methods.
"""
import shutil
import sys, os
import tempfile
import time
@ -13,7 +14,7 @@ except ImportError:
have_threads = 0
import unittest
from .test_all import verbose
from bsddb.test.test_all import verbose
from bsddb import db, dbshelve, StringKeys
@ -47,19 +48,13 @@ class JoinTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK )
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK )
def tearDown(self):
self.env.close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test01_join(self):
if verbose:

View File

@ -2,6 +2,7 @@
TestCases for testing the locking sub-system.
"""
import shutil
import sys, os
import tempfile
import time
@ -15,7 +16,7 @@ except ImportError:
import unittest
from .test_all import verbose
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
@ -30,21 +31,15 @@ except ImportError:
class LockingTestCase(unittest.TestCase):
def setUp(self):
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_CREATE)
def tearDown(self):
self.env.close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test01_simple(self):

View File

@ -2,6 +2,7 @@
"""
import os
import shutil
import sys
import unittest
import tempfile
@ -18,22 +19,14 @@ except ImportError:
class MiscTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try:
os.mkdir(homeDir)
except OSError:
pass
self.homeDir = tempfile.mkdtemp()
def tearDown(self):
try:
os.remove(self.filename)
except OSError:
pass
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test01_badpointer(self):
dbs = dbshelve.open(self.filename)

View File

@ -1,6 +1,8 @@
import shutil
import sys, os
import pickle
import tempfile
import unittest
import glob
@ -16,33 +18,27 @@ except ImportError as e:
class pickleTestCase(unittest.TestCase):
"""Verify that DBError can be pickled and unpickled"""
db_home = 'db_home'
db_name = 'test-dbobj.db'
def setUp(self):
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.homeDir = tempfile.mkdtemp()
def tearDown(self):
if hasattr(self, 'db'):
del self.db
if hasattr(self, 'env'):
del self.env
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def _base_test_pickle_DBError(self, pickle):
self.env = db.DBEnv()
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
self.db = db.DB(self.env)
self.db.open(self.db_name, db.DB_HASH, db.DB_CREATE)
self.db.put('spam', 'eggs')
assert self.db['spam'] == 'eggs'
self.db.put(b'spam', b'eggs')
self.assertEqual(self.db[b'spam'], b'eggs')
try:
self.db.put('spam', 'ham', flags=db.DB_NOOVERWRITE)
self.db.put(b'spam', b'ham', flags=db.DB_NOOVERWRITE)
except db.DBError as egg:
pickledEgg = pickle.dumps(egg)
#print repr(pickledEgg)
@ -50,7 +46,7 @@ class pickleTestCase(unittest.TestCase):
if rottenEgg.args != egg.args or type(rottenEgg) != type(egg):
raise Exception(rottenEgg, '!=', egg)
else:
raise Exception("where's my DBError exception?!?")
self.fail("where's my DBError exception?!?")
self.db.close()
self.env.close()

View File

@ -14,7 +14,7 @@ except ImportError:
# For Python 2.3
from bsddb import db
from .test_all import verbose
from bsddb.test.test_all import verbose
letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

View File

@ -2,13 +2,14 @@
"""
import os
import shutil
import sys
import errno
import tempfile
from pprint import pprint
import unittest
from .test_all import verbose
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
@ -25,12 +26,14 @@ letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
class SimpleRecnoTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mktemp()
self.homeDir = tempfile.mkdtemp()
def tearDown(self):
try:
os.remove(self.filename)
except OSError as e:
if e.errno != errno.EEXIST: raise
shutil.rmtree(self.homeDir)
def test01_basic(self):
d = db.DB()
@ -203,10 +206,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
just a line in the file, but you can set a different record delimiter
if needed.
"""
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
source = os.path.join(homeDir, 'test_recno.txt')
if not os.path.isdir(homeDir):
os.mkdir(homeDir)
source = os.path.join(self.homeDir, 'test_recno.txt')
f = open(source, 'w') # create the file
f.close()

View File

@ -1,5 +1,6 @@
import unittest
import os
import shutil
import sys
import tempfile
import glob
@ -10,20 +11,17 @@ try:
except ImportError:
from bsddb import db
from .test_all import verbose
from bsddb.test.test_all import verbose
class DBSequenceTest(unittest.TestCase):
def setUp(self):
self.int_32_max = 0x100000000
self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
try:
os.mkdir(self.homeDir)
except os.error:
pass
self.homeDir = tempfile.mkdtemp()
old_tempfile_tempdir = tempfile.tempdir
tempfile.tempdir = self.homeDir
self.filename = os.path.split(tempfile.mktemp())[1]
tempfile.tempdir = None
tempfile.tempdir = old_tempfile_tempdir
self.dbenv = db.DBEnv()
self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0o666)
@ -41,9 +39,7 @@ class DBSequenceTest(unittest.TestCase):
self.dbenv.close()
del self.dbenv
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
shutil.rmtree(self.homeDir)
def test_get(self):
self.seq = db.DBSequence(self.d, flags=0)

View File

@ -25,7 +25,7 @@ except NameError:
pass
import unittest
from .test_all import verbose
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
@ -47,15 +47,10 @@ class BaseThreadedTestCase(unittest.TestCase):
if verbose:
dbutils._deadlock_VerboseFile = sys.stdout
homeDir = os.path.join(tempfile.gettempdir(), 'db_home')
self.homeDir = homeDir
try:
os.mkdir(homeDir)
except OSError as e:
if e.errno != errno.EEXIST: raise
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.setEnvOpts()
self.env.open(homeDir, self.envflags | db.DB_CREATE)
self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
self.filename = self.__class__.__name__ + '.db'
self.d = db.DB(self.env)
@ -74,6 +69,28 @@ class BaseThreadedTestCase(unittest.TestCase):
def makeData(self, key):
return DASH.join([key] * 5)
def _writerThread(self, *args, **kwargs):
raise RuntimeError("must override this in a subclass")
def _readerThread(self, *args, **kwargs):
raise RuntimeError("must override this in a subclass")
def writerThread(self, *args, **kwargs):
try:
self._writerThread(*args, **kwargs)
except db.DBLockDeadlockError:
if verbose:
print(currentThread().getName(), 'died from', e)
def readerThread(self, *args, **kwargs):
try:
self._readerThread(*args, **kwargs)
except db.DBLockDeadlockError as e:
if verbose:
print(currentThread().getName(), 'died from', e)
#----------------------------------------------------------------------
@ -111,7 +128,7 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
for t in threads:
t.join()
def writerThread(self, d, howMany, writerNum):
def _writerThread(self, d, howMany, writerNum):
#time.sleep(0.01 * writerNum + 0.01)
name = currentThread().getName()
start = howMany * writerNum
@ -122,7 +139,7 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
max_retries=12)
max_retries=20)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
@ -143,7 +160,7 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
if verbose:
print("%s: thread finished" % name)
def readerThread(self, d, readerNum):
def _readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum)
name = currentThread().getName()
@ -215,7 +232,7 @@ class SimpleThreadedBase(BaseThreadedTestCase):
for t in threads:
t.join()
def writerThread(self, d, howMany, writerNum):
def _writerThread(self, d, howMany, writerNum):
name = currentThread().getName()
start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
@ -226,7 +243,7 @@ class SimpleThreadedBase(BaseThreadedTestCase):
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
max_retries=12)
max_retries=20)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
@ -235,12 +252,12 @@ class SimpleThreadedBase(BaseThreadedTestCase):
if random() <= 0.05:
for y in range(start, x):
key = ('%04d' % x).encode("ascii")
data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
self.assertEqual(data, self.makeData(key))
# flush them
try:
dbutils.DeadlockWrap(d.sync, max_retries=12)
dbutils.DeadlockWrap(d.sync, max_retries=20)
except db.DBIncompleteError as val:
if verbose:
print("could not complete sync()...")
@ -248,31 +265,31 @@ class SimpleThreadedBase(BaseThreadedTestCase):
# read them back, deleting a few
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
if verbose and x % 100 == 0:
print("%s: fetched record (%s, %s)" % (name, key, data))
self.assertEqual(data, self.makeData(key))
if random() <= 0.10:
dbutils.DeadlockWrap(d.delete, key, max_retries=12)
dbutils.DeadlockWrap(d.delete, key, max_retries=20)
if verbose:
print("%s: deleted record %s" % (name, key))
if verbose:
print("%s: thread finished" % name)
def readerThread(self, d, readerNum):
def _readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum)
name = currentThread().getName()
for loop in range(5):
c = d.cursor()
count = 0
rec = dbutils.DeadlockWrap(c.first, max_retries=10)
rec = dbutils.DeadlockWrap(c.first, max_retries=20)
while rec:
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
rec = dbutils.DeadlockWrap(c.next, max_retries=10)
rec = dbutils.DeadlockWrap(c.next, max_retries=20)
if verbose:
print("%s: found %d records" % (name, count))
c.close()
@ -360,7 +377,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
txn.abort()
time.sleep(0.05)
def writerThread(self, d, howMany, writerNum):
def _writerThread(self, d, howMany, writerNum):
name = currentThread().getName()
start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
@ -401,7 +418,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
if verbose:
print("%s: thread finished" % name)
def readerThread(self, d, readerNum):
def _readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum + 0.05)
name = currentThread().getName()

View File

@ -4,20 +4,21 @@ Run all test cases.
"""
import sys
import unittest
from test.test_support import requires, verbose, run_unittest, unlink
import test.test_support
from test.test_support import requires, run_unittest, unlink
# When running as a script instead of within the regrtest framework, skip the
# requires test, since it's obvious we want to run them.
if __name__ != '__main__':
requires('bsddb')
verbose = False
import bsddb.test.test_all
if 'verbose' in sys.argv:
verbose = True
bsddb.test.test_all.verbose = 1
sys.argv.remove('verbose')
if 'silent' in sys.argv: # take care of old flag, just in case
verbose = False
bsddb.test.test_all.verbose = 0
sys.argv.remove('silent')
@ -25,14 +26,15 @@ def suite():
try:
# this is special, it used to segfault the interpreter
import bsddb.test.test_1413192
except:
for f in ['__db.001', '__db.002', '__db.003', 'log.0000000001']:
finally:
for f in ['xxx.db','__db.001','__db.002','__db.003','log.0000000001']:
unlink(f)
test_modules = [
'test_associate',
'test_basics',
'test_compat',
'test_compare',
'test_dbobj',
'test_dbshelve',
'test_dbtables',
@ -41,6 +43,7 @@ def suite():
'test_join',
'test_lock',
'test_misc',
'test_pickle',
'test_queue',
'test_recno',
'test_thread',