cpython/Lib/bsddb/test/test_thread.py

516 lines
16 KiB
Python

"""TestCases for multi-threaded access to a DB.
"""
import os
import sys
import time
import errno
import shutil
import tempfile
from pprint import pprint
from random import random
DASH = b'-'
try:
from threading import Thread, currentThread
have_threads = True
except ImportError:
have_threads = False
try:
WindowsError
except NameError:
class WindowsError(Exception):
pass
import unittest
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
from bsddb3 import db, dbutils
except ImportError:
# For Python 2.3
from bsddb import db, dbutils
#----------------------------------------------------------------------
class BaseThreadedTestCase(unittest.TestCase):
dbtype = db.DB_UNKNOWN # must be set in derived class
dbopenflags = 0
dbsetflags = 0
envflags = 0
def setUp(self):
if verbose:
dbutils._deadlock_VerboseFile = sys.stdout
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.setEnvOpts()
self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
self.filename = self.__class__.__name__ + '.db'
self.d = db.DB(self.env)
if self.dbsetflags:
self.d.set_flags(self.dbsetflags)
self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
def tearDown(self):
self.d.close()
self.env.close()
shutil.rmtree(self.homeDir)
def setEnvOpts(self):
pass
def makeData(self, key):
return DASH.join([key] * 5)
def _writerThread(self, *args, **kwargs):
raise RuntimeError("must override this in a subclass")
def _readerThread(self, *args, **kwargs):
raise RuntimeError("must override this in a subclass")
def writerThread(self, *args, **kwargs):
try:
self._writerThread(*args, **kwargs)
except db.DBLockDeadlockError:
if verbose:
print(currentThread().getName(), 'died from', e)
else:
if verbose:
print(currentThread().getName(), "finished.")
def readerThread(self, *args, **kwargs):
try:
self._readerThread(*args, **kwargs)
except db.DBLockDeadlockError as e:
if verbose:
print(currentThread().getName(), 'died from', e)
else:
if verbose:
print(currentThread().getName(), "finished.")
#----------------------------------------------------------------------
class ConcurrentDataStoreBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD
envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
readers = 0 # derived class should set
writers = 0
records = 1000
def test01_1WriterMultiReaders(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test01_1WriterMultiReaders..." % \
self.__class__.__name__)
print('Using:', self.homeDir, self.filename)
threads = []
wt = Thread(target = self.writerThread,
args = (self.d, self.records),
name = 'the writer',
)#verbose = verbose)
threads.append(wt)
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
threads.append(rt)
for t in threads:
t.start()
for t in threads:
t.join()
def _writerThread(self, d, howMany):
name = currentThread().getName()
start = 0
stop = howMany
if verbose:
print(name+": creating records", start, "-", stop)
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
d.put(key, self.makeData(key))
if verbose and x > start and x % 50 == 0:
print(name+": records", start, "-", x, "finished")
if verbose:
print("%s: finished creating records" % name)
## # Each write-cursor will be exclusive, the only one that can update the DB...
## if verbose: print "%s: deleting a few records" % name
## c = d.cursor(flags = db.DB_WRITECURSOR)
## for x in range(10):
## key = int(random() * howMany) + start
## key = '%04d' % key
## if d.has_key(key):
## c.set(key)
## c.delete()
## c.close()
def _readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum)
name = currentThread().getName()
for loop in range(5):
c = d.cursor()
count = 0
rec = c.first()
while rec:
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
rec = c.next()
if verbose:
print(name+": found", count, "records")
c.close()
time.sleep(0.05)
class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
dbtype = db.DB_BTREE
writers = 1
readers = 10
records = 1000
class HashConcurrentDataStore(ConcurrentDataStoreBase):
dbtype = db.DB_HASH
writers = 1
readers = 0
records = 1000
#----------------------------------------------------------------------
class SimpleThreadedBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
readers = 5
writers = 3
records = 1000
def setEnvOpts(self):
self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
def test02_SimpleLocks(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test02_SimpleLocks..." % self.__class__.__name__)
threads = []
for x in range(self.writers):
wt = Thread(target = self.writerThread,
args = (self.d, self.records, x),
name = 'writer %d' % x,
)#verbose = verbose)
threads.append(wt)
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
threads.append(rt)
for t in threads:
t.start()
for t in threads:
t.join()
def _writerThread(self, d, howMany, writerNum):
name = currentThread().getName()
start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
if verbose:
print("%s: creating records %d - %d" % (name, start, stop))
# create a bunch of records
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
max_retries=20)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
# do a bit or reading too
if random() <= 0.05:
for y in range(start, x):
key = ('%04d' % x).encode("ascii")
data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
self.assertEqual(data, self.makeData(key))
# flush them
try:
dbutils.DeadlockWrap(d.sync, max_retries=20)
except db.DBIncompleteError as val:
if verbose:
print("could not complete sync()...")
# read them back, deleting a few
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
if verbose and x % 100 == 0:
print("%s: fetched record (%s, %s)" % (name, key, data))
self.assertEqual(data, self.makeData(key))
if random() <= 0.10:
dbutils.DeadlockWrap(d.delete, key, max_retries=20)
if verbose:
print("%s: deleted record %s" % (name, key))
if verbose:
print("%s: thread finished" % name)
def _readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum)
name = currentThread().getName()
for loop in range(5):
c = d.cursor()
count = 0
rec = dbutils.DeadlockWrap(c.first, max_retries=20)
while rec:
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
rec = dbutils.DeadlockWrap(c.next, max_retries=20)
if verbose:
print("%s: found %d records" % (name, count))
c.close()
time.sleep(0.05)
if verbose:
print("%s: thread finished" % name)
class BTreeSimpleThreaded(SimpleThreadedBase):
dbtype = db.DB_BTREE
class HashSimpleThreaded(SimpleThreadedBase):
dbtype = db.DB_HASH
#----------------------------------------------------------------------
class ThreadedTransactionsBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
envflags = (db.DB_THREAD |
db.DB_INIT_MPOOL |
db.DB_INIT_LOCK |
db.DB_INIT_LOG |
db.DB_INIT_TXN
)
readers = 0
writers = 0
records = 2000
txnFlag = 0
def setEnvOpts(self):
#self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
pass
def test03_ThreadedTransactions(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test03_ThreadedTransactions..." % \
self.__class__.__name__)
threads = []
for x in range(self.writers):
wt = Thread(target = self.writerThread,
args = (self.d, self.records, x),
name = 'writer %d' % x,
)#verbose = verbose)
threads.append(wt)
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
threads.append(rt)
dt = Thread(target = self.deadlockThread)
dt.start()
for t in threads:
t.start()
for t in threads:
t.join()
self.doLockDetect = False
dt.join()
def doWrite(self, d, name, start, stop):
finished = False
while not finished:
try:
txn = self.env.txn_begin(None, self.txnFlag)
for x in range(start, stop):
key = ('%04d' % x).encode("ascii")
d.put(key, self.makeData(key), txn)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
txn.commit()
finished = True
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
print("%s: Aborting transaction (%s)" % (name, val))
txn.abort()
time.sleep(0.05)
def _writerThread(self, d, howMany, writerNum):
name = currentThread().getName()
start = howMany * writerNum
stop = howMany * (writerNum + 1) - 1
if verbose:
print("%s: creating records %d - %d" % (name, start, stop))
step = 100
for x in range(start, stop, step):
self.doWrite(d, name, x, min(stop, x+step))
if verbose:
print("%s: finished creating records" % name)
if verbose:
print("%s: deleting a few records" % name)
finished = False
while not finished:
try:
recs = []
txn = self.env.txn_begin(None, self.txnFlag)
for x in range(10):
key = int(random() * howMany) + start
key = ('%04d' % key).encode("ascii")
data = d.get(key, None, txn, db.DB_RMW)
if data is not None:
d.delete(key, txn)
recs.append(key)
txn.commit()
finished = True
if verbose:
print("%s: deleted records %s" % (name, recs))
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
print("%s: Aborting transaction (%s)" % (name, val))
txn.abort()
time.sleep(0.05)
if verbose:
print("%s: thread finished" % name)
def _readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum + 0.05)
name = currentThread().getName()
for loop in range(5):
finished = False
while not finished:
try:
txn = self.env.txn_begin(None, self.txnFlag)
c = d.cursor(txn)
count = 0
rec = c.first()
while rec:
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
rec = c.next()
if verbose: print("%s: found %d records" % (name, count))
c.close()
txn.commit()
finished = True
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
print("%s: Aborting transaction (%s)" % (name, val))
c.close()
txn.abort()
time.sleep(0.05)
time.sleep(0.05)
if verbose:
print("%s: thread finished" % name)
def deadlockThread(self):
self.doLockDetect = True
while self.doLockDetect:
time.sleep(0.5)
try:
aborted = self.env.lock_detect(
db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
if verbose and aborted:
print("deadlock: Aborted %d deadlocked transaction(s)" \
% aborted)
except db.DBError:
pass
class BTreeThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
writers = 3
readers = 5
records = 2000
class HashThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
writers = 1
readers = 5
records = 2000
class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
writers = 3
readers = 5
records = 2000
txnFlag = db.DB_TXN_NOWAIT
class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
writers = 1
readers = 5
records = 2000
txnFlag = db.DB_TXN_NOWAIT
#----------------------------------------------------------------------
def test_suite():
suite = unittest.TestSuite()
if have_threads:
suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
suite.addTest(unittest.makeSuite(HashSimpleThreaded))
suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
suite.addTest(unittest.makeSuite(HashThreadedTransactions))
suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
else:
print("Threads not available, skipping thread tests.")
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')