gh-101659: Use the Raw Allocator in the _xxinterpchannels Module (gh-103287)

Using the raw allocator for any of the global state makes sense, especially as we move to a per-interpreter obmalloc state (gh-101660).
This commit is contained in:
Eric Snow 2023-04-05 15:13:12 -06:00 committed by GitHub
parent de18267685
commit aa5a9b5eb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 93 additions and 22 deletions

View File

@ -10,9 +10,77 @@
#include "pycore_interpreteridobject.h" #include "pycore_interpreteridobject.h"
/*
This module has the following process-global state:
_globals (static struct globals):
module_count (int)
channels (struct _channels):
numopen (int64_t)
next_id; (int64_t)
mutex (PyThread_type_lock)
head (linked list of struct _channelref *):
id (int64_t)
objcount (Py_ssize_t)
next (struct _channelref *):
...
chan (struct _channel *):
open (int)
mutex (PyThread_type_lock)
closing (struct _channel_closing *):
ref (struct _channelref *):
...
ends (struct _channelends *):
numsendopen (int64_t)
numrecvopen (int64_t)
send (struct _channelend *):
interp (int64_t)
open (int)
next (struct _channelend *)
recv (struct _channelend *):
...
queue (struct _channelqueue *):
count (int64_t)
first (struct _channelitem *):
next (struct _channelitem *):
...
data (_PyCrossInterpreterData *):
data (void *)
obj (PyObject *)
interp (int64_t)
new_object (xid_newobjectfunc)
free (xid_freefunc)
last (struct _channelitem *):
...
The above state includes the following allocations by the module:
* 1 top-level mutex (to protect the rest of the state)
* for each channel:
* 1 struct _channelref
* 1 struct _channel
* 0-1 struct _channel_closing
* 1 struct _channelends
* 2 struct _channelend
* 1 struct _channelqueue
* for each item in each channel:
* 1 struct _channelitem
* 1 _PyCrossInterpreterData
The only objects in that global state are the references held by each
channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
API.. The module does not create any objects that are shared globally.
*/
#define MODULE_NAME "_xxinterpchannels" #define MODULE_NAME "_xxinterpchannels"
#define GLOBAL_MALLOC(TYPE) \
PyMem_RawMalloc(sizeof(TYPE))
#define GLOBAL_FREE(VAR) \
PyMem_RawFree(VAR)
static PyInterpreterState * static PyInterpreterState *
_get_current_interp(void) _get_current_interp(void)
{ {
@ -301,7 +369,7 @@ typedef struct _channelitem {
static _channelitem * static _channelitem *
_channelitem_new(void) _channelitem_new(void)
{ {
_channelitem *item = PyMem_NEW(_channelitem, 1); _channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) { if (item == NULL) {
PyErr_NoMemory(); PyErr_NoMemory();
return NULL; return NULL;
@ -316,7 +384,8 @@ _channelitem_clear(_channelitem *item)
{ {
if (item->data != NULL) { if (item->data != NULL) {
(void)_release_xid_data(item->data, 1); (void)_release_xid_data(item->data, 1);
PyMem_Free(item->data); // It was allocated in _channel_send().
GLOBAL_FREE(item->data);
item->data = NULL; item->data = NULL;
} }
item->next = NULL; item->next = NULL;
@ -326,7 +395,7 @@ static void
_channelitem_free(_channelitem *item) _channelitem_free(_channelitem *item)
{ {
_channelitem_clear(item); _channelitem_clear(item);
PyMem_Free(item); GLOBAL_FREE(item);
} }
static void static void
@ -357,7 +426,7 @@ typedef struct _channelqueue {
static _channelqueue * static _channelqueue *
_channelqueue_new(void) _channelqueue_new(void)
{ {
_channelqueue *queue = PyMem_NEW(_channelqueue, 1); _channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
if (queue == NULL) { if (queue == NULL) {
PyErr_NoMemory(); PyErr_NoMemory();
return NULL; return NULL;
@ -381,7 +450,7 @@ static void
_channelqueue_free(_channelqueue *queue) _channelqueue_free(_channelqueue *queue)
{ {
_channelqueue_clear(queue); _channelqueue_clear(queue);
PyMem_Free(queue); GLOBAL_FREE(queue);
} }
static int static int
@ -433,7 +502,7 @@ typedef struct _channelend {
static _channelend * static _channelend *
_channelend_new(int64_t interp) _channelend_new(int64_t interp)
{ {
_channelend *end = PyMem_NEW(_channelend, 1); _channelend *end = GLOBAL_MALLOC(_channelend);
if (end == NULL) { if (end == NULL) {
PyErr_NoMemory(); PyErr_NoMemory();
return NULL; return NULL;
@ -447,7 +516,7 @@ _channelend_new(int64_t interp)
static void static void
_channelend_free(_channelend *end) _channelend_free(_channelend *end)
{ {
PyMem_Free(end); GLOBAL_FREE(end);
} }
static void static void
@ -492,7 +561,7 @@ typedef struct _channelassociations {
static _channelends * static _channelends *
_channelends_new(void) _channelends_new(void)
{ {
_channelends *ends = PyMem_NEW(_channelends, 1); _channelends *ends = GLOBAL_MALLOC(_channelends);
if (ends== NULL) { if (ends== NULL) {
return NULL; return NULL;
} }
@ -519,7 +588,7 @@ static void
_channelends_free(_channelends *ends) _channelends_free(_channelends *ends)
{ {
_channelends_clear(ends); _channelends_clear(ends);
PyMem_Free(ends); GLOBAL_FREE(ends);
} }
static _channelend * static _channelend *
@ -660,20 +729,20 @@ typedef struct _channel {
static _PyChannelState * static _PyChannelState *
_channel_new(PyThread_type_lock mutex) _channel_new(PyThread_type_lock mutex)
{ {
_PyChannelState *chan = PyMem_NEW(_PyChannelState, 1); _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
if (chan == NULL) { if (chan == NULL) {
return NULL; return NULL;
} }
chan->mutex = mutex; chan->mutex = mutex;
chan->queue = _channelqueue_new(); chan->queue = _channelqueue_new();
if (chan->queue == NULL) { if (chan->queue == NULL) {
PyMem_Free(chan); GLOBAL_FREE(chan);
return NULL; return NULL;
} }
chan->ends = _channelends_new(); chan->ends = _channelends_new();
if (chan->ends == NULL) { if (chan->ends == NULL) {
_channelqueue_free(chan->queue); _channelqueue_free(chan->queue);
PyMem_Free(chan); GLOBAL_FREE(chan);
return NULL; return NULL;
} }
chan->open = 1; chan->open = 1;
@ -691,7 +760,7 @@ _channel_free(_PyChannelState *chan)
PyThread_release_lock(chan->mutex); PyThread_release_lock(chan->mutex);
PyThread_free_lock(chan->mutex); PyThread_free_lock(chan->mutex);
PyMem_Free(chan); GLOBAL_FREE(chan);
} }
static int static int
@ -814,7 +883,7 @@ typedef struct _channelref {
static _channelref * static _channelref *
_channelref_new(int64_t id, _PyChannelState *chan) _channelref_new(int64_t id, _PyChannelState *chan)
{ {
_channelref *ref = PyMem_NEW(_channelref, 1); _channelref *ref = GLOBAL_MALLOC(_channelref);
if (ref == NULL) { if (ref == NULL) {
return NULL; return NULL;
} }
@ -841,7 +910,7 @@ _channelref_free(_channelref *ref)
_channel_clear_closing(ref->chan); _channel_clear_closing(ref->chan);
} }
//_channelref_clear(ref); //_channelref_clear(ref);
PyMem_Free(ref); GLOBAL_FREE(ref);
} }
static _channelref * static _channelref *
@ -1163,7 +1232,7 @@ _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
res = ERR_CHANNEL_CLOSED; res = ERR_CHANNEL_CLOSED;
goto done; goto done;
} }
chan->closing = PyMem_NEW(struct _channel_closing, 1); chan->closing = GLOBAL_MALLOC(struct _channel_closing);
if (chan->closing == NULL) { if (chan->closing == NULL) {
goto done; goto done;
} }
@ -1179,7 +1248,7 @@ static void
_channel_clear_closing(struct _channel *chan) { _channel_clear_closing(struct _channel *chan) {
PyThread_acquire_lock(chan->mutex, WAIT_LOCK); PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
if (chan->closing != NULL) { if (chan->closing != NULL) {
PyMem_Free(chan->closing); GLOBAL_FREE(chan->closing);
chan->closing = NULL; chan->closing = NULL;
} }
PyThread_release_lock(chan->mutex); PyThread_release_lock(chan->mutex);
@ -1257,14 +1326,14 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj)
} }
// Convert the object to cross-interpreter data. // Convert the object to cross-interpreter data.
_PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1); _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
if (data == NULL) { if (data == NULL) {
PyThread_release_lock(mutex); PyThread_release_lock(mutex);
return -1; return -1;
} }
if (_PyObject_GetCrossInterpreterData(obj, data) != 0) { if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
PyThread_release_lock(mutex); PyThread_release_lock(mutex);
PyMem_Free(data); GLOBAL_FREE(data);
return -1; return -1;
} }
@ -1274,7 +1343,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj)
if (res != 0) { if (res != 0) {
// We may chain an exception here: // We may chain an exception here:
(void)_release_xid_data(data, 0); (void)_release_xid_data(data, 0);
PyMem_Free(data); GLOBAL_FREE(data);
return res; return res;
} }
@ -1323,11 +1392,13 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res)
if (obj == NULL) { if (obj == NULL) {
assert(PyErr_Occurred()); assert(PyErr_Occurred());
(void)_release_xid_data(data, 1); (void)_release_xid_data(data, 1);
PyMem_Free(data); // It was allocated in _channel_send().
GLOBAL_FREE(data);
return -1; return -1;
} }
int release_res = _release_xid_data(data, 0); int release_res = _release_xid_data(data, 0);
PyMem_Free(data); // It was allocated in _channel_send().
GLOBAL_FREE(data);
if (release_res < 0) { if (release_res < 0) {
// The source interpreter has been destroyed already. // The source interpreter has been destroyed already.
assert(PyErr_Occurred()); assert(PyErr_Occurred());