gh-113884: Refactor `queue.SimpleQueue` to use a ring buffer to store items (#114259)

Use a ring buffer instead of a Python list in order to simplify the
process of making queue.SimpleQueue thread-safe in free-threaded
builds. The ring buffer implementation has no places where critical
sections may be released.
This commit is contained in:
mpage 2024-01-19 04:17:51 -08:00 committed by GitHub
parent 05e47202a3
commit 28eacf27ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 169 additions and 40 deletions

View File

@ -7,6 +7,7 @@
#include "pycore_moduleobject.h" // _PyModule_GetState()
#include "pycore_time.h" // _PyTime_t
#include <stdbool.h>
#include <stddef.h> // offsetof()
typedef struct {
@ -25,12 +26,167 @@ static struct PyModuleDef queuemodule;
#define simplequeue_get_state_by_type(type) \
(simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))
static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;
typedef struct {
// Where to place the next item
Py_ssize_t put_idx;
// Where to get the next item
Py_ssize_t get_idx;
PyObject **items;
// Total number of items that may be stored
Py_ssize_t items_cap;
// Number of items stored
Py_ssize_t num_items;
} RingBuf;
static int
RingBuf_Init(RingBuf *buf)
{
buf->put_idx = 0;
buf->get_idx = 0;
buf->items_cap = INITIAL_RING_BUF_CAPACITY;
buf->num_items = 0;
buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
if (buf->items == NULL) {
PyErr_NoMemory();
return -1;
}
return 0;
}
static PyObject *
RingBuf_At(RingBuf *buf, Py_ssize_t idx)
{
assert(idx >= 0 && idx < buf->num_items);
return buf->items[(buf->get_idx + idx) % buf->items_cap];
}
static void
RingBuf_Fini(RingBuf *buf)
{
PyObject **items = buf->items;
Py_ssize_t num_items = buf->num_items;
Py_ssize_t cap = buf->items_cap;
Py_ssize_t idx = buf->get_idx;
buf->items = NULL;
buf->put_idx = 0;
buf->get_idx = 0;
buf->num_items = 0;
buf->items_cap = 0;
for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
Py_DECREF(items[idx]);
}
PyMem_Free(items);
}
// Resize the underlying items array of buf to the new capacity and arrange
// the items contiguously in the new items array.
//
// Returns -1 on allocation failure or 0 on success.
static int
resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
{
Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
if (new_capacity == buf->items_cap) {
return 0;
}
assert(buf->num_items <= new_capacity);
PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
if (new_items == NULL) {
return -1;
}
// Copy the "tail" of the old items array. This corresponds to "head" of
// the abstract ring buffer.
Py_ssize_t tail_size =
Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
if (tail_size > 0) {
memcpy(new_items, buf->items + buf->get_idx,
tail_size * sizeof(PyObject *));
}
// Copy the "head" of the old items array, if any. This corresponds to the
// "tail" of the abstract ring buffer.
Py_ssize_t head_size = buf->num_items - tail_size;
if (head_size > 0) {
memcpy(new_items + tail_size, buf->items,
head_size * sizeof(PyObject *));
}
PyMem_Free(buf->items);
buf->items = new_items;
buf->items_cap = new_capacity;
buf->get_idx = 0;
buf->put_idx = buf->num_items;
return 0;
}
// Returns a strong reference from the head of the buffer.
static PyObject *
RingBuf_Get(RingBuf *buf)
{
assert(buf->num_items > 0);
if (buf->num_items < (buf->items_cap / 4)) {
// Items is less than 25% occupied, shrink it by 50%. This allows for
// growth without immediately needing to resize the underlying items
// array.
//
// It's safe it ignore allocation failures here; shrinking is an
// optimization that isn't required for correctness.
(void)resize_ringbuf(buf, buf->items_cap / 2);
}
PyObject *item = buf->items[buf->get_idx];
buf->items[buf->get_idx] = NULL;
buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
buf->num_items--;
return item;
}
// Returns 0 on success or -1 if the buffer failed to grow
static int
RingBuf_Put(RingBuf *buf, PyObject *item)
{
assert(buf->num_items <= buf->items_cap);
if (buf->num_items == buf->items_cap) {
// Buffer is full, grow it.
if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
PyErr_NoMemory();
return -1;
}
}
buf->items[buf->put_idx] = Py_NewRef(item);
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
buf->num_items++;
return 0;
}
static Py_ssize_t
RingBuf_Len(RingBuf *buf)
{
return buf->num_items;
}
static bool
RingBuf_IsEmpty(RingBuf *buf)
{
return buf->num_items == 0;
}
typedef struct {
PyObject_HEAD
PyThread_type_lock lock;
int locked;
PyObject *lst;
Py_ssize_t lst_pos;
RingBuf buf;
PyObject *weakreflist;
} simplequeueobject;
@ -43,7 +199,7 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
static int
simplequeue_clear(simplequeueobject *self)
{
Py_CLEAR(self->lst);
RingBuf_Fini(&self->buf);
return 0;
}
@ -69,7 +225,10 @@ simplequeue_dealloc(simplequeueobject *self)
static int
simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
{
Py_VISIT(self->lst);
RingBuf *buf = &self->buf;
for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
Py_VISIT(RingBuf_At(buf, i));
}
Py_VISIT(Py_TYPE(self));
return 0;
}
@ -90,15 +249,13 @@ simplequeue_new_impl(PyTypeObject *type)
self = (simplequeueobject *) type->tp_alloc(type, 0);
if (self != NULL) {
self->weakreflist = NULL;
self->lst = PyList_New(0);
self->lock = PyThread_allocate_lock();
self->lst_pos = 0;
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
return NULL;
}
if (self->lst == NULL) {
if (RingBuf_Init(&self->buf) < 0) {
Py_DECREF(self);
return NULL;
}
@ -126,7 +283,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
{
/* BEGIN GIL-protected critical section */
if (PyList_Append(self->lst, item) < 0)
if (RingBuf_Put(&self->buf, item) < 0)
return NULL;
if (self->locked) {
/* A get() may be waiting, wake it up */
@ -155,33 +312,6 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
}
static PyObject *
simplequeue_pop_item(simplequeueobject *self)
{
Py_ssize_t count, n;
PyObject *item;
n = PyList_GET_SIZE(self->lst);
assert(self->lst_pos < n);
item = PyList_GET_ITEM(self->lst, self->lst_pos);
Py_INCREF(Py_None);
PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
self->lst_pos += 1;
count = n - self->lst_pos;
if (self->lst_pos > count) {
/* The list is more than 50% empty, reclaim space at the beginning */
if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
/* Undo pop */
self->lst_pos -= 1;
PyList_SET_ITEM(self->lst, self->lst_pos, item);
return NULL;
}
self->lst_pos = 0;
}
return item;
}
/*[clinic input]
_queue.SimpleQueue.get
@ -249,7 +379,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
* So we simply try to acquire the lock in a loop, until the condition
* (queue non-empty) becomes true.
*/
while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
while (RingBuf_IsEmpty(&self->buf)) {
/* First a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
@ -279,8 +409,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
}
/* BEGIN GIL-protected critical section */
assert(self->lst_pos < PyList_GET_SIZE(self->lst));
item = simplequeue_pop_item(self);
item = RingBuf_Get(&self->buf);
if (self->locked) {
PyThread_release_lock(self->lock);
self->locked = 0;
@ -320,7 +449,7 @@ static int
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
{
return self->lst_pos == PyList_GET_SIZE(self->lst);
return RingBuf_IsEmpty(&self->buf);
}
/*[clinic input]
@ -333,7 +462,7 @@ static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
{
return PyList_GET_SIZE(self->lst) - self->lst_pos;
return RingBuf_Len(&self->buf);
}
static int