Added stub for the Queue module to be renamed in 3.0.

Use the 3.0 module name to avoid spurious warnings.
This commit is contained in:
Alexandre Vassalotti 2008-05-11 19:39:48 +00:00
parent ca3ccd15ff
commit 30ece44f2e
8 changed files with 276 additions and 268 deletions

View File

@ -1,244 +1,8 @@
"""A multi-producer, multi-consumer queue.""" import sys
from warnings import warnpy3k
from time import time as _time warnpy3k("the Queue module has been renamed "
from collections import deque "to 'queue' in Python 3.0", stacklevel=2)
import heapq
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] import queue
sys.modules[__name__] = queue
class Empty(Exception):
"Exception raised by Queue.get(block=0)/get_nowait()."
pass
class Full(Exception):
"Exception raised by Queue.put(block=0)/put_nowait()."
pass
class Queue:
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
"""
def __init__(self, maxsize=0):
try:
import threading
except ImportError:
import dummy_threading as threading
self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
"""
self.all_tasks_done.acquire()
try:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notifyAll()
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
"""Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
self.all_tasks_done.acquire()
try:
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
self.mutex.acquire()
n = self._qsize()
self.mutex.release()
return n
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
self.mutex.acquire()
n = not self._qsize()
self.mutex.release()
return n
def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
self.mutex.acquire()
n = 0 < self.maxsize == self._qsize()
self.mutex.release()
return n
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, False)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
"""
return self.get(False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
def _qsize(self, len=len):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
class PriorityQueue(Queue):
'''Variant of Queue that retrieves open entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
'''
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''Variant of Queue that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()

View File

@ -35,7 +35,7 @@ import SocketServer
import struct import struct
import cPickle as pickle import cPickle as pickle
import threading import threading
import Queue import queue
import traceback import traceback
import copyreg import copyreg
import types import types
@ -117,8 +117,8 @@ class RPCServer(SocketServer.TCPServer):
#----------------- end class RPCServer -------------------- #----------------- end class RPCServer --------------------
objecttable = {} objecttable = {}
request_queue = Queue.Queue(0) request_queue = queue.Queue(0)
response_queue = Queue.Queue(0) response_queue = queue.Queue(0)
class SocketIO(object): class SocketIO(object):
@ -413,7 +413,7 @@ class SocketIO(object):
# send queued response if there is one available # send queued response if there is one available
try: try:
qmsg = response_queue.get(0) qmsg = response_queue.get(0)
except Queue.Empty: except queue.Empty:
pass pass
else: else:
seq, response = qmsg seq, response = qmsg

View File

@ -5,7 +5,7 @@ import socket
import traceback import traceback
import thread import thread
import threading import threading
import Queue import queue
import CallTips import CallTips
import AutoComplete import AutoComplete
@ -85,7 +85,7 @@ def main(del_exitfunc=False):
continue continue
try: try:
seq, request = rpc.request_queue.get(block=True, timeout=0.05) seq, request = rpc.request_queue.get(block=True, timeout=0.05)
except Queue.Empty: except queue.Empty:
continue continue
method, args, kwargs = request method, args, kwargs = request
ret = method(*args, **kwargs) ret = method(*args, **kwargs)

244
Lib/queue.py Normal file
View File

@ -0,0 +1,244 @@
"""A multi-producer, multi-consumer queue."""
from time import time as _time
from collections import deque
import heapq
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
class Empty(Exception):
"Exception raised by Queue.get(block=0)/get_nowait()."
pass
class Full(Exception):
"Exception raised by Queue.put(block=0)/put_nowait()."
pass
class Queue:
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
"""
def __init__(self, maxsize=0):
try:
import threading
except ImportError:
import dummy_threading as threading
self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
"""
self.all_tasks_done.acquire()
try:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notifyAll()
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
"""Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
self.all_tasks_done.acquire()
try:
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
self.mutex.acquire()
n = self._qsize()
self.mutex.release()
return n
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
self.mutex.acquire()
n = not self._qsize()
self.mutex.release()
return n
def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
self.mutex.acquire()
n = 0 < self.maxsize == self._qsize()
self.mutex.release()
return n
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, False)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
"""
return self.get(False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
def _qsize(self, len=len):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
class PriorityQueue(Queue):
'''Variant of Queue that retrieves open entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
'''
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''Variant of Queue that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()

View File

@ -7,7 +7,7 @@ implementation as its sole argument.
""" """
import dummy_thread as _thread import dummy_thread as _thread
import time import time
import Queue import queue
import random import random
import unittest import unittest
from test import test_support from test import test_support
@ -124,7 +124,7 @@ class ThreadTests(unittest.TestCase):
"""Use to test _thread.start_new_thread() passes args properly.""" """Use to test _thread.start_new_thread() passes args properly."""
queue.put((arg1, arg2)) queue.put((arg1, arg2))
testing_queue = Queue.Queue(1) testing_queue = queue.Queue(1)
_thread.start_new_thread(arg_tester, (testing_queue, True, True)) _thread.start_new_thread(arg_tester, (testing_queue, True, True))
result = testing_queue.get() result = testing_queue.get()
self.failUnless(result[0] and result[1], self.failUnless(result[0] and result[1],
@ -148,7 +148,7 @@ class ThreadTests(unittest.TestCase):
queue.put(_thread.get_ident()) queue.put(_thread.get_ident())
thread_count = 5 thread_count = 5
testing_queue = Queue.Queue(thread_count) testing_queue = queue.Queue(thread_count)
if test_support.verbose: if test_support.verbose:
print print
print "*** Testing multiple thread creation "\ print "*** Testing multiple thread creation "\

View File

@ -1,6 +1,6 @@
# Some simple Queue module tests, plus some failure conditions # Some simple queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable. # to ensure the Queue locks remain stable.
import Queue import queue
import sys import sys
import threading import threading
import time import time
@ -107,12 +107,12 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
try: try:
q.put("full", block=0) q.put("full", block=0)
self.fail("Didn't appear to block with a full queue") self.fail("Didn't appear to block with a full queue")
except Queue.Full: except queue.Full:
pass pass
try: try:
q.put("full", timeout=0.01) q.put("full", timeout=0.01)
self.fail("Didn't appear to time-out with a full queue") self.fail("Didn't appear to time-out with a full queue")
except Queue.Full: except queue.Full:
pass pass
# Test a blocking put # Test a blocking put
self.do_blocking_test(q.put, ("full",), q.get, ()) self.do_blocking_test(q.put, ("full",), q.get, ())
@ -124,12 +124,12 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
try: try:
q.get(block=0) q.get(block=0)
self.fail("Didn't appear to block with an empty queue") self.fail("Didn't appear to block with an empty queue")
except Queue.Empty: except queue.Empty:
pass pass
try: try:
q.get(timeout=0.01) q.get(timeout=0.01)
self.fail("Didn't appear to time-out with an empty queue") self.fail("Didn't appear to time-out with an empty queue")
except Queue.Empty: except queue.Empty:
pass pass
# Test a blocking get # Test a blocking get
self.do_blocking_test(q.get, (), q.put, ('empty',)) self.do_blocking_test(q.get, (), q.put, ('empty',))
@ -191,13 +191,13 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
class QueueTest(BaseQueueTest): class QueueTest(BaseQueueTest):
type2test = Queue.Queue type2test = queue.Queue
class LifoQueueTest(BaseQueueTest): class LifoQueueTest(BaseQueueTest):
type2test = Queue.LifoQueue type2test = queue.LifoQueue
class PriorityQueueTest(BaseQueueTest): class PriorityQueueTest(BaseQueueTest):
type2test = Queue.PriorityQueue type2test = queue.PriorityQueue
@ -205,21 +205,21 @@ class PriorityQueueTest(BaseQueueTest):
class FailingQueueException(Exception): class FailingQueueException(Exception):
pass pass
class FailingQueue(Queue.Queue): class FailingQueue(queue.Queue):
def __init__(self, *args): def __init__(self, *args):
self.fail_next_put = False self.fail_next_put = False
self.fail_next_get = False self.fail_next_get = False
Queue.Queue.__init__(self, *args) queue.Queue.__init__(self, *args)
def _put(self, item): def _put(self, item):
if self.fail_next_put: if self.fail_next_put:
self.fail_next_put = False self.fail_next_put = False
raise FailingQueueException, "You Lose" raise FailingQueueException, "You Lose"
return Queue.Queue._put(self, item) return queue.Queue._put(self, item)
def _get(self): def _get(self):
if self.fail_next_get: if self.fail_next_get:
self.fail_next_get = False self.fail_next_get = False
raise FailingQueueException, "You Lose" raise FailingQueueException, "You Lose"
return Queue.Queue._get(self) return queue.Queue._get(self)
class FailingQueueTest(unittest.TestCase, BlockingTestMixin): class FailingQueueTest(unittest.TestCase, BlockingTestMixin):

View File

@ -9,7 +9,7 @@ import select
import thread, threading import thread, threading
import time import time
import traceback import traceback
import Queue import queue
import sys import sys
import os import os
import array import array
@ -96,7 +96,7 @@ class ThreadableTest:
self.server_ready = threading.Event() self.server_ready = threading.Event()
self.client_ready = threading.Event() self.client_ready = threading.Event()
self.done = threading.Event() self.done = threading.Event()
self.queue = Queue.Queue(1) self.queue = queue.Queue(1)
# Do some munging to start the client test. # Do some munging to start the client test.
methodname = self.id() methodname = self.id()

View File

@ -10,7 +10,7 @@ from Tkinter import *
import websucker import websucker
import os import os
import threading import threading
import Queue import queue
import time import time
VERBOSE = 2 VERBOSE = 2
@ -139,7 +139,7 @@ class App:
def go(self, event=None): def go(self, event=None):
if not self.msgq: if not self.msgq:
self.msgq = Queue.Queue(0) self.msgq = queue.Queue(0)
self.check_msgq() self.check_msgq()
if not self.sucker: if not self.sucker:
self.sucker = SuckerThread(self.msgq) self.sucker = SuckerThread(self.msgq)