bpo-32604: Swap threads only if the interpreter is different. (gh-5778)
The CPython runtime assumes that there is a one-to-one relationship (for a given interpreter) between PyThreadState and OS threads. Sending and receiving on a channel in the same interpreter was causing crashes because of this (specifically due to a check in PyThreadState_Swap()). The solution is to not switch threads if the interpreter is the same.
This commit is contained in:
parent
80d20b918b
commit
f53d9f2778
|
@ -3,6 +3,7 @@ import os
|
|||
import pickle
|
||||
from textwrap import dedent, indent
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from test import support
|
||||
|
@ -1147,6 +1148,54 @@ class ChannelTests(TestBase):
|
|||
|
||||
self.assertEqual(obj, b'spam')
|
||||
|
||||
def test_send_recv_different_threads(self):
|
||||
cid = interpreters.channel_create()
|
||||
|
||||
def f():
|
||||
while True:
|
||||
try:
|
||||
obj = interpreters.channel_recv(cid)
|
||||
break
|
||||
except interpreters.ChannelEmptyError:
|
||||
time.sleep(0.1)
|
||||
interpreters.channel_send(cid, obj)
|
||||
t = threading.Thread(target=f)
|
||||
t.start()
|
||||
|
||||
interpreters.channel_send(cid, b'spam')
|
||||
t.join()
|
||||
obj = interpreters.channel_recv(cid)
|
||||
|
||||
self.assertEqual(obj, b'spam')
|
||||
|
||||
def test_send_recv_different_interpreters_and_threads(self):
|
||||
cid = interpreters.channel_create()
|
||||
id1 = interpreters.create()
|
||||
out = None
|
||||
|
||||
def f():
|
||||
nonlocal out
|
||||
out = _run_output(id1, dedent(f"""
|
||||
import time
|
||||
import _xxsubinterpreters as _interpreters
|
||||
while True:
|
||||
try:
|
||||
obj = _interpreters.channel_recv({int(cid)})
|
||||
break
|
||||
except _interpreters.ChannelEmptyError:
|
||||
time.sleep(0.1)
|
||||
assert(obj == b'spam')
|
||||
_interpreters.channel_send({int(cid)}, b'eggs')
|
||||
"""))
|
||||
t = threading.Thread(target=f)
|
||||
t.start()
|
||||
|
||||
interpreters.channel_send(cid, b'spam')
|
||||
t.join()
|
||||
obj = interpreters.channel_recv(cid)
|
||||
|
||||
self.assertEqual(obj, b'eggs')
|
||||
|
||||
def test_send_not_found(self):
|
||||
with self.assertRaises(interpreters.ChannelNotFoundError):
|
||||
interpreters.channel_send(10, b'spam')
|
||||
|
|
|
@ -1759,8 +1759,13 @@ _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
|
|||
}
|
||||
|
||||
// Switch to interpreter.
|
||||
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
|
||||
PyThreadState *save_tstate = PyThreadState_Swap(tstate);
|
||||
PyThreadState *save_tstate = NULL;
|
||||
if (interp != PyThreadState_Get()->interp) {
|
||||
// XXX Using the "head" thread isn't strictly correct.
|
||||
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
|
||||
// XXX Possible GILState issues?
|
||||
save_tstate = PyThreadState_Swap(tstate);
|
||||
}
|
||||
|
||||
// Run the script.
|
||||
_sharedexception *exc = NULL;
|
||||
|
@ -2079,9 +2084,9 @@ interp_create(PyObject *self, PyObject *args)
|
|||
}
|
||||
|
||||
// Create and initialize the new interpreter.
|
||||
PyThreadState *tstate, *save_tstate;
|
||||
save_tstate = PyThreadState_Swap(NULL);
|
||||
tstate = Py_NewInterpreter();
|
||||
PyThreadState *save_tstate = PyThreadState_Swap(NULL);
|
||||
// XXX Possible GILState issues?
|
||||
PyThreadState *tstate = Py_NewInterpreter();
|
||||
PyThreadState_Swap(save_tstate);
|
||||
if (tstate == NULL) {
|
||||
/* Since no new thread state was created, there is no exception to
|
||||
|
@ -2096,6 +2101,7 @@ interp_create(PyObject *self, PyObject *args)
|
|||
return _get_id(tstate->interp);
|
||||
|
||||
error:
|
||||
// XXX Possible GILState issues?
|
||||
save_tstate = PyThreadState_Swap(tstate);
|
||||
Py_EndInterpreter(tstate);
|
||||
PyThreadState_Swap(save_tstate);
|
||||
|
@ -2146,9 +2152,9 @@ interp_destroy(PyObject *self, PyObject *args)
|
|||
|
||||
// Destroy the interpreter.
|
||||
//PyInterpreterState_Delete(interp);
|
||||
PyThreadState *tstate, *save_tstate;
|
||||
tstate = PyInterpreterState_ThreadHead(interp);
|
||||
save_tstate = PyThreadState_Swap(tstate);
|
||||
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
|
||||
// XXX Possible GILState issues?
|
||||
PyThreadState *save_tstate = PyThreadState_Swap(tstate);
|
||||
Py_EndInterpreter(tstate);
|
||||
PyThreadState_Swap(save_tstate);
|
||||
|
||||
|
|
|
@ -331,9 +331,10 @@ _PyInterpreterState_IDDecref(PyInterpreterState *interp)
|
|||
PyThread_release_lock(interp->id_mutex);
|
||||
|
||||
if (refcount == 0) {
|
||||
PyThreadState *tstate, *save_tstate;
|
||||
tstate = PyInterpreterState_ThreadHead(interp);
|
||||
save_tstate = PyThreadState_Swap(tstate);
|
||||
// XXX Using the "head" thread isn't strictly correct.
|
||||
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
|
||||
// XXX Possible GILState issues?
|
||||
PyThreadState *save_tstate = PyThreadState_Swap(tstate);
|
||||
Py_EndInterpreter(tstate);
|
||||
PyThreadState_Swap(save_tstate);
|
||||
}
|
||||
|
@ -1213,8 +1214,14 @@ _PyCrossInterpreterData_Release(_PyCrossInterpreterData *data)
|
|||
}
|
||||
return;
|
||||
}
|
||||
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
|
||||
PyThreadState *save_tstate = PyThreadState_Swap(tstate);
|
||||
|
||||
PyThreadState *save_tstate = NULL;
|
||||
if (interp != PyThreadState_Get()->interp) {
|
||||
// XXX Using the "head" thread isn't strictly correct.
|
||||
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
|
||||
// XXX Possible GILState issues?
|
||||
save_tstate = PyThreadState_Swap(tstate);
|
||||
}
|
||||
|
||||
// "Release" the data and/or the object.
|
||||
if (data->free != NULL) {
|
||||
|
@ -1223,8 +1230,9 @@ _PyCrossInterpreterData_Release(_PyCrossInterpreterData *data)
|
|||
Py_XDECREF(data->obj);
|
||||
|
||||
// Switch back.
|
||||
if (save_tstate != NULL)
|
||||
if (save_tstate != NULL) {
|
||||
PyThreadState_Swap(save_tstate);
|
||||
}
|
||||
}
|
||||
|
||||
PyObject *
|
||||
|
|
Loading…
Reference in New Issue