bpo-30850: Use specialized assert methods in bsddb tests. (#2584)

This provides more information on test failures.
This commit is contained in:
Serhiy Storchaka 2017-07-05 15:09:36 +03:00 committed by Victor Stinner
parent 8767de2f77
commit 2b92cd3b16
13 changed files with 82 additions and 83 deletions

View File

@ -233,7 +233,7 @@ class AssociateTestCase(unittest.TestCase):
self.assertEqual(vals, None, vals)
vals = secDB.pget('Unknown', txn=txn)
self.assertTrue(vals[0] == 99 or vals[0] == '99', vals)
self.assertIn(vals[0], (99, '99'), vals)
vals[1].index('Unknown')
vals[1].index('Unnamed')
vals[1].index('unknown')
@ -247,7 +247,8 @@ class AssociateTestCase(unittest.TestCase):
if type(self.keytype) == type(''):
self.assertTrue(int(rec[0])) # for primary db, key is a number
else:
self.assertTrue(rec[0] and type(rec[0]) == type(0))
self.assertTrue(rec[0])
self.assertIs(type(rec[0]), int)
count = count + 1
if verbose:
print rec
@ -262,7 +263,7 @@ class AssociateTestCase(unittest.TestCase):
# test cursor pget
vals = self.cur.pget('Unknown', flags=db.DB_LAST)
self.assertTrue(vals[1] == 99 or vals[1] == '99', vals)
self.assertIn(vals[1], (99, '99'), vals)
self.assertEqual(vals[0], 'Unknown')
vals[2].index('Unknown')
vals[2].index('Unnamed')

View File

@ -597,7 +597,7 @@ class BasicTestCase(unittest.TestCase):
d.put("abcde", "ABCDE");
num = d.truncate()
self.assertTrue(num >= 1, "truncate returned <= 0 on non-empty database")
self.assertGreaterEqual(num, 1, "truncate returned <= 0 on non-empty database")
num = d.truncate()
self.assertEqual(num, 0,
"truncate on empty DB returned nonzero (%r)" % (num,))
@ -616,9 +616,9 @@ class BasicTestCase(unittest.TestCase):
if db.version() >= (4, 6):
def test08_exists(self) :
self.d.put("abcde", "ABCDE")
self.assertTrue(self.d.exists("abcde") == True,
self.assertEqual(self.d.exists("abcde"), True,
"DB->exists() returns wrong value")
self.assertTrue(self.d.exists("x") == False,
self.assertEqual(self.d.exists("x"), False,
"DB->exists() returns wrong value")
#----------------------------------------
@ -773,7 +773,7 @@ class BasicTransactionTestCase(BasicTestCase):
if verbose:
print 'log file: ' + log
logs = self.env.log_archive(db.DB_ARCH_REMOVE)
self.assertTrue(not logs)
self.assertFalse(logs)
self.txn = self.env.txn_begin()
@ -785,9 +785,9 @@ class BasicTransactionTestCase(BasicTestCase):
self.d.put("abcde", "ABCDE", txn=txn)
txn.commit()
txn = self.env.txn_begin()
self.assertTrue(self.d.exists("abcde", txn=txn) == True,
self.assertEqual(self.d.exists("abcde", txn=txn), True,
"DB->exists() returns wrong value")
self.assertTrue(self.d.exists("x", txn=txn) == False,
self.assertEqual(self.d.exists("x", txn=txn), False,
"DB->exists() returns wrong value")
txn.abort()
@ -802,7 +802,7 @@ class BasicTransactionTestCase(BasicTestCase):
d.put("abcde", "ABCDE");
txn = self.env.txn_begin()
num = d.truncate(txn)
self.assertTrue(num >= 1, "truncate returned <= 0 on non-empty database")
self.assertGreaterEqual(num, 1, "truncate returned <= 0 on non-empty database")
num = d.truncate(txn)
self.assertEqual(num, 0,
"truncate on empty DB returned nonzero (%r)" % (num,))
@ -1086,7 +1086,7 @@ class PrivateObject(unittest.TestCase) :
a = "example of private object"
self.obj.set_private(a)
b = self.obj.get_private()
self.assertTrue(a is b) # Object identity
self.assertIs(a, b) # Object identity
def test03_leak_assignment(self) :
a = "example of private object"

View File

@ -54,15 +54,15 @@ class DBEnv_general(DBEnv) :
self.env.set_cache_max(0, size)
size2 = self.env.get_cache_max()
self.assertEqual(0, size2[0])
self.assertTrue(size <= size2[1])
self.assertTrue(2*size > size2[1])
self.assertLessEqual(size, size2[1])
self.assertGreater(2*size, size2[1])
if db.version() >= (4, 4) :
def test_mutex_stat(self) :
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK)
stat = self.env.mutex_stat()
self.assertTrue("mutex_inuse_max" in stat)
self.assertIn("mutex_inuse_max", stat)
def test_lg_filemode(self) :
for i in [0600, 0660, 0666] :
@ -128,8 +128,8 @@ class DBEnv_general(DBEnv) :
i = i*1024*1024
self.env.set_lg_regionmax(i)
j = self.env.get_lg_regionmax()
self.assertTrue(i <= j)
self.assertTrue(2*i > j)
self.assertLessEqual(i, j)
self.assertGreater(2*i, j)
def test_lk_detect(self) :
flags= [db.DB_LOCK_DEFAULT, db.DB_LOCK_EXPIRE, db.DB_LOCK_MAXLOCKS,
@ -150,10 +150,10 @@ class DBEnv_general(DBEnv) :
def test_lg_bsize(self) :
log_size = 70*1024
self.env.set_lg_bsize(log_size)
self.assertTrue(self.env.get_lg_bsize() >= log_size)
self.assertTrue(self.env.get_lg_bsize() < 4*log_size)
self.assertGreaterEqual(self.env.get_lg_bsize(), log_size)
self.assertLess(self.env.get_lg_bsize(), 4*log_size)
self.env.set_lg_bsize(4*log_size)
self.assertTrue(self.env.get_lg_bsize() >= 4*log_size)
self.assertGreaterEqual(self.env.get_lg_bsize(), 4*log_size)
def test_setget_data_dirs(self) :
dirs = ("a", "b", "c", "d")
@ -185,7 +185,7 @@ class DBEnv_general(DBEnv) :
self.assertEqual(cachesize2[0], cachesize3[0])
self.assertEqual(cachesize2[2], cachesize3[2])
# In Berkeley DB 5.1, the cachesize can change when opening the Env
self.assertTrue(cachesize2[1] <= cachesize3[1])
self.assertLessEqual(cachesize2[1], cachesize3[1])
def test_set_cachesize_dbenv_db(self) :
# You can not configure the cachesize using
@ -299,7 +299,7 @@ class DBEnv_log(DBEnv) :
msg = "This is a test..."
self.env.log_printf(msg)
logc = self.env.log_cursor()
self.assertTrue(msg in (logc.last()[1]))
self.assertIn(msg, logc.last()[1])
if db.version() >= (4, 7) :
def test_log_config(self) :
@ -341,21 +341,21 @@ class DBEnv_log_txn(DBEnv) :
txn.commit()
logc = self.env.log_cursor()
logc.last() # Skip the commit
self.assertTrue(msg in (logc.prev()[1]))
self.assertIn(msg, logc.prev()[1])
msg = "This is another test..."
txn = self.env.txn_begin()
self.env.log_printf(msg, txn=txn)
txn.abort() # Do not store the new message
logc.last() # Skip the abort
self.assertTrue(msg not in (logc.prev()[1]))
self.assertNotIn(msg, logc.prev()[1])
msg = "This is a third test..."
txn = self.env.txn_begin()
self.env.log_printf(msg, txn=txn)
txn.commit() # Do not store the new message
logc.last() # Skip the commit
self.assertTrue(msg in (logc.prev()[1]))
self.assertIn(msg, logc.prev()[1])
class DBEnv_memp(DBEnv):
@ -372,39 +372,39 @@ class DBEnv_memp(DBEnv):
def test_memp_1_trickle(self) :
self.db.put("hi", "bye")
self.assertTrue(self.env.memp_trickle(100) > 0)
self.assertGreater(self.env.memp_trickle(100), 0)
# Preserve the order, do "memp_trickle" test first
def test_memp_2_sync(self) :
self.db.put("hi", "bye")
self.env.memp_sync() # Full flush
# Nothing to do...
self.assertTrue(self.env.memp_trickle(100) == 0)
self.assertEqual(self.env.memp_trickle(100), 0)
self.db.put("hi", "bye2")
self.env.memp_sync((1, 0)) # NOP, probably
# Something to do... or not
self.assertTrue(self.env.memp_trickle(100) >= 0)
self.assertGreaterEqual(self.env.memp_trickle(100), 0)
self.db.put("hi", "bye3")
self.env.memp_sync((123, 99)) # Full flush
# Nothing to do...
self.assertTrue(self.env.memp_trickle(100) == 0)
self.assertEqual(self.env.memp_trickle(100), 0)
def test_memp_stat_1(self) :
stats = self.env.memp_stat() # No param
self.assertTrue(len(stats)==2)
self.assertTrue("cache_miss" in stats[0])
self.assertEqual(len(stats), 2)
self.assertIn("cache_miss", stats[0])
stats = self.env.memp_stat(db.DB_STAT_CLEAR) # Positional param
self.assertTrue("cache_miss" in stats[0])
self.assertIn("cache_miss", stats[0])
stats = self.env.memp_stat(flags=0) # Keyword param
self.assertTrue("cache_miss" in stats[0])
self.assertIn("cache_miss", stats[0])
def test_memp_stat_2(self) :
stats=self.env.memp_stat()[1]
self.assertTrue(len(stats))==1
self.assertTrue("test" in stats)
self.assertTrue("page_in" in stats["test"])
self.assertEqual(len(stats), 1)
self.assertIn("test", stats)
self.assertIn("page_in", stats["test"])
class DBEnv_logcursor(DBEnv):
def setUp(self):
@ -426,28 +426,28 @@ class DBEnv_logcursor(DBEnv):
DBEnv.tearDown(self)
def _check_return(self, value) :
self.assertTrue(isinstance(value, tuple))
self.assertIsInstance(value, tuple)
self.assertEqual(len(value), 2)
self.assertTrue(isinstance(value[0], tuple))
self.assertIsInstance(value[0], tuple)
self.assertEqual(len(value[0]), 2)
self.assertTrue(isinstance(value[0][0], int))
self.assertTrue(isinstance(value[0][1], int))
self.assertTrue(isinstance(value[1], str))
self.assertIsInstance(value[0][0], int)
self.assertIsInstance(value[0][1], int)
self.assertIsInstance(value[1], str)
# Preserve test order
def test_1_first(self) :
logc = self.env.log_cursor()
v = logc.first()
self._check_return(v)
self.assertTrue((1, 1) < v[0])
self.assertTrue(len(v[1])>0)
self.assertLess((1, 1), v[0])
self.assertGreater(len(v[1]), 0)
def test_2_last(self) :
logc = self.env.log_cursor()
lsn_first = logc.first()[0]
v = logc.last()
self._check_return(v)
self.assertTrue(lsn_first < v[0])
self.assertLess(lsn_first, v[0])
def test_3_next(self) :
logc = self.env.log_cursor()
@ -456,16 +456,16 @@ class DBEnv_logcursor(DBEnv):
lsn_first = logc.first()[0]
v = logc.next()
self._check_return(v)
self.assertTrue(lsn_first < v[0])
self.assertTrue(lsn_last > v[0])
self.assertLess(lsn_first, v[0])
self.assertGreater(lsn_last, v[0])
v2 = logc.next()
self.assertTrue(v2[0] > v[0])
self.assertTrue(lsn_last > v2[0])
self.assertGreater(v2[0], v[0])
self.assertGreater(lsn_last, v2[0])
v3 = logc.next()
self.assertTrue(v3[0] > v2[0])
self.assertTrue(lsn_last > v3[0])
self.assertGreater(v3[0], v2[0])
self.assertGreater(lsn_last, v3[0])
def test_4_prev(self) :
logc = self.env.log_cursor()
@ -474,16 +474,16 @@ class DBEnv_logcursor(DBEnv):
lsn_last = logc.last()[0]
v = logc.prev()
self._check_return(v)
self.assertTrue(lsn_first < v[0])
self.assertTrue(lsn_last > v[0])
self.assertLess(lsn_first, v[0])
self.assertGreater(lsn_last, v[0])
v2 = logc.prev()
self.assertTrue(v2[0] < v[0])
self.assertTrue(lsn_first < v2[0])
self.assertLess(v2[0], v[0])
self.assertLess(lsn_first, v2[0])
v3 = logc.prev()
self.assertTrue(v3[0] < v2[0])
self.assertTrue(lsn_first < v3[0])
self.assertLess(v3[0], v2[0])
self.assertLess(lsn_first, v3[0])
def test_5_current(self) :
logc = self.env.log_cursor()

View File

@ -248,7 +248,7 @@ class DBShelveTestCase(unittest.TestCase):
self.assertEqual(value.L, [x] * 10)
else:
self.assertTrue(0, 'Unknown key type, fix the test')
self.fail('Unknown key type, fix the test')
#----------------------------------------------------------------------

View File

@ -82,8 +82,8 @@ class TableDBTestCase(unittest.TestCase):
colval = pickle.loads(values[0][colname])
else :
colval = pickle.loads(bytes(values[0][colname], "iso8859-1"))
self.assertTrue(colval > 3.141)
self.assertTrue(colval < 3.142)
self.assertGreater(colval, 3.141)
self.assertLess(colval, 3.142)
def test02(self):

View File

@ -79,7 +79,7 @@ class DBTxn_distributed(unittest.TestCase):
recovered_txns=self.dbenv.txn_recover()
self.assertEqual(self.num_txns,len(recovered_txns))
for gid,txn in recovered_txns :
self.assertTrue(gid in txns)
self.assertIn(gid, txns)
del txn
del recovered_txns
@ -122,7 +122,7 @@ class DBTxn_distributed(unittest.TestCase):
# Be sure there are not pending transactions.
# Check also database size.
recovered_txns=self.dbenv.txn_recover()
self.assertTrue(len(recovered_txns)==0)
self.assertEqual(len(recovered_txns), 0)
self.assertEqual(len(committed_txns),self.db.stat()["nkeys"])
class DBTxn_distributedSYNC(DBTxn_distributed):

View File

@ -129,7 +129,7 @@ class LockingTestCase(unittest.TestCase):
end_time=time.time()
deadlock_detection.end=True
# Floating point rounding
self.assertTrue((end_time-start_time) >= 0.0999)
self.assertGreaterEqual(end_time-start_time, 0.0999)
self.env.lock_put(lock)
t.join()
@ -137,7 +137,7 @@ class LockingTestCase(unittest.TestCase):
self.env.lock_id_free(anID2)
if db.version() >= (4,6):
self.assertTrue(deadlock_detection.count>0)
self.assertGreater(deadlock_detection.count, 0)
def theThread(self, lockType):
import sys

View File

@ -25,7 +25,7 @@ class MiscTestCase(unittest.TestCase):
def test02_db_home(self):
env = db.DBEnv()
# check for crash fixed when db_home is used before open()
self.assertTrue(env.db_home is None)
self.assertIsNone(env.db_home)
env.open(self.homeDir, db.DB_CREATE)
if sys.version_info[0] < 3 :
self.assertEqual(self.homeDir, env.db_home)

View File

@ -18,7 +18,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
def assertIsInstance(self, obj, datatype, msg=None) :
return self.assertEqual(type(obj), datatype, msg=msg)
def assertGreaterEqual(self, a, b, msg=None) :
return self.assertTrue(a>=b, msg=msg)
return self.assertGreaterEqual(a, b, msg=msg)
def setUp(self):

View File

@ -186,20 +186,18 @@ class DBReplicationManager(DBReplication) :
d = d.values()[0] # There is only one
self.assertEqual(d[0], "127.0.0.1")
self.assertEqual(d[1], client_port)
self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
(d[2]==db.DB_REPMGR_DISCONNECTED))
self.assertIn(d[2], (db.DB_REPMGR_CONNECTED, db.DB_REPMGR_DISCONNECTED))
d = self.dbenvClient.repmgr_site_list()
self.assertEqual(len(d), 1)
d = d.values()[0] # There is only one
self.assertEqual(d[0], "127.0.0.1")
self.assertEqual(d[1], master_port)
self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
(d[2]==db.DB_REPMGR_DISCONNECTED))
self.assertIn(d[2], (db.DB_REPMGR_CONNECTED, db.DB_REPMGR_DISCONNECTED))
if db.version() >= (4,6) :
d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
self.assertTrue("msgs_queued" in d)
self.assertIn("msgs_queued", d)
self.dbMaster=db.DB(self.dbenvMaster)
txn=self.dbenvMaster.txn_begin()
@ -247,7 +245,7 @@ class DBReplicationManager(DBReplication) :
if time.time()>=timeout and startup_timeout:
self.skipTest("replication test skipped due to random failure, "
"see issue 3892")
self.assertTrue(time.time()<timeout)
self.assertLess(time.time(), timeout)
self.assertEqual("123", v)
txn=self.dbenvMaster.txn_begin()
@ -260,7 +258,7 @@ class DBReplicationManager(DBReplication) :
txn.commit()
if v is None :
time.sleep(0.02)
self.assertTrue(time.time()<timeout)
self.assertLess(time.time(), timeout)
self.assertEqual(None, v)
class DBBaseReplication(DBReplication) :
@ -381,7 +379,7 @@ class DBBaseReplication(DBReplication) :
while (time.time()<timeout) and not (self.confirmed_master and
self.client_startupdone) :
time.sleep(0.02)
self.assertTrue(time.time()<timeout)
self.assertLess(time.time(), timeout)
self.dbMaster=db.DB(self.dbenvMaster)
txn=self.dbenvMaster.txn_begin()
@ -410,7 +408,7 @@ class DBBaseReplication(DBReplication) :
break
d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
self.assertTrue("master_changes" in d)
self.assertIn("master_changes", d)
txn=self.dbenvMaster.txn_begin()
self.dbMaster.put("ABC", "123", txn=txn)
@ -424,7 +422,7 @@ class DBBaseReplication(DBReplication) :
txn.commit()
if v is None :
time.sleep(0.02)
self.assertTrue(time.time()<timeout)
self.assertLess(time.time(), timeout)
self.assertEqual("123", v)
txn=self.dbenvMaster.txn_begin()
@ -437,7 +435,7 @@ class DBBaseReplication(DBReplication) :
txn.commit()
if v is None :
time.sleep(0.02)
self.assertTrue(time.time()<timeout)
self.assertLess(time.time(), timeout)
self.assertEqual(None, v)
if db.version() >= (4,7) :

View File

@ -82,7 +82,7 @@ class DBSequenceTest(unittest.TestCase):
stat = self.seq.stat()
for param in ('nowait', 'min', 'max', 'value', 'current',
'flags', 'cache_size', 'last_value', 'wait'):
self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
self.assertIn(param, stat, "parameter %s isn't in stat info" % param)
if db.version() >= (4,7) :
# This code checks a crash solved in Berkeley DB 4.7

View File

@ -85,7 +85,7 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
self.assertEqual(records_per_writer%readers_per_writer, 0)
readers = []
for x in xrange(self.readers):
@ -213,7 +213,7 @@ class SimpleThreadedBase(BaseThreadedTestCase):
readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
self.assertEqual(records_per_writer%readers_per_writer, 0)
readers = []
for x in xrange(self.readers):
@ -339,7 +339,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
self.assertEqual(records_per_writer%readers_per_writer, 0)
readers=[]
for x in xrange(self.readers):

View File

@ -172,7 +172,7 @@ class TestBSDDB(unittest.TestCase):
def test_first_while_deleting(self):
# Test for bug 1725856
self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
self.assertGreaterEqual(len(self.d), 2, "test requires >=2 items")
for _ in self.d:
key = self.f.first()[0]
del self.f[key]
@ -180,7 +180,7 @@ class TestBSDDB(unittest.TestCase):
def test_last_while_deleting(self):
# Test for bug 1725856's evil twin
self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
self.assertGreaterEqual(len(self.d), 2, "test requires >=2 items")
for _ in self.d:
key = self.f.last()[0]
del self.f[key]
@ -197,7 +197,7 @@ class TestBSDDB(unittest.TestCase):
def test_has_key(self):
for k in self.d:
self.assertTrue(self.f.has_key(k))
self.assertTrue(not self.f.has_key('not here'))
self.assertFalse(self.f.has_key('not here'))
def test_clear(self):
self.f.clear()
@ -271,7 +271,7 @@ class TestBSDDB(unittest.TestCase):
self.assertEqual(nc1, nc2)
self.assertEqual(nc1, nc4)
self.assertTrue(nc3 == nc1+1)
self.assertEqual(nc3, nc1+1)
def test_popitem(self):
k, v = self.f.popitem()