diff --git a/Lib/Queue.py b/Lib/Queue.py new file mode 100644 index 00000000000..eb089b88a58 --- /dev/null +++ b/Lib/Queue.py @@ -0,0 +1,122 @@ +# A multi-producer, multi-consumer queue. + +Empty = 'Queue.Empty' # Exception raised by get_nowait() + +class Queue: + + # Initialize a queue object with a given maximum size + # (If maxsize is <= 0, the maximum size is infinite) + def init(self, maxsize): + import thread + self._init(maxsize) + self.mutex = thread.allocate_lock() + self.esema = thread.allocate_lock() + self.esema.acquire_lock() + self.fsema = thread.allocate_lock() + return self + + # Get an approximation of the queue size (not reliable!) + def qsize(self): + self.mutex.acquire_lock() + n = self._qsize() + self.mutex.release_lock() + return n + + # Check if the queue is empty (not reliable!) + def empty(self): + self.mutex.acquire_lock() + n = self._empty() + self.mutex.release_lock() + return n + + # Check if the queue is full (not reliable!) + def full(self): + self.mutex.acquire_lock() + n = self._full() + self.mutex.release_lock() + return n + + # Put a new item into the queue + def put(self, item): + self.fsema.acquire_lock() + self.mutex.acquire_lock() + was_empty = self._empty() + self._put(item) + if was_empty: + self.esema.release_lock() + if not self._full(): + self.fsema.release_lock() + self.mutex.release_lock() + + # Get an item from the queue, + # blocking if necessary until one is available + def get(self): + self.esema.acquire_lock() + self.mutex.acquire_lock() + was_full = self._full() + item = self._get() + if was_full: + self.fsema.release_lock() + if not self._empty(): + self.esema.release_lock() + self.mutex.release_lock() + return item + + # Get an item from the queue if one is immediately available, + # raise Empty if the queue is empty or temporarily unavailable + def get_nowait(self): + locked = self.esema.acquire_lock(0) + self.mutex.acquire_lock() + if self._empty(): + # The queue is empyt -- we can't have esema + self.mutex.release_lock() + raise Empty + if not locked: + locked = self.esema.acquire_lock(0) + if not locked: + # Somebody else has esema + # but we have mutex -- + # go out of their way + self.mutex.release_lock() + raise Empty + was_full = self._full() + item = self._get() + if was_full: + self.fsema.release_lock() + if not self._empty(): + self.esema.release_lock() + self.mutex.release_lock() + return item + + # XXX Need to define put_nowait() as well. + + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.maxsize = maxsize + self.queue = [] + + def _qsize(self): + return len(self.queue) + + # Check wheter the queue is empty + def _empty(self): + return not self.queue + + # Check whether the queue is full + def _full(self): + return self.maxsize > 0 and len(self.queue) == self.maxsize + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + item = self.queue[0] + del self.queue[0] + return item