mirror of https://github.com/python/cpython
314 lines
9.4 KiB
Python
314 lines
9.4 KiB
Python
"""Cross-interpreter Queues High Level Module."""
|
|
|
|
import pickle
|
|
import queue
|
|
import time
|
|
import weakref
|
|
import _interpqueues as _queues
|
|
from . import _crossinterp
|
|
|
|
# aliases:
|
|
from _interpqueues import (
|
|
QueueError, QueueNotFoundError,
|
|
)
|
|
from ._crossinterp import (
|
|
UNBOUND_ERROR, UNBOUND_REMOVE,
|
|
)
|
|
|
|
__all__ = [
|
|
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
|
|
'create', 'list_all',
|
|
'Queue',
|
|
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
|
|
'ItemInterpreterDestroyed',
|
|
]
|
|
|
|
|
|
class QueueEmpty(QueueError, queue.Empty):
|
|
"""Raised from get_nowait() when the queue is empty.
|
|
|
|
It is also raised from get() if it times out.
|
|
"""
|
|
|
|
|
|
class QueueFull(QueueError, queue.Full):
|
|
"""Raised from put_nowait() when the queue is full.
|
|
|
|
It is also raised from put() if it times out.
|
|
"""
|
|
|
|
|
|
class ItemInterpreterDestroyed(QueueError,
|
|
_crossinterp.ItemInterpreterDestroyed):
|
|
"""Raised from get() and get_nowait()."""
|
|
|
|
|
|
_SHARED_ONLY = 0
|
|
_PICKLED = 1
|
|
|
|
|
|
UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
|
|
|
|
|
|
def _serialize_unbound(unbound):
|
|
if unbound is UNBOUND:
|
|
unbound = _crossinterp.UNBOUND
|
|
return _crossinterp.serialize_unbound(unbound)
|
|
|
|
|
|
def _resolve_unbound(flag):
|
|
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
|
|
if resolved is _crossinterp.UNBOUND:
|
|
resolved = UNBOUND
|
|
return resolved
|
|
|
|
|
|
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
|
|
"""Return a new cross-interpreter queue.
|
|
|
|
The queue may be used to pass data safely between interpreters.
|
|
|
|
"syncobj" sets the default for Queue.put()
|
|
and Queue.put_nowait().
|
|
|
|
"unbounditems" likewise sets the default. See Queue.put() for
|
|
supported values. The default value is UNBOUND, which replaces
|
|
the unbound item.
|
|
"""
|
|
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
|
unbound = _serialize_unbound(unbounditems)
|
|
unboundop, = unbound
|
|
qid = _queues.create(maxsize, fmt, unboundop)
|
|
return Queue(qid, _fmt=fmt, _unbound=unbound)
|
|
|
|
|
|
def list_all():
|
|
"""Return a list of all open queues."""
|
|
return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
|
|
for qid, fmt, unboundop in _queues.list_all()]
|
|
|
|
|
|
_known_queues = weakref.WeakValueDictionary()
|
|
|
|
class Queue:
|
|
"""A cross-interpreter queue."""
|
|
|
|
def __new__(cls, id, /, *, _fmt=None, _unbound=None):
|
|
# There is only one instance for any given ID.
|
|
if isinstance(id, int):
|
|
id = int(id)
|
|
else:
|
|
raise TypeError(f'id must be an int, got {id!r}')
|
|
if _fmt is None:
|
|
if _unbound is None:
|
|
_fmt, op = _queues.get_queue_defaults(id)
|
|
_unbound = (op,)
|
|
else:
|
|
_fmt, _ = _queues.get_queue_defaults(id)
|
|
elif _unbound is None:
|
|
_, op = _queues.get_queue_defaults(id)
|
|
_unbound = (op,)
|
|
try:
|
|
self = _known_queues[id]
|
|
except KeyError:
|
|
self = super().__new__(cls)
|
|
self._id = id
|
|
self._fmt = _fmt
|
|
self._unbound = _unbound
|
|
_known_queues[id] = self
|
|
_queues.bind(id)
|
|
return self
|
|
|
|
def __del__(self):
|
|
try:
|
|
_queues.release(self._id)
|
|
except QueueNotFoundError:
|
|
pass
|
|
try:
|
|
del _known_queues[self._id]
|
|
except KeyError:
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return f'{type(self).__name__}({self.id})'
|
|
|
|
def __hash__(self):
|
|
return hash(self._id)
|
|
|
|
# for pickling:
|
|
def __getnewargs__(self):
|
|
return (self._id,)
|
|
|
|
# for pickling:
|
|
def __getstate__(self):
|
|
return None
|
|
|
|
@property
|
|
def id(self):
|
|
return self._id
|
|
|
|
@property
|
|
def maxsize(self):
|
|
try:
|
|
return self._maxsize
|
|
except AttributeError:
|
|
self._maxsize = _queues.get_maxsize(self._id)
|
|
return self._maxsize
|
|
|
|
def empty(self):
|
|
return self.qsize() == 0
|
|
|
|
def full(self):
|
|
return _queues.is_full(self._id)
|
|
|
|
def qsize(self):
|
|
return _queues.get_count(self._id)
|
|
|
|
def put(self, obj, timeout=None, *,
|
|
syncobj=None,
|
|
unbound=None,
|
|
_delay=10 / 1000, # 10 milliseconds
|
|
):
|
|
"""Add the object to the queue.
|
|
|
|
This blocks while the queue is full.
|
|
|
|
If "syncobj" is None (the default) then it uses the
|
|
queue's default, set with create_queue().
|
|
|
|
If "syncobj" is false then all objects are supported,
|
|
at the expense of worse performance.
|
|
|
|
If "syncobj" is true then the object must be "shareable".
|
|
Examples of "shareable" objects include the builtin singletons,
|
|
str, and memoryview. One benefit is that such objects are
|
|
passed through the queue efficiently.
|
|
|
|
The key difference, though, is conceptual: the corresponding
|
|
object returned from Queue.get() will be strictly equivalent
|
|
to the given obj. In other words, the two objects will be
|
|
effectively indistinguishable from each other, even if the
|
|
object is mutable. The received object may actually be the
|
|
same object, or a copy (immutable values only), or a proxy.
|
|
Regardless, the received object should be treated as though
|
|
the original has been shared directly, whether or not it
|
|
actually is. That's a slightly different and stronger promise
|
|
than just (initial) equality, which is all "syncobj=False"
|
|
can promise.
|
|
|
|
"unbound" controls the behavior of Queue.get() for the given
|
|
object if the current interpreter (calling put()) is later
|
|
destroyed.
|
|
|
|
If "unbound" is None (the default) then it uses the
|
|
queue's default, set with create_queue(),
|
|
which is usually UNBOUND.
|
|
|
|
If "unbound" is UNBOUND_ERROR then get() will raise an
|
|
ItemInterpreterDestroyed exception if the original interpreter
|
|
has been destroyed. This does not otherwise affect the queue;
|
|
the next call to put() will work like normal, returning the next
|
|
item in the queue.
|
|
|
|
If "unbound" is UNBOUND_REMOVE then the item will be removed
|
|
from the queue as soon as the original interpreter is destroyed.
|
|
Be aware that this will introduce an imbalance between put()
|
|
and get() calls.
|
|
|
|
If "unbound" is UNBOUND then it is returned by get() in place
|
|
of the unbound item.
|
|
"""
|
|
if syncobj is None:
|
|
fmt = self._fmt
|
|
else:
|
|
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
|
if unbound is None:
|
|
unboundop, = self._unbound
|
|
else:
|
|
unboundop, = _serialize_unbound(unbound)
|
|
if timeout is not None:
|
|
timeout = int(timeout)
|
|
if timeout < 0:
|
|
raise ValueError(f'timeout value must be non-negative')
|
|
end = time.time() + timeout
|
|
if fmt is _PICKLED:
|
|
obj = pickle.dumps(obj)
|
|
while True:
|
|
try:
|
|
_queues.put(self._id, obj, fmt, unboundop)
|
|
except QueueFull as exc:
|
|
if timeout is not None and time.time() >= end:
|
|
raise # re-raise
|
|
time.sleep(_delay)
|
|
else:
|
|
break
|
|
|
|
def put_nowait(self, obj, *, syncobj=None, unbound=None):
|
|
if syncobj is None:
|
|
fmt = self._fmt
|
|
else:
|
|
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
|
if unbound is None:
|
|
unboundop, = self._unbound
|
|
else:
|
|
unboundop, = _serialize_unbound(unbound)
|
|
if fmt is _PICKLED:
|
|
obj = pickle.dumps(obj)
|
|
_queues.put(self._id, obj, fmt, unboundop)
|
|
|
|
def get(self, timeout=None, *,
|
|
_delay=10 / 1000, # 10 milliseconds
|
|
):
|
|
"""Return the next object from the queue.
|
|
|
|
This blocks while the queue is empty.
|
|
|
|
If the next item's original interpreter has been destroyed
|
|
then the "next object" is determined by the value of the
|
|
"unbound" argument to put().
|
|
"""
|
|
if timeout is not None:
|
|
timeout = int(timeout)
|
|
if timeout < 0:
|
|
raise ValueError(f'timeout value must be non-negative')
|
|
end = time.time() + timeout
|
|
while True:
|
|
try:
|
|
obj, fmt, unboundop = _queues.get(self._id)
|
|
except QueueEmpty as exc:
|
|
if timeout is not None and time.time() >= end:
|
|
raise # re-raise
|
|
time.sleep(_delay)
|
|
else:
|
|
break
|
|
if unboundop is not None:
|
|
assert obj is None, repr(obj)
|
|
return _resolve_unbound(unboundop)
|
|
if fmt == _PICKLED:
|
|
obj = pickle.loads(obj)
|
|
else:
|
|
assert fmt == _SHARED_ONLY
|
|
return obj
|
|
|
|
def get_nowait(self):
|
|
"""Return the next object from the channel.
|
|
|
|
If the queue is empty then raise QueueEmpty. Otherwise this
|
|
is the same as get().
|
|
"""
|
|
try:
|
|
obj, fmt, unboundop = _queues.get(self._id)
|
|
except QueueEmpty as exc:
|
|
raise # re-raise
|
|
if unboundop is not None:
|
|
assert obj is None, repr(obj)
|
|
return _resolve_unbound(unboundop)
|
|
if fmt == _PICKLED:
|
|
obj = pickle.loads(obj)
|
|
else:
|
|
assert fmt == _SHARED_ONLY
|
|
return obj
|
|
|
|
|
|
_queues._register_heap_types(Queue, QueueEmpty, QueueFull)
|