/* interpreters module */ /* low-level access to interpreter primitives */ #ifndef Py_BUILD_CORE_BUILTIN # define Py_BUILD_CORE_MODULE 1 #endif #include "Python.h" #include "pycore_crossinterp.h" // struct _xid #define REGISTERS_HEAP_TYPES #define HAS_UNBOUND_ITEMS #include "_interpreters_common.h" #undef HAS_UNBOUND_ITEMS #undef REGISTERS_HEAP_TYPES #define MODULE_NAME _interpqueues #define MODULE_NAME_STR Py_STRINGIFY(MODULE_NAME) #define MODINIT_FUNC_NAME RESOLVE_MODINIT_FUNC_NAME(MODULE_NAME) #define GLOBAL_MALLOC(TYPE) \ PyMem_RawMalloc(sizeof(TYPE)) #define GLOBAL_FREE(VAR) \ PyMem_RawFree(VAR) #define XID_IGNORE_EXC 1 #define XID_FREE 2 static int _release_xid_data(_PyCrossInterpreterData *data, int flags) { int ignoreexc = flags & XID_IGNORE_EXC; PyObject *exc; if (ignoreexc) { exc = PyErr_GetRaisedException(); } int res; if (flags & XID_FREE) { res = _PyCrossInterpreterData_ReleaseAndRawFree(data); } else { res = _PyCrossInterpreterData_Release(data); } if (res < 0) { /* The owning interpreter is already destroyed. */ if (ignoreexc) { // XXX Emit a warning? PyErr_Clear(); } } if (flags & XID_FREE) { /* Either way, we free the data. */ } if (ignoreexc) { PyErr_SetRaisedException(exc); } return res; } static PyInterpreterState * _get_current_interp(void) { // PyInterpreterState_Get() aborts if lookup fails, so don't need // to check the result for NULL. return PyInterpreterState_Get(); } static PyObject * _get_current_module(void) { PyObject *name = PyUnicode_FromString(MODULE_NAME_STR); if (name == NULL) { return NULL; } PyObject *mod = PyImport_GetModule(name); Py_DECREF(name); if (mod == NULL) { return NULL; } assert(mod != Py_None); return mod; } struct idarg_int64_converter_data { // input: const char *label; // output: int64_t id; }; static int idarg_int64_converter(PyObject *arg, void *ptr) { int64_t id; struct idarg_int64_converter_data *data = ptr; const char *label = data->label; if (label == NULL) { label = "ID"; } if (PyIndex_Check(arg)) { int overflow = 0; id = PyLong_AsLongLongAndOverflow(arg, &overflow); if (id == -1 && PyErr_Occurred()) { return 0; } else if (id == -1 && overflow == 1) { PyErr_Format(PyExc_OverflowError, "max %s is %lld, got %R", label, INT64_MAX, arg); return 0; } else if (id < 0) { PyErr_Format(PyExc_ValueError, "%s must be a non-negative int, got %R", label, arg); return 0; } } else { PyErr_Format(PyExc_TypeError, "%s must be an int, got %.100s", label, Py_TYPE(arg)->tp_name); return 0; } data->id = id; return 1; } static int ensure_highlevel_module_loaded(void) { PyObject *highlevel = PyImport_ImportModule("interpreters.queues"); if (highlevel == NULL) { PyErr_Clear(); highlevel = PyImport_ImportModule("test.support.interpreters.queues"); if (highlevel == NULL) { return -1; } } Py_DECREF(highlevel); return 0; } /* module state *************************************************************/ typedef struct { /* external types (added at runtime by interpreters module) */ PyTypeObject *queue_type; /* QueueError (and its subclasses) */ PyObject *QueueError; PyObject *QueueNotFoundError; PyObject *QueueEmpty; PyObject *QueueFull; } module_state; static inline module_state * get_module_state(PyObject *mod) { assert(mod != NULL); module_state *state = PyModule_GetState(mod); assert(state != NULL); return state; } static int traverse_module_state(module_state *state, visitproc visit, void *arg) { /* external types */ Py_VISIT(state->queue_type); /* QueueError */ Py_VISIT(state->QueueError); Py_VISIT(state->QueueNotFoundError); Py_VISIT(state->QueueEmpty); Py_VISIT(state->QueueFull); return 0; } static int clear_module_state(module_state *state) { /* external types */ if (state->queue_type != NULL) { (void)clear_xid_class(state->queue_type); } Py_CLEAR(state->queue_type); /* QueueError */ Py_CLEAR(state->QueueError); Py_CLEAR(state->QueueNotFoundError); Py_CLEAR(state->QueueEmpty); Py_CLEAR(state->QueueFull); return 0; } /* error codes **************************************************************/ #define ERR_EXCEPTION_RAISED (-1) // multi-queue errors #define ERR_QUEUES_ALLOC (-11) #define ERR_QUEUE_ALLOC (-12) #define ERR_NO_NEXT_QUEUE_ID (-13) #define ERR_QUEUE_NOT_FOUND (-14) // single-queue errors #define ERR_QUEUE_EMPTY (-21) #define ERR_QUEUE_FULL (-22) #define ERR_QUEUE_NEVER_BOUND (-23) static int ensure_external_exc_types(module_state *); static int resolve_module_errcode(module_state *state, int errcode, int64_t qid, PyObject **p_exctype, PyObject **p_msgobj) { PyObject *exctype = NULL; PyObject *msg = NULL; switch (errcode) { case ERR_NO_NEXT_QUEUE_ID: exctype = state->QueueError; msg = PyUnicode_FromString("ran out of queue IDs"); break; case ERR_QUEUE_NOT_FOUND: exctype = state->QueueNotFoundError; msg = PyUnicode_FromFormat("queue %" PRId64 " not found", qid); break; case ERR_QUEUE_EMPTY: if (ensure_external_exc_types(state) < 0) { return -1; } exctype = state->QueueEmpty; msg = PyUnicode_FromFormat("queue %" PRId64 " is empty", qid); break; case ERR_QUEUE_FULL: if (ensure_external_exc_types(state) < 0) { return -1; } exctype = state->QueueFull; msg = PyUnicode_FromFormat("queue %" PRId64 " is full", qid); break; case ERR_QUEUE_NEVER_BOUND: exctype = state->QueueError; msg = PyUnicode_FromFormat("queue %" PRId64 " never bound", qid); break; default: PyErr_Format(PyExc_ValueError, "unsupported error code %d", errcode); return -1; } if (msg == NULL) { assert(PyErr_Occurred()); return -1; } *p_exctype = exctype; *p_msgobj = msg; return 0; } /* QueueError ***************************************************************/ static int add_exctype(PyObject *mod, PyObject **p_state_field, const char *qualname, const char *doc, PyObject *base) { #ifndef NDEBUG const char *dot = strrchr(qualname, '.'); assert(dot != NULL); const char *name = dot+1; assert(*p_state_field == NULL); assert(!PyObject_HasAttrStringWithError(mod, name)); #endif PyObject *exctype = PyErr_NewExceptionWithDoc(qualname, doc, base, NULL); if (exctype == NULL) { return -1; } if (PyModule_AddType(mod, (PyTypeObject *)exctype) < 0) { Py_DECREF(exctype); return -1; } *p_state_field = exctype; return 0; } static int add_QueueError(PyObject *mod) { module_state *state = get_module_state(mod); #define PREFIX "test.support.interpreters." #define ADD_EXCTYPE(NAME, BASE, DOC) \ assert(state->NAME == NULL); \ if (add_exctype(mod, &state->NAME, PREFIX #NAME, DOC, BASE) < 0) { \ return -1; \ } ADD_EXCTYPE(QueueError, PyExc_RuntimeError, "Indicates that a queue-related error happened.") ADD_EXCTYPE(QueueNotFoundError, state->QueueError, NULL) // QueueEmpty and QueueFull are set by set_external_exc_types(). state->QueueEmpty = NULL; state->QueueFull = NULL; #undef ADD_EXCTYPE #undef PREFIX return 0; } static int set_external_exc_types(module_state *state, PyObject *emptyerror, PyObject *fullerror) { if (state->QueueEmpty != NULL) { assert(state->QueueFull != NULL); Py_CLEAR(state->QueueEmpty); Py_CLEAR(state->QueueFull); } else { assert(state->QueueFull == NULL); } assert(PyObject_IsSubclass(emptyerror, state->QueueError)); assert(PyObject_IsSubclass(fullerror, state->QueueError)); state->QueueEmpty = Py_NewRef(emptyerror); state->QueueFull = Py_NewRef(fullerror); return 0; } static int ensure_external_exc_types(module_state *state) { if (state->QueueEmpty != NULL) { assert(state->QueueFull != NULL); return 0; } assert(state->QueueFull == NULL); // Force the module to be loaded, to register the type. if (ensure_highlevel_module_loaded() < 0) { return -1; } assert(state->QueueEmpty != NULL); assert(state->QueueFull != NULL); return 0; } static int handle_queue_error(int err, PyObject *mod, int64_t qid) { if (err == 0) { assert(!PyErr_Occurred()); return 0; } assert(err < 0); assert((err == -1) == (PyErr_Occurred() != NULL)); module_state *state; switch (err) { case ERR_QUEUE_ALLOC: // fall through case ERR_QUEUES_ALLOC: PyErr_NoMemory(); break; case -1: return -1; default: state = get_module_state(mod); assert(state->QueueError != NULL); PyObject *exctype = NULL; PyObject *msg = NULL; if (resolve_module_errcode(state, err, qid, &exctype, &msg) < 0) { return -1; } PyObject *exc = PyObject_CallOneArg(exctype, msg); Py_DECREF(msg); if (exc == NULL) { return -1; } PyErr_SetObject(exctype, exc); Py_DECREF(exc); } return 1; } /* the basic queue **********************************************************/ struct _queueitem; typedef struct _queueitem { /* The interpreter that added the item to the queue. The actual bound interpid is found in item->data. This is necessary because item->data might be NULL, meaning the interpreter has been destroyed. */ int64_t interpid; _PyCrossInterpreterData *data; int fmt; int unboundop; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, int64_t interpid, _PyCrossInterpreterData *data, int fmt, int unboundop) { if (interpid < 0) { interpid = _get_interpid(data); } else { assert(data == NULL || _PyCrossInterpreterData_INTERPID(data) < 0 || interpid == _PyCrossInterpreterData_INTERPID(data)); } assert(check_unbound(unboundop)); *item = (_queueitem){ .interpid = interpid, .data = data, .fmt = fmt, .unboundop = unboundop, }; } static void _queueitem_clear_data(_queueitem *item) { if (item->data == NULL) { return; } // It was allocated in queue_put(). (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); item->data = NULL; } static void _queueitem_clear(_queueitem *item) { item->next = NULL; _queueitem_clear_data(item); } static _queueitem * _queueitem_new(int64_t interpid, _PyCrossInterpreterData *data, int fmt, int unboundop) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } _queueitem_init(item, interpid, data, fmt, unboundop); return item; } static void _queueitem_free(_queueitem *item) { _queueitem_clear(item); GLOBAL_FREE(item); } static void _queueitem_free_all(_queueitem *item) { while (item != NULL) { _queueitem *last = item; item = item->next; _queueitem_free(last); } } static void _queueitem_popped(_queueitem *item, _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { *p_data = item->data; *p_fmt = item->fmt; *p_unboundop = item->unboundop; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); } static int _queueitem_clear_interpreter(_queueitem *item) { assert(item->interpid >= 0); if (item->data == NULL) { // Its interpreter was already cleared (or it was never bound). // For UNBOUND_REMOVE it should have been freed at that time. assert(item->unboundop != UNBOUND_REMOVE); return 0; } assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid); switch (item->unboundop) { case UNBOUND_REMOVE: // The caller must free/clear it. return 1; case UNBOUND_ERROR: case UNBOUND_REPLACE: // We won't need the cross-interpreter data later // so we completely throw it away. _queueitem_clear_data(item); return 0; default: Py_FatalError("not reachable"); return -1; } } /* the queue */ typedef struct _queue { Py_ssize_t num_waiters; // protected by global lock PyThread_type_lock mutex; int alive; struct _queueitems { Py_ssize_t maxsize; Py_ssize_t count; _queueitem *first; _queueitem *last; } items; struct { int fmt; int unboundop; } defaults; } _queue; static int _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) { assert(check_unbound(unboundop)); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUE_ALLOC; } *queue = (_queue){ .mutex = mutex, .alive = 1, .items = { .maxsize = maxsize, }, .defaults = { .fmt = fmt, .unboundop = unboundop, }, }; return 0; } static void _queue_clear(_queue *queue) { assert(!queue->alive); assert(queue->num_waiters == 0); _queueitem_free_all(queue->items.first); assert(queue->mutex != NULL); PyThread_free_lock(queue->mutex); *queue = (_queue){0}; } static void _queue_free(_queue *); static void _queue_kill_and_wait(_queue *queue) { // Mark it as dead. PyThread_acquire_lock(queue->mutex, WAIT_LOCK); assert(queue->alive); queue->alive = 0; PyThread_release_lock(queue->mutex); // Wait for all waiters to fail. while (queue->num_waiters > 0) { PyThread_acquire_lock(queue->mutex, WAIT_LOCK); PyThread_release_lock(queue->mutex); }; } static void _queue_mark_waiter(_queue *queue, PyThread_type_lock parent_mutex) { if (parent_mutex != NULL) { PyThread_acquire_lock(parent_mutex, WAIT_LOCK); queue->num_waiters += 1; PyThread_release_lock(parent_mutex); } else { // The caller must be holding the parent lock already. queue->num_waiters += 1; } } static void _queue_unmark_waiter(_queue *queue, PyThread_type_lock parent_mutex) { if (parent_mutex != NULL) { PyThread_acquire_lock(parent_mutex, WAIT_LOCK); queue->num_waiters -= 1; PyThread_release_lock(parent_mutex); } else { // The caller must be holding the parent lock already. queue->num_waiters -= 1; } } static int _queue_lock(_queue *queue) { // The queue must be marked as a waiter already. PyThread_acquire_lock(queue->mutex, WAIT_LOCK); if (!queue->alive) { PyThread_release_lock(queue->mutex); return ERR_QUEUE_NOT_FOUND; } return 0; } static void _queue_unlock(_queue *queue) { PyThread_release_lock(queue->mutex); } static int _queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data, int fmt, int unboundop) { int err = _queue_lock(queue); if (err < 0) { return err; } Py_ssize_t maxsize = queue->items.maxsize; if (maxsize <= 0) { maxsize = PY_SSIZE_T_MAX; } if (queue->items.count >= maxsize) { _queue_unlock(queue); return ERR_QUEUE_FULL; } _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop); if (item == NULL) { _queue_unlock(queue); return -1; } queue->items.count += 1; if (queue->items.first == NULL) { queue->items.first = item; } else { queue->items.last->next = item; } queue->items.last = item; _queue_unlock(queue); return 0; } static int _queue_next(_queue *queue, _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { int err = _queue_lock(queue); if (err < 0) { return err; } assert(queue->items.count >= 0); _queueitem *item = queue->items.first; if (item == NULL) { _queue_unlock(queue); return ERR_QUEUE_EMPTY; } queue->items.first = item->next; if (queue->items.last == item) { queue->items.last = NULL; } queue->items.count -= 1; _queueitem_popped(item, p_data, p_fmt, p_unboundop); _queue_unlock(queue); return 0; } static int _queue_get_maxsize(_queue *queue, Py_ssize_t *p_maxsize) { int err = _queue_lock(queue); if (err < 0) { return err; } *p_maxsize = queue->items.maxsize; _queue_unlock(queue); return 0; } static int _queue_is_full(_queue *queue, int *p_is_full) { int err = _queue_lock(queue); if (err < 0) { return err; } assert(queue->items.count <= queue->items.maxsize); *p_is_full = queue->items.count == queue->items.maxsize; _queue_unlock(queue); return 0; } static int _queue_get_count(_queue *queue, Py_ssize_t *p_count) { int err = _queue_lock(queue); if (err < 0) { return err; } *p_count = queue->items.count; _queue_unlock(queue); return 0; } static void _queue_clear_interpreter(_queue *queue, int64_t interpid) { int err = _queue_lock(queue); if (err == ERR_QUEUE_NOT_FOUND) { // The queue is already destroyed, so there's nothing to clear. assert(!PyErr_Occurred()); return; } assert(err == 0); // There should be no other errors. _queueitem *prev = NULL; _queueitem *next = queue->items.first; while (next != NULL) { _queueitem *item = next; next = item->next; int remove = (item->interpid == interpid) ? _queueitem_clear_interpreter(item) : 0; if (remove) { _queueitem_free(item); if (prev == NULL) { queue->items.first = next; } else { prev->next = next; } queue->items.count -= 1; } else { prev = item; } } _queue_unlock(queue); } /* external queue references ************************************************/ struct _queueref; typedef struct _queueref { struct _queueref *next; int64_t qid; Py_ssize_t refcount; _queue *queue; } _queueref; static _queueref * _queuerefs_find(_queueref *first, int64_t qid, _queueref **pprev) { _queueref *prev = NULL; _queueref *ref = first; while (ref != NULL) { if (ref->qid == qid) { break; } prev = ref; ref = ref->next; } if (pprev != NULL) { *pprev = prev; } return ref; } static void _queuerefs_clear(_queueref *head) { _queueref *next = head; while (next != NULL) { _queueref *ref = next; next = ref->next; #ifdef Py_DEBUG int64_t qid = ref->qid; fprintf(stderr, "queue %" PRId64 " still exists\n", qid); #endif _queue *queue = ref->queue; GLOBAL_FREE(ref); _queue_kill_and_wait(queue); #ifdef Py_DEBUG if (queue->items.count > 0) { fprintf(stderr, "queue %" PRId64 " still holds %zd items\n", qid, queue->items.count); } #endif _queue_free(queue); } } /* a collection of queues ***************************************************/ typedef struct _queues { PyThread_type_lock mutex; _queueref *head; int64_t count; int64_t next_id; } _queues; static void _queues_init(_queues *queues, PyThread_type_lock mutex) { queues->mutex = mutex; queues->head = NULL; queues->count = 0; queues->next_id = 1; } static void _queues_fini(_queues *queues) { if (queues->count > 0) { PyThread_acquire_lock(queues->mutex, WAIT_LOCK); assert((queues->count == 0) != (queues->head != NULL)); _queueref *head = queues->head; queues->head = NULL; queues->count = 0; PyThread_release_lock(queues->mutex); _queuerefs_clear(head); } if (queues->mutex != NULL) { PyThread_free_lock(queues->mutex); queues->mutex = NULL; } } static int64_t _queues_next_id(_queues *queues) // needs lock { int64_t qid = queues->next_id; if (qid < 0) { /* overflow */ return ERR_NO_NEXT_QUEUE_ID; } queues->next_id += 1; return qid; } static int _queues_lookup(_queues *queues, int64_t qid, _queue **res) { PyThread_acquire_lock(queues->mutex, WAIT_LOCK); _queueref *ref = _queuerefs_find(queues->head, qid, NULL); if (ref == NULL) { PyThread_release_lock(queues->mutex); return ERR_QUEUE_NOT_FOUND; } assert(ref->queue != NULL); _queue *queue = ref->queue; _queue_mark_waiter(queue, NULL); // The caller must unmark it. PyThread_release_lock(queues->mutex); *res = queue; return 0; } static int64_t _queues_add(_queues *queues, _queue *queue) { int64_t qid = -1; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); // Create a new ref. int64_t _qid = _queues_next_id(queues); if (_qid < 0) { goto done; } _queueref *ref = GLOBAL_MALLOC(_queueref); if (ref == NULL) { qid = ERR_QUEUE_ALLOC; goto done; } *ref = (_queueref){ .qid = _qid, .queue = queue, }; // Add it to the list. // We assume that the queue is a new one (not already in the list). ref->next = queues->head; queues->head = ref; queues->count += 1; qid = _qid; done: PyThread_release_lock(queues->mutex); return qid; } static void _queues_remove_ref(_queues *queues, _queueref *ref, _queueref *prev, _queue **p_queue) { assert(ref->queue != NULL); if (ref == queues->head) { queues->head = ref->next; } else { prev->next = ref->next; } ref->next = NULL; queues->count -= 1; *p_queue = ref->queue; ref->queue = NULL; GLOBAL_FREE(ref); } static int _queues_remove(_queues *queues, int64_t qid, _queue **p_queue) { PyThread_acquire_lock(queues->mutex, WAIT_LOCK); _queueref *prev = NULL; _queueref *ref = _queuerefs_find(queues->head, qid, &prev); if (ref == NULL) { PyThread_release_lock(queues->mutex); return ERR_QUEUE_NOT_FOUND; } _queues_remove_ref(queues, ref, prev, p_queue); PyThread_release_lock(queues->mutex); return 0; } static int _queues_incref(_queues *queues, int64_t qid) { // XXX Track interpreter IDs? int res = -1; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); _queueref *ref = _queuerefs_find(queues->head, qid, NULL); if (ref == NULL) { assert(!PyErr_Occurred()); res = ERR_QUEUE_NOT_FOUND; goto done; } ref->refcount += 1; res = 0; done: PyThread_release_lock(queues->mutex); return res; } static int _queues_decref(_queues *queues, int64_t qid) { int res = -1; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); _queueref *prev = NULL; _queueref *ref = _queuerefs_find(queues->head, qid, &prev); if (ref == NULL) { assert(!PyErr_Occurred()); res = ERR_QUEUE_NOT_FOUND; goto finally; } if (ref->refcount == 0) { res = ERR_QUEUE_NEVER_BOUND; goto finally; } assert(ref->refcount > 0); ref->refcount -= 1; // Destroy if no longer used. assert(ref->queue != NULL); if (ref->refcount == 0) { _queue *queue = NULL; _queues_remove_ref(queues, ref, prev, &queue); PyThread_release_lock(queues->mutex); _queue_kill_and_wait(queue); _queue_free(queue); return 0; } res = 0; finally: PyThread_release_lock(queues->mutex); return res; } struct queue_id_and_info { int64_t id; int fmt; int unboundop; }; static struct queue_id_and_info * _queues_list_all(_queues *queues, int64_t *p_count) { struct queue_id_and_info *qids = NULL; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info, (Py_ssize_t)(queues->count)); if (ids == NULL) { goto done; } _queueref *ref = queues->head; for (int64_t i=0; ref != NULL; ref = ref->next, i++) { ids[i].id = ref->qid; assert(ref->queue != NULL); ids[i].fmt = ref->queue->defaults.fmt; ids[i].unboundop = ref->queue->defaults.unboundop; } *p_count = queues->count; qids = ids; done: PyThread_release_lock(queues->mutex); return qids; } static void _queues_clear_interpreter(_queues *queues, int64_t interpid) { PyThread_acquire_lock(queues->mutex, WAIT_LOCK); _queueref *ref = queues->head; for (; ref != NULL; ref = ref->next) { assert(ref->queue != NULL); _queue_clear_interpreter(ref->queue, interpid); } PyThread_release_lock(queues->mutex); } /* "high"-level queue-related functions *************************************/ static void _queue_free(_queue *queue) { _queue_clear(queue); GLOBAL_FREE(queue); } // Create a new queue. static int64_t queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } int err = _queue_init(queue, maxsize, fmt, unboundop); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; } int64_t qid = _queues_add(queues, queue); if (qid < 0) { _queue_clear(queue); GLOBAL_FREE(queue); } return qid; } // Completely destroy the queue. static int queue_destroy(_queues *queues, int64_t qid) { _queue *queue = NULL; int err = _queues_remove(queues, qid, &queue); if (err < 0) { return err; } _queue_kill_and_wait(queue); _queue_free(queue); return 0; } // Push an object onto the queue. static int queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) { // Look up the queue. _queue *queue = NULL; int err = _queues_lookup(queues, qid, &queue); if (err != 0) { return err; } assert(queue != NULL); // Convert the object to cross-interpreter data. _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData); if (data == NULL) { _queue_unmark_waiter(queue, queues->mutex); return -1; } if (_PyObject_GetCrossInterpreterData(obj, data) != 0) { _queue_unmark_waiter(queue, queues->mutex); GLOBAL_FREE(data); return -1; } assert(_PyCrossInterpreterData_INTERPID(data) == \ PyInterpreterState_GetID(PyInterpreterState_Get())); // Add the data to the queue. int64_t interpid = -1; // _queueitem_init() will set it. int res = _queue_add(queue, interpid, data, fmt, unboundop); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: (void)_release_xid_data(data, 0); GLOBAL_FREE(data); return res; } return 0; } // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt, int *p_unboundop) { int err; *res = NULL; // Look up the queue. _queue *queue = NULL; err = _queues_lookup(queues, qid, &queue); if (err != 0) { return err; } // Past this point we are responsible for releasing the mutex. assert(queue != NULL); // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; err = _queue_next(queue, &data, p_fmt, p_unboundop); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; } else if (data == NULL) { assert(!PyErr_Occurred()); return 0; } // Convert the data back to an object. PyObject *obj = _PyCrossInterpreterData_NewObject(data); if (obj == NULL) { assert(PyErr_Occurred()); // It was allocated in queue_put(), so we free it. (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE); return -1; } // It was allocated in queue_put(), so we free it. int release_res = _release_xid_data(data, XID_FREE); if (release_res < 0) { // The source interpreter has been destroyed already. assert(PyErr_Occurred()); Py_DECREF(obj); return -1; } *res = obj; return 0; } static int queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize) { _queue *queue = NULL; int err = _queues_lookup(queues, qid, &queue); if (err < 0) { return err; } err = _queue_get_maxsize(queue, p_maxsize); _queue_unmark_waiter(queue, queues->mutex); return err; } static int queue_is_full(_queues *queues, int64_t qid, int *p_is_full) { _queue *queue = NULL; int err = _queues_lookup(queues, qid, &queue); if (err < 0) { return err; } err = _queue_is_full(queue, p_is_full); _queue_unmark_waiter(queue, queues->mutex); return err; } static int queue_get_count(_queues *queues, int64_t qid, Py_ssize_t *p_count) { _queue *queue = NULL; int err = _queues_lookup(queues, qid, &queue); if (err < 0) { return err; } err = _queue_get_count(queue, p_count); _queue_unmark_waiter(queue, queues->mutex); return err; } /* external Queue objects ***************************************************/ static int _queueobj_shared(PyThreadState *, PyObject *, _PyCrossInterpreterData *); static int set_external_queue_type(module_state *state, PyTypeObject *queue_type) { // Clear the old value if the .py module was reloaded. if (state->queue_type != NULL) { (void)clear_xid_class(state->queue_type); Py_CLEAR(state->queue_type); } // Add and register the new type. if (ensure_xid_class(queue_type, _queueobj_shared) < 0) { return -1; } state->queue_type = (PyTypeObject *)Py_NewRef(queue_type); return 0; } static PyTypeObject * get_external_queue_type(PyObject *module) { module_state *state = get_module_state(module); PyTypeObject *cls = state->queue_type; if (cls == NULL) { // Force the module to be loaded, to register the type. if (ensure_highlevel_module_loaded() < 0) { return NULL; } cls = state->queue_type; assert(cls != NULL); } return cls; } // XXX Use a new __xid__ protocol instead? struct _queueid_xid { int64_t qid; }; static _queues * _get_global_queues(void); static void * _queueid_xid_new(int64_t qid) { _queues *queues = _get_global_queues(); if (_queues_incref(queues, qid) < 0) { return NULL; } struct _queueid_xid *data = PyMem_RawMalloc(sizeof(struct _queueid_xid)); if (data == NULL) { _queues_incref(queues, qid); return NULL; } data->qid = qid; return (void *)data; } static void _queueid_xid_free(void *data) { int64_t qid = ((struct _queueid_xid *)data)->qid; PyMem_RawFree(data); _queues *queues = _get_global_queues(); int res = _queues_decref(queues, qid); if (res == ERR_QUEUE_NOT_FOUND) { // Already destroyed. // XXX Warn? } else { assert(res == 0); } } static PyObject * _queueobj_from_xid(_PyCrossInterpreterData *data) { int64_t qid = *(int64_t *)_PyCrossInterpreterData_DATA(data); PyObject *qidobj = PyLong_FromLongLong(qid); if (qidobj == NULL) { return NULL; } PyObject *mod = _get_current_module(); if (mod == NULL) { // XXX import it? PyErr_SetString(PyExc_RuntimeError, MODULE_NAME_STR " module not imported yet"); return NULL; } PyTypeObject *cls = get_external_queue_type(mod); Py_DECREF(mod); if (cls == NULL) { Py_DECREF(qidobj); return NULL; } PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)qidobj); Py_DECREF(qidobj); return obj; } static int _queueobj_shared(PyThreadState *tstate, PyObject *queueobj, _PyCrossInterpreterData *data) { PyObject *qidobj = PyObject_GetAttrString(queueobj, "_id"); if (qidobj == NULL) { return -1; } struct idarg_int64_converter_data converted = { .label = "queue ID", }; int res = idarg_int64_converter(qidobj, &converted); Py_CLEAR(qidobj); if (!res) { assert(PyErr_Occurred()); return -1; } void *raw = _queueid_xid_new(converted.id); if (raw == NULL) { return -1; } _PyCrossInterpreterData_Init(data, tstate->interp, raw, NULL, _queueobj_from_xid); _PyCrossInterpreterData_SET_FREE(data, _queueid_xid_free); return 0; } /* module level code ********************************************************/ /* globals is the process-global state for the module. It holds all the data that we need to share between interpreters, so it cannot hold PyObject values. */ static struct globals { int module_count; _queues queues; } _globals = {0}; static int _globals_init(void) { // XXX This isn't thread-safe. _globals.module_count++; if (_globals.module_count > 1) { // Already initialized. return 0; } assert(_globals.queues.mutex == NULL); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUES_ALLOC; } _queues_init(&_globals.queues, mutex); return 0; } static void _globals_fini(void) { // XXX This isn't thread-safe. _globals.module_count--; if (_globals.module_count > 0) { return; } _queues_fini(&_globals.queues); } static _queues * _get_global_queues(void) { return &_globals.queues; } static void clear_interpreter(void *data) { if (_globals.module_count == 0) { return; } PyInterpreterState *interp = (PyInterpreterState *)data; assert(interp == _get_current_interp()); int64_t interpid = PyInterpreterState_GetID(interp); _queues_clear_interpreter(&_globals.queues, interpid); } typedef struct idarg_int64_converter_data qidarg_converter_data; static int qidarg_converter(PyObject *arg, void *ptr) { qidarg_converter_data *data = ptr; if (data->label == NULL) { data->label = "queue ID"; } return idarg_int64_converter(arg, ptr); } static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL}; Py_ssize_t maxsize; int fmt; int unboundop; if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist, &maxsize, &fmt, &unboundop)) { return NULL; } if (!check_unbound(unboundop)) { PyErr_Format(PyExc_ValueError, "unsupported unboundop %d", unboundop); return NULL; } int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; } PyObject *qidobj = PyLong_FromLongLong(qid); if (qidobj == NULL) { PyObject *exc = PyErr_GetRaisedException(); int err = queue_destroy(&_globals.queues, qid); if (handle_queue_error(err, self, qid)) { // XXX issue a warning? PyErr_Clear(); } PyErr_SetRaisedException(exc); return NULL; } return qidobj; } PyDoc_STRVAR(queuesmod_create_doc, "create(maxsize, fmt, unboundop) -> qid\n\ \n\ Create a new cross-interpreter queue and return its unique generated ID.\n\ It is a new reference as though bind() had been called on the queue.\n\ \n\ The caller is responsible for calling destroy() for the new queue\n\ before the runtime is finalized."); static PyObject * queuesmod_destroy(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:destroy", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; int err = queue_destroy(&_globals.queues, qid); if (handle_queue_error(err, self, qid)) { return NULL; } Py_RETURN_NONE; } PyDoc_STRVAR(queuesmod_destroy_doc, "destroy(qid)\n\ \n\ Clear and destroy the queue. Afterward attempts to use the queue\n\ will behave as though it never existed."); static PyObject * queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count); if (qids == NULL) { if (!PyErr_Occurred() && count == 0) { return PyList_New(0); } return NULL; } PyObject *ids = PyList_New((Py_ssize_t)count); if (ids == NULL) { goto finally; } struct queue_id_and_info *cur = qids; for (int64_t i=0; i < count; cur++, i++) { PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt, cur->unboundop); if (item == NULL) { Py_SETREF(ids, NULL); break; } PyList_SET_ITEM(ids, (Py_ssize_t)i, item); } finally: PyMem_Free(qids); return ids; } PyDoc_STRVAR(queuesmod_list_all_doc, "list_all() -> [(qid, fmt)]\n\ \n\ Return the list of IDs for all queues.\n\ Each corresponding default format is also included."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL}; qidarg_converter_data qidarg; PyObject *obj; int fmt; int unboundop; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist, qidarg_converter, &qidarg, &obj, &fmt, &unboundop)) { return NULL; } int64_t qid = qidarg.id; if (!check_unbound(unboundop)) { PyErr_Format(PyExc_ValueError, "unsupported unboundop %d", unboundop); return NULL; } /* Queue up the object. */ int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop); // This is the only place that raises QueueFull. if (handle_queue_error(err, self, qid)) { return NULL; } Py_RETURN_NONE; } PyDoc_STRVAR(queuesmod_put_doc, "put(qid, obj, fmt)\n\ \n\ Add the object's data to the queue."); static PyObject * queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; PyObject *obj = NULL; int fmt = 0; int unboundop = 0; int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, self, qid)) { return NULL; } if (obj == NULL) { return Py_BuildValue("Oii", Py_None, fmt, unboundop); } PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None); Py_DECREF(obj); return res; } PyDoc_STRVAR(queuesmod_get_doc, "get(qid) -> (obj, fmt)\n\ \n\ Return a new object from the data at the front of the queue.\n\ The object's format is also returned.\n\ \n\ If there is nothing to receive then raise QueueEmpty."); static PyObject * queuesmod_bind(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:bind", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; // XXX Check module state if bound already. int err = _queues_incref(&_globals.queues, qid); if (handle_queue_error(err, self, qid)) { return NULL; } // XXX Update module state. Py_RETURN_NONE; } PyDoc_STRVAR(queuesmod_bind_doc, "bind(qid)\n\ \n\ Take a reference to the identified queue.\n\ The queue is not destroyed until there are no references left."); static PyObject * queuesmod_release(PyObject *self, PyObject *args, PyObject *kwds) { // Note that only the current interpreter is affected. static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:release", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; // XXX Check module state if bound already. // XXX Update module state. int err = _queues_decref(&_globals.queues, qid); if (handle_queue_error(err, self, qid)) { return NULL; } Py_RETURN_NONE; } PyDoc_STRVAR(queuesmod_release_doc, "release(qid)\n\ \n\ Release a reference to the queue.\n\ The queue is destroyed once there are no references left."); static PyObject * queuesmod_get_maxsize(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get_maxsize", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; Py_ssize_t maxsize = -1; int err = queue_get_maxsize(&_globals.queues, qid, &maxsize); if (handle_queue_error(err, self, qid)) { return NULL; } return PyLong_FromLongLong(maxsize); } PyDoc_STRVAR(queuesmod_get_maxsize_doc, "get_maxsize(qid)\n\ \n\ Return the maximum number of items in the queue."); static PyObject * queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get_queue_defaults", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; _queue *queue = NULL; int err = _queues_lookup(&_globals.queues, qid, &queue); if (handle_queue_error(err, self, qid)) { return NULL; } int fmt = queue->defaults.fmt; int unboundop = queue->defaults.unboundop; _queue_unmark_waiter(queue, _globals.queues.mutex); PyObject *defaults = Py_BuildValue("ii", fmt, unboundop); return defaults; } PyDoc_STRVAR(queuesmod_get_queue_defaults_doc, "get_queue_defaults(qid)\n\ \n\ Return the queue's default values, set when it was created."); static PyObject * queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:is_full", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; int is_full = 0; int err = queue_is_full(&_globals.queues, qid, &is_full); if (handle_queue_error(err, self, qid)) { return NULL; } if (is_full) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } PyDoc_STRVAR(queuesmod_is_full_doc, "is_full(qid)\n\ \n\ Return true if the queue has a maxsize and has reached it."); static PyObject * queuesmod_get_count(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"qid", NULL}; qidarg_converter_data qidarg; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get_count", kwlist, qidarg_converter, &qidarg)) { return NULL; } int64_t qid = qidarg.id; Py_ssize_t count = -1; int err = queue_get_count(&_globals.queues, qid, &count); if (handle_queue_error(err, self, qid)) { return NULL; } assert(count >= 0); return PyLong_FromSsize_t(count); } PyDoc_STRVAR(queuesmod_get_count_doc, "get_count(qid)\n\ \n\ Return the number of items in the queue."); static PyObject * queuesmod__register_heap_types(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"queuetype", "emptyerror", "fullerror", NULL}; PyObject *queuetype; PyObject *emptyerror; PyObject *fullerror; if (!PyArg_ParseTupleAndKeywords(args, kwds, "OOO:_register_heap_types", kwlist, &queuetype, &emptyerror, &fullerror)) { return NULL; } if (!PyType_Check(queuetype)) { PyErr_SetString(PyExc_TypeError, "expected a type for 'queuetype'"); return NULL; } if (!PyExceptionClass_Check(emptyerror)) { PyErr_SetString(PyExc_TypeError, "expected an exception type for 'emptyerror'"); return NULL; } if (!PyExceptionClass_Check(fullerror)) { PyErr_SetString(PyExc_TypeError, "expected an exception type for 'fullerror'"); return NULL; } module_state *state = get_module_state(self); if (set_external_queue_type(state, (PyTypeObject *)queuetype) < 0) { return NULL; } if (set_external_exc_types(state, emptyerror, fullerror) < 0) { return NULL; } Py_RETURN_NONE; } static PyMethodDef module_functions[] = { {"create", _PyCFunction_CAST(queuesmod_create), METH_VARARGS | METH_KEYWORDS, queuesmod_create_doc}, {"destroy", _PyCFunction_CAST(queuesmod_destroy), METH_VARARGS | METH_KEYWORDS, queuesmod_destroy_doc}, {"list_all", queuesmod_list_all, METH_NOARGS, queuesmod_list_all_doc}, {"put", _PyCFunction_CAST(queuesmod_put), METH_VARARGS | METH_KEYWORDS, queuesmod_put_doc}, {"get", _PyCFunction_CAST(queuesmod_get), METH_VARARGS | METH_KEYWORDS, queuesmod_get_doc}, {"bind", _PyCFunction_CAST(queuesmod_bind), METH_VARARGS | METH_KEYWORDS, queuesmod_bind_doc}, {"release", _PyCFunction_CAST(queuesmod_release), METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc}, {"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize), METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc}, {"get_queue_defaults", _PyCFunction_CAST(queuesmod_get_queue_defaults), METH_VARARGS | METH_KEYWORDS, queuesmod_get_queue_defaults_doc}, {"is_full", _PyCFunction_CAST(queuesmod_is_full), METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc}, {"get_count", _PyCFunction_CAST(queuesmod_get_count), METH_VARARGS | METH_KEYWORDS, queuesmod_get_count_doc}, {"_register_heap_types", _PyCFunction_CAST(queuesmod__register_heap_types), METH_VARARGS | METH_KEYWORDS, NULL}, {NULL, NULL} /* sentinel */ }; /* initialization function */ PyDoc_STRVAR(module_doc, "This module provides primitive operations to manage Python interpreters.\n\ The 'interpreters' module provides a more convenient interface."); static int module_exec(PyObject *mod) { if (_globals_init() != 0) { return -1; } /* Add exception types */ if (add_QueueError(mod) < 0) { goto error; } /* Make sure queues drop objects owned by this interpreter. */ PyInterpreterState *interp = _get_current_interp(); PyUnstable_AtExit(interp, clear_interpreter, (void *)interp); return 0; error: _globals_fini(); return -1; } static struct PyModuleDef_Slot module_slots[] = { {Py_mod_exec, module_exec}, {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED}, {Py_mod_gil, Py_MOD_GIL_NOT_USED}, {0, NULL}, }; static int module_traverse(PyObject *mod, visitproc visit, void *arg) { module_state *state = get_module_state(mod); traverse_module_state(state, visit, arg); return 0; } static int module_clear(PyObject *mod) { module_state *state = get_module_state(mod); // Now we clear the module state. clear_module_state(state); return 0; } static void module_free(void *mod) { module_state *state = get_module_state(mod); // Now we clear the module state. clear_module_state(state); _globals_fini(); } static struct PyModuleDef moduledef = { .m_base = PyModuleDef_HEAD_INIT, .m_name = MODULE_NAME_STR, .m_doc = module_doc, .m_size = sizeof(module_state), .m_methods = module_functions, .m_slots = module_slots, .m_traverse = module_traverse, .m_clear = module_clear, .m_free = (freefunc)module_free, }; PyMODINIT_FUNC MODINIT_FUNC_NAME(void) { return PyModuleDef_Init(&moduledef); }