Cleanups, and conversion of assert to assertEqual()

This commit is contained in:
Barry Warsaw 2003-01-10 19:03:29 +00:00
parent 71731d7f70
commit a21bdeae51
1 changed files with 70 additions and 72 deletions

View File

@ -1,23 +1,31 @@
""" """TestCases for multi-threaded access to a DB.
TestCases for multi-threaded access to a DB.
""" """
import sys, os, string import os
import tempfile import sys
import time import time
import errno
import shutil
import tempfile
from pprint import pprint from pprint import pprint
from whrandom import random from whrandom import random
try: try:
from threading import Thread, currentThread True, False
have_threads = 1 except NameError:
except ImportError: True = 1
have_threads = 0 False = 0
DASH = '-'
try:
from threading import Thread, currentThread
have_threads = True
except ImportError:
have_threads = False
import unittest import unittest
from test_all import verbose from test_all import verbose
from bsddb import db, dbutils from bsddb import db, dbutils
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@ -28,15 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase):
dbsetflags = 0 dbsetflags = 0
envflags = 0 envflags = 0
def setUp(self): def setUp(self):
if verbose: if verbose:
dbutils._deadlock_VerboseFile = sys.stdout dbutils._deadlock_VerboseFile = sys.stdout
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir self.homeDir = homeDir
try: os.mkdir(homeDir) try:
except os.error: pass os.mkdir(homeDir)
except OSError, e:
if e.errno <> errno.EEXIST: raise
self.env = db.DBEnv() self.env = db.DBEnv()
self.setEnvOpts() self.setEnvOpts()
self.env.open(homeDir, self.envflags | db.DB_CREATE) self.env.open(homeDir, self.envflags | db.DB_CREATE)
@ -47,22 +56,16 @@ class BaseThreadedTestCase(unittest.TestCase):
self.d.set_flags(self.dbsetflags) self.d.set_flags(self.dbsetflags)
self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
def tearDown(self): def tearDown(self):
self.d.close() self.d.close()
self.env.close() self.env.close()
import glob shutil.rmtree(self.homeDir)
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def setEnvOpts(self): def setEnvOpts(self):
pass pass
def makeData(self, key): def makeData(self, key):
return string.join([key] * 5, '-') return DASH.join([key] * 5)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@ -75,7 +78,6 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
writers = 0 writers = 0
records = 1000 records = 1000
def test01_1WriterMultiReaders(self): def test01_1WriterMultiReaders(self):
if verbose: if verbose:
print '\n', '-=' * 30 print '\n', '-=' * 30
@ -102,11 +104,11 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
for t in threads: for t in threads:
t.join() t.join()
def writerThread(self, d, howMany, writerNum): def writerThread(self, d, howMany, writerNum):
#time.sleep(0.01 * writerNum + 0.01) #time.sleep(0.01 * writerNum + 0.01)
name = currentThread().getName() name = currentThread().getName()
start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
if verbose: if verbose:
print "%s: creating records %d - %d" % (name, start, stop) print "%s: creating records %d - %d" % (name, start, stop)
@ -117,7 +119,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
if verbose and x % 100 == 0: if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x) print "%s: records %d - %d finished" % (name, start, x)
if verbose: print "%s: finished creating records" % name if verbose:
print "%s: finished creating records" % name
## # Each write-cursor will be exclusive, the only one that can update the DB... ## # Each write-cursor will be exclusive, the only one that can update the DB...
## if verbose: print "%s: deleting a few records" % name ## if verbose: print "%s: deleting a few records" % name
@ -130,8 +133,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
## c.delete() ## c.delete()
## c.close() ## c.close()
if verbose: print "%s: thread finished" % name if verbose:
print "%s: thread finished" % name
def readerThread(self, d, readerNum): def readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum) time.sleep(0.01 * readerNum)
@ -142,16 +145,17 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
count = 0 count = 0
rec = c.first() rec = c.first()
while rec: while rec:
count = count + 1 count += 1
key, data = rec key, data = rec
assert self.makeData(key) == data self.assertEqual(self.makeData(key), data)
rec = c.next() rec = c.next()
if verbose: print "%s: found %d records" % (name, count) if verbose:
print "%s: found %d records" % (name, count)
c.close() c.close()
time.sleep(0.05) time.sleep(0.05)
if verbose: print "%s: thread finished" % name if verbose:
print "%s: thread finished" % name
class BTreeConcurrentDataStore(ConcurrentDataStoreBase): class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
@ -167,6 +171,7 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase):
readers = 10 readers = 10
records = 1000 records = 1000
#---------------------------------------------------------------------- #----------------------------------------------------------------------
class SimpleThreadedBase(BaseThreadedTestCase): class SimpleThreadedBase(BaseThreadedTestCase):
@ -176,11 +181,9 @@ class SimpleThreadedBase(BaseThreadedTestCase):
writers = 3 writers = 3
records = 1000 records = 1000
def setEnvOpts(self): def setEnvOpts(self):
self.env.set_lk_detect(db.DB_LOCK_DEFAULT) self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
def test02_SimpleLocks(self): def test02_SimpleLocks(self):
if verbose: if verbose:
print '\n', '-=' * 30 print '\n', '-=' * 30
@ -205,11 +208,10 @@ class SimpleThreadedBase(BaseThreadedTestCase):
for t in threads: for t in threads:
t.join() t.join()
def writerThread(self, d, howMany, writerNum): def writerThread(self, d, howMany, writerNum):
name = currentThread().getName() name = currentThread().getName()
start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
if verbose: if verbose:
print "%s: creating records %d - %d" % (name, start, stop) print "%s: creating records %d - %d" % (name, start, stop)
@ -227,7 +229,7 @@ class SimpleThreadedBase(BaseThreadedTestCase):
for y in xrange(start, x): for y in xrange(start, x):
key = '%04d' % x key = '%04d' % x
data = dbutils.DeadlockWrap(d.get, key, max_retries=12) data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
assert data == self.makeData(key) self.assertEqual(data, self.makeData(key))
# flush them # flush them
try: try:
@ -242,14 +244,14 @@ class SimpleThreadedBase(BaseThreadedTestCase):
data = dbutils.DeadlockWrap(d.get, key, max_retries=12) data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
if verbose and x % 100 == 0: if verbose and x % 100 == 0:
print "%s: fetched record (%s, %s)" % (name, key, data) print "%s: fetched record (%s, %s)" % (name, key, data)
assert data == self.makeData(key), (key, data, self.makeData(key)) self.assertEqual(data, self.makeData(key))
if random() <= 0.10: if random() <= 0.10:
dbutils.DeadlockWrap(d.delete, key, max_retries=12) dbutils.DeadlockWrap(d.delete, key, max_retries=12)
if verbose: if verbose:
print "%s: deleted record %s" % (name, key) print "%s: deleted record %s" % (name, key)
if verbose: print "%s: thread finished" % name if verbose:
print "%s: thread finished" % name
def readerThread(self, d, readerNum): def readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum) time.sleep(0.01 * readerNum)
@ -260,17 +262,17 @@ class SimpleThreadedBase(BaseThreadedTestCase):
count = 0 count = 0
rec = c.first() rec = c.first()
while rec: while rec:
count = count + 1 count += 1
key, data = rec key, data = rec
assert self.makeData(key) == data self.assertEqual(self.makeData(key), data)
rec = c.next() rec = c.next()
if verbose: print "%s: found %d records" % (name, count) if verbose:
print "%s: found %d records" % (name, count)
c.close() c.close()
time.sleep(0.05) time.sleep(0.05)
if verbose: print "%s: thread finished" % name if verbose:
print "%s: thread finished" % name
class BTreeSimpleThreaded(SimpleThreadedBase): class BTreeSimpleThreaded(SimpleThreadedBase):
@ -284,7 +286,6 @@ class HashSimpleThreaded(SimpleThreadedBase):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
class ThreadedTransactionsBase(BaseThreadedTestCase): class ThreadedTransactionsBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
envflags = (db.DB_THREAD | envflags = (db.DB_THREAD |
@ -296,15 +297,12 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
readers = 0 readers = 0
writers = 0 writers = 0
records = 2000 records = 2000
txnFlag = 0 txnFlag = 0
def setEnvOpts(self): def setEnvOpts(self):
#self.env.set_lk_detect(db.DB_LOCK_DEFAULT) #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
pass pass
def test03_ThreadedTransactions(self): def test03_ThreadedTransactions(self):
if verbose: if verbose:
print '\n', '-=' * 30 print '\n', '-=' * 30
@ -334,12 +332,11 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
for t in threads: for t in threads:
t.join() t.join()
self.doLockDetect = 0 self.doLockDetect = False
dt.join() dt.join()
def doWrite(self, d, name, start, stop): def doWrite(self, d, name, start, stop):
finished = 0 finished = False
while not finished: while not finished:
try: try:
txn = self.env.txn_begin(None, self.txnFlag) txn = self.env.txn_begin(None, self.txnFlag)
@ -349,18 +346,17 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
if verbose and x % 100 == 0: if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x) print "%s: records %d - %d finished" % (name, start, x)
txn.commit() txn.commit()
finished = 1 finished = True
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose: if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1]) print "%s: Aborting transaction (%s)" % (name, val[1])
txn.abort() txn.abort()
time.sleep(0.05) time.sleep(0.05)
def writerThread(self, d, howMany, writerNum): def writerThread(self, d, howMany, writerNum):
name = currentThread().getName() name = currentThread().getName()
start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
if verbose: if verbose:
print "%s: creating records %d - %d" % (name, start, stop) print "%s: creating records %d - %d" % (name, start, stop)
@ -368,10 +364,12 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
for x in range(start, stop, step): for x in range(start, stop, step):
self.doWrite(d, name, x, min(stop, x+step)) self.doWrite(d, name, x, min(stop, x+step))
if verbose: print "%s: finished creating records" % name if verbose:
if verbose: print "%s: deleting a few records" % name print "%s: finished creating records" % name
if verbose:
print "%s: deleting a few records" % name
finished = 0 finished = False
while not finished: while not finished:
try: try:
recs = [] recs = []
@ -384,23 +382,24 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
d.delete(key, txn) d.delete(key, txn)
recs.append(key) recs.append(key)
txn.commit() txn.commit()
finished = 1 finished = True
if verbose: print "%s: deleted records %s" % (name, recs) if verbose:
print "%s: deleted records %s" % (name, recs)
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose: if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1]) print "%s: Aborting transaction (%s)" % (name, val[1])
txn.abort() txn.abort()
time.sleep(0.05) time.sleep(0.05)
if verbose: print "%s: thread finished" % name if verbose:
print "%s: thread finished" % name
def readerThread(self, d, readerNum): def readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum + 0.05) time.sleep(0.01 * readerNum + 0.05)
name = currentThread().getName() name = currentThread().getName()
for loop in range(5): for loop in range(5):
finished = 0 finished = False
while not finished: while not finished:
try: try:
txn = self.env.txn_begin(None, self.txnFlag) txn = self.env.txn_begin(None, self.txnFlag)
@ -408,14 +407,14 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
count = 0 count = 0
rec = c.first() rec = c.first()
while rec: while rec:
count = count + 1 count += 1
key, data = rec key, data = rec
assert self.makeData(key) == data self.assertEqual(self.makeData(key), data)
rec = c.next() rec = c.next()
if verbose: print "%s: found %d records" % (name, count) if verbose: print "%s: found %d records" % (name, count)
c.close() c.close()
txn.commit() txn.commit()
finished = 1 finished = True
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose: if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1]) print "%s: Aborting transaction (%s)" % (name, val[1])
@ -425,11 +424,11 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
time.sleep(0.05) time.sleep(0.05)
if verbose: print "%s: thread finished" % name if verbose:
print "%s: thread finished" % name
def deadlockThread(self): def deadlockThread(self):
self.doLockDetect = 1 self.doLockDetect = True
while self.doLockDetect: while self.doLockDetect:
time.sleep(0.5) time.sleep(0.5)
try: try:
@ -442,7 +441,6 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
pass pass
class BTreeThreadedTransactions(ThreadedTransactionsBase): class BTreeThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE dbtype = db.DB_BTREE
writers = 3 writers = 3