gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)

Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.

When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.

Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.) 
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.

My approach, here, to improving the situation is to give users three options:

1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)

The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.

The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)
This commit is contained in:
Eric Snow 2024-07-15 12:49:23 -06:00 committed by GitHub
parent 985dd8e17b
commit 6b98b274b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 513 additions and 90 deletions

View File

@ -12,9 +12,11 @@ from _interpqueues import (
)
__all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all',
'Queue',
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
'ItemInterpreterDestroyed',
]
@ -32,26 +34,90 @@ class QueueFull(QueueError, queue.Full):
"""
class ItemInterpreterDestroyed(QueueError):
"""Raised from get() and get_nowait()."""
_SHARED_ONLY = 0
_PICKLED = 1
def create(maxsize=0, *, syncobj=False):
class UnboundItem:
"""Represents a Queue item no longer bound to an interpreter.
An item is unbound when the interpreter that added it to the queue
is destroyed.
"""
__slots__ = ()
def __new__(cls):
return UNBOUND
def __repr__(self):
return f'interpreters.queues.UNBOUND'
UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()
_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
def _serialize_unbound(unbound):
op = unbound
try:
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,
def _resolve_unbound(flag):
try:
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
if op is UNBOUND_REMOVE:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
raise ItemInterpreterDestroyed("item's original interpreter destroyed")
elif op is UNBOUND:
return UNBOUND
else:
raise NotImplementedError(repr(op))
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
"syncobj" sets the default for Queue.put()
and Queue.put_nowait().
"unbounditems" likewise sets the default. See Queue.put() for
supported values. The default value is UNBOUND, which replaces
the unbound item.
"""
fmt = _SHARED_ONLY if syncobj else _PICKLED
qid = _queues.create(maxsize, fmt)
return Queue(qid, _fmt=fmt)
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
qid = _queues.create(maxsize, fmt, unboundop)
return Queue(qid, _fmt=fmt, _unbound=unbound)
def list_all():
"""Return a list of all open queues."""
return [Queue(qid, _fmt=fmt)
for qid, fmt in _queues.list_all()]
return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
for qid, fmt, unboundop in _queues.list_all()]
_known_queues = weakref.WeakValueDictionary()
@ -59,20 +125,28 @@ _known_queues = weakref.WeakValueDictionary()
class Queue:
"""A cross-interpreter queue."""
def __new__(cls, id, /, *, _fmt=None):
def __new__(cls, id, /, *, _fmt=None, _unbound=None):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
if _fmt is None:
_fmt, = _queues.get_queue_defaults(id)
if _unbound is None:
_fmt, op = _queues.get_queue_defaults(id)
_unbound = (op,)
else:
_fmt, _ = _queues.get_queue_defaults(id)
elif _unbound is None:
_, op = _queues.get_queue_defaults(id)
_unbound = (op,)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
self._fmt = _fmt
self._unbound = _unbound
_known_queues[id] = self
_queues.bind(id)
return self
@ -124,6 +198,7 @@ class Queue:
def put(self, obj, timeout=None, *,
syncobj=None,
unbound=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
@ -131,7 +206,7 @@ class Queue:
This blocks while the queue is full.
If "syncobj" is None (the default) then it uses the
queue's default, set with create_queue()..
queue's default, set with create_queue().
If "syncobj" is false then all objects are supported,
at the expense of worse performance.
@ -152,11 +227,37 @@ class Queue:
actually is. That's a slightly different and stronger promise
than just (initial) equality, which is all "syncobj=False"
can promise.
"unbound" controls the behavior of Queue.get() for the given
object if the current interpreter (calling put()) is later
destroyed.
If "unbound" is None (the default) then it uses the
queue's default, set with create_queue(),
which is usually UNBOUND.
If "unbound" is UNBOUND_ERROR then get() will raise an
ItemInterpreterDestroyed exception if the original interpreter
has been destroyed. This does not otherwise affect the queue;
the next call to put() will work like normal, returning the next
item in the queue.
If "unbound" is UNBOUND_REMOVE then the item will be removed
from the queue as soon as the original interpreter is destroyed.
Be aware that this will introduce an imbalance between put()
and get() calls.
If "unbound" is UNBOUND then it is returned by get() in place
of the unbound item.
"""
if syncobj is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
@ -166,7 +267,7 @@ class Queue:
obj = pickle.dumps(obj)
while True:
try:
_queues.put(self._id, obj, fmt)
_queues.put(self._id, obj, fmt, unboundop)
except QueueFull as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
@ -174,14 +275,18 @@ class Queue:
else:
break
def put_nowait(self, obj, *, syncobj=None):
def put_nowait(self, obj, *, syncobj=None, unbound=None):
if syncobj is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
if unbound is None:
unboundop, = self._unbound
else:
unboundop, = _serialize_unbound(unbound)
if fmt is _PICKLED:
obj = pickle.dumps(obj)
_queues.put(self._id, obj, fmt)
_queues.put(self._id, obj, fmt, unboundop)
def get(self, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
@ -189,6 +294,10 @@ class Queue:
"""Return the next object from the queue.
This blocks while the queue is empty.
If the next item's original interpreter has been destroyed
then the "next object" is determined by the value of the
"unbound" argument to put().
"""
if timeout is not None:
timeout = int(timeout)
@ -197,13 +306,16 @@ class Queue:
end = time.time() + timeout
while True:
try:
obj, fmt = _queues.get(self._id)
obj, fmt, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
else:
break
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:
@ -217,9 +329,12 @@ class Queue:
is the same as get().
"""
try:
obj, fmt = _queues.get(self._id)
obj, fmt, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
raise # re-raise
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:

View File

@ -12,13 +12,16 @@ from test.support.interpreters import queues
from .utils import _run_output, TestBase as _TestBase
REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND]
def get_num_queues():
return len(_queues.list_all())
class TestBase(_TestBase):
def tearDown(self):
for qid, _ in _queues.list_all():
for qid, _, _ in _queues.list_all():
try:
_queues.destroy(qid)
except Exception:
@ -39,7 +42,7 @@ class LowLevelTests(TestBase):
importlib.reload(queues)
def test_create_destroy(self):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
@ -53,7 +56,7 @@ class LowLevelTests(TestBase):
'-c',
dedent(f"""
import {_queues.__name__} as _queues
_queues.create(2, 0)
_queues.create(2, 0, {REPLACE})
"""),
)
self.assertEqual(stdout, '')
@ -64,13 +67,13 @@ class LowLevelTests(TestBase):
def test_bind_release(self):
with self.subTest('typical'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@ -78,7 +81,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@ -86,7 +89,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
@ -426,26 +429,206 @@ class TestQueueOps(TestBase):
self.assertNotEqual(id(obj2), int(out))
def test_put_cleared_with_subinterpreter(self):
interp = interpreters.create()
queue = queues.create()
def common(queue, unbound=None, presize=0):
if not unbound:
extraargs = ''
elif unbound is queues.UNBOUND:
extraargs = ', unbound=queues.UNBOUND'
elif unbound is queues.UNBOUND_ERROR:
extraargs = ', unbound=queues.UNBOUND_ERROR'
elif unbound is queues.UNBOUND_REMOVE:
extraargs = ', unbound=queues.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
out = _run_output(
interp,
dedent(f"""
_run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
queue.put(obj1, syncobj=True)
queue.put(obj2, syncobj=True)
queue.put(obj1, syncobj=True{extraargs})
queue.put(obj2, syncobj=True{extraargs})
"""))
self.assertEqual(queue.qsize(), 2)
self.assertEqual(queue.qsize(), presize + 2)
obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), 1)
if presize == 0:
obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), presize + 1)
return interp
with self.subTest('default'): # UNBOUND
queue = queues.create()
interp = common(queue)
del interp
obj1 = queue.get()
self.assertIs(obj1, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.subTest('UNBOUND'):
queue = queues.create()
interp = common(queue, queues.UNBOUND)
del interp
obj1 = queue.get()
self.assertIs(obj1, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.subTest('UNBOUND_ERROR'):
queue = queues.create()
interp = common(queue, queues.UNBOUND_ERROR)
del interp
self.assertEqual(queue.qsize(), 1)
with self.assertRaises(queues.ItemInterpreterDestroyed):
queue.get()
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.subTest('UNBOUND_REMOVE'):
queue = queues.create()
interp = common(queue, queues.UNBOUND_REMOVE)
del interp
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 1)
interp = common(queue, queues.UNBOUND_REMOVE, 1)
self.assertEqual(queue.qsize(), 3)
queue.put(42, unbound=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 4)
del interp
self.assertEqual(queue.qsize(), 2)
obj1 = queue.get()
obj2 = queue.get()
self.assertEqual(obj1, b'ham')
self.assertEqual(obj2, 42)
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
def test_put_cleared_with_subinterpreter_mixed(self):
queue = queues.create()
interp = interpreters.create()
_run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
queue.put(1, syncobj=True, unbound=queues.UNBOUND)
queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(3, syncobj=True)
queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(5, syncobj=True, unbound=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 5)
del interp
self.assertEqual(queue.qsize(), 4)
obj1 = queue.get()
self.assertIs(obj1, queues.UNBOUND)
self.assertEqual(queue.qsize(), 3)
with self.assertRaises(queues.ItemInterpreterDestroyed):
queue.get()
self.assertEqual(queue.qsize(), 2)
obj2 = queue.get()
self.assertIs(obj2, queues.UNBOUND)
self.assertEqual(queue.qsize(), 1)
obj3 = queue.get()
self.assertIs(obj3, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
def test_put_cleared_with_subinterpreter_multiple(self):
queue = queues.create()
interp1 = interpreters.create()
interp2 = interpreters.create()
queue.put(1, syncobj=True)
_run_output(interp1, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = queue.get()
queue.put(2, syncobj=True, unbound=queues.UNBOUND)
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj2 = queue.get()
obj1 = queue.get()
"""))
self.assertEqual(queue.qsize(), 0)
queue.put(3)
_run_output(interp1, dedent("""
queue.put(4, syncobj=True, unbound=queues.UNBOUND)
# interp closed here
queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(6, syncobj=True, unbound=queues.UNBOUND)
"""))
_run_output(interp2, dedent("""
queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
# interp closed here
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(8, syncobj=True, unbound=queues.UNBOUND)
"""))
_run_output(interp1, dedent("""
queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(10, syncobj=True, unbound=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 10)
obj3 = queue.get()
self.assertEqual(obj3, 3)
self.assertEqual(queue.qsize(), 9)
obj4 = queue.get()
self.assertEqual(obj4, 4)
self.assertEqual(queue.qsize(), 8)
del interp1
self.assertEqual(queue.qsize(), 6)
# obj5 was removed
obj6 = queue.get()
self.assertIs(obj6, queues.UNBOUND)
self.assertEqual(queue.qsize(), 5)
obj7 = queue.get()
self.assertEqual(obj7, 7)
self.assertEqual(queue.qsize(), 4)
del interp2
self.assertEqual(queue.qsize(), 3)
# obj1
with self.assertRaises(queues.ItemInterpreterDestroyed):
queue.get()
self.assertEqual(queue.qsize(), 2)
# obj2 was removed
obj8 = queue.get()
self.assertIs(obj8, queues.UNBOUND)
self.assertEqual(queue.qsize(), 1)
# obj9 was removed
obj10 = queue.get()
self.assertIs(obj10, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
def test_put_get_different_threads(self):

View File

@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
return res;
}
static inline int64_t
_get_interpid(_PyCrossInterpreterData *data)
{
int64_t interpid;
if (data != NULL) {
interpid = _PyCrossInterpreterData_INTERPID(data);
assert(!PyErr_Occurred());
}
else {
interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
}
return interpid;
}
static PyInterpreterState *
_get_current_interp(void)
@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
}
/* unbound items ************************************************************/
#define UNBOUND_REMOVE 1
#define UNBOUND_ERROR 2
#define UNBOUND_REPLACE 3
// It would also be possible to add UNBOUND_REPLACE where the replacement
// value is user-provided. There would be some limitations there, though.
// Another possibility would be something like UNBOUND_COPY, where the
// object is released but the underlying data is copied (with the "raw"
// allocator) and used when the item is popped off the queue.
static int
check_unbound(int unboundop)
{
switch (unboundop) {
case UNBOUND_REMOVE:
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
return 1;
default:
return 0;
}
}
/* the basic queue **********************************************************/
struct _queueitem;
typedef struct _queueitem {
/* The interpreter that added the item to the queue.
The actual bound interpid is found in item->data.
This is necessary because item->data might be NULL,
meaning the interpreter has been destroyed. */
int64_t interpid;
_PyCrossInterpreterData *data;
int fmt;
int unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
_PyCrossInterpreterData *data, int fmt)
int64_t interpid, _PyCrossInterpreterData *data,
int fmt, int unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
}
else {
assert(data == NULL
|| _PyCrossInterpreterData_INTERPID(data) < 0
|| interpid == _PyCrossInterpreterData_INTERPID(data));
}
assert(check_unbound(unboundop));
*item = (_queueitem){
.interpid = interpid,
.data = data,
.fmt = fmt,
.unboundop = unboundop,
};
}
static void
_queueitem_clear_data(_queueitem *item)
{
if (item->data == NULL) {
return;
}
// It was allocated in queue_put().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
static void
_queueitem_clear(_queueitem *item)
{
item->next = NULL;
if (item->data != NULL) {
// It was allocated in queue_put().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
_queueitem_clear_data(item);
}
static _queueitem *
_queueitem_new(_PyCrossInterpreterData *data, int fmt)
_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
int fmt, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
_queueitem_init(item, data, fmt);
_queueitem_init(item, interpid, data, fmt, unboundop);
return item;
}
@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item)
static void
_queueitem_popped(_queueitem *item,
_PyCrossInterpreterData **p_data, int *p_fmt)
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
*p_data = item->data;
*p_fmt = item->fmt;
*p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
_queueitem_free(item);
}
static int
_queueitem_clear_interpreter(_queueitem *item)
{
assert(item->interpid >= 0);
if (item->data == NULL) {
// Its interpreter was already cleared (or it was never bound).
// For UNBOUND_REMOVE it should have been freed at that time.
assert(item->unboundop != UNBOUND_REMOVE);
return 0;
}
assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
switch (item->unboundop) {
case UNBOUND_REMOVE:
// The caller must free/clear it.
return 1;
case UNBOUND_ERROR:
case UNBOUND_REPLACE:
// We won't need the cross-interpreter data later
// so we completely throw it away.
_queueitem_clear_data(item);
return 0;
default:
Py_FatalError("not reachable");
return -1;
}
}
/* the queue */
@ -474,12 +567,16 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
int fmt;
struct {
int fmt;
int unboundop;
} defaults;
} _queue;
static int
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
{
assert(check_unbound(unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
.items = {
.maxsize = maxsize,
},
.fmt = fmt,
.defaults = {
.fmt = fmt,
.unboundop = unboundop,
},
};
return 0;
}
@ -571,7 +671,8 @@ _queue_unlock(_queue *queue)
}
static int
_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
int fmt, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
return ERR_QUEUE_FULL;
}
_queueitem *item = _queueitem_new(data, fmt);
_queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
static int
_queue_next(_queue *queue,
_PyCrossInterpreterData **p_data, int *p_fmt)
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@ -627,7 +728,7 @@ _queue_next(_queue *queue,
}
queue->items.count -= 1;
_queueitem_popped(item, p_data, p_fmt);
_queueitem_popped(item, p_data, p_fmt, p_unboundop);
_queue_unlock(queue);
return 0;
@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid)
while (next != NULL) {
_queueitem *item = next;
next = item->next;
if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
int remove = (item->interpid == interpid)
? _queueitem_clear_interpreter(item)
: 0;
if (remove) {
_queueitem_free(item);
if (prev == NULL) {
queue->items.first = item->next;
queue->items.first = next;
}
else {
prev->next = item->next;
prev->next = next;
}
_queueitem_free(item);
queue->items.count -= 1;
}
else {
@ -966,18 +1070,19 @@ finally:
return res;
}
struct queue_id_and_fmt {
struct queue_id_and_info {
int64_t id;
int fmt;
int unboundop;
};
static struct queue_id_and_fmt *
_queues_list_all(_queues *queues, int64_t *count)
static struct queue_id_and_info *
_queues_list_all(_queues *queues, int64_t *p_count)
{
struct queue_id_and_fmt *qids = NULL;
struct queue_id_and_info *qids = NULL;
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
(Py_ssize_t)(queues->count));
struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
(Py_ssize_t)(queues->count));
if (ids == NULL) {
goto done;
}
@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
ids[i].fmt = ref->queue->fmt;
ids[i].fmt = ref->queue->defaults.fmt;
ids[i].unboundop = ref->queue->defaults.unboundop;
}
*count = queues->count;
*p_count = queues->count;
qids = ids;
done:
@ -1021,13 +1127,13 @@ _queue_free(_queue *queue)
// Create a new queue.
static int64_t
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
int err = _queue_init(queue, maxsize, fmt);
int err = _queue_init(queue, maxsize, fmt, unboundop);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
{
// Look up the queue.
_queue *queue = NULL;
@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
GLOBAL_FREE(data);
return -1;
}
assert(_PyCrossInterpreterData_INTERPID(data) == \
PyInterpreterState_GetID(PyInterpreterState_Get()));
// Add the data to the queue.
int res = _queue_add(queue, data, fmt);
int64_t interpid = -1; // _queueitem_init() will set it.
int res = _queue_add(queue, interpid, data, fmt, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
// Pop the next object off the queue. Fail if empty.
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
queue_get(_queues *queues, int64_t qid,
PyObject **res, int *p_fmt, int *p_unboundop)
{
int err;
*res = NULL;
@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
// Pop off the next item from the queue.
_PyCrossInterpreterData *data = NULL;
err = _queue_next(queue, &data, p_fmt);
err = _queue_next(queue, &data, p_fmt, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"maxsize", "fmt", NULL};
static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
Py_ssize_t maxsize;
int fmt;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
&maxsize, &fmt)) {
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
&maxsize, &fmt, &unboundop))
{
return NULL;
}
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_create_doc,
"create(maxsize, fmt) -> qid\n\
"create(maxsize, fmt, unboundop) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
@ -1463,9 +1580,9 @@ static PyObject *
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
if (qids == NULL) {
if (count == 0) {
if (!PyErr_Occurred() && count == 0) {
return PyList_New(0);
}
return NULL;
@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
if (ids == NULL) {
goto finally;
}
struct queue_id_and_fmt *cur = qids;
struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
cur->unboundop);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
@ -1498,18 +1616,26 @@ Each corresponding default format is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"qid", "obj", "fmt", NULL};
static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
qidarg_converter_data qidarg;
PyObject *obj;
int fmt;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist,
qidarg_converter, &qidarg, &obj, &fmt)) {
int unboundop;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
qidarg_converter, &qidarg, &obj, &fmt,
&unboundop))
{
return NULL;
}
int64_t qid = qidarg.id;
if (!check_unbound(unboundop)) {
PyErr_Format(PyExc_ValueError,
"unsupported unboundop %d", unboundop);
return NULL;
}
/* Queue up the object. */
int err = queue_put(&_globals.queues, qid, obj, fmt);
int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
PyObject *obj = NULL;
int fmt = 0;
int err = queue_get(&_globals.queues, qid, &obj, &fmt);
int unboundop = 0;
int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
PyObject *res = Py_BuildValue("Oi", obj, fmt);
if (obj == NULL) {
return Py_BuildValue("Oii", Py_None, fmt, unboundop);
}
PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
Py_DECREF(obj);
return res;
}
@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
if (handle_queue_error(err, self, qid)) {
return NULL;
}
int fmt = queue->fmt;
int fmt = queue->defaults.fmt;
int unboundop = queue->defaults.unboundop;
_queue_unmark_waiter(queue, _globals.queues.mutex);
PyObject *fmt_obj = PyLong_FromLong(fmt);
if (fmt_obj == NULL) {
return NULL;
}
// For now queues only have one default.
PyObject *res = PyTuple_Pack(1, fmt_obj);
Py_DECREF(fmt_obj);
return res;
PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
return defaults;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,