Added optional lock parameter to condition class.
Added mrsw (multiple-reader-single-writer) lock.
This commit is contained in:
parent
c95f7248d6
commit
a6970580d1
|
@ -1,16 +1,23 @@
|
|||
# Defines classes that provide synchronization objects. Note that use of
|
||||
# this module requires that your Python support threads.
|
||||
#
|
||||
# condition() # a POSIX-like condition-variable object
|
||||
# condition(lock=None) # a POSIX-like condition-variable object
|
||||
# barrier(n) # an n-thread barrier
|
||||
# event() # an event object
|
||||
# semaphore(n=1)# a semaphore object, with initial count n
|
||||
# semaphore(n=1) # a semaphore object, with initial count n
|
||||
# mrsw() # a multiple-reader single-writer lock
|
||||
#
|
||||
# CONDITIONS
|
||||
#
|
||||
# A condition object is created via
|
||||
# import this_module
|
||||
# your_condition_object = this_module.condition()
|
||||
# your_condition_object = this_module.condition(lock=None)
|
||||
#
|
||||
# As explained below, a condition object has a lock associated with it,
|
||||
# used in the protocol to protect condition data. You can specify a
|
||||
# lock to use in the constructor, else the constructor will allocate
|
||||
# an anonymous lock for you. Specifying a lock explicitly can be useful
|
||||
# when more than one condition keys off the same set of shared data.
|
||||
#
|
||||
# Methods:
|
||||
# .acquire()
|
||||
|
@ -209,13 +216,63 @@
|
|||
# any) blocked by a .p(). It's an (detected) error for a .v() to
|
||||
# increase the semaphore's count to a value larger than the initial
|
||||
# count.
|
||||
#
|
||||
# MULTIPLE-READER SINGLE-WRITER LOCKS
|
||||
#
|
||||
# A mrsw lock is created via
|
||||
# import this_module
|
||||
# your_mrsw_lock = this_module.mrsw()
|
||||
#
|
||||
# This kind of lock is often useful with complex shared data structures.
|
||||
# The object lets any number of "readers" proceed, so long as no thread
|
||||
# wishes to "write". When a (one or more) thread declares its intention
|
||||
# to "write" (e.g., to update a shared structure), all current readers
|
||||
# are allowed to finish, and then a writer gets exclusive access; all
|
||||
# other readers & writers are blocked until the current writer completes.
|
||||
# Finally, if some thread is waiting to write and another is waiting to
|
||||
# read, the writer takes precedence.
|
||||
#
|
||||
# Methods:
|
||||
#
|
||||
# .read_in()
|
||||
# If no thread is writing or waiting to write, returns immediately.
|
||||
# Else blocks until no thread is writing or waiting to write. So
|
||||
# long as some thread has completed a .read_in but not a .read_out,
|
||||
# writers are blocked.
|
||||
#
|
||||
# .read_out()
|
||||
# Use sometime after a .read_in to declare that the thread is done
|
||||
# reading. When all threads complete reading, a writer can proceed.
|
||||
#
|
||||
# .write_in()
|
||||
# If no thread is writing (has completed a .write_in, but hasn't yet
|
||||
# done a .write_out) or reading (similarly), returns immediately.
|
||||
# Else blocks the calling thread, and threads waiting to read, until
|
||||
# the current writer completes writing or all the current readers
|
||||
# complete reading; if then more than one thread is waiting to
|
||||
# write, one of them is allowed to proceed, but which one is not
|
||||
# specified.
|
||||
#
|
||||
# .write_out()
|
||||
# Use sometime after a .write_in to declare that the thread is done
|
||||
# writing. Then if some other thread is waiting to write, it's
|
||||
# allowed to proceed. Else all threads (if any) waiting to read are
|
||||
# allowed to proceed.
|
||||
|
||||
import thread
|
||||
|
||||
class condition:
|
||||
def __init__(self):
|
||||
def __init__(self, lock=None):
|
||||
# the lock actually used by .acquire() and .release()
|
||||
if lock is None:
|
||||
self.mutex = thread.allocate_lock()
|
||||
else:
|
||||
if hasattr(lock, 'acquire') and \
|
||||
hasattr(lock, 'release'):
|
||||
self.mutex = lock
|
||||
else:
|
||||
raise TypeError, 'condition constructor requires ' \
|
||||
'a lock argument'
|
||||
|
||||
# lock used to block threads until a signal
|
||||
self.checkout = thread.allocate_lock()
|
||||
|
@ -357,6 +414,56 @@ class semaphore:
|
|||
self.nonzero.signal()
|
||||
self.nonzero.release()
|
||||
|
||||
class mrsw:
|
||||
def __init__(self):
|
||||
# critical-section lock & the data it protects
|
||||
self.rwOK = thread.allocate_lock()
|
||||
self.nr = 0 # number readers actively reading (not just waiting)
|
||||
self.nw = 0 # number writers either waiting to write or writing
|
||||
self.writing = 0 # 1 iff some thread is writing
|
||||
|
||||
# conditions
|
||||
self.readOK = condition(self.rwOK) # OK to unblock readers
|
||||
self.writeOK = condition(self.rwOK) # OK to unblock writers
|
||||
|
||||
def read_in(self):
|
||||
self.rwOK.acquire()
|
||||
while self.nw:
|
||||
self.readOK.wait()
|
||||
self.nr = self.nr + 1
|
||||
self.rwOK.release()
|
||||
|
||||
def read_out(self):
|
||||
self.rwOK.acquire()
|
||||
if self.nr <= 0:
|
||||
raise ValueError, \
|
||||
'.read_out() invoked without an active reader'
|
||||
self.nr = self.nr - 1
|
||||
if self.nr == 0:
|
||||
self.writeOK.signal()
|
||||
self.rwOK.release()
|
||||
|
||||
def write_in(self):
|
||||
self.rwOK.acquire()
|
||||
self.nw = self.nw + 1
|
||||
while self.writing or self.nr:
|
||||
self.writeOK.wait()
|
||||
self.writing = 1
|
||||
self.rwOK.release()
|
||||
|
||||
def write_out(self):
|
||||
self.rwOK.acquire()
|
||||
if not self.writing:
|
||||
raise ValueError, \
|
||||
'.write_out() invoked without an active writer'
|
||||
self.writing = 0
|
||||
self.nw = self.nw - 1
|
||||
if self.nw:
|
||||
self.writeOK.signal()
|
||||
else:
|
||||
self.readOK.broadcast()
|
||||
self.rwOK.release()
|
||||
|
||||
# The rest of the file is a test case, that runs a number of parallelized
|
||||
# quicksorts in parallel. If it works, you'll get about 600 lines of
|
||||
# tracing output, with a line like
|
||||
|
|
Loading…
Reference in New Issue