#ifndef Py_BUILD_CORE_BUILTIN # define Py_BUILD_CORE_MODULE 1 #endif #include "Python.h" #include "pycore_ceval.h" // Py_MakePendingCalls() #include "pycore_moduleobject.h" // _PyModule_GetState() #include "pycore_parking_lot.h" #include "pycore_time.h" // _PyTime_t #include #include // offsetof() typedef struct { PyTypeObject *SimpleQueueType; PyObject *EmptyError; } simplequeue_state; static simplequeue_state * simplequeue_get_state(PyObject *module) { simplequeue_state *state = _PyModule_GetState(module); assert(state); return state; } static struct PyModuleDef queuemodule; #define simplequeue_get_state_by_type(type) \ (simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule))) static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8; typedef struct { // Where to place the next item Py_ssize_t put_idx; // Where to get the next item Py_ssize_t get_idx; PyObject **items; // Total number of items that may be stored Py_ssize_t items_cap; // Number of items stored Py_ssize_t num_items; } RingBuf; static int RingBuf_Init(RingBuf *buf) { buf->put_idx = 0; buf->get_idx = 0; buf->items_cap = INITIAL_RING_BUF_CAPACITY; buf->num_items = 0; buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *)); if (buf->items == NULL) { PyErr_NoMemory(); return -1; } return 0; } static PyObject * RingBuf_At(RingBuf *buf, Py_ssize_t idx) { assert(idx >= 0 && idx < buf->num_items); return buf->items[(buf->get_idx + idx) % buf->items_cap]; } static void RingBuf_Fini(RingBuf *buf) { PyObject **items = buf->items; Py_ssize_t num_items = buf->num_items; Py_ssize_t cap = buf->items_cap; Py_ssize_t idx = buf->get_idx; buf->items = NULL; buf->put_idx = 0; buf->get_idx = 0; buf->num_items = 0; buf->items_cap = 0; for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) { Py_DECREF(items[idx]); } PyMem_Free(items); } // Resize the underlying items array of buf to the new capacity and arrange // the items contiguously in the new items array. // // Returns -1 on allocation failure or 0 on success. static int resize_ringbuf(RingBuf *buf, Py_ssize_t capacity) { Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity); if (new_capacity == buf->items_cap) { return 0; } assert(buf->num_items <= new_capacity); PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *)); if (new_items == NULL) { return -1; } // Copy the "tail" of the old items array. This corresponds to "head" of // the abstract ring buffer. Py_ssize_t tail_size = Py_MIN(buf->num_items, buf->items_cap - buf->get_idx); if (tail_size > 0) { memcpy(new_items, buf->items + buf->get_idx, tail_size * sizeof(PyObject *)); } // Copy the "head" of the old items array, if any. This corresponds to the // "tail" of the abstract ring buffer. Py_ssize_t head_size = buf->num_items - tail_size; if (head_size > 0) { memcpy(new_items + tail_size, buf->items, head_size * sizeof(PyObject *)); } PyMem_Free(buf->items); buf->items = new_items; buf->items_cap = new_capacity; buf->get_idx = 0; buf->put_idx = buf->num_items; return 0; } // Returns a strong reference from the head of the buffer. static PyObject * RingBuf_Get(RingBuf *buf) { assert(buf->num_items > 0); if (buf->num_items < (buf->items_cap / 4)) { // Items is less than 25% occupied, shrink it by 50%. This allows for // growth without immediately needing to resize the underlying items // array. // // It's safe it ignore allocation failures here; shrinking is an // optimization that isn't required for correctness. (void)resize_ringbuf(buf, buf->items_cap / 2); } PyObject *item = buf->items[buf->get_idx]; buf->items[buf->get_idx] = NULL; buf->get_idx = (buf->get_idx + 1) % buf->items_cap; buf->num_items--; return item; } // Returns 0 on success or -1 if the buffer failed to grow. // // Steals a reference to item. static int RingBuf_Put(RingBuf *buf, PyObject *item) { assert(buf->num_items <= buf->items_cap); if (buf->num_items == buf->items_cap) { // Buffer is full, grow it. if (resize_ringbuf(buf, buf->items_cap * 2) < 0) { PyErr_NoMemory(); return -1; } } buf->items[buf->put_idx] = item; buf->put_idx = (buf->put_idx + 1) % buf->items_cap; buf->num_items++; return 0; } static Py_ssize_t RingBuf_Len(RingBuf *buf) { return buf->num_items; } static bool RingBuf_IsEmpty(RingBuf *buf) { return buf->num_items == 0; } typedef struct { PyObject_HEAD // Are there threads waiting for items bool has_threads_waiting; // Items in the queue RingBuf buf; PyObject *weakreflist; } simplequeueobject; /*[clinic input] module _queue class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(type)->SimpleQueueType" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=0a4023fe4d198c8d]*/ static int simplequeue_clear(simplequeueobject *self) { RingBuf_Fini(&self->buf); return 0; } static void simplequeue_dealloc(simplequeueobject *self) { PyTypeObject *tp = Py_TYPE(self); PyObject_GC_UnTrack(self); (void)simplequeue_clear(self); if (self->weakreflist != NULL) PyObject_ClearWeakRefs((PyObject *) self); Py_TYPE(self)->tp_free(self); Py_DECREF(tp); } static int simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg) { RingBuf *buf = &self->buf; for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) { Py_VISIT(RingBuf_At(buf, i)); } Py_VISIT(Py_TYPE(self)); return 0; } /*[clinic input] @classmethod _queue.SimpleQueue.__new__ as simplequeue_new Simple, unbounded, reentrant FIFO queue. [clinic start generated code]*/ static PyObject * simplequeue_new_impl(PyTypeObject *type) /*[clinic end generated code: output=ba97740608ba31cd input=a0674a1643e3e2fb]*/ { simplequeueobject *self; self = (simplequeueobject *) type->tp_alloc(type, 0); if (self != NULL) { self->weakreflist = NULL; if (RingBuf_Init(&self->buf) < 0) { Py_DECREF(self); return NULL; } } return (PyObject *) self; } typedef struct { bool handed_off; simplequeueobject *queue; PyObject *item; } HandoffData; static void maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters) { if (item == NULL) { // No threads were waiting data->handed_off = false; } else { // There was at least one waiting thread, hand off the item *item = data->item; data->handed_off = true; } data->queue->has_threads_waiting = has_more_waiters; } /*[clinic input] @critical_section _queue.SimpleQueue.put item: object block: bool = True timeout: object = None Put the item on the queue. The optional 'block' and 'timeout' arguments are ignored, as this method never blocks. They are provided for compatibility with the Queue class. [clinic start generated code]*/ static PyObject * _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, int block, PyObject *timeout) /*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/ { HandoffData data = { .handed_off = 0, .item = Py_NewRef(item), .queue = self, }; if (self->has_threads_waiting) { // Try to hand the item off directly if there are threads waiting _PyParkingLot_Unpark(&self->has_threads_waiting, (_Py_unpark_fn_t *)maybe_handoff_item, &data); } if (!data.handed_off) { if (RingBuf_Put(&self->buf, item) < 0) { return NULL; } } Py_RETURN_NONE; } /*[clinic input] @critical_section _queue.SimpleQueue.put_nowait item: object Put an item into the queue without blocking. This is exactly equivalent to `put(item)` and is only provided for compatibility with the Queue class. [clinic start generated code]*/ static PyObject * _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item) /*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/ { return _queue_SimpleQueue_put_impl(self, item, 0, Py_None); } static PyObject * empty_error(PyTypeObject *cls) { PyObject *module = PyType_GetModule(cls); assert(module != NULL); simplequeue_state *state = simplequeue_get_state(module); PyErr_SetNone(state->EmptyError); return NULL; } /*[clinic input] @critical_section _queue.SimpleQueue.get cls: defining_class / block: bool = True timeout as timeout_obj: object = None Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). [clinic start generated code]*/ static PyObject * _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, int block, PyObject *timeout_obj) /*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/ { _PyTime_t endtime = 0; // XXX Use PyThread_ParseTimeoutArg(). if (block != 0 && !Py_IsNone(timeout_obj)) { /* With timeout */ _PyTime_t timeout; if (_PyTime_FromSecondsObject(&timeout, timeout_obj, _PyTime_ROUND_CEILING) < 0) { return NULL; } if (timeout < 0) { PyErr_SetString(PyExc_ValueError, "'timeout' must be a non-negative number"); return NULL; } endtime = _PyDeadline_Init(timeout); } for (;;) { if (!RingBuf_IsEmpty(&self->buf)) { return RingBuf_Get(&self->buf); } if (!block) { return empty_error(cls); } int64_t timeout_ns = -1; if (endtime != 0) { timeout_ns = _PyDeadline_Get(endtime); if (timeout_ns < 0) { return empty_error(cls); } } bool waiting = 1; self->has_threads_waiting = waiting; PyObject *item = NULL; int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting, sizeof(bool), timeout_ns, &item, /* detach */ 1); switch (st) { case Py_PARK_OK: { assert(item != NULL); return item; } case Py_PARK_TIMEOUT: { return empty_error(cls); } case Py_PARK_INTR: { // Interrupted if (Py_MakePendingCalls() < 0) { return NULL; } break; } case Py_PARK_AGAIN: { // This should be impossible with the current implementation of // PyParkingLot, but would be possible if critical sections / // the GIL were released before the thread was added to the // internal thread queue in the parking lot. break; } default: { Py_UNREACHABLE(); } } } } /*[clinic input] @critical_section _queue.SimpleQueue.get_nowait cls: defining_class / Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. [clinic start generated code]*/ static PyObject * _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self, PyTypeObject *cls) /*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/ { return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None); } /*[clinic input] @critical_section _queue.SimpleQueue.empty -> bool Return True if the queue is empty, False otherwise (not reliable!). [clinic start generated code]*/ static int _queue_SimpleQueue_empty_impl(simplequeueobject *self) /*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/ { return RingBuf_IsEmpty(&self->buf); } /*[clinic input] @critical_section _queue.SimpleQueue.qsize -> Py_ssize_t Return the approximate size of the queue (not reliable!). [clinic start generated code]*/ static Py_ssize_t _queue_SimpleQueue_qsize_impl(simplequeueobject *self) /*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/ { return RingBuf_Len(&self->buf); } static int queue_traverse(PyObject *m, visitproc visit, void *arg) { simplequeue_state *state = simplequeue_get_state(m); Py_VISIT(state->SimpleQueueType); Py_VISIT(state->EmptyError); return 0; } static int queue_clear(PyObject *m) { simplequeue_state *state = simplequeue_get_state(m); Py_CLEAR(state->SimpleQueueType); Py_CLEAR(state->EmptyError); return 0; } static void queue_free(void *m) { queue_clear((PyObject *)m); } #include "clinic/_queuemodule.c.h" static PyMethodDef simplequeue_methods[] = { _QUEUE_SIMPLEQUEUE_EMPTY_METHODDEF _QUEUE_SIMPLEQUEUE_GET_METHODDEF _QUEUE_SIMPLEQUEUE_GET_NOWAIT_METHODDEF _QUEUE_SIMPLEQUEUE_PUT_METHODDEF _QUEUE_SIMPLEQUEUE_PUT_NOWAIT_METHODDEF _QUEUE_SIMPLEQUEUE_QSIZE_METHODDEF {"__class_getitem__", Py_GenericAlias, METH_O|METH_CLASS, PyDoc_STR("See PEP 585")}, {NULL, NULL} /* sentinel */ }; static struct PyMemberDef simplequeue_members[] = { {"__weaklistoffset__", Py_T_PYSSIZET, offsetof(simplequeueobject, weakreflist), Py_READONLY}, {NULL}, }; static PyType_Slot simplequeue_slots[] = { {Py_tp_dealloc, simplequeue_dealloc}, {Py_tp_doc, (void *)simplequeue_new__doc__}, {Py_tp_traverse, simplequeue_traverse}, {Py_tp_clear, simplequeue_clear}, {Py_tp_members, simplequeue_members}, {Py_tp_methods, simplequeue_methods}, {Py_tp_new, simplequeue_new}, {0, NULL}, }; static PyType_Spec simplequeue_spec = { .name = "_queue.SimpleQueue", .basicsize = sizeof(simplequeueobject), .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_IMMUTABLETYPE), .slots = simplequeue_slots, }; /* Initialization function */ PyDoc_STRVAR(queue_module_doc, "C implementation of the Python queue module.\n\ This module is an implementation detail, please do not use it directly."); static int queuemodule_exec(PyObject *module) { simplequeue_state *state = simplequeue_get_state(module); state->EmptyError = PyErr_NewExceptionWithDoc( "_queue.Empty", "Exception raised by Queue.get(block=0)/get_nowait().", NULL, NULL); if (state->EmptyError == NULL) { return -1; } if (PyModule_AddObjectRef(module, "Empty", state->EmptyError) < 0) { return -1; } state->SimpleQueueType = (PyTypeObject *)PyType_FromModuleAndSpec( module, &simplequeue_spec, NULL); if (state->SimpleQueueType == NULL) { return -1; } if (PyModule_AddType(module, state->SimpleQueueType) < 0) { return -1; } return 0; } static PyModuleDef_Slot queuemodule_slots[] = { {Py_mod_exec, queuemodule_exec}, {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED}, {0, NULL} }; static struct PyModuleDef queuemodule = { .m_base = PyModuleDef_HEAD_INIT, .m_name = "_queue", .m_doc = queue_module_doc, .m_size = sizeof(simplequeue_state), .m_slots = queuemodule_slots, .m_traverse = queue_traverse, .m_clear = queue_clear, .m_free = queue_free, }; PyMODINIT_FUNC PyInit__queue(void) { return PyModuleDef_Init(&queuemodule); }