gh-76785: Add SendChannel.send_buffer() (#110246)

(This is still a test module.)
This commit is contained in:
Eric Snow 2023-10-09 07:39:51 -06:00 committed by GitHub
parent f4cb0d27cc
commit 7bd560ce8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 467 additions and 67 deletions

View File

@ -41,12 +41,26 @@ extern void _PyEval_InitState(PyInterpreterState *, PyThread_type_lock);
extern void _PyEval_FiniState(struct _ceval_state *ceval);
extern void _PyEval_SignalReceived(PyInterpreterState *interp);
// bitwise flags:
#define _Py_PENDING_MAINTHREADONLY 1
#define _Py_PENDING_RAWFREE 2
// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyInterpreterState *interp,
_Py_pending_call_func func,
void *arg,
int mainthreadonly);
int flags);
typedef int (*_Py_simple_func)(void *);
extern int _Py_CallInInterpreter(
PyInterpreterState *interp,
_Py_simple_func func,
void *arg);
extern int _Py_CallInInterpreterAndRawFree(
PyInterpreterState *interp,
_Py_simple_func func,
void *arg);
extern void _PyEval_SignalAsyncExc(PyInterpreterState *interp);
#ifdef HAVE_FORK

View File

@ -22,6 +22,7 @@ struct _pending_calls {
struct _pending_call {
_Py_pending_call_func func;
void *arg;
int flags;
} calls[NPENDINGCALLS];
int first;
int last;

View File

@ -267,7 +267,8 @@ _PyInterpreterState_SetFinalizing(PyInterpreterState *interp, PyThreadState *tst
}
extern PyInterpreterState* _PyInterpreterState_LookUpID(int64_t);
// Export for the _xxinterpchannels module.
PyAPI_FUNC(PyInterpreterState *) _PyInterpreterState_LookUpID(int64_t);
extern int _PyInterpreterState_IDInitref(PyInterpreterState *);
extern int _PyInterpreterState_IDIncref(PyInterpreterState *);

View File

@ -0,0 +1,21 @@
#ifndef Py_INTERNAL_PYBUFFER_H
#define Py_INTERNAL_PYBUFFER_H
#ifdef __cplusplus
extern "C" {
#endif
#ifndef Py_BUILD_CORE
# error "this header requires Py_BUILD_CORE define"
#endif
// Exported for the _xxinterpchannels module.
PyAPI_FUNC(int) _PyBuffer_ReleaseInInterpreter(
PyInterpreterState *interp, Py_buffer *view);
PyAPI_FUNC(int) _PyBuffer_ReleaseInInterpreterAndRawFree(
PyInterpreterState *interp, Py_buffer *view);
#ifdef __cplusplus
}
#endif
#endif /* !Py_INTERNAL_PYBUFFER_H */

View File

@ -225,6 +225,21 @@ class SendChannel(_ChannelEnd):
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj)
def send_buffer(self, obj):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
_channels.send_buffer(self._id, obj)
def send_buffer_nowait(self, obj):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
return _channels.send_buffer(self._id, obj)
def close(self):
_channels.close(self._id, send=True)

View File

@ -703,6 +703,21 @@ class ChannelTests(TestBase):
channels.recv(cid2)
del cid2
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
cid = channels.create()
channels.send_buffer(cid, buf)
obj = channels.recv(cid)
self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview)
self.assertEqual(obj, buf)
buf[4:8] = b'eggs'
self.assertEqual(obj, buf)
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)
def test_allowed_types(self):
cid = channels.create()
objects = [

View File

@ -1067,3 +1067,46 @@ class TestSendRecv(TestBase):
self.assertEqual(obj4, b'spam')
self.assertEqual(obj5, b'eggs')
self.assertIs(obj6, default)
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
obj = None
rch, sch = interpreters.create_channel()
def f():
nonlocal obj
while True:
try:
obj = rch.recv()
break
except interpreters.ChannelEmptyError:
time.sleep(0.1)
t = threading.Thread(target=f)
t.start()
sch.send_buffer(buf)
t.join()
self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview)
self.assertEqual(obj, buf)
buf[4:8] = b'eggs'
self.assertEqual(obj, buf)
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)
def test_send_buffer_nowait(self):
buf = bytearray(b'spamspamspam')
rch, sch = interpreters.create_channel()
sch.send_buffer_nowait(buf)
obj = rch.recv()
self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview)
self.assertEqual(obj, buf)
buf[4:8] = b'eggs'
self.assertEqual(obj, buf)
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)

