bpo-38880: List interpreters associated with a channel end (GH-17323)

This PR adds the functionality requested by https://github.com/ericsnowcurrently/multi-core-python/issues/52.

Automerge-Triggered-By: @ericsnowcurrently
This commit is contained in:
Lewis Gaul 2020-04-29 01:18:42 +01:00 committed by GitHub
parent 49f70db83e
commit f7bbf58aa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 282 additions and 6 deletions

View File

@ -1207,6 +1207,185 @@ class ChannelTests(TestBase):
self.assertEqual(cid2, int(cid1) + 1) self.assertEqual(cid2, int(cid1) + 1)
def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations."""
# Test for channel with no associated interpreters.
cid = interpreters.channel_create()
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [])
self.assertEqual(recv_interps, [])
def test_channel_list_interpreters_basic(self):
"""Test basic listing channel interpreters."""
interp0 = interpreters.get_main()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "send")
# Test for a channel that has one end associated to an interpreter.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])
interp1 = interpreters.create()
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
# Test for channel that has boths ends associated to an interpreter.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])
def test_channel_list_interpreters_multiple(self):
"""Test listing interpreters for a channel with many associations."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
interp2 = interpreters.create()
interp3 = interpreters.create()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "send")
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.channel_send({cid}, "send")
"""))
_run_output(interp2, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
_run_output(interp3, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(set(send_interps), {interp0, interp1})
self.assertEqual(set(recv_interps), {interp2, interp3})
def test_channel_list_interpreters_destroyed(self):
"""Test listing channel interpreters with a destroyed interpreter."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "send")
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
# Should be one interpreter associated with each end.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])
interpreters.destroy(interp1)
# Destroyed interpreter should not be listed.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])
def test_channel_list_interpreters_released(self):
"""Test listing channel interpreters with a released channel."""
# Set up one channel with main interpreter on the send end and two
# subinterpreters on the receive end.
interp0 = interpreters.get_main()
interp1 = interpreters.create()
interp2 = interpreters.create()
cid = interpreters.channel_create()
interpreters.channel_send(cid, "data")
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
interpreters.channel_send(cid, "data")
_run_output(interp2, dedent(f"""
import _xxsubinterpreters as _interpreters
obj = _interpreters.channel_recv({cid})
"""))
# Check the setup.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 2)
# Release the main interpreter from the send end.
interpreters.channel_release(cid, send=True)
# Send end should have no associated interpreters.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
self.assertEqual(len(recv_interps), 2)
# Release one of the subinterpreters from the receive end.
_run_output(interp2, dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.channel_release({cid})
"""))
# Receive end should have the released interpreter removed.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
self.assertEqual(recv_interps, [interp1])
def test_channel_list_interpreters_closed(self):
"""Test listing channel interpreters with a closed channel."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = interpreters.channel_create()
# Put something in the channel so that it's not empty.
interpreters.channel_send(cid, "send")
# Check initial state.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 0)
# Force close the channel.
interpreters.channel_close(cid, force=True)
# Both ends should raise an error.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=False)
def test_channel_list_interpreters_closed_send_end(self):
"""Test listing channel interpreters with a channel's send end closed."""
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = interpreters.channel_create()
# Put something in the channel so that it's not empty.
interpreters.channel_send(cid, "send")
# Check initial state.
send_interps = interpreters.channel_list_interpreters(cid, send=True)
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 0)
# Close the send end of the channel.
interpreters.channel_close(cid, send=True)
# Send end should raise an error.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
# Receive end should not be closed (since channel is not empty).
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
self.assertEqual(len(recv_interps), 0)
# Close the receive end of the channel from a subinterpreter.
_run_output(interp1, dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.channel_close({cid}, force=True)
"""))
# Both ends should raise an error.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=False)
#################### ####################
def test_send_recv_main(self): def test_send_recv_main(self):
@ -1540,6 +1719,23 @@ class ChannelTests(TestBase):
with self.assertRaises(interpreters.ChannelClosedError): with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_recv(cid) interpreters.channel_recv(cid)
def test_channel_list_interpreters_invalid_channel(self):
cid = interpreters.channel_create()
# Test for invalid channel ID.
with self.assertRaises(interpreters.ChannelNotFoundError):
interpreters.channel_list_interpreters(1000, send=True)
interpreters.channel_close(cid)
# Test for a channel that has been closed.
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_list_interpreters(cid, send=True)
def test_channel_list_interpreters_invalid_args(self):
# Tests for invalid arguments passed to the API.
cid = interpreters.channel_create()
with self.assertRaises(TypeError):
interpreters.channel_list_interpreters(cid)
class ChannelReleaseTests(TestBase): class ChannelReleaseTests(TestBase):

View File

@ -456,6 +456,7 @@ Rodolpho Eckhardt
Ulrich Eckhardt Ulrich Eckhardt
David Edelsohn David Edelsohn
John Edmonds John Edmonds
Benjamin Edwards
Grant Edwards Grant Edwards
Zvi Effron Zvi Effron
John Ehresman John Ehresman
@ -570,6 +571,7 @@ Jake Garver
Dan Gass Dan Gass
Tim Gates Tim Gates
Andrew Gaul Andrew Gaul
Lewis Gaul
Matthieu Gautier Matthieu Gautier
Stephen M. Gava Stephen M. Gava
Xavier de Gaye Xavier de Gaye

View File

@ -0,0 +1 @@
Added the ability to list interpreters associated with channel ends in the internal subinterpreters module.

View File

@ -538,7 +538,7 @@ _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
typedef struct _channelassociations { typedef struct _channelassociations {
// Note that the list entries are never removed for interpreter // Note that the list entries are never removed for interpreter
// for which the channel is closed. This should be a problem in // for which the channel is closed. This should not be a problem in
// practice. Also, a channel isn't automatically closed when an // practice. Also, a channel isn't automatically closed when an
// interpreter is destroyed. // interpreter is destroyed.
int64_t numsendopen; int64_t numsendopen;
@ -1179,11 +1179,6 @@ _channels_list_all(_channels *channels, int64_t *count)
{ {
int64_t *cids = NULL; int64_t *cids = NULL;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK); PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
int64_t numopen = channels->numopen;
if (numopen >= PY_SSIZE_T_MAX) {
PyErr_SetString(PyExc_RuntimeError, "too many channels open");
goto done;
}
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen)); int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
if (ids == NULL) { if (ids == NULL) {
goto done; goto done;
@ -1392,6 +1387,24 @@ _channel_close(_channels *channels, int64_t id, int end, int force)
return _channels_close(channels, id, NULL, end, force); return _channels_close(channels, id, NULL, end, force);
} }
static int
_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
int send)
{
_PyChannelState *chan = _channels_lookup(channels, cid, NULL);
if (chan == NULL) {
return -1;
} else if (send && chan->closing != NULL) {
PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
return -1;
}
_channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
interp, NULL);
return (end != NULL && end->open);
}
/* ChannelID class */ /* ChannelID class */
static PyTypeObject ChannelIDtype; static PyTypeObject ChannelIDtype;
@ -2323,6 +2336,68 @@ PyDoc_STRVAR(channel_list_all_doc,
\n\ \n\
Return the list of all IDs for active channels."); Return the list of all IDs for active channels.");
static PyObject *
channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "send", NULL};
int64_t cid; /* Channel ID */
int send = 0; /* Send or receive end? */
int64_t id;
PyObject *ids, *id_obj;
PyInterpreterState *interp;
if (!PyArg_ParseTupleAndKeywords(
args, kwds, "O&$p:channel_list_interpreters",
kwlist, channel_id_converter, &cid, &send)) {
return NULL;
}
ids = PyList_New(0);
if (ids == NULL) {
goto except;
}
interp = PyInterpreterState_Head();
while (interp != NULL) {
id = PyInterpreterState_GetID(interp);
assert(id >= 0);
int res = _channel_is_associated(&_globals.channels, cid, id, send);
if (res < 0) {
goto except;
}
if (res) {
id_obj = _PyInterpreterState_GetIDObject(interp);
if (id_obj == NULL) {
goto except;
}
res = PyList_Insert(ids, 0, id_obj);
Py_DECREF(id_obj);
if (res < 0) {
goto except;
}
}
interp = PyInterpreterState_Next(interp);
}
goto finally;
except:
Py_XDECREF(ids);
ids = NULL;
finally:
return ids;
}
PyDoc_STRVAR(channel_list_interpreters_doc,
"channel_list_interpreters(cid, *, send) -> [id]\n\
\n\
Return the list of all interpreter IDs associated with an end of the channel.\n\
\n\
The 'send' argument should be a boolean indicating whether to use the send or\n\
receive end.");
static PyObject * static PyObject *
channel_send(PyObject *self, PyObject *args, PyObject *kwds) channel_send(PyObject *self, PyObject *args, PyObject *kwds)
{ {
@ -2493,6 +2568,8 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channel_destroy_doc}, METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
{"channel_list_all", channel_list_all, {"channel_list_all", channel_list_all,
METH_NOARGS, channel_list_all_doc}, METH_NOARGS, channel_list_all_doc},
{"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters,
METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
{"channel_send", (PyCFunction)(void(*)(void))channel_send, {"channel_send", (PyCFunction)(void(*)(void))channel_send,
METH_VARARGS | METH_KEYWORDS, channel_send_doc}, METH_VARARGS | METH_KEYWORDS, channel_send_doc},
{"channel_recv", (PyCFunction)(void(*)(void))channel_recv, {"channel_recv", (PyCFunction)(void(*)(void))channel_recv,