623 lines
19 KiB
Python
623 lines
19 KiB
Python
"""Run all test cases.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import unittest
|
|
try:
|
|
# For Pythons w/distutils pybsddb
|
|
import bsddb3 as bsddb
|
|
except ImportError:
|
|
# For Python 2.3
|
|
import bsddb
|
|
|
|
|
|
if sys.version_info[0] >= 3 :
|
|
charset = "iso8859-1" # Full 8 bit
|
|
|
|
class logcursor_py3k(object) :
|
|
def __init__(self, env) :
|
|
self._logcursor = env.log_cursor()
|
|
|
|
def __getattr__(self, v) :
|
|
return getattr(self._logcursor, v)
|
|
|
|
def __next__(self) :
|
|
v = getattr(self._logcursor, "next")()
|
|
if v is not None :
|
|
v = (v[0], v[1].decode(charset))
|
|
return v
|
|
|
|
next = __next__
|
|
|
|
def first(self) :
|
|
v = self._logcursor.first()
|
|
if v is not None :
|
|
v = (v[0], v[1].decode(charset))
|
|
return v
|
|
|
|
def last(self) :
|
|
v = self._logcursor.last()
|
|
if v is not None :
|
|
v = (v[0], v[1].decode(charset))
|
|
return v
|
|
|
|
def prev(self) :
|
|
v = self._logcursor.prev()
|
|
if v is not None :
|
|
v = (v[0], v[1].decode(charset))
|
|
return v
|
|
|
|
def current(self) :
|
|
v = self._logcursor.current()
|
|
if v is not None :
|
|
v = (v[0], v[1].decode(charset))
|
|
return v
|
|
|
|
def set(self, lsn) :
|
|
v = self._logcursor.set(lsn)
|
|
if v is not None :
|
|
v = (v[0], v[1].decode(charset))
|
|
return v
|
|
|
|
class cursor_py3k(object) :
|
|
def __init__(self, db, *args, **kwargs) :
|
|
self._dbcursor = db.cursor(*args, **kwargs)
|
|
|
|
def __getattr__(self, v) :
|
|
return getattr(self._dbcursor, v)
|
|
|
|
def _fix(self, v) :
|
|
if v is None : return None
|
|
key, value = v
|
|
if isinstance(key, bytes) :
|
|
key = key.decode(charset)
|
|
return (key, value.decode(charset))
|
|
|
|
def __next__(self) :
|
|
v = getattr(self._dbcursor, "next")()
|
|
return self._fix(v)
|
|
|
|
next = __next__
|
|
|
|
def previous(self) :
|
|
v = self._dbcursor.previous()
|
|
return self._fix(v)
|
|
|
|
def last(self) :
|
|
v = self._dbcursor.last()
|
|
return self._fix(v)
|
|
|
|
def set(self, k) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
v = self._dbcursor.set(k)
|
|
return self._fix(v)
|
|
|
|
def set_recno(self, num) :
|
|
v = self._dbcursor.set_recno(num)
|
|
return self._fix(v)
|
|
|
|
def set_range(self, k, dlen=-1, doff=-1) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
v = self._dbcursor.set_range(k, dlen=dlen, doff=doff)
|
|
return self._fix(v)
|
|
|
|
def dup(self, flags=0) :
|
|
cursor = self._dbcursor.dup(flags)
|
|
return dup_cursor_py3k(cursor)
|
|
|
|
def next_dup(self) :
|
|
v = self._dbcursor.next_dup()
|
|
return self._fix(v)
|
|
|
|
def next_nodup(self) :
|
|
v = self._dbcursor.next_nodup()
|
|
return self._fix(v)
|
|
|
|
def put(self, key, data, flags=0, dlen=-1, doff=-1) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
if isinstance(data, str) :
|
|
value = bytes(data, charset)
|
|
return self._dbcursor.put(key, data, flags=flags, dlen=dlen,
|
|
doff=doff)
|
|
|
|
def current(self, flags=0, dlen=-1, doff=-1) :
|
|
v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff)
|
|
return self._fix(v)
|
|
|
|
def first(self) :
|
|
v = self._dbcursor.first()
|
|
return self._fix(v)
|
|
|
|
def pget(self, key=None, data=None, flags=0) :
|
|
# Incorrect because key can be a bare number,
|
|
# but enough to pass testsuite
|
|
if isinstance(key, int) and (data is None) and (flags == 0) :
|
|
flags = key
|
|
key = None
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
if isinstance(data, int) and (flags==0) :
|
|
flags = data
|
|
data = None
|
|
if isinstance(data, str) :
|
|
data = bytes(data, charset)
|
|
v=self._dbcursor.pget(key=key, data=data, flags=flags)
|
|
if v is not None :
|
|
v1, v2, v3 = v
|
|
if isinstance(v1, bytes) :
|
|
v1 = v1.decode(charset)
|
|
if isinstance(v2, bytes) :
|
|
v2 = v2.decode(charset)
|
|
|
|
v = (v1, v2, v3.decode(charset))
|
|
|
|
return v
|
|
|
|
def join_item(self) :
|
|
v = self._dbcursor.join_item()
|
|
if v is not None :
|
|
v = v.decode(charset)
|
|
return v
|
|
|
|
def get(self, *args, **kwargs) :
|
|
l = len(args)
|
|
if l == 2 :
|
|
k, f = args
|
|
if isinstance(k, str) :
|
|
k = bytes(k, "iso8859-1")
|
|
args = (k, f)
|
|
elif l == 3 :
|
|
k, d, f = args
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
if isinstance(d, str) :
|
|
d = bytes(d, charset)
|
|
args =(k, d, f)
|
|
|
|
v = self._dbcursor.get(*args, **kwargs)
|
|
if v is not None :
|
|
k, v = v
|
|
if isinstance(k, bytes) :
|
|
k = k.decode(charset)
|
|
v = (k, v.decode(charset))
|
|
return v
|
|
|
|
def get_both(self, key, value) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
if isinstance(value, str) :
|
|
value = bytes(value, charset)
|
|
v=self._dbcursor.get_both(key, value)
|
|
return self._fix(v)
|
|
|
|
class dup_cursor_py3k(cursor_py3k) :
|
|
def __init__(self, dbcursor) :
|
|
self._dbcursor = dbcursor
|
|
|
|
class DB_py3k(object) :
|
|
def __init__(self, *args, **kwargs) :
|
|
args2=[]
|
|
for i in args :
|
|
if isinstance(i, DBEnv_py3k) :
|
|
i = i._dbenv
|
|
args2.append(i)
|
|
args = tuple(args2)
|
|
for k, v in kwargs.items() :
|
|
if isinstance(v, DBEnv_py3k) :
|
|
kwargs[k] = v._dbenv
|
|
|
|
self._db = bsddb._db.DB_orig(*args, **kwargs)
|
|
|
|
def __contains__(self, k) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
return getattr(self._db, "has_key")(k)
|
|
|
|
def __getitem__(self, k) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
v = self._db[k]
|
|
if v is not None :
|
|
v = v.decode(charset)
|
|
return v
|
|
|
|
def __setitem__(self, k, v) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
if isinstance(v, str) :
|
|
v = bytes(v, charset)
|
|
self._db[k] = v
|
|
|
|
def __delitem__(self, k) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
del self._db[k]
|
|
|
|
def __getattr__(self, v) :
|
|
return getattr(self._db, v)
|
|
|
|
def __len__(self) :
|
|
return len(self._db)
|
|
|
|
def has_key(self, k, txn=None) :
|
|
if isinstance(k, str) :
|
|
k = bytes(k, charset)
|
|
return self._db.has_key(k, txn=txn)
|
|
|
|
def set_re_delim(self, c) :
|
|
if isinstance(c, str) : # We can use a numeric value byte too
|
|
c = bytes(c, charset)
|
|
return self._db.set_re_delim(c)
|
|
|
|
def set_re_pad(self, c) :
|
|
if isinstance(c, str) : # We can use a numeric value byte too
|
|
c = bytes(c, charset)
|
|
return self._db.set_re_pad(c)
|
|
|
|
def get_re_source(self) :
|
|
source = self._db.get_re_source()
|
|
return source.decode(charset)
|
|
|
|
def put(self, key, data, txn=None, flags=0, dlen=-1, doff=-1) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
if isinstance(data, str) :
|
|
value = bytes(data, charset)
|
|
return self._db.put(key, data, flags=flags, txn=txn, dlen=dlen,
|
|
doff=doff)
|
|
|
|
def append(self, value, txn=None) :
|
|
if isinstance(value, str) :
|
|
value = bytes(value, charset)
|
|
return self._db.append(value, txn=txn)
|
|
|
|
def get_size(self, key) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
return self._db.get_size(key)
|
|
|
|
def exists(self, key, *args, **kwargs) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
return self._db.exists(key, *args, **kwargs)
|
|
|
|
def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
if default != "MagicCookie" : # Magic for 'test_get_none.py'
|
|
v=self._db.get(key, default=default, txn=txn, flags=flags,
|
|
dlen=dlen, doff=doff)
|
|
else :
|
|
v=self._db.get(key, txn=txn, flags=flags,
|
|
dlen=dlen, doff=doff)
|
|
if (v is not None) and isinstance(v, bytes) :
|
|
v = v.decode(charset)
|
|
return v
|
|
|
|
def pget(self, key, txn=None) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
v=self._db.pget(key, txn=txn)
|
|
if v is not None :
|
|
v1, v2 = v
|
|
if isinstance(v1, bytes) :
|
|
v1 = v1.decode(charset)
|
|
|
|
v = (v1, v2.decode(charset))
|
|
return v
|
|
|
|
def get_both(self, key, value, txn=None, flags=0) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
if isinstance(value, str) :
|
|
value = bytes(value, charset)
|
|
v=self._db.get_both(key, value, txn=txn, flags=flags)
|
|
if v is not None :
|
|
v = v.decode(charset)
|
|
return v
|
|
|
|
def delete(self, key, txn=None) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
return self._db.delete(key, txn=txn)
|
|
|
|
def keys(self) :
|
|
k = self._db.keys()
|
|
if len(k) and isinstance(k[0], bytes) :
|
|
return [i.decode(charset) for i in self._db.keys()]
|
|
else :
|
|
return k
|
|
|
|
def items(self) :
|
|
data = self._db.items()
|
|
if not len(data) : return data
|
|
data2 = []
|
|
for k, v in data :
|
|
if isinstance(k, bytes) :
|
|
k = k.decode(charset)
|
|
data2.append((k, v.decode(charset)))
|
|
return data2
|
|
|
|
def associate(self, secondarydb, callback, flags=0, txn=None) :
|
|
class associate_callback(object) :
|
|
def __init__(self, callback) :
|
|
self._callback = callback
|
|
|
|
def callback(self, key, data) :
|
|
if isinstance(key, str) :
|
|
key = key.decode(charset)
|
|
data = data.decode(charset)
|
|
key = self._callback(key, data)
|
|
if (key != bsddb._db.DB_DONOTINDEX) :
|
|
if isinstance(key, str) :
|
|
key = bytes(key, charset)
|
|
elif isinstance(key, list) :
|
|
key2 = []
|
|
for i in key :
|
|
if isinstance(i, str) :
|
|
i = bytes(i, charset)
|
|
key2.append(i)
|
|
key = key2
|
|
return key
|
|
|
|
return self._db.associate(secondarydb._db,
|
|
associate_callback(callback).callback, flags=flags,
|
|
txn=txn)
|
|
|
|
def cursor(self, txn=None, flags=0) :
|
|
return cursor_py3k(self._db, txn=txn, flags=flags)
|
|
|
|
def join(self, cursor_list) :
|
|
cursor_list = [i._dbcursor for i in cursor_list]
|
|
return dup_cursor_py3k(self._db.join(cursor_list))
|
|
|
|
class DBEnv_py3k(object) :
|
|
def __init__(self, *args, **kwargs) :
|
|
self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs)
|
|
|
|
def __getattr__(self, v) :
|
|
return getattr(self._dbenv, v)
|
|
|
|
def log_cursor(self, flags=0) :
|
|
return logcursor_py3k(self._dbenv)
|
|
|
|
def get_lg_dir(self) :
|
|
return self._dbenv.get_lg_dir().decode(charset)
|
|
|
|
def get_tmp_dir(self) :
|
|
return self._dbenv.get_tmp_dir().decode(charset)
|
|
|
|
def get_data_dirs(self) :
|
|
# Have to use a list comprehension and not
|
|
# generators, because we are supporting Python 2.3.
|
|
return tuple(
|
|
[i.decode(charset) for i in self._dbenv.get_data_dirs()])
|
|
|
|
class DBSequence_py3k(object) :
|
|
def __init__(self, db, *args, **kwargs) :
|
|
self._db=db
|
|
self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs)
|
|
|
|
def __getattr__(self, v) :
|
|
return getattr(self._dbsequence, v)
|
|
|
|
def open(self, key, *args, **kwargs) :
|
|
return self._dbsequence.open(bytes(key, charset), *args, **kwargs)
|
|
|
|
def get_key(self) :
|
|
return self._dbsequence.get_key().decode(charset)
|
|
|
|
def get_dbp(self) :
|
|
return self._db
|
|
|
|
import string
|
|
string.letters=[chr(i) for i in xrange(65,91)]
|
|
|
|
bsddb._db.DBEnv_orig = bsddb._db.DBEnv
|
|
bsddb._db.DB_orig = bsddb._db.DB
|
|
if bsddb.db.version() <= (4, 3) :
|
|
bsddb._db.DBSequence_orig = None
|
|
else :
|
|
bsddb._db.DBSequence_orig = bsddb._db.DBSequence
|
|
|
|
def do_proxy_db_py3k(flag) :
|
|
flag2 = do_proxy_db_py3k.flag
|
|
do_proxy_db_py3k.flag = flag
|
|
if flag :
|
|
bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k
|
|
bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k
|
|
bsddb._db.DBSequence = DBSequence_py3k
|
|
else :
|
|
bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig
|
|
bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig
|
|
bsddb._db.DBSequence = bsddb._db.DBSequence_orig
|
|
return flag2
|
|
|
|
do_proxy_db_py3k.flag = False
|
|
do_proxy_db_py3k(True)
|
|
|
|
try:
|
|
# For Pythons w/distutils pybsddb
|
|
from bsddb3 import db, dbtables, dbutils, dbshelve, \
|
|
hashopen, btopen, rnopen, dbobj
|
|
except ImportError:
|
|
# For Python 2.3
|
|
from bsddb import db, dbtables, dbutils, dbshelve, \
|
|
hashopen, btopen, rnopen, dbobj
|
|
|
|
try:
|
|
from bsddb3 import test_support
|
|
except ImportError:
|
|
if sys.version_info[0] < 3 :
|
|
from test import test_support
|
|
else :
|
|
from test import support as test_support
|
|
|
|
|
|
try:
|
|
if sys.version_info[0] < 3 :
|
|
from threading import Thread, currentThread
|
|
del Thread, currentThread
|
|
else :
|
|
from threading import Thread, current_thread
|
|
del Thread, current_thread
|
|
have_threads = True
|
|
except ImportError:
|
|
have_threads = False
|
|
|
|
verbose = 0
|
|
if 'verbose' in sys.argv:
|
|
verbose = 1
|
|
sys.argv.remove('verbose')
|
|
|
|
if 'silent' in sys.argv: # take care of old flag, just in case
|
|
verbose = 0
|
|
sys.argv.remove('silent')
|
|
|
|
|
|
def print_versions():
|
|
print
|
|
print '-=' * 38
|
|
print db.DB_VERSION_STRING
|
|
print 'bsddb.db.version(): %s' % (db.version(), )
|
|
print 'bsddb.db.__version__: %s' % db.__version__
|
|
print 'bsddb.db.cvsid: %s' % db.cvsid
|
|
|
|
# Workaround for allowing generating an EGGs as a ZIP files.
|
|
suffix="__"
|
|
print 'py module: %s' % getattr(bsddb, "__file"+suffix)
|
|
print 'extension module: %s' % getattr(bsddb, "__file"+suffix)
|
|
|
|
print 'python version: %s' % sys.version
|
|
print 'My pid: %s' % os.getpid()
|
|
print '-=' * 38
|
|
|
|
|
|
def get_new_path(name) :
|
|
get_new_path.mutex.acquire()
|
|
try :
|
|
import os
|
|
path=os.path.join(get_new_path.prefix,
|
|
name+"_"+str(os.getpid())+"_"+str(get_new_path.num))
|
|
get_new_path.num+=1
|
|
finally :
|
|
get_new_path.mutex.release()
|
|
return path
|
|
|
|
def get_new_environment_path() :
|
|
path=get_new_path("environment")
|
|
import os
|
|
try:
|
|
os.makedirs(path,mode=0700)
|
|
except os.error:
|
|
test_support.rmtree(path)
|
|
os.makedirs(path)
|
|
return path
|
|
|
|
def get_new_database_path() :
|
|
path=get_new_path("database")
|
|
import os
|
|
if os.path.exists(path) :
|
|
os.remove(path)
|
|
return path
|
|
|
|
|
|
# This path can be overriden via "set_test_path_prefix()".
|
|
import os, os.path
|
|
get_new_path.prefix=os.path.join(os.sep,"tmp","z-Berkeley_DB")
|
|
get_new_path.num=0
|
|
|
|
def get_test_path_prefix() :
|
|
return get_new_path.prefix
|
|
|
|
def set_test_path_prefix(path) :
|
|
get_new_path.prefix=path
|
|
|
|
def remove_test_path_directory() :
|
|
test_support.rmtree(get_new_path.prefix)
|
|
|
|
if have_threads :
|
|
import threading
|
|
get_new_path.mutex=threading.Lock()
|
|
del threading
|
|
else :
|
|
class Lock(object) :
|
|
def acquire(self) :
|
|
pass
|
|
def release(self) :
|
|
pass
|
|
get_new_path.mutex=Lock()
|
|
del Lock
|
|
|
|
|
|
|
|
class PrintInfoFakeTest(unittest.TestCase):
|
|
def testPrintVersions(self):
|
|
print_versions()
|
|
|
|
|
|
# This little hack is for when this module is run as main and all the
|
|
# other modules import it so they will still be able to get the right
|
|
# verbose setting. It's confusing but it works.
|
|
if sys.version_info[0] < 3 :
|
|
import test_all
|
|
test_all.verbose = verbose
|
|
else :
|
|
import sys
|
|
print >>sys.stderr, "Work to do!"
|
|
|
|
|
|
def suite(module_prefix='', timing_check=None):
|
|
test_modules = [
|
|
'test_associate',
|
|
'test_basics',
|
|
'test_dbenv',
|
|
'test_db',
|
|
'test_compare',
|
|
'test_compat',
|
|
'test_cursor_pget_bug',
|
|
'test_dbobj',
|
|
'test_dbshelve',
|
|
'test_dbtables',
|
|
'test_distributed_transactions',
|
|
'test_early_close',
|
|
'test_fileid',
|
|
'test_get_none',
|
|
'test_join',
|
|
'test_lock',
|
|
'test_misc',
|
|
'test_pickle',
|
|
'test_queue',
|
|
'test_recno',
|
|
'test_replication',
|
|
'test_sequence',
|
|
'test_thread',
|
|
]
|
|
|
|
alltests = unittest.TestSuite()
|
|
for name in test_modules:
|
|
#module = __import__(name)
|
|
# Do it this way so that suite may be called externally via
|
|
# python's Lib/test/test_bsddb3.
|
|
module = __import__(module_prefix+name, globals(), locals(), name)
|
|
|
|
alltests.addTest(module.test_suite())
|
|
if timing_check:
|
|
alltests.addTest(unittest.makeSuite(timing_check))
|
|
return alltests
|
|
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
|
|
return suite
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print_versions()
|
|
unittest.main(defaultTest='suite')
|