From a6970580d14d1d024062522dc86f1c346b913164 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Wed, 18 May 1994 08:14:04 +0000 Subject: [PATCH] Added optional lock parameter to condition class. Added mrsw (multiple-reader-single-writer) lock. --- Demo/threads/sync.py | 121 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 114 insertions(+), 7 deletions(-) diff --git a/Demo/threads/sync.py b/Demo/threads/sync.py index 4e999798a25..6cdf3e8f888 100644 --- a/Demo/threads/sync.py +++ b/Demo/threads/sync.py @@ -1,16 +1,23 @@ # Defines classes that provide synchronization objects. Note that use of # this module requires that your Python support threads. # -# condition() # a POSIX-like condition-variable object -# barrier(n) # an n-thread barrier -# event() # an event object -# semaphore(n=1)# a semaphore object, with initial count n +# condition(lock=None) # a POSIX-like condition-variable object +# barrier(n) # an n-thread barrier +# event() # an event object +# semaphore(n=1) # a semaphore object, with initial count n +# mrsw() # a multiple-reader single-writer lock # # CONDITIONS # # A condition object is created via # import this_module -# your_condition_object = this_module.condition() +# your_condition_object = this_module.condition(lock=None) +# +# As explained below, a condition object has a lock associated with it, +# used in the protocol to protect condition data. You can specify a +# lock to use in the constructor, else the constructor will allocate +# an anonymous lock for you. Specifying a lock explicitly can be useful +# when more than one condition keys off the same set of shared data. # # Methods: # .acquire() @@ -209,13 +216,63 @@ # any) blocked by a .p(). It's an (detected) error for a .v() to # increase the semaphore's count to a value larger than the initial # count. +# +# MULTIPLE-READER SINGLE-WRITER LOCKS +# +# A mrsw lock is created via +# import this_module +# your_mrsw_lock = this_module.mrsw() +# +# This kind of lock is often useful with complex shared data structures. +# The object lets any number of "readers" proceed, so long as no thread +# wishes to "write". When a (one or more) thread declares its intention +# to "write" (e.g., to update a shared structure), all current readers +# are allowed to finish, and then a writer gets exclusive access; all +# other readers & writers are blocked until the current writer completes. +# Finally, if some thread is waiting to write and another is waiting to +# read, the writer takes precedence. +# +# Methods: +# +# .read_in() +# If no thread is writing or waiting to write, returns immediately. +# Else blocks until no thread is writing or waiting to write. So +# long as some thread has completed a .read_in but not a .read_out, +# writers are blocked. +# +# .read_out() +# Use sometime after a .read_in to declare that the thread is done +# reading. When all threads complete reading, a writer can proceed. +# +# .write_in() +# If no thread is writing (has completed a .write_in, but hasn't yet +# done a .write_out) or reading (similarly), returns immediately. +# Else blocks the calling thread, and threads waiting to read, until +# the current writer completes writing or all the current readers +# complete reading; if then more than one thread is waiting to +# write, one of them is allowed to proceed, but which one is not +# specified. +# +# .write_out() +# Use sometime after a .write_in to declare that the thread is done +# writing. Then if some other thread is waiting to write, it's +# allowed to proceed. Else all threads (if any) waiting to read are +# allowed to proceed. import thread class condition: - def __init__(self): + def __init__(self, lock=None): # the lock actually used by .acquire() and .release() - self.mutex = thread.allocate_lock() + if lock is None: + self.mutex = thread.allocate_lock() + else: + if hasattr(lock, 'acquire') and \ + hasattr(lock, 'release'): + self.mutex = lock + else: + raise TypeError, 'condition constructor requires ' \ + 'a lock argument' # lock used to block threads until a signal self.checkout = thread.allocate_lock() @@ -357,6 +414,56 @@ class semaphore: self.nonzero.signal() self.nonzero.release() +class mrsw: + def __init__(self): + # critical-section lock & the data it protects + self.rwOK = thread.allocate_lock() + self.nr = 0 # number readers actively reading (not just waiting) + self.nw = 0 # number writers either waiting to write or writing + self.writing = 0 # 1 iff some thread is writing + + # conditions + self.readOK = condition(self.rwOK) # OK to unblock readers + self.writeOK = condition(self.rwOK) # OK to unblock writers + + def read_in(self): + self.rwOK.acquire() + while self.nw: + self.readOK.wait() + self.nr = self.nr + 1 + self.rwOK.release() + + def read_out(self): + self.rwOK.acquire() + if self.nr <= 0: + raise ValueError, \ + '.read_out() invoked without an active reader' + self.nr = self.nr - 1 + if self.nr == 0: + self.writeOK.signal() + self.rwOK.release() + + def write_in(self): + self.rwOK.acquire() + self.nw = self.nw + 1 + while self.writing or self.nr: + self.writeOK.wait() + self.writing = 1 + self.rwOK.release() + + def write_out(self): + self.rwOK.acquire() + if not self.writing: + raise ValueError, \ + '.write_out() invoked without an active writer' + self.writing = 0 + self.nw = self.nw - 1 + if self.nw: + self.writeOK.signal() + else: + self.readOK.broadcast() + self.rwOK.release() + # The rest of the file is a test case, that runs a number of parallelized # quicksorts in parallel. If it works, you'll get about 600 lines of # tracing output, with a line like