View File

@ -1791,6 +1791,7 @@ PYTHON_HEADERS= \
$(srcdir)/Include/internal/pycore_parking_lot.h \
$(srcdir)/Include/internal/pycore_pathconfig.h \
$(srcdir)/Include/internal/pycore_pyarena.h \
$(srcdir)/Include/internal/pycore_pybuffer.h \
$(srcdir)/Include/internal/pycore_pyerrors.h \
$(srcdir)/Include/internal/pycore_pyhash.h \
$(srcdir)/Include/internal/pycore_pylifecycle.h \

View File

@ -1,8 +1,14 @@
/* interpreters module */
/* low-level access to interpreter primitives */
#ifndef Py_BUILD_CORE_BUILTIN
# define Py_BUILD_CORE_MODULE 1
#endif
#include "Python.h"
#include "interpreteridobject.h"
#include "pycore_pybuffer.h" // _PyBuffer_ReleaseInInterpreterAndRawFree()
#include "pycore_interp.h" // _PyInterpreterState_LookUpID()
/*
@ -76,6 +82,73 @@ API.. The module does not create any objects that are shared globally.
PyMem_RawFree(VAR)
struct xid_class_registry {
size_t count;
#define MAX_XID_CLASSES 5
struct {
PyTypeObject *cls;
} added[MAX_XID_CLASSES];
};
static int
register_xid_class(PyTypeObject *cls, crossinterpdatafunc shared,
struct xid_class_registry *classes)
{
int res = _PyCrossInterpreterData_RegisterClass(cls, shared);
if (res == 0) {
assert(classes->count < MAX_XID_CLASSES);
// The class has refs elsewhere, so we need to incref here.
classes->added[classes->count].cls = cls;
classes->count += 1;
}
return res;
}
static void
clear_xid_class_registry(struct xid_class_registry *classes)
{
while (classes->count > 0) {
classes->count -= 1;
PyTypeObject *cls = classes->added[classes->count].cls;
_PyCrossInterpreterData_UnregisterClass(cls);
}
}
#define XID_IGNORE_EXC 1
#define XID_FREE 2
static int
_release_xid_data(_PyCrossInterpreterData *data, int flags)
{
int ignoreexc = flags & XID_IGNORE_EXC;
PyObject *exc;
if (ignoreexc) {
exc = PyErr_GetRaisedException();
}
int res;
if (flags & XID_FREE) {
res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
}
else {
res = _PyCrossInterpreterData_Release(data);
}
if (res < 0) {
/* The owning interpreter is already destroyed. */
if (ignoreexc) {
// XXX Emit a warning?
PyErr_Clear();
}
}
if (flags & XID_FREE) {
/* Either way, we free the data. */
}
if (ignoreexc) {
PyErr_SetRaisedException(exc);
}
return res;
}
static PyInterpreterState *
_get_current_interp(void)
{
@ -140,7 +213,8 @@ add_new_exception(PyObject *mod, const char *name, PyObject *base)
add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
static PyTypeObject *
add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared,
struct xid_class_registry *classes)
{
PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
NULL, mod, spec, NULL);
@ -152,7 +226,7 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
return NULL;
}
if (shared != NULL) {
if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
if (register_xid_class(cls, shared, classes)) {
Py_DECREF(cls);
return NULL;
}
@ -160,49 +234,149 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
return cls;
}
#define XID_IGNORE_EXC 1
#define XID_FREE 2
static int
_release_xid_data(_PyCrossInterpreterData *data, int flags)
/* Cross-interpreter Buffer Views *******************************************/
// XXX Release when the original interpreter is destroyed.
typedef struct {
PyObject_HEAD
Py_buffer *view;
int64_t interp;
} XIBufferViewObject;
static PyObject *
xibufferview_from_xid(PyTypeObject *cls, _PyCrossInterpreterData *data)
{
int ignoreexc = flags & XID_IGNORE_EXC;
PyObject *exc;
if (ignoreexc) {
exc = PyErr_GetRaisedException();
assert(data->data != NULL);
assert(data->obj == NULL);
assert(data->interp >= 0);
XIBufferViewObject *self = PyObject_Malloc(sizeof(XIBufferViewObject));
if (self == NULL) {
return NULL;
}
int res;
if (flags & XID_FREE) {
res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
}
else {
res = _PyCrossInterpreterData_Release(data);
}
if (res < 0) {
/* The owning interpreter is already destroyed. */
if (ignoreexc) {
PyObject_Init((PyObject *)self, cls);
self->view = (Py_buffer *)data->data;
self->interp = data->interp;
return (PyObject *)self;
}
static void
xibufferview_dealloc(XIBufferViewObject *self)
{
PyInterpreterState *interp = _PyInterpreterState_LookUpID(self->interp);
/* If the interpreter is no longer alive then we have problems,
since other objects may be using the buffer still. */
assert(interp != NULL);
if (_PyBuffer_ReleaseInInterpreterAndRawFree(interp, self->view) < 0) {
// XXX Emit a warning?
PyErr_Clear();
}
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free(self);
/* "Instances of heap-allocated types hold a reference to their type."
* See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
* See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
*/
// XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
// like we do for _abc._abc_data?
Py_DECREF(tp);
}
static int
xibufferview_getbuf(XIBufferViewObject *self, Py_buffer *view, int flags)
{
/* Only PyMemoryView_FromObject() should ever call this,
via _memoryview_from_xid() below. */
*view = *self->view;
view->obj = (PyObject *)self;
// XXX Should we leave it alone?
view->internal = NULL;
return 0;
}
static PyType_Slot XIBufferViewType_slots[] = {
{Py_tp_dealloc, (destructor)xibufferview_dealloc},
{Py_bf_getbuffer, (getbufferproc)xibufferview_getbuf},
// We don't bother with Py_bf_releasebuffer since we don't need it.
{0, NULL},
};
static PyType_Spec XIBufferViewType_spec = {
.name = MODULE_NAME ".CrossInterpreterBufferView",
.basicsize = sizeof(XIBufferViewObject),
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
.slots = XIBufferViewType_slots,
};
/* extra XID types **********************************************************/
static PyTypeObject * _get_current_xibufferview_type(void);
static PyObject *
_memoryview_from_xid(_PyCrossInterpreterData *data)
{
PyTypeObject *cls = _get_current_xibufferview_type();
if (cls == NULL) {
return NULL;
}
if (flags & XID_FREE) {
/* Either way, we free the data. */
PyObject *obj = xibufferview_from_xid(cls, data);
if (obj == NULL) {
return NULL;
}
if (ignoreexc) {
PyErr_SetRaisedException(exc);
return PyMemoryView_FromObject(obj);
}
static int
_memoryview_shared(PyThreadState *tstate, PyObject *obj,
_PyCrossInterpreterData *data)
{
Py_buffer *view = PyMem_RawMalloc(sizeof(Py_buffer));
if (view == NULL) {
return -1;
}
return res;
if (PyObject_GetBuffer(obj, view, PyBUF_FULL_RO) < 0) {
PyMem_RawFree(view);
return -1;
}
_PyCrossInterpreterData_Init(data, tstate->interp, view, NULL,
_memoryview_from_xid);
return 0;
}
static int
register_builtin_xid_types(struct xid_class_registry *classes)
{
PyTypeObject *cls;
crossinterpdatafunc func;
// builtin memoryview
cls = &PyMemoryView_Type;
func = _memoryview_shared;
if (register_xid_class(cls, func, classes)) {
return -1;
}
return 0;
}
/* module state *************************************************************/
typedef struct {
struct xid_class_registry xid_classes;
/* Added at runtime by interpreters module. */
PyTypeObject *send_channel_type;
PyTypeObject *recv_channel_type;
/* heap types */
PyTypeObject *ChannelIDType;
PyTypeObject *XIBufferViewType;
/* exceptions */
PyObject *ChannelError;
@ -241,6 +415,7 @@ traverse_module_state(module_state *state, visitproc visit, void *arg)
{
/* heap types */
Py_VISIT(state->ChannelIDType);
Py_VISIT(state->XIBufferViewType);
/* exceptions */
Py_VISIT(state->ChannelError);
@ -263,6 +438,7 @@ clear_module_state(module_state *state)
(void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
}
Py_CLEAR(state->ChannelIDType);
Py_CLEAR(state->XIBufferViewType);
/* exceptions */
Py_CLEAR(state->ChannelError);
@ -275,6 +451,17 @@ clear_module_state(module_state *state)
}
static PyTypeObject *
_get_current_xibufferview_type(void)
{
module_state *state = _get_current_module_state();
if (state == NULL) {
return NULL;
}
return state->XIBufferViewType;
}
/* channel-specific code ****************************************************/
#define CHANNEL_SEND 1
@ -2045,6 +2232,7 @@ set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
if (state == NULL) {
return -1;
}
struct xid_class_registry *xid_classes = &state->xid_classes;
if (state->send_channel_type != NULL
|| state->recv_channel_type != NULL)
@ -2055,10 +2243,10 @@ set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
state->send_channel_type = (PyTypeObject *)Py_NewRef(send);
state->recv_channel_type = (PyTypeObject *)Py_NewRef(recv);
if (_PyCrossInterpreterData_RegisterClass(send, _channel_end_shared)) {
if (register_xid_class(send, _channel_end_shared, xid_classes)) {
return -1;
}
if (_PyCrossInterpreterData_RegisterClass(recv, _channel_end_shared)) {
if (register_xid_class(recv, _channel_end_shared, xid_classes)) {
return -1;
}
@ -2325,6 +2513,40 @@ PyDoc_STRVAR(channel_send_doc,
\n\
Add the object's data to the channel's queue.");
static PyObject *
channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", NULL};
int64_t cid;
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&O:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj)) {
return NULL;
}
cid = cid_data.cid;
PyObject *tempobj = PyMemoryView_FromObject(obj);
if (tempobj == NULL) {
return NULL;
}
int err = _channel_send(&_globals.channels, cid, tempobj);
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Py_RETURN_NONE;
}
PyDoc_STRVAR(channel_send_buffer_doc,
"channel_send_buffer(cid, obj)\n\
\n\
Add the object's buffer to the channel's queue.");
static PyObject *
channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
{
@ -2516,6 +2738,8 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
{"send", _PyCFunction_CAST(channel_send),
METH_VARARGS | METH_KEYWORDS, channel_send_doc},
{"send_buffer", _PyCFunction_CAST(channel_send_buffer),
METH_VARARGS | METH_KEYWORDS, channel_send_buffer_doc},
{"recv", _PyCFunction_CAST(channel_recv),
METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
{"close", _PyCFunction_CAST(channel_close),
@ -2543,6 +2767,13 @@ module_exec(PyObject *mod)
if (_globals_init() != 0) {
return -1;
}
struct xid_class_registry *xid_classes = NULL;
module_state *state = get_module_state(mod);
if (state == NULL) {
goto error;
}
xid_classes = &state->xid_classes;
/* Add exception types */
if (exceptions_init(mod) != 0) {
@ -2550,25 +2781,34 @@ module_exec(PyObject *mod)
}
/* Add other types */
module_state *state = get_module_state(mod);
if (state == NULL) {
goto error;
}
// ChannelID
state->ChannelIDType = add_new_type(
mod, &ChannelIDType_spec, _channelid_shared);
mod, &ChannelIDType_spec, _channelid_shared, xid_classes);
if (state->ChannelIDType == NULL) {
goto error;
}
// Make sure chnnels drop objects owned by this interpreter
state->XIBufferViewType = add_new_type(mod, &XIBufferViewType_spec, NULL,
xid_classes);
if (state->XIBufferViewType == NULL) {
goto error;
}
if (register_builtin_xid_types(xid_classes) < 0) {
goto error;
}
/* Make sure chnnels drop objects owned by this interpreter. */
PyInterpreterState *interp = _get_current_interp();
PyUnstable_AtExit(interp, clear_interpreter, (void *)interp);
return 0;
error:
if (xid_classes != NULL) {
clear_xid_class_registry(xid_classes);
}
_globals_fini();
return -1;
}
@ -2593,6 +2833,11 @@ module_clear(PyObject *mod)
{
module_state *state = get_module_state(mod);
assert(state != NULL);
// Before clearing anything, we unregister the various XID types. */
clear_xid_class_registry(&state->xid_classes);
// Now we clear the module state.
clear_module_state(state);
return 0;
}
@ -2602,7 +2847,13 @@ module_free(void *mod)
{
module_state *state = get_module_state(mod);
assert(state != NULL);
// Before clearing anything, we unregister the various XID types. */
clear_xid_class_registry(&state->xid_classes);
// Now we clear the module state.
clear_module_state(state);
_globals_fini();
}

View File

@ -315,7 +315,7 @@ trip_signal(int sig_num)
_PyEval_AddPendingCall(interp,
report_wakeup_send_error,
(void *)(intptr_t) last_error,
1);
_Py_PENDING_MAINTHREADONLY);
}
}
}
@ -335,7 +335,7 @@ trip_signal(int sig_num)
_PyEval_AddPendingCall(interp,
report_wakeup_write_error,
(void *)(intptr_t)errno,
1);
_Py_PENDING_MAINTHREADONLY);
}
}
}

