mirror of https://github.com/python/cpython
3382 lines
87 KiB
C
3382 lines
87 KiB
C
/* interpreters module */
|
|
/* low-level access to interpreter primitives */
|
|
|
|
#ifndef Py_BUILD_CORE_BUILTIN
|
|
# define Py_BUILD_CORE_MODULE 1
|
|
#endif
|
|
|
|
#include "Python.h"
|
|
#include "pycore_crossinterp.h" // struct _xid
|
|
#include "pycore_interp.h" // _PyInterpreterState_LookUpID()
|
|
#include "pycore_pystate.h" // _PyInterpreterState_GetIDObject()
|
|
|
|
#ifdef MS_WINDOWS
|
|
#define WIN32_LEAN_AND_MEAN
|
|
#include <windows.h> // SwitchToThread()
|
|
#elif defined(HAVE_SCHED_H)
|
|
#include <sched.h> // sched_yield()
|
|
#endif
|
|
|
|
#define REGISTERS_HEAP_TYPES
|
|
#include "_interpreters_common.h"
|
|
#undef REGISTERS_HEAP_TYPES
|
|
|
|
|
|
/*
|
|
This module has the following process-global state:
|
|
|
|
_globals (static struct globals):
|
|
module_count (int)
|
|
channels (struct _channels):
|
|
numopen (int64_t)
|
|
next_id; (int64_t)
|
|
mutex (PyThread_type_lock)
|
|
head (linked list of struct _channelref *):
|
|
cid (int64_t)
|
|
objcount (Py_ssize_t)
|
|
next (struct _channelref *):
|
|
...
|
|
chan (struct _channel *):
|
|
open (int)
|
|
mutex (PyThread_type_lock)
|
|
closing (struct _channel_closing *):
|
|
ref (struct _channelref *):
|
|
...
|
|
ends (struct _channelends *):
|
|
numsendopen (int64_t)
|
|
numrecvopen (int64_t)
|
|
send (struct _channelend *):
|
|
interpid (int64_t)
|
|
open (int)
|
|
next (struct _channelend *)
|
|
recv (struct _channelend *):
|
|
...
|
|
queue (struct _channelqueue *):
|
|
count (int64_t)
|
|
first (struct _channelitem *):
|
|
next (struct _channelitem *):
|
|
...
|
|
data (_PyCrossInterpreterData *):
|
|
data (void *)
|
|
obj (PyObject *)
|
|
interpid (int64_t)
|
|
new_object (xid_newobjectfunc)
|
|
free (xid_freefunc)
|
|
last (struct _channelitem *):
|
|
...
|
|
|
|
The above state includes the following allocations by the module:
|
|
|
|
* 1 top-level mutex (to protect the rest of the state)
|
|
* for each channel:
|
|
* 1 struct _channelref
|
|
* 1 struct _channel
|
|
* 0-1 struct _channel_closing
|
|
* 1 struct _channelends
|
|
* 2 struct _channelend
|
|
* 1 struct _channelqueue
|
|
* for each item in each channel:
|
|
* 1 struct _channelitem
|
|
* 1 _PyCrossInterpreterData
|
|
|
|
The only objects in that global state are the references held by each
|
|
channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
|
|
API.. The module does not create any objects that are shared globally.
|
|
*/
|
|
|
|
#define MODULE_NAME _interpchannels
|
|
#define MODULE_NAME_STR Py_STRINGIFY(MODULE_NAME)
|
|
#define MODINIT_FUNC_NAME RESOLVE_MODINIT_FUNC_NAME(MODULE_NAME)
|
|
|
|
|
|
#define GLOBAL_MALLOC(TYPE) \
|
|
PyMem_RawMalloc(sizeof(TYPE))
|
|
#define GLOBAL_FREE(VAR) \
|
|
PyMem_RawFree(VAR)
|
|
|
|
|
|
#define XID_IGNORE_EXC 1
|
|
#define XID_FREE 2
|
|
|
|
static int
|
|
_release_xid_data(_PyCrossInterpreterData *data, int flags)
|
|
{
|
|
int ignoreexc = flags & XID_IGNORE_EXC;
|
|
PyObject *exc;
|
|
if (ignoreexc) {
|
|
exc = PyErr_GetRaisedException();
|
|
}
|
|
int res;
|
|
if (flags & XID_FREE) {
|
|
res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
|
|
}
|
|
else {
|
|
res = _PyCrossInterpreterData_Release(data);
|
|
}
|
|
if (res < 0) {
|
|
/* The owning interpreter is already destroyed. */
|
|
if (ignoreexc) {
|
|
// XXX Emit a warning?
|
|
PyErr_Clear();
|
|
}
|
|
}
|
|
if (flags & XID_FREE) {
|
|
/* Either way, we free the data. */
|
|
}
|
|
if (ignoreexc) {
|
|
PyErr_SetRaisedException(exc);
|
|
}
|
|
return res;
|
|
}
|
|
|
|
|
|
static PyInterpreterState *
|
|
_get_current_interp(void)
|
|
{
|
|
// PyInterpreterState_Get() aborts if lookup fails, so don't need
|
|
// to check the result for NULL.
|
|
return PyInterpreterState_Get();
|
|
}
|
|
|
|
static PyObject *
|
|
_get_current_module(void)
|
|
{
|
|
PyObject *name = PyUnicode_FromString(MODULE_NAME_STR);
|
|
if (name == NULL) {
|
|
return NULL;
|
|
}
|
|
PyObject *mod = PyImport_GetModule(name);
|
|
Py_DECREF(name);
|
|
if (mod == NULL) {
|
|
return NULL;
|
|
}
|
|
assert(mod != Py_None);
|
|
return mod;
|
|
}
|
|
|
|
static PyObject *
|
|
get_module_from_owned_type(PyTypeObject *cls)
|
|
{
|
|
assert(cls != NULL);
|
|
return _get_current_module();
|
|
// XXX Use the more efficient API now that we use heap types:
|
|
//return PyType_GetModule(cls);
|
|
}
|
|
|
|
static struct PyModuleDef moduledef;
|
|
|
|
static PyObject *
|
|
get_module_from_type(PyTypeObject *cls)
|
|
{
|
|
assert(cls != NULL);
|
|
return _get_current_module();
|
|
// XXX Use the more efficient API now that we use heap types:
|
|
//return PyType_GetModuleByDef(cls, &moduledef);
|
|
}
|
|
|
|
static PyObject *
|
|
add_new_exception(PyObject *mod, const char *name, PyObject *base)
|
|
{
|
|
assert(!PyObject_HasAttrStringWithError(mod, name));
|
|
PyObject *exctype = PyErr_NewException(name, base, NULL);
|
|
if (exctype == NULL) {
|
|
return NULL;
|
|
}
|
|
int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
|
|
if (res < 0) {
|
|
Py_DECREF(exctype);
|
|
return NULL;
|
|
}
|
|
return exctype;
|
|
}
|
|
|
|
#define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
|
|
add_new_exception(MOD, MODULE_NAME_STR "." Py_STRINGIFY(NAME), BASE)
|
|
|
|
static int
|
|
wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout)
|
|
{
|
|
PyLockStatus res = PyThread_acquire_lock_timed_with_retries(mutex, timeout);
|
|
if (res == PY_LOCK_INTR) {
|
|
/* KeyboardInterrupt, etc. */
|
|
assert(PyErr_Occurred());
|
|
return -1;
|
|
}
|
|
else if (res == PY_LOCK_FAILURE) {
|
|
assert(!PyErr_Occurred());
|
|
assert(timeout > 0);
|
|
PyErr_SetString(PyExc_TimeoutError, "timed out");
|
|
return -1;
|
|
}
|
|
assert(res == PY_LOCK_ACQUIRED);
|
|
PyThread_release_lock(mutex);
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* module state *************************************************************/
|
|
|
|
typedef struct {
|
|
/* Added at runtime by interpreters module. */
|
|
PyTypeObject *send_channel_type;
|
|
PyTypeObject *recv_channel_type;
|
|
|
|
/* heap types */
|
|
PyTypeObject *ChannelInfoType;
|
|
PyTypeObject *ChannelIDType;
|
|
|
|
/* exceptions */
|
|
PyObject *ChannelError;
|
|
PyObject *ChannelNotFoundError;
|
|
PyObject *ChannelClosedError;
|
|
PyObject *ChannelEmptyError;
|
|
PyObject *ChannelNotEmptyError;
|
|
} module_state;
|
|
|
|
static inline module_state *
|
|
get_module_state(PyObject *mod)
|
|
{
|
|
assert(mod != NULL);
|
|
module_state *state = PyModule_GetState(mod);
|
|
assert(state != NULL);
|
|
return state;
|
|
}
|
|
|
|
static module_state *
|
|
_get_current_module_state(void)
|
|
{
|
|
PyObject *mod = _get_current_module();
|
|
if (mod == NULL) {
|
|
// XXX import it?
|
|
PyErr_SetString(PyExc_RuntimeError,
|
|
MODULE_NAME_STR " module not imported yet");
|
|
return NULL;
|
|
}
|
|
module_state *state = get_module_state(mod);
|
|
Py_DECREF(mod);
|
|
return state;
|
|
}
|
|
|
|
static int
|
|
traverse_module_state(module_state *state, visitproc visit, void *arg)
|
|
{
|
|
/* external types */
|
|
Py_VISIT(state->send_channel_type);
|
|
Py_VISIT(state->recv_channel_type);
|
|
|
|
/* heap types */
|
|
Py_VISIT(state->ChannelInfoType);
|
|
Py_VISIT(state->ChannelIDType);
|
|
|
|
/* exceptions */
|
|
Py_VISIT(state->ChannelError);
|
|
Py_VISIT(state->ChannelNotFoundError);
|
|
Py_VISIT(state->ChannelClosedError);
|
|
Py_VISIT(state->ChannelEmptyError);
|
|
Py_VISIT(state->ChannelNotEmptyError);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
clear_xid_types(module_state *state)
|
|
{
|
|
/* external types */
|
|
if (state->send_channel_type != NULL) {
|
|
(void)clear_xid_class(state->send_channel_type);
|
|
Py_CLEAR(state->send_channel_type);
|
|
}
|
|
if (state->recv_channel_type != NULL) {
|
|
(void)clear_xid_class(state->recv_channel_type);
|
|
Py_CLEAR(state->recv_channel_type);
|
|
}
|
|
|
|
/* heap types */
|
|
if (state->ChannelIDType != NULL) {
|
|
(void)clear_xid_class(state->ChannelIDType);
|
|
Py_CLEAR(state->ChannelIDType);
|
|
}
|
|
}
|
|
|
|
static int
|
|
clear_module_state(module_state *state)
|
|
{
|
|
clear_xid_types(state);
|
|
|
|
/* heap types */
|
|
Py_CLEAR(state->ChannelInfoType);
|
|
|
|
/* exceptions */
|
|
Py_CLEAR(state->ChannelError);
|
|
Py_CLEAR(state->ChannelNotFoundError);
|
|
Py_CLEAR(state->ChannelClosedError);
|
|
Py_CLEAR(state->ChannelEmptyError);
|
|
Py_CLEAR(state->ChannelNotEmptyError);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* channel-specific code ****************************************************/
|
|
|
|
#define CHANNEL_SEND 1
|
|
#define CHANNEL_BOTH 0
|
|
#define CHANNEL_RECV -1
|
|
|
|
|
|
/* channel errors */
|
|
|
|
#define ERR_CHANNEL_NOT_FOUND -2
|
|
#define ERR_CHANNEL_CLOSED -3
|
|
#define ERR_CHANNEL_INTERP_CLOSED -4
|
|
#define ERR_CHANNEL_EMPTY -5
|
|
#define ERR_CHANNEL_NOT_EMPTY -6
|
|
#define ERR_CHANNEL_MUTEX_INIT -7
|
|
#define ERR_CHANNELS_MUTEX_INIT -8
|
|
#define ERR_NO_NEXT_CHANNEL_ID -9
|
|
#define ERR_CHANNEL_CLOSED_WAITING -10
|
|
|
|
static int
|
|
exceptions_init(PyObject *mod)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
if (state == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
#define ADD(NAME, BASE) \
|
|
do { \
|
|
assert(state->NAME == NULL); \
|
|
state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
|
|
if (state->NAME == NULL) { \
|
|
return -1; \
|
|
} \
|
|
} while (0)
|
|
|
|
// A channel-related operation failed.
|
|
ADD(ChannelError, PyExc_RuntimeError);
|
|
// An operation tried to use a channel that doesn't exist.
|
|
ADD(ChannelNotFoundError, state->ChannelError);
|
|
// An operation tried to use a closed channel.
|
|
ADD(ChannelClosedError, state->ChannelError);
|
|
// An operation tried to pop from an empty channel.
|
|
ADD(ChannelEmptyError, state->ChannelError);
|
|
// An operation tried to close a non-empty channel.
|
|
ADD(ChannelNotEmptyError, state->ChannelError);
|
|
#undef ADD
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
handle_channel_error(int err, PyObject *mod, int64_t cid)
|
|
{
|
|
if (err == 0) {
|
|
assert(!PyErr_Occurred());
|
|
return 0;
|
|
}
|
|
assert(err < 0);
|
|
module_state *state = get_module_state(mod);
|
|
assert(state != NULL);
|
|
if (err == ERR_CHANNEL_NOT_FOUND) {
|
|
PyErr_Format(state->ChannelNotFoundError,
|
|
"channel %" PRId64 " not found", cid);
|
|
}
|
|
else if (err == ERR_CHANNEL_CLOSED) {
|
|
PyErr_Format(state->ChannelClosedError,
|
|
"channel %" PRId64 " is closed", cid);
|
|
}
|
|
else if (err == ERR_CHANNEL_CLOSED_WAITING) {
|
|
PyErr_Format(state->ChannelClosedError,
|
|
"channel %" PRId64 " has closed", cid);
|
|
}
|
|
else if (err == ERR_CHANNEL_INTERP_CLOSED) {
|
|
PyErr_Format(state->ChannelClosedError,
|
|
"channel %" PRId64 " is already closed", cid);
|
|
}
|
|
else if (err == ERR_CHANNEL_EMPTY) {
|
|
PyErr_Format(state->ChannelEmptyError,
|
|
"channel %" PRId64 " is empty", cid);
|
|
}
|
|
else if (err == ERR_CHANNEL_NOT_EMPTY) {
|
|
PyErr_Format(state->ChannelNotEmptyError,
|
|
"channel %" PRId64 " may not be closed "
|
|
"if not empty (try force=True)",
|
|
cid);
|
|
}
|
|
else if (err == ERR_CHANNEL_MUTEX_INIT) {
|
|
PyErr_SetString(state->ChannelError,
|
|
"can't initialize mutex for new channel");
|
|
}
|
|
else if (err == ERR_CHANNELS_MUTEX_INIT) {
|
|
PyErr_SetString(state->ChannelError,
|
|
"can't initialize mutex for channel management");
|
|
}
|
|
else if (err == ERR_NO_NEXT_CHANNEL_ID) {
|
|
PyErr_SetString(state->ChannelError,
|
|
"failed to get a channel ID");
|
|
}
|
|
else {
|
|
assert(PyErr_Occurred());
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
|
|
/* the channel queue */
|
|
|
|
typedef uintptr_t _channelitem_id_t;
|
|
|
|
typedef struct wait_info {
|
|
PyThread_type_lock mutex;
|
|
enum {
|
|
WAITING_NO_STATUS = 0,
|
|
WAITING_ACQUIRED = 1,
|
|
WAITING_RELEASING = 2,
|
|
WAITING_RELEASED = 3,
|
|
} status;
|
|
int received;
|
|
_channelitem_id_t itemid;
|
|
} _waiting_t;
|
|
|
|
static int
|
|
_waiting_init(_waiting_t *waiting)
|
|
{
|
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
|
if (mutex == NULL) {
|
|
PyErr_NoMemory();
|
|
return -1;
|
|
}
|
|
|
|
*waiting = (_waiting_t){
|
|
.mutex = mutex,
|
|
.status = WAITING_NO_STATUS,
|
|
};
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_waiting_clear(_waiting_t *waiting)
|
|
{
|
|
assert(waiting->status != WAITING_ACQUIRED
|
|
&& waiting->status != WAITING_RELEASING);
|
|
if (waiting->mutex != NULL) {
|
|
PyThread_free_lock(waiting->mutex);
|
|
waiting->mutex = NULL;
|
|
}
|
|
}
|
|
|
|
static _channelitem_id_t
|
|
_waiting_get_itemid(_waiting_t *waiting)
|
|
{
|
|
return waiting->itemid;
|
|
}
|
|
|
|
static void
|
|
_waiting_acquire(_waiting_t *waiting)
|
|
{
|
|
assert(waiting->status == WAITING_NO_STATUS);
|
|
PyThread_acquire_lock(waiting->mutex, NOWAIT_LOCK);
|
|
waiting->status = WAITING_ACQUIRED;
|
|
}
|
|
|
|
static void
|
|
_waiting_release(_waiting_t *waiting, int received)
|
|
{
|
|
assert(waiting->mutex != NULL);
|
|
assert(waiting->status == WAITING_ACQUIRED);
|
|
assert(!waiting->received);
|
|
|
|
waiting->status = WAITING_RELEASING;
|
|
PyThread_release_lock(waiting->mutex);
|
|
if (waiting->received != received) {
|
|
assert(received == 1);
|
|
waiting->received = received;
|
|
}
|
|
waiting->status = WAITING_RELEASED;
|
|
}
|
|
|
|
static void
|
|
_waiting_finish_releasing(_waiting_t *waiting)
|
|
{
|
|
while (waiting->status == WAITING_RELEASING) {
|
|
#ifdef MS_WINDOWS
|
|
SwitchToThread();
|
|
#elif defined(HAVE_SCHED_H)
|
|
sched_yield();
|
|
#endif
|
|
}
|
|
}
|
|
|
|
struct _channelitem;
|
|
|
|
typedef struct _channelitem {
|
|
_PyCrossInterpreterData *data;
|
|
_waiting_t *waiting;
|
|
struct _channelitem *next;
|
|
} _channelitem;
|
|
|
|
static inline _channelitem_id_t
|
|
_channelitem_ID(_channelitem *item)
|
|
{
|
|
return (_channelitem_id_t)item;
|
|
}
|
|
|
|
static void
|
|
_channelitem_init(_channelitem *item,
|
|
_PyCrossInterpreterData *data, _waiting_t *waiting)
|
|
{
|
|
*item = (_channelitem){
|
|
.data = data,
|
|
.waiting = waiting,
|
|
};
|
|
if (waiting != NULL) {
|
|
waiting->itemid = _channelitem_ID(item);
|
|
}
|
|
}
|
|
|
|
static void
|
|
_channelitem_clear(_channelitem *item)
|
|
{
|
|
item->next = NULL;
|
|
|
|
if (item->data != NULL) {
|
|
// It was allocated in channel_send().
|
|
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
|
|
item->data = NULL;
|
|
}
|
|
|
|
if (item->waiting != NULL) {
|
|
if (item->waiting->status == WAITING_ACQUIRED) {
|
|
_waiting_release(item->waiting, 0);
|
|
}
|
|
item->waiting = NULL;
|
|
}
|
|
}
|
|
|
|
static _channelitem *
|
|
_channelitem_new(_PyCrossInterpreterData *data, _waiting_t *waiting)
|
|
{
|
|
_channelitem *item = GLOBAL_MALLOC(_channelitem);
|
|
if (item == NULL) {
|
|
PyErr_NoMemory();
|
|
return NULL;
|
|
}
|
|
_channelitem_init(item, data, waiting);
|
|
return item;
|
|
}
|
|
|
|
static void
|
|
_channelitem_free(_channelitem *item)
|
|
{
|
|
_channelitem_clear(item);
|
|
GLOBAL_FREE(item);
|
|
}
|
|
|
|
static void
|
|
_channelitem_free_all(_channelitem *item)
|
|
{
|
|
while (item != NULL) {
|
|
_channelitem *last = item;
|
|
item = item->next;
|
|
_channelitem_free(last);
|
|
}
|
|
}
|
|
|
|
static void
|
|
_channelitem_popped(_channelitem *item,
|
|
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
|
|
{
|
|
assert(item->waiting == NULL || item->waiting->status == WAITING_ACQUIRED);
|
|
*p_data = item->data;
|
|
*p_waiting = item->waiting;
|
|
// We clear them here, so they won't be released in _channelitem_clear().
|
|
item->data = NULL;
|
|
item->waiting = NULL;
|
|
_channelitem_free(item);
|
|
}
|
|
|
|
typedef struct _channelqueue {
|
|
int64_t count;
|
|
_channelitem *first;
|
|
_channelitem *last;
|
|
} _channelqueue;
|
|
|
|
static _channelqueue *
|
|
_channelqueue_new(void)
|
|
{
|
|
_channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
|
|
if (queue == NULL) {
|
|
PyErr_NoMemory();
|
|
return NULL;
|
|
}
|
|
queue->count = 0;
|
|
queue->first = NULL;
|
|
queue->last = NULL;
|
|
return queue;
|
|
}
|
|
|
|
static void
|
|
_channelqueue_clear(_channelqueue *queue)
|
|
{
|
|
_channelitem_free_all(queue->first);
|
|
queue->count = 0;
|
|
queue->first = NULL;
|
|
queue->last = NULL;
|
|
}
|
|
|
|
static void
|
|
_channelqueue_free(_channelqueue *queue)
|
|
{
|
|
_channelqueue_clear(queue);
|
|
GLOBAL_FREE(queue);
|
|
}
|
|
|
|
static int
|
|
_channelqueue_put(_channelqueue *queue,
|
|
_PyCrossInterpreterData *data, _waiting_t *waiting)
|
|
{
|
|
_channelitem *item = _channelitem_new(data, waiting);
|
|
if (item == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
queue->count += 1;
|
|
if (queue->first == NULL) {
|
|
queue->first = item;
|
|
}
|
|
else {
|
|
queue->last->next = item;
|
|
}
|
|
queue->last = item;
|
|
|
|
if (waiting != NULL) {
|
|
_waiting_acquire(waiting);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_channelqueue_get(_channelqueue *queue,
|
|
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
|
|
{
|
|
_channelitem *item = queue->first;
|
|
if (item == NULL) {
|
|
return ERR_CHANNEL_EMPTY;
|
|
}
|
|
queue->first = item->next;
|
|
if (queue->last == item) {
|
|
queue->last = NULL;
|
|
}
|
|
queue->count -= 1;
|
|
|
|
_channelitem_popped(item, p_data, p_waiting);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_channelqueue_find(_channelqueue *queue, _channelitem_id_t itemid,
|
|
_channelitem **p_item, _channelitem **p_prev)
|
|
{
|
|
_channelitem *prev = NULL;
|
|
_channelitem *item = NULL;
|
|
if (queue->first != NULL) {
|
|
if (_channelitem_ID(queue->first) == itemid) {
|
|
item = queue->first;
|
|
}
|
|
else {
|
|
prev = queue->first;
|
|
while (prev->next != NULL) {
|
|
if (_channelitem_ID(prev->next) == itemid) {
|
|
item = prev->next;
|
|
break;
|
|
}
|
|
prev = prev->next;
|
|
}
|
|
if (item == NULL) {
|
|
prev = NULL;
|
|
}
|
|
}
|
|
}
|
|
if (p_item != NULL) {
|
|
*p_item = item;
|
|
}
|
|
if (p_prev != NULL) {
|
|
*p_prev = prev;
|
|
}
|
|
return (item != NULL);
|
|
}
|
|
|
|
static void
|
|
_channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
|
|
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
|
|
{
|
|
_channelitem *prev = NULL;
|
|
_channelitem *item = NULL;
|
|
int found = _channelqueue_find(queue, itemid, &item, &prev);
|
|
if (!found) {
|
|
return;
|
|
}
|
|
|
|
assert(item->waiting != NULL);
|
|
assert(!item->waiting->received);
|
|
if (prev == NULL) {
|
|
assert(queue->first == item);
|
|
queue->first = item->next;
|
|
}
|
|
else {
|
|
assert(queue->first != item);
|
|
assert(prev->next == item);
|
|
prev->next = item->next;
|
|
}
|
|
item->next = NULL;
|
|
|
|
if (queue->last == item) {
|
|
queue->last = prev;
|
|
}
|
|
queue->count -= 1;
|
|
|
|
_channelitem_popped(item, p_data, p_waiting);
|
|
}
|
|
|
|
static void
|
|
_channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid)
|
|
{
|
|
_channelitem *prev = NULL;
|
|
_channelitem *next = queue->first;
|
|
while (next != NULL) {
|
|
_channelitem *item = next;
|
|
next = item->next;
|
|
if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
|
|
if (prev == NULL) {
|
|
queue->first = item->next;
|
|
}
|
|
else {
|
|
prev->next = item->next;
|
|
}
|
|
_channelitem_free(item);
|
|
queue->count -= 1;
|
|
}
|
|
else {
|
|
prev = item;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/* channel-interpreter associations */
|
|
|
|
struct _channelend;
|
|
|
|
typedef struct _channelend {
|
|
struct _channelend *next;
|
|
int64_t interpid;
|
|
int open;
|
|
} _channelend;
|
|
|
|
static _channelend *
|
|
_channelend_new(int64_t interpid)
|
|
{
|
|
_channelend *end = GLOBAL_MALLOC(_channelend);
|
|
if (end == NULL) {
|
|
PyErr_NoMemory();
|
|
return NULL;
|
|
}
|
|
end->next = NULL;
|
|
end->interpid = interpid;
|
|
end->open = 1;
|
|
return end;
|
|
}
|
|
|
|
static void
|
|
_channelend_free(_channelend *end)
|
|
{
|
|
GLOBAL_FREE(end);
|
|
}
|
|
|
|
static void
|
|
_channelend_free_all(_channelend *end)
|
|
{
|
|
while (end != NULL) {
|
|
_channelend *last = end;
|
|
end = end->next;
|
|
_channelend_free(last);
|
|
}
|
|
}
|
|
|
|
static _channelend *
|
|
_channelend_find(_channelend *first, int64_t interpid, _channelend **pprev)
|
|
{
|
|
_channelend *prev = NULL;
|
|
_channelend *end = first;
|
|
while (end != NULL) {
|
|
if (end->interpid == interpid) {
|
|
break;
|
|
}
|
|
prev = end;
|
|
end = end->next;
|
|
}
|
|
if (pprev != NULL) {
|
|
*pprev = prev;
|
|
}
|
|
return end;
|
|
}
|
|
|
|
typedef struct _channelassociations {
|
|
// Note that the list entries are never removed for interpreter
|
|
// for which the channel is closed. This should not be a problem in
|
|
// practice. Also, a channel isn't automatically closed when an
|
|
// interpreter is destroyed.
|
|
int64_t numsendopen;
|
|
int64_t numrecvopen;
|
|
_channelend *send;
|
|
_channelend *recv;
|
|
} _channelends;
|
|
|
|
static _channelends *
|
|
_channelends_new(void)
|
|
{
|
|
_channelends *ends = GLOBAL_MALLOC(_channelends);
|
|
if (ends== NULL) {
|
|
return NULL;
|
|
}
|
|
ends->numsendopen = 0;
|
|
ends->numrecvopen = 0;
|
|
ends->send = NULL;
|
|
ends->recv = NULL;
|
|
return ends;
|
|
}
|
|
|
|
static void
|
|
_channelends_clear(_channelends *ends)
|
|
{
|
|
_channelend_free_all(ends->send);
|
|
ends->send = NULL;
|
|
ends->numsendopen = 0;
|
|
|
|
_channelend_free_all(ends->recv);
|
|
ends->recv = NULL;
|
|
ends->numrecvopen = 0;
|
|
}
|
|
|
|
static void
|
|
_channelends_free(_channelends *ends)
|
|
{
|
|
_channelends_clear(ends);
|
|
GLOBAL_FREE(ends);
|
|
}
|
|
|
|
static _channelend *
|
|
_channelends_add(_channelends *ends, _channelend *prev, int64_t interpid,
|
|
int send)
|
|
{
|
|
_channelend *end = _channelend_new(interpid);
|
|
if (end == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
if (prev == NULL) {
|
|
if (send) {
|
|
ends->send = end;
|
|
}
|
|
else {
|
|
ends->recv = end;
|
|
}
|
|
}
|
|
else {
|
|
prev->next = end;
|
|
}
|
|
if (send) {
|
|
ends->numsendopen += 1;
|
|
}
|
|
else {
|
|
ends->numrecvopen += 1;
|
|
}
|
|
return end;
|
|
}
|
|
|
|
static int
|
|
_channelends_associate(_channelends *ends, int64_t interpid, int send)
|
|
{
|
|
_channelend *prev;
|
|
_channelend *end = _channelend_find(send ? ends->send : ends->recv,
|
|
interpid, &prev);
|
|
if (end != NULL) {
|
|
if (!end->open) {
|
|
return ERR_CHANNEL_CLOSED;
|
|
}
|
|
// already associated
|
|
return 0;
|
|
}
|
|
if (_channelends_add(ends, prev, interpid, send) == NULL) {
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
_channelends_is_open(_channelends *ends)
|
|
{
|
|
if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
|
|
// At least one interpreter is still associated with the channel
|
|
// (and hasn't been released).
|
|
return 1;
|
|
}
|
|
// XXX This is wrong if an end can ever be removed.
|
|
if (ends->send == NULL && ends->recv == NULL) {
|
|
// The channel has never had any interpreters associated with it.
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_channelends_release_end(_channelends *ends, _channelend *end, int send)
|
|
{
|
|
end->open = 0;
|
|
if (send) {
|
|
ends->numsendopen -= 1;
|
|
}
|
|
else {
|
|
ends->numrecvopen -= 1;
|
|
}
|
|
}
|
|
|
|
static int
|
|
_channelends_release_interpreter(_channelends *ends, int64_t interpid, int which)
|
|
{
|
|
_channelend *prev;
|
|
_channelend *end;
|
|
if (which >= 0) { // send/both
|
|
end = _channelend_find(ends->send, interpid, &prev);
|
|
if (end == NULL) {
|
|
// never associated so add it
|
|
end = _channelends_add(ends, prev, interpid, 1);
|
|
if (end == NULL) {
|
|
return -1;
|
|
}
|
|
}
|
|
_channelends_release_end(ends, end, 1);
|
|
}
|
|
if (which <= 0) { // recv/both
|
|
end = _channelend_find(ends->recv, interpid, &prev);
|
|
if (end == NULL) {
|
|
// never associated so add it
|
|
end = _channelends_add(ends, prev, interpid, 0);
|
|
if (end == NULL) {
|
|
return -1;
|
|
}
|
|
}
|
|
_channelends_release_end(ends, end, 0);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_channelends_release_all(_channelends *ends, int which, int force)
|
|
{
|
|
// XXX Handle the ends.
|
|
// XXX Handle force is True.
|
|
|
|
// Ensure all the "send"-associated interpreters are closed.
|
|
_channelend *end;
|
|
for (end = ends->send; end != NULL; end = end->next) {
|
|
_channelends_release_end(ends, end, 1);
|
|
}
|
|
|
|
// Ensure all the "recv"-associated interpreters are closed.
|
|
for (end = ends->recv; end != NULL; end = end->next) {
|
|
_channelends_release_end(ends, end, 0);
|
|
}
|
|
}
|
|
|
|
static void
|
|
_channelends_clear_interpreter(_channelends *ends, int64_t interpid)
|
|
{
|
|
// XXX Actually remove the entries?
|
|
_channelend *end;
|
|
end = _channelend_find(ends->send, interpid, NULL);
|
|
if (end != NULL) {
|
|
_channelends_release_end(ends, end, 1);
|
|
}
|
|
end = _channelend_find(ends->recv, interpid, NULL);
|
|
if (end != NULL) {
|
|
_channelends_release_end(ends, end, 0);
|
|
}
|
|
}
|
|
|
|
|
|
/* each channel's state */
|
|
|
|
struct _channel;
|
|
struct _channel_closing;
|
|
static void _channel_clear_closing(struct _channel *);
|
|
static void _channel_finish_closing(struct _channel *);
|
|
|
|
typedef struct _channel {
|
|
PyThread_type_lock mutex;
|
|
_channelqueue *queue;
|
|
_channelends *ends;
|
|
int open;
|
|
struct _channel_closing *closing;
|
|
} _channel_state;
|
|
|
|
static _channel_state *
|
|
_channel_new(PyThread_type_lock mutex)
|
|
{
|
|
_channel_state *chan = GLOBAL_MALLOC(_channel_state);
|
|
if (chan == NULL) {
|
|
return NULL;
|
|
}
|
|
chan->mutex = mutex;
|
|
chan->queue = _channelqueue_new();
|
|
if (chan->queue == NULL) {
|
|
GLOBAL_FREE(chan);
|
|
return NULL;
|
|
}
|
|
chan->ends = _channelends_new();
|
|
if (chan->ends == NULL) {
|
|
_channelqueue_free(chan->queue);
|
|
GLOBAL_FREE(chan);
|
|
return NULL;
|
|
}
|
|
chan->open = 1;
|
|
chan->closing = NULL;
|
|
return chan;
|
|
}
|
|
|
|
static void
|
|
_channel_free(_channel_state *chan)
|
|
{
|
|
_channel_clear_closing(chan);
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
_channelqueue_free(chan->queue);
|
|
_channelends_free(chan->ends);
|
|
PyThread_release_lock(chan->mutex);
|
|
|
|
PyThread_free_lock(chan->mutex);
|
|
GLOBAL_FREE(chan);
|
|
}
|
|
|
|
static int
|
|
_channel_add(_channel_state *chan, int64_t interpid,
|
|
_PyCrossInterpreterData *data, _waiting_t *waiting)
|
|
{
|
|
int res = -1;
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
|
|
if (!chan->open) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
if (_channelends_associate(chan->ends, interpid, 1) != 0) {
|
|
res = ERR_CHANNEL_INTERP_CLOSED;
|
|
goto done;
|
|
}
|
|
|
|
if (_channelqueue_put(chan->queue, data, waiting) != 0) {
|
|
goto done;
|
|
}
|
|
// Any errors past this point must cause a _waiting_release() call.
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(chan->mutex);
|
|
return res;
|
|
}
|
|
|
|
static int
|
|
_channel_next(_channel_state *chan, int64_t interpid,
|
|
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
|
|
{
|
|
int err = 0;
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
|
|
if (!chan->open) {
|
|
err = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
if (_channelends_associate(chan->ends, interpid, 0) != 0) {
|
|
err = ERR_CHANNEL_INTERP_CLOSED;
|
|
goto done;
|
|
}
|
|
|
|
int empty = _channelqueue_get(chan->queue, p_data, p_waiting);
|
|
assert(empty == 0 || empty == ERR_CHANNEL_EMPTY);
|
|
assert(!PyErr_Occurred());
|
|
if (empty && chan->closing != NULL) {
|
|
chan->open = 0;
|
|
}
|
|
|
|
done:
|
|
PyThread_release_lock(chan->mutex);
|
|
if (chan->queue->count == 0) {
|
|
_channel_finish_closing(chan);
|
|
}
|
|
return err;
|
|
}
|
|
|
|
static void
|
|
_channel_remove(_channel_state *chan, _channelitem_id_t itemid)
|
|
{
|
|
_PyCrossInterpreterData *data = NULL;
|
|
_waiting_t *waiting = NULL;
|
|
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
_channelqueue_remove(chan->queue, itemid, &data, &waiting);
|
|
PyThread_release_lock(chan->mutex);
|
|
|
|
(void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
|
|
if (waiting != NULL) {
|
|
_waiting_release(waiting, 0);
|
|
}
|
|
|
|
if (chan->queue->count == 0) {
|
|
_channel_finish_closing(chan);
|
|
}
|
|
}
|
|
|
|
static int
|
|
_channel_release_interpreter(_channel_state *chan, int64_t interpid, int end)
|
|
{
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
|
|
int res = -1;
|
|
if (!chan->open) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
|
|
if (_channelends_release_interpreter(chan->ends, interpid, end) != 0) {
|
|
goto done;
|
|
}
|
|
chan->open = _channelends_is_open(chan->ends);
|
|
// XXX Clear the queue if not empty?
|
|
// XXX Activate the "closing" mechanism?
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(chan->mutex);
|
|
return res;
|
|
}
|
|
|
|
static int
|
|
_channel_release_all(_channel_state *chan, int end, int force)
|
|
{
|
|
int res = -1;
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
|
|
if (!chan->open) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
|
|
if (!force && chan->queue->count > 0) {
|
|
res = ERR_CHANNEL_NOT_EMPTY;
|
|
goto done;
|
|
}
|
|
// XXX Clear the queue?
|
|
|
|
chan->open = 0;
|
|
|
|
// We *could* also just leave these in place, since we've marked
|
|
// the channel as closed already.
|
|
_channelends_release_all(chan->ends, end, force);
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(chan->mutex);
|
|
return res;
|
|
}
|
|
|
|
static void
|
|
_channel_clear_interpreter(_channel_state *chan, int64_t interpid)
|
|
{
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
|
|
_channelqueue_clear_interpreter(chan->queue, interpid);
|
|
_channelends_clear_interpreter(chan->ends, interpid);
|
|
chan->open = _channelends_is_open(chan->ends);
|
|
|
|
PyThread_release_lock(chan->mutex);
|
|
}
|
|
|
|
|
|
/* the set of channels */
|
|
|
|
struct _channelref;
|
|
|
|
typedef struct _channelref {
|
|
int64_t cid;
|
|
_channel_state *chan;
|
|
struct _channelref *next;
|
|
// The number of ChannelID objects referring to this channel.
|
|
Py_ssize_t objcount;
|
|
} _channelref;
|
|
|
|
static _channelref *
|
|
_channelref_new(int64_t cid, _channel_state *chan)
|
|
{
|
|
_channelref *ref = GLOBAL_MALLOC(_channelref);
|
|
if (ref == NULL) {
|
|
return NULL;
|
|
}
|
|
ref->cid = cid;
|
|
ref->chan = chan;
|
|
ref->next = NULL;
|
|
ref->objcount = 0;
|
|
return ref;
|
|
}
|
|
|
|
//static void
|
|
//_channelref_clear(_channelref *ref)
|
|
//{
|
|
// ref->cid = -1;
|
|
// ref->chan = NULL;
|
|
// ref->next = NULL;
|
|
// ref->objcount = 0;
|
|
//}
|
|
|
|
static void
|
|
_channelref_free(_channelref *ref)
|
|
{
|
|
if (ref->chan != NULL) {
|
|
_channel_clear_closing(ref->chan);
|
|
}
|
|
//_channelref_clear(ref);
|
|
GLOBAL_FREE(ref);
|
|
}
|
|
|
|
static _channelref *
|
|
_channelref_find(_channelref *first, int64_t cid, _channelref **pprev)
|
|
{
|
|
_channelref *prev = NULL;
|
|
_channelref *ref = first;
|
|
while (ref != NULL) {
|
|
if (ref->cid == cid) {
|
|
break;
|
|
}
|
|
prev = ref;
|
|
ref = ref->next;
|
|
}
|
|
if (pprev != NULL) {
|
|
*pprev = prev;
|
|
}
|
|
return ref;
|
|
}
|
|
|
|
|
|
typedef struct _channels {
|
|
PyThread_type_lock mutex;
|
|
_channelref *head;
|
|
int64_t numopen;
|
|
int64_t next_id;
|
|
} _channels;
|
|
|
|
static void
|
|
_channels_init(_channels *channels, PyThread_type_lock mutex)
|
|
{
|
|
channels->mutex = mutex;
|
|
channels->head = NULL;
|
|
channels->numopen = 0;
|
|
channels->next_id = 0;
|
|
}
|
|
|
|
static void
|
|
_channels_fini(_channels *channels)
|
|
{
|
|
assert(channels->numopen == 0);
|
|
assert(channels->head == NULL);
|
|
if (channels->mutex != NULL) {
|
|
PyThread_free_lock(channels->mutex);
|
|
channels->mutex = NULL;
|
|
}
|
|
}
|
|
|
|
static int64_t
|
|
_channels_next_id(_channels *channels) // needs lock
|
|
{
|
|
int64_t cid = channels->next_id;
|
|
if (cid < 0) {
|
|
/* overflow */
|
|
return -1;
|
|
}
|
|
channels->next_id += 1;
|
|
return cid;
|
|
}
|
|
|
|
static int
|
|
_channels_lookup(_channels *channels, int64_t cid, PyThread_type_lock *pmutex,
|
|
_channel_state **res)
|
|
{
|
|
int err = -1;
|
|
_channel_state *chan = NULL;
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
if (pmutex != NULL) {
|
|
*pmutex = NULL;
|
|
}
|
|
|
|
_channelref *ref = _channelref_find(channels->head, cid, NULL);
|
|
if (ref == NULL) {
|
|
err = ERR_CHANNEL_NOT_FOUND;
|
|
goto done;
|
|
}
|
|
if (ref->chan == NULL || !ref->chan->open) {
|
|
err = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
|
|
if (pmutex != NULL) {
|
|
// The mutex will be closed by the caller.
|
|
*pmutex = channels->mutex;
|
|
}
|
|
|
|
chan = ref->chan;
|
|
err = 0;
|
|
|
|
done:
|
|
if (pmutex == NULL || *pmutex == NULL) {
|
|
PyThread_release_lock(channels->mutex);
|
|
}
|
|
*res = chan;
|
|
return err;
|
|
}
|
|
|
|
static int64_t
|
|
_channels_add(_channels *channels, _channel_state *chan)
|
|
{
|
|
int64_t cid = -1;
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
|
|
// Create a new ref.
|
|
int64_t _cid = _channels_next_id(channels);
|
|
if (_cid < 0) {
|
|
cid = ERR_NO_NEXT_CHANNEL_ID;
|
|
goto done;
|
|
}
|
|
_channelref *ref = _channelref_new(_cid, chan);
|
|
if (ref == NULL) {
|
|
goto done;
|
|
}
|
|
|
|
// Add it to the list.
|
|
// We assume that the channel is a new one (not already in the list).
|
|
ref->next = channels->head;
|
|
channels->head = ref;
|
|
channels->numopen += 1;
|
|
|
|
cid = _cid;
|
|
done:
|
|
PyThread_release_lock(channels->mutex);
|
|
return cid;
|
|
}
|
|
|
|
/* forward */
|
|
static int _channel_set_closing(_channelref *, PyThread_type_lock);
|
|
|
|
static int
|
|
_channels_close(_channels *channels, int64_t cid, _channel_state **pchan,
|
|
int end, int force)
|
|
{
|
|
int res = -1;
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
if (pchan != NULL) {
|
|
*pchan = NULL;
|
|
}
|
|
|
|
_channelref *ref = _channelref_find(channels->head, cid, NULL);
|
|
if (ref == NULL) {
|
|
res = ERR_CHANNEL_NOT_FOUND;
|
|
goto done;
|
|
}
|
|
|
|
if (ref->chan == NULL) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
else {
|
|
int err = _channel_release_all(ref->chan, end, force);
|
|
if (err != 0) {
|
|
if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
|
|
if (ref->chan->closing != NULL) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
// Mark the channel as closing and return. The channel
|
|
// will be cleaned up in _channel_next().
|
|
PyErr_Clear();
|
|
int err = _channel_set_closing(ref, channels->mutex);
|
|
if (err != 0) {
|
|
res = err;
|
|
goto done;
|
|
}
|
|
if (pchan != NULL) {
|
|
*pchan = ref->chan;
|
|
}
|
|
res = 0;
|
|
}
|
|
else {
|
|
res = err;
|
|
}
|
|
goto done;
|
|
}
|
|
if (pchan != NULL) {
|
|
*pchan = ref->chan;
|
|
}
|
|
else {
|
|
_channel_free(ref->chan);
|
|
}
|
|
ref->chan = NULL;
|
|
}
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(channels->mutex);
|
|
return res;
|
|
}
|
|
|
|
static void
|
|
_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
|
|
_channel_state **pchan)
|
|
{
|
|
if (ref == channels->head) {
|
|
channels->head = ref->next;
|
|
}
|
|
else {
|
|
prev->next = ref->next;
|
|
}
|
|
channels->numopen -= 1;
|
|
|
|
if (pchan != NULL) {
|
|
*pchan = ref->chan;
|
|
}
|
|
_channelref_free(ref);
|
|
}
|
|
|
|
static int
|
|
_channels_remove(_channels *channels, int64_t cid, _channel_state **pchan)
|
|
{
|
|
int res = -1;
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
|
|
if (pchan != NULL) {
|
|
*pchan = NULL;
|
|
}
|
|
|
|
_channelref *prev = NULL;
|
|
_channelref *ref = _channelref_find(channels->head, cid, &prev);
|
|
if (ref == NULL) {
|
|
res = ERR_CHANNEL_NOT_FOUND;
|
|
goto done;
|
|
}
|
|
|
|
_channels_remove_ref(channels, ref, prev, pchan);
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(channels->mutex);
|
|
return res;
|
|
}
|
|
|
|
static int
|
|
_channels_add_id_object(_channels *channels, int64_t cid)
|
|
{
|
|
int res = -1;
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
|
|
_channelref *ref = _channelref_find(channels->head, cid, NULL);
|
|
if (ref == NULL) {
|
|
res = ERR_CHANNEL_NOT_FOUND;
|
|
goto done;
|
|
}
|
|
ref->objcount += 1;
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(channels->mutex);
|
|
return res;
|
|
}
|
|
|
|
static void
|
|
_channels_release_cid_object(_channels *channels, int64_t cid)
|
|
{
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
|
|
_channelref *prev = NULL;
|
|
_channelref *ref = _channelref_find(channels->head, cid, &prev);
|
|
if (ref == NULL) {
|
|
// Already destroyed.
|
|
goto done;
|
|
}
|
|
ref->objcount -= 1;
|
|
|
|
// Destroy if no longer used.
|
|
if (ref->objcount == 0) {
|
|
_channel_state *chan = NULL;
|
|
_channels_remove_ref(channels, ref, prev, &chan);
|
|
if (chan != NULL) {
|
|
_channel_free(chan);
|
|
}
|
|
}
|
|
|
|
done:
|
|
PyThread_release_lock(channels->mutex);
|
|
}
|
|
|
|
static int64_t *
|
|
_channels_list_all(_channels *channels, int64_t *count)
|
|
{
|
|
int64_t *cids = NULL;
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
|
|
if (ids == NULL) {
|
|
goto done;
|
|
}
|
|
_channelref *ref = channels->head;
|
|
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
|
|
ids[i] = ref->cid;
|
|
}
|
|
*count = channels->numopen;
|
|
|
|
cids = ids;
|
|
done:
|
|
PyThread_release_lock(channels->mutex);
|
|
return cids;
|
|
}
|
|
|
|
static void
|
|
_channels_clear_interpreter(_channels *channels, int64_t interpid)
|
|
{
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
|
|
_channelref *ref = channels->head;
|
|
for (; ref != NULL; ref = ref->next) {
|
|
if (ref->chan != NULL) {
|
|
_channel_clear_interpreter(ref->chan, interpid);
|
|
}
|
|
}
|
|
|
|
PyThread_release_lock(channels->mutex);
|
|
}
|
|
|
|
|
|
/* support for closing non-empty channels */
|
|
|
|
struct _channel_closing {
|
|
_channelref *ref;
|
|
};
|
|
|
|
static int
|
|
_channel_set_closing(_channelref *ref, PyThread_type_lock mutex) {
|
|
_channel_state *chan = ref->chan;
|
|
if (chan == NULL) {
|
|
// already closed
|
|
return 0;
|
|
}
|
|
int res = -1;
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
if (chan->closing != NULL) {
|
|
res = ERR_CHANNEL_CLOSED;
|
|
goto done;
|
|
}
|
|
chan->closing = GLOBAL_MALLOC(struct _channel_closing);
|
|
if (chan->closing == NULL) {
|
|
goto done;
|
|
}
|
|
chan->closing->ref = ref;
|
|
|
|
res = 0;
|
|
done:
|
|
PyThread_release_lock(chan->mutex);
|
|
return res;
|
|
}
|
|
|
|
static void
|
|
_channel_clear_closing(_channel_state *chan) {
|
|
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
|
|
if (chan->closing != NULL) {
|
|
GLOBAL_FREE(chan->closing);
|
|
chan->closing = NULL;
|
|
}
|
|
PyThread_release_lock(chan->mutex);
|
|
}
|
|
|
|
static void
|
|
_channel_finish_closing(_channel_state *chan) {
|
|
struct _channel_closing *closing = chan->closing;
|
|
if (closing == NULL) {
|
|
return;
|
|
}
|
|
_channelref *ref = closing->ref;
|
|
_channel_clear_closing(chan);
|
|
// Do the things that would have been done in _channels_close().
|
|
ref->chan = NULL;
|
|
_channel_free(chan);
|
|
}
|
|
|
|
|
|
/* "high"-level channel-related functions */
|
|
|
|
// Create a new channel.
|
|
static int64_t
|
|
channel_create(_channels *channels)
|
|
{
|
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
|
if (mutex == NULL) {
|
|
return ERR_CHANNEL_MUTEX_INIT;
|
|
}
|
|
_channel_state *chan = _channel_new(mutex);
|
|
if (chan == NULL) {
|
|
PyThread_free_lock(mutex);
|
|
return -1;
|
|
}
|
|
int64_t cid = _channels_add(channels, chan);
|
|
if (cid < 0) {
|
|
_channel_free(chan);
|
|
}
|
|
return cid;
|
|
}
|
|
|
|
// Completely destroy the channel.
|
|
static int
|
|
channel_destroy(_channels *channels, int64_t cid)
|
|
{
|
|
_channel_state *chan = NULL;
|
|
int err = _channels_remove(channels, cid, &chan);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
if (chan != NULL) {
|
|
_channel_free(chan);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
// Push an object onto the channel.
|
|
// The current interpreter gets associated with the send end of the channel.
|
|
// Optionally request to be notified when it is received.
|
|
static int
|
|
channel_send(_channels *channels, int64_t cid, PyObject *obj,
|
|
_waiting_t *waiting)
|
|
{
|
|
PyInterpreterState *interp = _get_current_interp();
|
|
if (interp == NULL) {
|
|
return -1;
|
|
}
|
|
int64_t interpid = PyInterpreterState_GetID(interp);
|
|
|
|
// Look up the channel.
|
|
PyThread_type_lock mutex = NULL;
|
|
_channel_state *chan = NULL;
|
|
int err = _channels_lookup(channels, cid, &mutex, &chan);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
assert(chan != NULL);
|
|
// Past this point we are responsible for releasing the mutex.
|
|
|
|
if (chan->closing != NULL) {
|
|
PyThread_release_lock(mutex);
|
|
return ERR_CHANNEL_CLOSED;
|
|
}
|
|
|
|
// Convert the object to cross-interpreter data.
|
|
_PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
|
|
if (data == NULL) {
|
|
PyThread_release_lock(mutex);
|
|
return -1;
|
|
}
|
|
if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
|
|
PyThread_release_lock(mutex);
|
|
GLOBAL_FREE(data);
|
|
return -1;
|
|
}
|
|
|
|
// Add the data to the channel.
|
|
int res = _channel_add(chan, interpid, data, waiting);
|
|
PyThread_release_lock(mutex);
|
|
if (res != 0) {
|
|
// We may chain an exception here:
|
|
(void)_release_xid_data(data, 0);
|
|
GLOBAL_FREE(data);
|
|
return res;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Basically, un-send an object.
|
|
static void
|
|
channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
|
|
{
|
|
// Look up the channel.
|
|
PyThread_type_lock mutex = NULL;
|
|
_channel_state *chan = NULL;
|
|
int err = _channels_lookup(channels, cid, &mutex, &chan);
|
|
if (err != 0) {
|
|
// The channel was already closed, etc.
|
|
assert(waiting->status == WAITING_RELEASED);
|
|
return; // Ignore the error.
|
|
}
|
|
assert(chan != NULL);
|
|
// Past this point we are responsible for releasing the mutex.
|
|
|
|
_channelitem_id_t itemid = _waiting_get_itemid(waiting);
|
|
_channel_remove(chan, itemid);
|
|
|
|
PyThread_release_lock(mutex);
|
|
}
|
|
|
|
// Like channel_send(), but strictly wait for the object to be received.
|
|
static int
|
|
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
|
|
PY_TIMEOUT_T timeout)
|
|
{
|
|
// We use a stack variable here, so we must ensure that &waiting
|
|
// is not held by any channel item at the point this function exits.
|
|
_waiting_t waiting;
|
|
if (_waiting_init(&waiting) < 0) {
|
|
assert(PyErr_Occurred());
|
|
return -1;
|
|
}
|
|
|
|
/* Queue up the object. */
|
|
int res = channel_send(channels, cid, obj, &waiting);
|
|
if (res < 0) {
|
|
assert(waiting.status == WAITING_NO_STATUS);
|
|
goto finally;
|
|
}
|
|
|
|
/* Wait until the object is received. */
|
|
if (wait_for_lock(waiting.mutex, timeout) < 0) {
|
|
assert(PyErr_Occurred());
|
|
_waiting_finish_releasing(&waiting);
|
|
/* The send() call is failing now, so make sure the item
|
|
won't be received. */
|
|
channel_clear_sent(channels, cid, &waiting);
|
|
assert(waiting.status == WAITING_RELEASED);
|
|
if (!waiting.received) {
|
|
res = -1;
|
|
goto finally;
|
|
}
|
|
// XXX Emit a warning if not a TimeoutError?
|
|
PyErr_Clear();
|
|
}
|
|
else {
|
|
_waiting_finish_releasing(&waiting);
|
|
assert(waiting.status == WAITING_RELEASED);
|
|
if (!waiting.received) {
|
|
res = ERR_CHANNEL_CLOSED_WAITING;
|
|
goto finally;
|
|
}
|
|
}
|
|
|
|
/* success! */
|
|
res = 0;
|
|
|
|
finally:
|
|
_waiting_clear(&waiting);
|
|
return res;
|
|
}
|
|
|
|
// Pop the next object off the channel. Fail if empty.
|
|
// The current interpreter gets associated with the recv end of the channel.
|
|
// XXX Support a "wait" mutex?
|
|
static int
|
|
channel_recv(_channels *channels, int64_t cid, PyObject **res)
|
|
{
|
|
int err;
|
|
*res = NULL;
|
|
|
|
PyInterpreterState *interp = _get_current_interp();
|
|
if (interp == NULL) {
|
|
// XXX Is this always an error?
|
|
if (PyErr_Occurred()) {
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
int64_t interpid = PyInterpreterState_GetID(interp);
|
|
|
|
// Look up the channel.
|
|
PyThread_type_lock mutex = NULL;
|
|
_channel_state *chan = NULL;
|
|
err = _channels_lookup(channels, cid, &mutex, &chan);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
assert(chan != NULL);
|
|
// Past this point we are responsible for releasing the mutex.
|
|
|
|
// Pop off the next item from the channel.
|
|
_PyCrossInterpreterData *data = NULL;
|
|
_waiting_t *waiting = NULL;
|
|
err = _channel_next(chan, interpid, &data, &waiting);
|
|
PyThread_release_lock(mutex);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
else if (data == NULL) {
|
|
assert(!PyErr_Occurred());
|
|
return 0;
|
|
}
|
|
|
|
// Convert the data back to an object.
|
|
PyObject *obj = _PyCrossInterpreterData_NewObject(data);
|
|
if (obj == NULL) {
|
|
assert(PyErr_Occurred());
|
|
// It was allocated in channel_send(), so we free it.
|
|
(void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
|
|
if (waiting != NULL) {
|
|
_waiting_release(waiting, 0);
|
|
}
|
|
return -1;
|
|
}
|
|
// It was allocated in channel_send(), so we free it.
|
|
int release_res = _release_xid_data(data, XID_FREE);
|
|
if (release_res < 0) {
|
|
// The source interpreter has been destroyed already.
|
|
assert(PyErr_Occurred());
|
|
Py_DECREF(obj);
|
|
if (waiting != NULL) {
|
|
_waiting_release(waiting, 0);
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
// Notify the sender.
|
|
if (waiting != NULL) {
|
|
_waiting_release(waiting, 1);
|
|
}
|
|
|
|
*res = obj;
|
|
return 0;
|
|
}
|
|
|
|
// Disallow send/recv for the current interpreter.
|
|
// The channel is marked as closed if no other interpreters
|
|
// are currently associated.
|
|
static int
|
|
channel_release(_channels *channels, int64_t cid, int send, int recv)
|
|
{
|
|
PyInterpreterState *interp = _get_current_interp();
|
|
if (interp == NULL) {
|
|
return -1;
|
|
}
|
|
int64_t interpid = PyInterpreterState_GetID(interp);
|
|
|
|
// Look up the channel.
|
|
PyThread_type_lock mutex = NULL;
|
|
_channel_state *chan = NULL;
|
|
int err = _channels_lookup(channels, cid, &mutex, &chan);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
// Past this point we are responsible for releasing the mutex.
|
|
|
|
// Close one or both of the two ends.
|
|
int res = _channel_release_interpreter(chan, interpid, send-recv);
|
|
PyThread_release_lock(mutex);
|
|
return res;
|
|
}
|
|
|
|
// Close the channel (for all interpreters). Fail if it's already closed.
|
|
// Close immediately if it's empty. Otherwise, disallow sending and
|
|
// finally close once empty. Optionally, immediately clear and close it.
|
|
static int
|
|
channel_close(_channels *channels, int64_t cid, int end, int force)
|
|
{
|
|
return _channels_close(channels, cid, NULL, end, force);
|
|
}
|
|
|
|
// Return true if the identified interpreter is associated
|
|
// with the given end of the channel.
|
|
static int
|
|
channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
|
|
int send)
|
|
{
|
|
_channel_state *chan = NULL;
|
|
int err = _channels_lookup(channels, cid, NULL, &chan);
|
|
if (err != 0) {
|
|
return err;
|
|
}
|
|
else if (send && chan->closing != NULL) {
|
|
return ERR_CHANNEL_CLOSED;
|
|
}
|
|
|
|
_channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
|
|
interpid, NULL);
|
|
|
|
return (end != NULL && end->open);
|
|
}
|
|
|
|
|
|
/* channel info */
|
|
|
|
struct channel_info {
|
|
struct {
|
|
// 1: closed; -1: closing
|
|
int closed;
|
|
struct {
|
|
Py_ssize_t nsend_only; // not released
|
|
Py_ssize_t nsend_only_released;
|
|
Py_ssize_t nrecv_only; // not released
|
|
Py_ssize_t nrecv_only_released;
|
|
Py_ssize_t nboth; // not released
|
|
Py_ssize_t nboth_released;
|
|
Py_ssize_t nboth_send_released;
|
|
Py_ssize_t nboth_recv_released;
|
|
} all;
|
|
struct {
|
|
// 1: associated; -1: released
|
|
int send;
|
|
int recv;
|
|
} cur;
|
|
} status;
|
|
Py_ssize_t count;
|
|
};
|
|
|
|
static int
|
|
_channel_get_info(_channels *channels, int64_t cid, struct channel_info *info)
|
|
{
|
|
int err = 0;
|
|
*info = (struct channel_info){0};
|
|
|
|
// Get the current interpreter.
|
|
PyInterpreterState *interp = _get_current_interp();
|
|
if (interp == NULL) {
|
|
return -1;
|
|
}
|
|
Py_ssize_t interpid = PyInterpreterState_GetID(interp);
|
|
|
|
// Hold the global lock until we're done.
|
|
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
|
|
|
|
// Find the channel.
|
|
_channelref *ref = _channelref_find(channels->head, cid, NULL);
|
|
if (ref == NULL) {
|
|
err = ERR_CHANNEL_NOT_FOUND;
|
|
goto finally;
|
|
}
|
|
_channel_state *chan = ref->chan;
|
|
|
|
// Check if open.
|
|
if (chan == NULL) {
|
|
info->status.closed = 1;
|
|
goto finally;
|
|
}
|
|
if (!chan->open) {
|
|
assert(chan->queue->count == 0);
|
|
info->status.closed = 1;
|
|
goto finally;
|
|
}
|
|
if (chan->closing != NULL) {
|
|
assert(chan->queue->count > 0);
|
|
info->status.closed = -1;
|
|
}
|
|
else {
|
|
info->status.closed = 0;
|
|
}
|
|
|
|
// Get the number of queued objects.
|
|
info->count = chan->queue->count;
|
|
|
|
// Get the ends statuses.
|
|
assert(info->status.cur.send == 0);
|
|
assert(info->status.cur.recv == 0);
|
|
_channelend *send = chan->ends->send;
|
|
while (send != NULL) {
|
|
if (send->interpid == interpid) {
|
|
info->status.cur.send = send->open ? 1 : -1;
|
|
}
|
|
|
|
if (send->open) {
|
|
info->status.all.nsend_only += 1;
|
|
}
|
|
else {
|
|
info->status.all.nsend_only_released += 1;
|
|
}
|
|
send = send->next;
|
|
}
|
|
_channelend *recv = chan->ends->recv;
|
|
while (recv != NULL) {
|
|
if (recv->interpid == interpid) {
|
|
info->status.cur.recv = recv->open ? 1 : -1;
|
|
}
|
|
|
|
// XXX This is O(n*n). Why do we have 2 linked lists?
|
|
_channelend *send = chan->ends->send;
|
|
while (send != NULL) {
|
|
if (send->interpid == recv->interpid) {
|
|
break;
|
|
}
|
|
send = send->next;
|
|
}
|
|
if (send == NULL) {
|
|
if (recv->open) {
|
|
info->status.all.nrecv_only += 1;
|
|
}
|
|
else {
|
|
info->status.all.nrecv_only_released += 1;
|
|
}
|
|
}
|
|
else {
|
|
if (recv->open) {
|
|
if (send->open) {
|
|
info->status.all.nboth += 1;
|
|
info->status.all.nsend_only -= 1;
|
|
}
|
|
else {
|
|
info->status.all.nboth_recv_released += 1;
|
|
info->status.all.nsend_only_released -= 1;
|
|
}
|
|
}
|
|
else {
|
|
if (send->open) {
|
|
info->status.all.nboth_send_released += 1;
|
|
info->status.all.nsend_only -= 1;
|
|
}
|
|
else {
|
|
info->status.all.nboth_released += 1;
|
|
info->status.all.nsend_only_released -= 1;
|
|
}
|
|
}
|
|
}
|
|
recv = recv->next;
|
|
}
|
|
|
|
finally:
|
|
PyThread_release_lock(channels->mutex);
|
|
return err;
|
|
}
|
|
|
|
PyDoc_STRVAR(channel_info_doc,
|
|
"ChannelInfo\n\
|
|
\n\
|
|
A named tuple of a channel's state.");
|
|
|
|
static PyStructSequence_Field channel_info_fields[] = {
|
|
{"open", "both ends are open"},
|
|
{"closing", "send is closed, recv is non-empty"},
|
|
{"closed", "both ends are closed"},
|
|
{"count", "queued objects"},
|
|
|
|
{"num_interp_send", "interpreters bound to the send end"},
|
|
{"num_interp_send_released",
|
|
"interpreters bound to the send end and released"},
|
|
|
|
{"num_interp_recv", "interpreters bound to the send end"},
|
|
{"num_interp_recv_released",
|
|
"interpreters bound to the send end and released"},
|
|
|
|
{"num_interp_both", "interpreters bound to both ends"},
|
|
{"num_interp_both_released",
|
|
"interpreters bound to both ends and released_from_both"},
|
|
{"num_interp_both_send_released",
|
|
"interpreters bound to both ends and released_from_the send end"},
|
|
{"num_interp_both_recv_released",
|
|
"interpreters bound to both ends and released_from_the recv end"},
|
|
|
|
{"send_associated", "current interpreter is bound to the send end"},
|
|
{"send_released", "current interpreter *was* bound to the send end"},
|
|
{"recv_associated", "current interpreter is bound to the recv end"},
|
|
{"recv_released", "current interpreter *was* bound to the recv end"},
|
|
{0}
|
|
};
|
|
|
|
static PyStructSequence_Desc channel_info_desc = {
|
|
.name = MODULE_NAME_STR ".ChannelInfo",
|
|
.doc = channel_info_doc,
|
|
.fields = channel_info_fields,
|
|
.n_in_sequence = 8,
|
|
};
|
|
|
|
static PyObject *
|
|
new_channel_info(PyObject *mod, struct channel_info *info)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
if (state == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
assert(state->ChannelInfoType != NULL);
|
|
PyObject *self = PyStructSequence_New(state->ChannelInfoType);
|
|
if (self == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
int pos = 0;
|
|
#define SET_BOOL(val) \
|
|
PyStructSequence_SET_ITEM(self, pos++, \
|
|
Py_NewRef(val ? Py_True : Py_False))
|
|
#define SET_COUNT(val) \
|
|
do { \
|
|
PyObject *obj = PyLong_FromLongLong(val); \
|
|
if (obj == NULL) { \
|
|
Py_CLEAR(self); \
|
|
return NULL; \
|
|
} \
|
|
PyStructSequence_SET_ITEM(self, pos++, obj); \
|
|
} while(0)
|
|
SET_BOOL(info->status.closed == 0);
|
|
SET_BOOL(info->status.closed == -1);
|
|
SET_BOOL(info->status.closed == 1);
|
|
SET_COUNT(info->count);
|
|
SET_COUNT(info->status.all.nsend_only);
|
|
SET_COUNT(info->status.all.nsend_only_released);
|
|
SET_COUNT(info->status.all.nrecv_only);
|
|
SET_COUNT(info->status.all.nrecv_only_released);
|
|
SET_COUNT(info->status.all.nboth);
|
|
SET_COUNT(info->status.all.nboth_released);
|
|
SET_COUNT(info->status.all.nboth_send_released);
|
|
SET_COUNT(info->status.all.nboth_recv_released);
|
|
SET_BOOL(info->status.cur.send == 1);
|
|
SET_BOOL(info->status.cur.send == -1);
|
|
SET_BOOL(info->status.cur.recv == 1);
|
|
SET_BOOL(info->status.cur.recv == -1);
|
|
#undef SET_COUNT
|
|
#undef SET_BOOL
|
|
assert(!PyErr_Occurred());
|
|
return self;
|
|
}
|
|
|
|
|
|
/* ChannelID class */
|
|
|
|
typedef struct channelid {
|
|
PyObject_HEAD
|
|
int64_t cid;
|
|
int end;
|
|
int resolve;
|
|
_channels *channels;
|
|
} channelid;
|
|
|
|
struct channel_id_converter_data {
|
|
PyObject *module;
|
|
int64_t cid;
|
|
int end;
|
|
};
|
|
|
|
static int
|
|
channel_id_converter(PyObject *arg, void *ptr)
|
|
{
|
|
int64_t cid;
|
|
int end = 0;
|
|
struct channel_id_converter_data *data = ptr;
|
|
module_state *state = get_module_state(data->module);
|
|
assert(state != NULL);
|
|
if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
|
|
cid = ((channelid *)arg)->cid;
|
|
end = ((channelid *)arg)->end;
|
|
}
|
|
else if (PyIndex_Check(arg)) {
|
|
cid = PyLong_AsLongLong(arg);
|
|
if (cid == -1 && PyErr_Occurred()) {
|
|
return 0;
|
|
}
|
|
if (cid < 0) {
|
|
PyErr_Format(PyExc_ValueError,
|
|
"channel ID must be a non-negative int, got %R", arg);
|
|
return 0;
|
|
}
|
|
}
|
|
else {
|
|
PyErr_Format(PyExc_TypeError,
|
|
"channel ID must be an int, got %.100s",
|
|
Py_TYPE(arg)->tp_name);
|
|
return 0;
|
|
}
|
|
data->cid = cid;
|
|
data->end = end;
|
|
return 1;
|
|
}
|
|
|
|
static int
|
|
newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
|
|
int force, int resolve, channelid **res)
|
|
{
|
|
*res = NULL;
|
|
|
|
channelid *self = PyObject_New(channelid, cls);
|
|
if (self == NULL) {
|
|
return -1;
|
|
}
|
|
self->cid = cid;
|
|
self->end = end;
|
|
self->resolve = resolve;
|
|
self->channels = channels;
|
|
|
|
int err = _channels_add_id_object(channels, cid);
|
|
if (err != 0) {
|
|
if (force && err == ERR_CHANNEL_NOT_FOUND) {
|
|
assert(!PyErr_Occurred());
|
|
}
|
|
else {
|
|
Py_DECREF((PyObject *)self);
|
|
return err;
|
|
}
|
|
}
|
|
|
|
*res = self;
|
|
return 0;
|
|
}
|
|
|
|
static _channels * _global_channels(void);
|
|
|
|
static PyObject *
|
|
_channelid_new(PyObject *mod, PyTypeObject *cls,
|
|
PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
|
|
int64_t cid;
|
|
int end;
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = mod,
|
|
};
|
|
int send = -1;
|
|
int recv = -1;
|
|
int force = 0;
|
|
int resolve = 0;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&|$pppp:ChannelID.__new__", kwlist,
|
|
channel_id_converter, &cid_data,
|
|
&send, &recv, &force, &resolve)) {
|
|
return NULL;
|
|
}
|
|
cid = cid_data.cid;
|
|
end = cid_data.end;
|
|
|
|
// Handle "send" and "recv".
|
|
if (send == 0 && recv == 0) {
|
|
PyErr_SetString(PyExc_ValueError,
|
|
"'send' and 'recv' cannot both be False");
|
|
return NULL;
|
|
}
|
|
else if (send == 1) {
|
|
if (recv == 0 || recv == -1) {
|
|
end = CHANNEL_SEND;
|
|
}
|
|
else {
|
|
assert(recv == 1);
|
|
end = 0;
|
|
}
|
|
}
|
|
else if (recv == 1) {
|
|
assert(send == 0 || send == -1);
|
|
end = CHANNEL_RECV;
|
|
}
|
|
|
|
PyObject *cidobj = NULL;
|
|
int err = newchannelid(cls, cid, end, _global_channels(),
|
|
force, resolve,
|
|
(channelid **)&cidobj);
|
|
if (handle_channel_error(err, mod, cid)) {
|
|
assert(cidobj == NULL);
|
|
return NULL;
|
|
}
|
|
assert(cidobj != NULL);
|
|
return cidobj;
|
|
}
|
|
|
|
static void
|
|
channelid_dealloc(PyObject *self)
|
|
{
|
|
int64_t cid = ((channelid *)self)->cid;
|
|
_channels *channels = ((channelid *)self)->channels;
|
|
|
|
PyTypeObject *tp = Py_TYPE(self);
|
|
tp->tp_free(self);
|
|
/* "Instances of heap-allocated types hold a reference to their type."
|
|
* See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
|
|
* See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
|
|
*/
|
|
// XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
|
|
// like we do for _abc._abc_data?
|
|
Py_DECREF(tp);
|
|
|
|
_channels_release_cid_object(channels, cid);
|
|
}
|
|
|
|
static PyObject *
|
|
channelid_repr(PyObject *self)
|
|
{
|
|
PyTypeObject *type = Py_TYPE(self);
|
|
const char *name = _PyType_Name(type);
|
|
|
|
channelid *cidobj = (channelid *)self;
|
|
const char *fmt;
|
|
if (cidobj->end == CHANNEL_SEND) {
|
|
fmt = "%s(%" PRId64 ", send=True)";
|
|
}
|
|
else if (cidobj->end == CHANNEL_RECV) {
|
|
fmt = "%s(%" PRId64 ", recv=True)";
|
|
}
|
|
else {
|
|
fmt = "%s(%" PRId64 ")";
|
|
}
|
|
return PyUnicode_FromFormat(fmt, name, cidobj->cid);
|
|
}
|
|
|
|
static PyObject *
|
|
channelid_str(PyObject *self)
|
|
{
|
|
channelid *cidobj = (channelid *)self;
|
|
return PyUnicode_FromFormat("%" PRId64 "", cidobj->cid);
|
|
}
|
|
|
|
static PyObject *
|
|
channelid_int(PyObject *self)
|
|
{
|
|
channelid *cidobj = (channelid *)self;
|
|
return PyLong_FromLongLong(cidobj->cid);
|
|
}
|
|
|
|
static Py_hash_t
|
|
channelid_hash(PyObject *self)
|
|
{
|
|
channelid *cidobj = (channelid *)self;
|
|
PyObject *pyid = PyLong_FromLongLong(cidobj->cid);
|
|
if (pyid == NULL) {
|
|
return -1;
|
|
}
|
|
Py_hash_t hash = PyObject_Hash(pyid);
|
|
Py_DECREF(pyid);
|
|
return hash;
|
|
}
|
|
|
|
static PyObject *
|
|
channelid_richcompare(PyObject *self, PyObject *other, int op)
|
|
{
|
|
PyObject *res = NULL;
|
|
if (op != Py_EQ && op != Py_NE) {
|
|
Py_RETURN_NOTIMPLEMENTED;
|
|
}
|
|
|
|
PyObject *mod = get_module_from_type(Py_TYPE(self));
|
|
if (mod == NULL) {
|
|
return NULL;
|
|
}
|
|
module_state *state = get_module_state(mod);
|
|
if (state == NULL) {
|
|
goto done;
|
|
}
|
|
|
|
if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
|
|
res = Py_NewRef(Py_NotImplemented);
|
|
goto done;
|
|
}
|
|
|
|
channelid *cidobj = (channelid *)self;
|
|
int equal;
|
|
if (PyObject_TypeCheck(other, state->ChannelIDType)) {
|
|
channelid *othercidobj = (channelid *)other;
|
|
equal = (cidobj->end == othercidobj->end) && (cidobj->cid == othercidobj->cid);
|
|
}
|
|
else if (PyLong_Check(other)) {
|
|
/* Fast path */
|
|
int overflow;
|
|
long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
|
|
if (othercid == -1 && PyErr_Occurred()) {
|
|
goto done;
|
|
}
|
|
equal = !overflow && (othercid >= 0) && (cidobj->cid == othercid);
|
|
}
|
|
else if (PyNumber_Check(other)) {
|
|
PyObject *pyid = PyLong_FromLongLong(cidobj->cid);
|
|
if (pyid == NULL) {
|
|
goto done;
|
|
}
|
|
res = PyObject_RichCompare(pyid, other, op);
|
|
Py_DECREF(pyid);
|
|
goto done;
|
|
}
|
|
else {
|
|
res = Py_NewRef(Py_NotImplemented);
|
|
goto done;
|
|
}
|
|
|
|
if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
|
|
res = Py_NewRef(Py_True);
|
|
}
|
|
else {
|
|
res = Py_NewRef(Py_False);
|
|
}
|
|
|
|
done:
|
|
Py_DECREF(mod);
|
|
return res;
|
|
}
|
|
|
|
static PyTypeObject * _get_current_channelend_type(int end);
|
|
|
|
static PyObject *
|
|
_channelobj_from_cidobj(PyObject *cidobj, int end)
|
|
{
|
|
PyObject *cls = (PyObject *)_get_current_channelend_type(end);
|
|
if (cls == NULL) {
|
|
return NULL;
|
|
}
|
|
PyObject *chan = PyObject_CallFunctionObjArgs(cls, cidobj, NULL);
|
|
Py_DECREF(cls);
|
|
if (chan == NULL) {
|
|
return NULL;
|
|
}
|
|
return chan;
|
|
}
|
|
|
|
struct _channelid_xid {
|
|
int64_t cid;
|
|
int end;
|
|
int resolve;
|
|
};
|
|
|
|
static PyObject *
|
|
_channelid_from_xid(_PyCrossInterpreterData *data)
|
|
{
|
|
struct _channelid_xid *xid = \
|
|
(struct _channelid_xid *)_PyCrossInterpreterData_DATA(data);
|
|
|
|
// It might not be imported yet, so we can't use _get_current_module().
|
|
PyObject *mod = PyImport_ImportModule(MODULE_NAME_STR);
|
|
if (mod == NULL) {
|
|
return NULL;
|
|
}
|
|
assert(mod != Py_None);
|
|
module_state *state = get_module_state(mod);
|
|
if (state == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
// Note that we do not preserve the "resolve" flag.
|
|
PyObject *cidobj = NULL;
|
|
int err = newchannelid(state->ChannelIDType, xid->cid, xid->end,
|
|
_global_channels(), 0, 0,
|
|
(channelid **)&cidobj);
|
|
if (err != 0) {
|
|
assert(cidobj == NULL);
|
|
(void)handle_channel_error(err, mod, xid->cid);
|
|
goto done;
|
|
}
|
|
assert(cidobj != NULL);
|
|
if (xid->end == 0) {
|
|
goto done;
|
|
}
|
|
if (!xid->resolve) {
|
|
goto done;
|
|
}
|
|
|
|
/* Try returning a high-level channel end but fall back to the ID. */
|
|
PyObject *chan = _channelobj_from_cidobj(cidobj, xid->end);
|
|
if (chan == NULL) {
|
|
PyErr_Clear();
|
|
goto done;
|
|
}
|
|
Py_DECREF(cidobj);
|
|
cidobj = chan;
|
|
|
|
done:
|
|
Py_DECREF(mod);
|
|
return cidobj;
|
|
}
|
|
|
|
static int
|
|
_channelid_shared(PyThreadState *tstate, PyObject *obj,
|
|
_PyCrossInterpreterData *data)
|
|
{
|
|
if (_PyCrossInterpreterData_InitWithSize(
|
|
data, tstate->interp, sizeof(struct _channelid_xid), obj,
|
|
_channelid_from_xid
|
|
) < 0)
|
|
{
|
|
return -1;
|
|
}
|
|
struct _channelid_xid *xid = \
|
|
(struct _channelid_xid *)_PyCrossInterpreterData_DATA(data);
|
|
xid->cid = ((channelid *)obj)->cid;
|
|
xid->end = ((channelid *)obj)->end;
|
|
xid->resolve = ((channelid *)obj)->resolve;
|
|
return 0;
|
|
}
|
|
|
|
static PyObject *
|
|
channelid_end(PyObject *self, void *end)
|
|
{
|
|
int force = 1;
|
|
channelid *cidobj = (channelid *)self;
|
|
if (end != NULL) {
|
|
PyObject *obj = NULL;
|
|
int err = newchannelid(Py_TYPE(self), cidobj->cid, *(int *)end,
|
|
cidobj->channels, force, cidobj->resolve,
|
|
(channelid **)&obj);
|
|
if (err != 0) {
|
|
assert(obj == NULL);
|
|
PyObject *mod = get_module_from_type(Py_TYPE(self));
|
|
if (mod == NULL) {
|
|
return NULL;
|
|
}
|
|
(void)handle_channel_error(err, mod, cidobj->cid);
|
|
Py_DECREF(mod);
|
|
return NULL;
|
|
}
|
|
assert(obj != NULL);
|
|
return obj;
|
|
}
|
|
|
|
if (cidobj->end == CHANNEL_SEND) {
|
|
return PyUnicode_InternFromString("send");
|
|
}
|
|
if (cidobj->end == CHANNEL_RECV) {
|
|
return PyUnicode_InternFromString("recv");
|
|
}
|
|
return PyUnicode_InternFromString("both");
|
|
}
|
|
|
|
static int _channelid_end_send = CHANNEL_SEND;
|
|
static int _channelid_end_recv = CHANNEL_RECV;
|
|
|
|
static PyGetSetDef channelid_getsets[] = {
|
|
{"end", (getter)channelid_end, NULL,
|
|
PyDoc_STR("'send', 'recv', or 'both'")},
|
|
{"send", (getter)channelid_end, NULL,
|
|
PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
|
|
{"recv", (getter)channelid_end, NULL,
|
|
PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
|
|
{NULL}
|
|
};
|
|
|
|
PyDoc_STRVAR(channelid_doc,
|
|
"A channel ID identifies a channel and may be used as an int.");
|
|
|
|
static PyType_Slot channelid_typeslots[] = {
|
|
{Py_tp_dealloc, (destructor)channelid_dealloc},
|
|
{Py_tp_doc, (void *)channelid_doc},
|
|
{Py_tp_repr, (reprfunc)channelid_repr},
|
|
{Py_tp_str, (reprfunc)channelid_str},
|
|
{Py_tp_hash, channelid_hash},
|
|
{Py_tp_richcompare, channelid_richcompare},
|
|
{Py_tp_getset, channelid_getsets},
|
|
// number slots
|
|
{Py_nb_int, (unaryfunc)channelid_int},
|
|
{Py_nb_index, (unaryfunc)channelid_int},
|
|
{0, NULL},
|
|
};
|
|
|
|
static PyType_Spec channelid_typespec = {
|
|
.name = MODULE_NAME_STR ".ChannelID",
|
|
.basicsize = sizeof(channelid),
|
|
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
|
|
Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
|
|
.slots = channelid_typeslots,
|
|
};
|
|
|
|
static PyTypeObject *
|
|
add_channelid_type(PyObject *mod)
|
|
{
|
|
PyTypeObject *cls = (PyTypeObject *)PyType_FromModuleAndSpec(
|
|
mod, &channelid_typespec, NULL);
|
|
if (cls == NULL) {
|
|
return NULL;
|
|
}
|
|
if (PyModule_AddType(mod, cls) < 0) {
|
|
Py_DECREF(cls);
|
|
return NULL;
|
|
}
|
|
if (ensure_xid_class(cls, _channelid_shared) < 0) {
|
|
Py_DECREF(cls);
|
|
return NULL;
|
|
}
|
|
return cls;
|
|
}
|
|
|
|
|
|
/* SendChannel and RecvChannel classes */
|
|
|
|
// XXX Use a new __xid__ protocol instead?
|
|
|
|
static PyTypeObject *
|
|
_get_current_channelend_type(int end)
|
|
{
|
|
module_state *state = _get_current_module_state();
|
|
if (state == NULL) {
|
|
return NULL;
|
|
}
|
|
PyTypeObject *cls;
|
|
if (end == CHANNEL_SEND) {
|
|
cls = state->send_channel_type;
|
|
}
|
|
else {
|
|
assert(end == CHANNEL_RECV);
|
|
cls = state->recv_channel_type;
|
|
}
|
|
if (cls == NULL) {
|
|
// Force the module to be loaded, to register the type.
|
|
PyObject *highlevel = PyImport_ImportModule("interpreters.channel");
|
|
if (highlevel == NULL) {
|
|
PyErr_Clear();
|
|
highlevel = PyImport_ImportModule("test.support.interpreters.channel");
|
|
if (highlevel == NULL) {
|
|
return NULL;
|
|
}
|
|
}
|
|
Py_DECREF(highlevel);
|
|
if (end == CHANNEL_SEND) {
|
|
cls = state->send_channel_type;
|
|
}
|
|
else {
|
|
cls = state->recv_channel_type;
|
|
}
|
|
assert(cls != NULL);
|
|
}
|
|
return cls;
|
|
}
|
|
|
|
static PyObject *
|
|
_channelend_from_xid(_PyCrossInterpreterData *data)
|
|
{
|
|
channelid *cidobj = (channelid *)_channelid_from_xid(data);
|
|
if (cidobj == NULL) {
|
|
return NULL;
|
|
}
|
|
PyTypeObject *cls = _get_current_channelend_type(cidobj->end);
|
|
if (cls == NULL) {
|
|
Py_DECREF(cidobj);
|
|
return NULL;
|
|
}
|
|
PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)cidobj);
|
|
Py_DECREF(cidobj);
|
|
return obj;
|
|
}
|
|
|
|
static int
|
|
_channelend_shared(PyThreadState *tstate, PyObject *obj,
|
|
_PyCrossInterpreterData *data)
|
|
{
|
|
PyObject *cidobj = PyObject_GetAttrString(obj, "_id");
|
|
if (cidobj == NULL) {
|
|
return -1;
|
|
}
|
|
int res = _channelid_shared(tstate, cidobj, data);
|
|
Py_DECREF(cidobj);
|
|
if (res < 0) {
|
|
return -1;
|
|
}
|
|
_PyCrossInterpreterData_SET_NEW_OBJECT(data, _channelend_from_xid);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
set_channelend_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
if (state == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
// Clear the old values if the .py module was reloaded.
|
|
if (state->send_channel_type != NULL) {
|
|
(void)clear_xid_class(state->send_channel_type);
|
|
Py_CLEAR(state->send_channel_type);
|
|
}
|
|
if (state->recv_channel_type != NULL) {
|
|
(void)clear_xid_class(state->recv_channel_type);
|
|
Py_CLEAR(state->recv_channel_type);
|
|
}
|
|
|
|
// Add and register the types.
|
|
state->send_channel_type = (PyTypeObject *)Py_NewRef(send);
|
|
state->recv_channel_type = (PyTypeObject *)Py_NewRef(recv);
|
|
if (ensure_xid_class(send, _channelend_shared) < 0) {
|
|
Py_CLEAR(state->send_channel_type);
|
|
Py_CLEAR(state->recv_channel_type);
|
|
return -1;
|
|
}
|
|
if (ensure_xid_class(recv, _channelend_shared) < 0) {
|
|
(void)clear_xid_class(state->send_channel_type);
|
|
Py_CLEAR(state->send_channel_type);
|
|
Py_CLEAR(state->recv_channel_type);
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* module level code ********************************************************/
|
|
|
|
/* globals is the process-global state for the module. It holds all
|
|
the data that we need to share between interpreters, so it cannot
|
|
hold PyObject values. */
|
|
static struct globals {
|
|
int module_count;
|
|
_channels channels;
|
|
} _globals = {0};
|
|
|
|
static int
|
|
_globals_init(void)
|
|
{
|
|
// XXX This isn't thread-safe.
|
|
_globals.module_count++;
|
|
if (_globals.module_count > 1) {
|
|
// Already initialized.
|
|
return 0;
|
|
}
|
|
|
|
assert(_globals.channels.mutex == NULL);
|
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
|
if (mutex == NULL) {
|
|
return ERR_CHANNELS_MUTEX_INIT;
|
|
}
|
|
_channels_init(&_globals.channels, mutex);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
_globals_fini(void)
|
|
{
|
|
// XXX This isn't thread-safe.
|
|
_globals.module_count--;
|
|
if (_globals.module_count > 0) {
|
|
return;
|
|
}
|
|
|
|
_channels_fini(&_globals.channels);
|
|
}
|
|
|
|
static _channels *
|
|
_global_channels(void) {
|
|
return &_globals.channels;
|
|
}
|
|
|
|
|
|
static void
|
|
clear_interpreter(void *data)
|
|
{
|
|
if (_globals.module_count == 0) {
|
|
return;
|
|
}
|
|
PyInterpreterState *interp = (PyInterpreterState *)data;
|
|
assert(interp == _get_current_interp());
|
|
int64_t interpid = PyInterpreterState_GetID(interp);
|
|
_channels_clear_interpreter(&_globals.channels, interpid);
|
|
}
|
|
|
|
|
|
static PyObject *
|
|
channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored))
|
|
{
|
|
int64_t cid = channel_create(&_globals.channels);
|
|
if (cid < 0) {
|
|
(void)handle_channel_error(-1, self, cid);
|
|
return NULL;
|
|
}
|
|
module_state *state = get_module_state(self);
|
|
if (state == NULL) {
|
|
return NULL;
|
|
}
|
|
PyObject *cidobj = NULL;
|
|
int err = newchannelid(state->ChannelIDType, cid, 0,
|
|
&_globals.channels, 0, 0,
|
|
(channelid **)&cidobj);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
assert(cidobj == NULL);
|
|
err = channel_destroy(&_globals.channels, cid);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
// XXX issue a warning?
|
|
}
|
|
return NULL;
|
|
}
|
|
assert(cidobj != NULL);
|
|
assert(((channelid *)cidobj)->channels != NULL);
|
|
return cidobj;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_create_doc,
|
|
"channel_create() -> cid\n\
|
|
\n\
|
|
Create a new cross-interpreter channel and return a unique generated ID.");
|
|
|
|
static PyObject *
|
|
channelsmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", NULL};
|
|
int64_t cid;
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
|
|
channel_id_converter, &cid_data)) {
|
|
return NULL;
|
|
}
|
|
cid = cid_data.cid;
|
|
|
|
int err = channel_destroy(&_globals.channels, cid);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_destroy_doc,
|
|
"channel_destroy(cid)\n\
|
|
\n\
|
|
Close and finalize the channel. Afterward attempts to use the channel\n\
|
|
will behave as though it never existed.");
|
|
|
|
static PyObject *
|
|
channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
|
|
{
|
|
int64_t count = 0;
|
|
int64_t *cids = _channels_list_all(&_globals.channels, &count);
|
|
if (cids == NULL) {
|
|
if (count == 0) {
|
|
return PyList_New(0);
|
|
}
|
|
return NULL;
|
|
}
|
|
PyObject *ids = PyList_New((Py_ssize_t)count);
|
|
if (ids == NULL) {
|
|
goto finally;
|
|
}
|
|
module_state *state = get_module_state(self);
|
|
if (state == NULL) {
|
|
Py_DECREF(ids);
|
|
ids = NULL;
|
|
goto finally;
|
|
}
|
|
int64_t *cur = cids;
|
|
for (int64_t i=0; i < count; cur++, i++) {
|
|
PyObject *cidobj = NULL;
|
|
int err = newchannelid(state->ChannelIDType, *cur, 0,
|
|
&_globals.channels, 0, 0,
|
|
(channelid **)&cidobj);
|
|
if (handle_channel_error(err, self, *cur)) {
|
|
assert(cidobj == NULL);
|
|
Py_SETREF(ids, NULL);
|
|
break;
|
|
}
|
|
assert(cidobj != NULL);
|
|
PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj);
|
|
}
|
|
|
|
finally:
|
|
PyMem_Free(cids);
|
|
return ids;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_list_all_doc,
|
|
"channel_list_all() -> [cid]\n\
|
|
\n\
|
|
Return the list of all IDs for active channels.");
|
|
|
|
static PyObject *
|
|
channelsmod_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", "send", NULL};
|
|
int64_t cid; /* Channel ID */
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
int send = 0; /* Send or receive end? */
|
|
int64_t interpid;
|
|
PyObject *ids, *interpid_obj;
|
|
PyInterpreterState *interp;
|
|
|
|
if (!PyArg_ParseTupleAndKeywords(
|
|
args, kwds, "O&$p:channel_list_interpreters",
|
|
kwlist, channel_id_converter, &cid_data, &send)) {
|
|
return NULL;
|
|
}
|
|
cid = cid_data.cid;
|
|
|
|
ids = PyList_New(0);
|
|
if (ids == NULL) {
|
|
goto except;
|
|
}
|
|
|
|
interp = PyInterpreterState_Head();
|
|
while (interp != NULL) {
|
|
interpid = PyInterpreterState_GetID(interp);
|
|
assert(interpid >= 0);
|
|
int res = channel_is_associated(&_globals.channels, cid, interpid, send);
|
|
if (res < 0) {
|
|
(void)handle_channel_error(res, self, cid);
|
|
goto except;
|
|
}
|
|
if (res) {
|
|
interpid_obj = _PyInterpreterState_GetIDObject(interp);
|
|
if (interpid_obj == NULL) {
|
|
goto except;
|
|
}
|
|
res = PyList_Insert(ids, 0, interpid_obj);
|
|
Py_DECREF(interpid_obj);
|
|
if (res < 0) {
|
|
goto except;
|
|
}
|
|
}
|
|
interp = PyInterpreterState_Next(interp);
|
|
}
|
|
|
|
goto finally;
|
|
|
|
except:
|
|
Py_CLEAR(ids);
|
|
|
|
finally:
|
|
return ids;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_list_interpreters_doc,
|
|
"channel_list_interpreters(cid, *, send) -> [id]\n\
|
|
\n\
|
|
Return the list of all interpreter IDs associated with an end of the channel.\n\
|
|
\n\
|
|
The 'send' argument should be a boolean indicating whether to use the send or\n\
|
|
receive end.");
|
|
|
|
|
|
static PyObject *
|
|
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
PyObject *obj;
|
|
int blocking = 1;
|
|
PyObject *timeout_obj = NULL;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist,
|
|
channel_id_converter, &cid_data, &obj,
|
|
&blocking, &timeout_obj)) {
|
|
return NULL;
|
|
}
|
|
|
|
int64_t cid = cid_data.cid;
|
|
PY_TIMEOUT_T timeout;
|
|
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
|
|
return NULL;
|
|
}
|
|
|
|
/* Queue up the object. */
|
|
int err = 0;
|
|
if (blocking) {
|
|
err = channel_send_wait(&_globals.channels, cid, obj, timeout);
|
|
}
|
|
else {
|
|
err = channel_send(&_globals.channels, cid, obj, NULL);
|
|
}
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_send_doc,
|
|
"channel_send(cid, obj, *, blocking=True, timeout=None)\n\
|
|
\n\
|
|
Add the object's data to the channel's queue.\n\
|
|
By default this waits for the object to be received.");
|
|
|
|
static PyObject *
|
|
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
PyObject *obj;
|
|
int blocking = 1;
|
|
PyObject *timeout_obj = NULL;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&O|$pO:channel_send_buffer", kwlist,
|
|
channel_id_converter, &cid_data, &obj,
|
|
&blocking, &timeout_obj)) {
|
|
return NULL;
|
|
}
|
|
|
|
int64_t cid = cid_data.cid;
|
|
PY_TIMEOUT_T timeout;
|
|
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
|
|
return NULL;
|
|
}
|
|
|
|
PyObject *tempobj = PyMemoryView_FromObject(obj);
|
|
if (tempobj == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
/* Queue up the object. */
|
|
int err = 0;
|
|
if (blocking) {
|
|
err = channel_send_wait(&_globals.channels, cid, tempobj, timeout);
|
|
}
|
|
else {
|
|
err = channel_send(&_globals.channels, cid, tempobj, NULL);
|
|
}
|
|
Py_DECREF(tempobj);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_send_buffer_doc,
|
|
"channel_send_buffer(cid, obj, *, blocking=True, timeout=None)\n\
|
|
\n\
|
|
Add the object's buffer to the channel's queue.\n\
|
|
By default this waits for the object to be received.");
|
|
|
|
static PyObject *
|
|
channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", "default", NULL};
|
|
int64_t cid;
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
PyObject *dflt = NULL;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
|
|
channel_id_converter, &cid_data, &dflt)) {
|
|
return NULL;
|
|
}
|
|
cid = cid_data.cid;
|
|
|
|
PyObject *obj = NULL;
|
|
int err = channel_recv(&_globals.channels, cid, &obj);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
Py_XINCREF(dflt);
|
|
if (obj == NULL) {
|
|
// Use the default.
|
|
if (dflt == NULL) {
|
|
(void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
|
|
return NULL;
|
|
}
|
|
obj = Py_NewRef(dflt);
|
|
}
|
|
Py_XDECREF(dflt);
|
|
return obj;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_recv_doc,
|
|
"channel_recv(cid, [default]) -> obj\n\
|
|
\n\
|
|
Return a new object from the data at the front of the channel's queue.\n\
|
|
\n\
|
|
If there is nothing to receive then raise ChannelEmptyError, unless\n\
|
|
a default value is provided. In that case return it.");
|
|
|
|
static PyObject *
|
|
channelsmod_close(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
|
|
int64_t cid;
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
int send = 0;
|
|
int recv = 0;
|
|
int force = 0;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&|$ppp:channel_close", kwlist,
|
|
channel_id_converter, &cid_data,
|
|
&send, &recv, &force)) {
|
|
return NULL;
|
|
}
|
|
cid = cid_data.cid;
|
|
|
|
int err = channel_close(&_globals.channels, cid, send-recv, force);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_close_doc,
|
|
"channel_close(cid, *, send=None, recv=None, force=False)\n\
|
|
\n\
|
|
Close the channel for all interpreters.\n\
|
|
\n\
|
|
If the channel is empty then the keyword args are ignored and both\n\
|
|
ends are immediately closed. Otherwise, if 'force' is True then\n\
|
|
all queued items are released and both ends are immediately\n\
|
|
closed.\n\
|
|
\n\
|
|
If the channel is not empty *and* 'force' is False then following\n\
|
|
happens:\n\
|
|
\n\
|
|
* recv is True (regardless of send):\n\
|
|
- raise ChannelNotEmptyError\n\
|
|
* recv is None and send is None:\n\
|
|
- raise ChannelNotEmptyError\n\
|
|
* send is True and recv is not True:\n\
|
|
- fully close the 'send' end\n\
|
|
- close the 'recv' end to interpreters not already receiving\n\
|
|
- fully close it once empty\n\
|
|
\n\
|
|
Closing an already closed channel results in a ChannelClosedError.\n\
|
|
\n\
|
|
Once the channel's ID has no more ref counts in any interpreter\n\
|
|
the channel will be destroyed.");
|
|
|
|
static PyObject *
|
|
channelsmod_release(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
// Note that only the current interpreter is affected.
|
|
static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
|
|
int64_t cid;
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
int send = 0;
|
|
int recv = 0;
|
|
int force = 0;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&|$ppp:channel_release", kwlist,
|
|
channel_id_converter, &cid_data,
|
|
&send, &recv, &force)) {
|
|
return NULL;
|
|
}
|
|
cid = cid_data.cid;
|
|
if (send == 0 && recv == 0) {
|
|
send = 1;
|
|
recv = 1;
|
|
}
|
|
|
|
// XXX Handle force is True.
|
|
// XXX Fix implicit release.
|
|
|
|
int err = channel_release(&_globals.channels, cid, send, recv);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_release_doc,
|
|
"channel_release(cid, *, send=None, recv=None, force=True)\n\
|
|
\n\
|
|
Close the channel for the current interpreter. 'send' and 'recv'\n\
|
|
(bool) may be used to indicate the ends to close. By default both\n\
|
|
ends are closed. Closing an already closed end is a noop.");
|
|
|
|
static PyObject *
|
|
channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"cid", NULL};
|
|
struct channel_id_converter_data cid_data = {
|
|
.module = self,
|
|
};
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"O&:_get_info", kwlist,
|
|
channel_id_converter, &cid_data)) {
|
|
return NULL;
|
|
}
|
|
int64_t cid = cid_data.cid;
|
|
|
|
struct channel_info info;
|
|
int err = _channel_get_info(&_globals.channels, cid, &info);
|
|
if (handle_channel_error(err, self, cid)) {
|
|
return NULL;
|
|
}
|
|
return new_channel_info(self, &info);
|
|
}
|
|
|
|
PyDoc_STRVAR(channelsmod_get_info_doc,
|
|
"get_info(cid)\n\
|
|
\n\
|
|
Return details about the channel.");
|
|
|
|
static PyObject *
|
|
channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
module_state *state = get_module_state(self);
|
|
if (state == NULL) {
|
|
return NULL;
|
|
}
|
|
PyTypeObject *cls = state->ChannelIDType;
|
|
|
|
PyObject *mod = get_module_from_owned_type(cls);
|
|
assert(mod == self);
|
|
Py_DECREF(mod);
|
|
|
|
return _channelid_new(self, cls, args, kwds);
|
|
}
|
|
|
|
static PyObject *
|
|
channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
|
|
{
|
|
static char *kwlist[] = {"send", "recv", NULL};
|
|
PyObject *send;
|
|
PyObject *recv;
|
|
if (!PyArg_ParseTupleAndKeywords(args, kwds,
|
|
"OO:_register_end_types", kwlist,
|
|
&send, &recv)) {
|
|
return NULL;
|
|
}
|
|
if (!PyType_Check(send)) {
|
|
PyErr_SetString(PyExc_TypeError, "expected a type for 'send'");
|
|
return NULL;
|
|
}
|
|
if (!PyType_Check(recv)) {
|
|
PyErr_SetString(PyExc_TypeError, "expected a type for 'recv'");
|
|
return NULL;
|
|
}
|
|
PyTypeObject *cls_send = (PyTypeObject *)send;
|
|
PyTypeObject *cls_recv = (PyTypeObject *)recv;
|
|
|
|
if (set_channelend_types(self, cls_send, cls_recv) < 0) {
|
|
return NULL;
|
|
}
|
|
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
static PyMethodDef module_functions[] = {
|
|
{"create", channelsmod_create,
|
|
METH_NOARGS, channelsmod_create_doc},
|
|
{"destroy", _PyCFunction_CAST(channelsmod_destroy),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc},
|
|
{"list_all", channelsmod_list_all,
|
|
METH_NOARGS, channelsmod_list_all_doc},
|
|
{"list_interpreters", _PyCFunction_CAST(channelsmod_list_interpreters),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_list_interpreters_doc},
|
|
{"send", _PyCFunction_CAST(channelsmod_send),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_send_doc},
|
|
{"send_buffer", _PyCFunction_CAST(channelsmod_send_buffer),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_send_buffer_doc},
|
|
{"recv", _PyCFunction_CAST(channelsmod_recv),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_recv_doc},
|
|
{"close", _PyCFunction_CAST(channelsmod_close),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc},
|
|
{"release", _PyCFunction_CAST(channelsmod_release),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc},
|
|
{"get_info", _PyCFunction_CAST(channelsmod_get_info),
|
|
METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc},
|
|
{"_channel_id", _PyCFunction_CAST(channelsmod__channel_id),
|
|
METH_VARARGS | METH_KEYWORDS, NULL},
|
|
{"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types),
|
|
METH_VARARGS | METH_KEYWORDS, NULL},
|
|
|
|
{NULL, NULL} /* sentinel */
|
|
};
|
|
|
|
|
|
/* initialization function */
|
|
|
|
PyDoc_STRVAR(module_doc,
|
|
"This module provides primitive operations to manage Python interpreters.\n\
|
|
The 'interpreters' module provides a more convenient interface.");
|
|
|
|
static int
|
|
module_exec(PyObject *mod)
|
|
{
|
|
if (_globals_init() != 0) {
|
|
return -1;
|
|
}
|
|
|
|
module_state *state = get_module_state(mod);
|
|
if (state == NULL) {
|
|
goto error;
|
|
}
|
|
|
|
/* Add exception types */
|
|
if (exceptions_init(mod) != 0) {
|
|
goto error;
|
|
}
|
|
|
|
/* Add other types */
|
|
|
|
// ChannelInfo
|
|
state->ChannelInfoType = PyStructSequence_NewType(&channel_info_desc);
|
|
if (state->ChannelInfoType == NULL) {
|
|
goto error;
|
|
}
|
|
if (PyModule_AddType(mod, state->ChannelInfoType) < 0) {
|
|
goto error;
|
|
}
|
|
|
|
// ChannelID
|
|
state->ChannelIDType = add_channelid_type(mod);
|
|
if (state->ChannelIDType == NULL) {
|
|
goto error;
|
|
}
|
|
|
|
/* Make sure chnnels drop objects owned by this interpreter. */
|
|
PyInterpreterState *interp = _get_current_interp();
|
|
PyUnstable_AtExit(interp, clear_interpreter, (void *)interp);
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (state != NULL) {
|
|
clear_xid_types(state);
|
|
}
|
|
_globals_fini();
|
|
return -1;
|
|
}
|
|
|
|
static struct PyModuleDef_Slot module_slots[] = {
|
|
{Py_mod_exec, module_exec},
|
|
{Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
|
|
{Py_mod_gil, Py_MOD_GIL_NOT_USED},
|
|
{0, NULL},
|
|
};
|
|
|
|
static int
|
|
module_traverse(PyObject *mod, visitproc visit, void *arg)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
assert(state != NULL);
|
|
traverse_module_state(state, visit, arg);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
module_clear(PyObject *mod)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
assert(state != NULL);
|
|
|
|
// Now we clear the module state.
|
|
clear_module_state(state);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
module_free(void *mod)
|
|
{
|
|
module_state *state = get_module_state(mod);
|
|
assert(state != NULL);
|
|
|
|
// Now we clear the module state.
|
|
clear_module_state(state);
|
|
|
|
_globals_fini();
|
|
}
|
|
|
|
static struct PyModuleDef moduledef = {
|
|
.m_base = PyModuleDef_HEAD_INIT,
|
|
.m_name = MODULE_NAME_STR,
|
|
.m_doc = module_doc,
|
|
.m_size = sizeof(module_state),
|
|
.m_methods = module_functions,
|
|
.m_slots = module_slots,
|
|
.m_traverse = module_traverse,
|
|
.m_clear = module_clear,
|
|
.m_free = (freefunc)module_free,
|
|
};
|
|
|
|
PyMODINIT_FUNC
|
|
MODINIT_FUNC_NAME(void)
|
|
{
|
|
return PyModuleDef_Init(&moduledef);
|
|
}
|