View File

@ -2,6 +2,7 @@
#include "Python.h"
#include "pycore_abstract.h" // _PyIndex_Check()
#include "pycore_pybuffer.h"
#include "pycore_call.h" // _PyObject_CallNoArgs()
#include "pycore_ceval.h" // _Py_EnterRecursiveCallTstate()
#include "pycore_object.h" // _Py_CheckSlotResult()
@ -806,6 +807,27 @@ PyBuffer_Release(Py_buffer *view)
Py_DECREF(obj);
}
static int
_buffer_release_call(void *arg)
{
PyBuffer_Release((Py_buffer *)arg);
return 0;
}
int
_PyBuffer_ReleaseInInterpreter(PyInterpreterState *interp,
Py_buffer *view)
{
return _Py_CallInInterpreter(interp, _buffer_release_call, view);
}
int
_PyBuffer_ReleaseInInterpreterAndRawFree(PyInterpreterState *interp,
Py_buffer *view)
{
return _Py_CallInInterpreterAndRawFree(interp, _buffer_release_call, view);
}
PyObject *
PyObject_Format(PyObject *obj, PyObject *format_spec)
{

View File

@ -663,7 +663,7 @@ _PyEval_SignalReceived(PyInterpreterState *interp)
/* Push one item onto the queue while holding the lock. */
static int
_push_pending_call(struct _pending_calls *pending,
_Py_pending_call_func func, void *arg)
_Py_pending_call_func func, void *arg, int flags)
{
int i = pending->last;
int j = (i + 1) % NPENDINGCALLS;
@ -672,6 +672,7 @@ _push_pending_call(struct _pending_calls *pending,
}
pending->calls[i].func = func;
pending->calls[i].arg = arg;
pending->calls[i].flags = flags;
pending->last = j;
assert(pending->calls_to_do < NPENDINGCALLS);
pending->calls_to_do++;
@ -680,7 +681,7 @@ _push_pending_call(struct _pending_calls *pending,
static int
_next_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg)
int (**func)(void *), void **arg, int *flags)
{
int i = pending->first;
if (i == pending->last) {
@ -690,15 +691,16 @@ _next_pending_call(struct _pending_calls *pending,
}
*func = pending->calls[i].func;
*arg = pending->calls[i].arg;
*flags = pending->calls[i].flags;
return i;
}
/* Pop one item off the queue while holding the lock. */
static void
_pop_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg)
int (**func)(void *), void **arg, int *flags)
{
int i = _next_pending_call(pending, func, arg);
int i = _next_pending_call(pending, func, arg, flags);
if (i >= 0) {
pending->calls[i] = (struct _pending_call){0};
pending->first = (i + 1) % NPENDINGCALLS;
@ -714,12 +716,12 @@ _pop_pending_call(struct _pending_calls *pending,
int
_PyEval_AddPendingCall(PyInterpreterState *interp,
_Py_pending_call_func func, void *arg,
int mainthreadonly)
_Py_pending_call_func func, void *arg, int flags)
{
assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
assert(!(flags & _Py_PENDING_MAINTHREADONLY)
|| _Py_IsMainInterpreter(interp));
struct _pending_calls *pending = &interp->ceval.pending;
if (mainthreadonly) {
if (flags & _Py_PENDING_MAINTHREADONLY) {
/* The main thread only exists in the main interpreter. */
assert(_Py_IsMainInterpreter(interp));
pending = &_PyRuntime.ceval.pending_mainthread;
@ -729,7 +731,7 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
assert(pending->lock != NULL);
PyThread_acquire_lock(pending->lock, WAIT_LOCK);
int result = _push_pending_call(pending, func, arg);
int result = _push_pending_call(pending, func, arg, flags);
PyThread_release_lock(pending->lock);
/* signal main loop */
@ -743,7 +745,7 @@ Py_AddPendingCall(_Py_pending_call_func func, void *arg)
/* Legacy users of this API will continue to target the main thread
(of the main interpreter). */
PyInterpreterState *interp = _PyInterpreterState_Main();
return _PyEval_AddPendingCall(interp, func, arg, 1);
return _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
}
static int
@ -769,17 +771,22 @@ _make_pending_calls(struct _pending_calls *pending)
for (int i=0; i<NPENDINGCALLS; i++) {
_Py_pending_call_func func = NULL;
void *arg = NULL;
int flags = 0;
/* pop one item off the queue while holding the lock */
PyThread_acquire_lock(pending->lock, WAIT_LOCK);
_pop_pending_call(pending, &func, &arg);
_pop_pending_call(pending, &func, &arg, &flags);
PyThread_release_lock(pending->lock);
/* having released the lock, perform the callback */
if (func == NULL) {
break;
}
if (func(arg) != 0) {
int res = func(arg);
if ((flags & _Py_PENDING_RAWFREE) && arg != NULL) {
PyMem_RawFree(arg);
}
if (res != 0) {
return -1;
}
}

View File

@ -2584,18 +2584,36 @@ _PyCrossInterpreterData_NewObject(_PyCrossInterpreterData *data)
return data->new_object(data);
}
static int
_release_xidata_pending(void *data)
int
_Py_CallInInterpreter(PyInterpreterState *interp,
_Py_simple_func func, void *arg)
{
_xidata_clear((_PyCrossInterpreterData *)data);
if (interp == current_fast_get(interp->runtime)->interp) {
return func(arg);
}
// XXX Emit a warning if this fails?
_PyEval_AddPendingCall(interp, (_Py_pending_call_func)func, arg, 0);
return 0;
}
int
_Py_CallInInterpreterAndRawFree(PyInterpreterState *interp,
_Py_simple_func func, void *arg)
{
if (interp == current_fast_get(interp->runtime)->interp) {
int res = func(arg);
PyMem_RawFree(arg);
return res;
}
// XXX Emit a warning if this fails?
_PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_RAWFREE);
return 0;
}
static int
_xidata_release_and_rawfree_pending(void *data)
_call_clear_xidata(void *data)
{
_xidata_clear((_PyCrossInterpreterData *)data);
PyMem_RawFree(data);
return 0;
}
@ -2627,21 +2645,12 @@ _xidata_release(_PyCrossInterpreterData *data, int rawfree)
}
// "Release" the data and/or the object.
if (interp == current_fast_get(interp->runtime)->interp) {
_xidata_clear(data);
if (rawfree) {
PyMem_RawFree(data);
}
return _Py_CallInInterpreterAndRawFree(interp, _call_clear_xidata, data);
}
else {
_Py_pending_call_func func = _release_xidata_pending;
if (rawfree) {
func = _xidata_release_and_rawfree_pending;
return _Py_CallInInterpreter(interp, _call_clear_xidata, data);
}
// XXX Emit a warning if this fails?
_PyEval_AddPendingCall(interp, func, data, 0);
}
return 0;
}
int