gh-105716: Support Background Threads in Subinterpreters Consistently (gh-109921)

The existence of background threads running on a subinterpreter was preventing interpreters from getting properly destroyed, as well as impacting the ability to run the interpreter again. It also affected how we wait for non-daemon threads to finish.

We add PyInterpreterState.threads.main, with some internal C-API functions.
This commit is contained in:
Eric Snow 2023-10-02 14:12:12 -06:00 committed by GitHub
parent a040a32ea2
commit 1dd9dee45d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 258 additions and 46 deletions

View File

@ -211,6 +211,7 @@ struct _ts {
* if it is NULL. */
PyAPI_FUNC(PyThreadState *) _PyThreadState_UncheckedGet(void);
// Disable tracing and profiling.
PyAPI_FUNC(void) PyThreadState_EnterTracing(PyThreadState *tstate);

View File

@ -73,6 +73,8 @@ struct _is {
uint64_t next_unique_id;
/* The linked list of threads, newest first. */
PyThreadState *head;
/* The thread currently executing in the __main__ module, if any. */
PyThreadState *main;
/* Used in Modules/_threadmodule.c. */
long count;
/* Support for runtime thread stack size tuning.

View File

@ -44,6 +44,11 @@ _Py_IsMainInterpreterFinalizing(PyInterpreterState *interp)
interp == &_PyRuntime._main_interpreter);
}
// Export for _xxsubinterpreters module.
PyAPI_FUNC(int) _PyInterpreterState_SetRunningMain(PyInterpreterState *);
PyAPI_FUNC(void) _PyInterpreterState_SetNotRunningMain(PyInterpreterState *);
PyAPI_FUNC(int) _PyInterpreterState_IsRunningMain(PyInterpreterState *);
static inline const PyConfig *
_Py_GetMainConfig(void)

View File

@ -261,6 +261,16 @@ class TestInterpreterIsRunning(TestBase):
self.assertTrue(interp.is_running())
self.assertFalse(interp.is_running())
def test_finished(self):
r, w = os.pipe()
interp = interpreters.create()
interp.run(f"""if True:
import os
os.write({w}, b'x')
""")
self.assertFalse(interp.is_running())
self.assertEqual(os.read(r, 1), b'x')
def test_from_subinterpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
@ -288,6 +298,31 @@ class TestInterpreterIsRunning(TestBase):
with self.assertRaises(ValueError):
interp.is_running()
def test_with_only_background_threads(self):
r_interp, w_interp = os.pipe()
r_thread, w_thread = os.pipe()
DONE = b'D'
FINISHED = b'F'
interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
def task():
v = os.read({r_thread}, 1)
assert v == {DONE!r}
os.write({w_interp}, {FINISHED!r})
t = threading.Thread(target=task)
t.start()
""")
self.assertFalse(interp.is_running())
os.write(w_thread, DONE)
interp.run('t.join()')
self.assertEqual(os.read(r_interp, 1), FINISHED)
class TestInterpreterClose(TestBase):
@ -389,6 +424,37 @@ class TestInterpreterClose(TestBase):
interp.close()
self.assertTrue(interp.is_running())
def test_subthreads_still_running(self):
r_interp, w_interp = os.pipe()
r_thread, w_thread = os.pipe()
FINISHED = b'F'
interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
import time
done = False
def notify_fini():
global done
done = True
t.join()
threading._register_atexit(notify_fini)
def task():
while not done:
time.sleep(0.1)
os.write({w_interp}, {FINISHED!r})
t = threading.Thread(target=task)
t.start()
""")
interp.close()
self.assertEqual(os.read(r_interp, 1), FINISHED)
class TestInterpreterRun(TestBase):
@ -465,6 +531,37 @@ class TestInterpreterRun(TestBase):
with self.assertRaises(TypeError):
interp.run(b'print("spam")')
def test_with_background_threads_still_running(self):
r_interp, w_interp = os.pipe()
r_thread, w_thread = os.pipe()
RAN = b'R'
DONE = b'D'
FINISHED = b'F'
interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
def task():
v = os.read({r_thread}, 1)
assert v == {DONE!r}
os.write({w_interp}, {FINISHED!r})
t = threading.Thread(target=task)
t.start()
os.write({w_interp}, {RAN!r})
""")
interp.run(f"""if True:
os.write({w_interp}, {RAN!r})
""")
os.write(w_thread, DONE)
interp.run('t.join()')
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), FINISHED)
# test_xxsubinterpreters covers the remaining Interpreter.run() behavior.

View File

@ -26,6 +26,11 @@ from unittest import mock
from test import lock_tests
from test import support
try:
from test.support import interpreters
except ModuleNotFoundError:
interpreters = None
threading_helper.requires_working_threading(module=True)
# Between fork() and exec(), only async-safe functions are allowed (issues
@ -52,6 +57,12 @@ def skip_unless_reliable_fork(test):
return test
def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(interpreters is None,
'subinterpreters required')(meth)
def restore_default_excepthook(testcase):
testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
threading.excepthook = threading.__excepthook__
@ -1311,6 +1322,44 @@ class SubinterpThreadingTests(BaseTestCase):
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
@requires_subinterpreters
def test_threads_join_with_no_main(self):
r_interp, w_interp = self.pipe()
INTERP = b'I'
FINI = b'F'
DONE = b'D'
interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
import time
done = False
def notify_fini():
global done
done = True
os.write({w_interp}, {FINI!r})
t.join()
threading._register_atexit(notify_fini)
def task():
while not done:
time.sleep(0.1)
os.write({w_interp}, {DONE!r})
t = threading.Thread(target=task)
t.start()
os.write({w_interp}, {INTERP!r})
""")
interp.close()
self.assertEqual(os.read(r_interp, 1), INTERP)
self.assertEqual(os.read(r_interp, 1), FINI)
self.assertEqual(os.read(r_interp, 1), DONE)
@cpython_only
def test_daemon_threads_fatal_error(self):
subinterp_code = f"""if 1:

View File

@ -38,6 +38,7 @@ _daemon_threads_allowed = _thread.daemon_threads_allowed
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
_is_main_interpreter = _thread._is_main_interpreter
try:
get_native_id = _thread.get_native_id
_HAVE_THREAD_NATIVE_ID = True
@ -1574,7 +1575,7 @@ def _shutdown():
# the main thread's tstate_lock - that won't happen until the interpreter
# is nearly dead. So we release it here. Note that just calling _stop()
# isn't enough: other threads may already be waiting on _tstate_lock.
if _main_thread._is_stopped:
if _main_thread._is_stopped and _is_main_interpreter():
# _shutdown() was already called
return
@ -1627,6 +1628,7 @@ def main_thread():
In normal conditions, the main thread is the thread from which the
Python interpreter was started.
"""
# XXX Figure this out for subinterpreters. (See gh-75698.)
return _main_thread
# get thread-local implementation, either from the thread

View File

@ -0,0 +1,3 @@
Subinterpreters now correctly handle the case where they have threads
running in the background. Before, such threads would interfere with
cleaning up and destroying them, as well as prevent running another script.

View File

@ -1605,6 +1605,18 @@ PyDoc_STRVAR(excepthook_doc,
\n\
Handle uncaught Thread.run() exception.");
static PyObject *
thread__is_main_interpreter(PyObject *module, PyObject *Py_UNUSED(ignored))
{
PyInterpreterState *interp = _PyInterpreterState_GET();
return PyBool_FromLong(_Py_IsMainInterpreter(interp));
}
PyDoc_STRVAR(thread__is_main_interpreter_doc,
"_is_main_interpreter()\n\
\n\
Return True if the current interpreter is the main Python interpreter.");
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
@ -1634,8 +1646,10 @@ static PyMethodDef thread_methods[] = {
METH_VARARGS, stack_size_doc},
{"_set_sentinel", thread__set_sentinel,
METH_NOARGS, _set_sentinel_doc},
{"_excepthook", thread_excepthook,
{"_excepthook", thread_excepthook,
METH_O, excepthook_doc},
{"_is_main_interpreter", thread__is_main_interpreter,
METH_NOARGS, thread__is_main_interpreter_doc},
{NULL, NULL} /* sentinel */
};

View File

@ -8,6 +8,7 @@
#include "Python.h"
#include "pycore_initconfig.h" // _PyErr_SetFromPyStatus()
#include "pycore_pyerrors.h" // _PyErr_ChainExceptions1()
#include "pycore_pystate.h" // _PyInterpreterState_SetRunningMain()
#include "interpreteridobject.h"
@ -357,42 +358,15 @@ exceptions_init(PyObject *mod)
return 0;
}
static int
_is_running(PyInterpreterState *interp)
{
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
if (PyThreadState_Next(tstate) != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"interpreter has more than one thread");
return -1;
}
assert(!PyErr_Occurred());
struct _PyInterpreterFrame *frame = tstate->current_frame;
if (frame == NULL) {
return 0;
}
return 1;
}
static int
_ensure_not_running(PyInterpreterState *interp)
{
int is_running = _is_running(interp);
if (is_running < 0) {
return -1;
}
if (is_running) {
PyErr_Format(PyExc_RuntimeError, "interpreter already running");
return -1;
}
return 0;
}
static int
_run_script(PyInterpreterState *interp, const char *codestr,
_sharedns *shared, _sharedexception *sharedexc)
{
if (_PyInterpreterState_SetRunningMain(interp) < 0) {
// We skip going through the shared exception.
return -1;
}
PyObject *excval = NULL;
PyObject *main_mod = PyUnstable_InterpreterState_GetMainModule(interp);
if (main_mod == NULL) {
@ -422,6 +396,7 @@ _run_script(PyInterpreterState *interp, const char *codestr,
else {
Py_DECREF(result); // We throw away the result.
}
_PyInterpreterState_SetNotRunningMain(interp);
*sharedexc = no_exception;
return 0;
@ -437,6 +412,7 @@ error:
}
Py_XDECREF(excval);
assert(!PyErr_Occurred());
_PyInterpreterState_SetNotRunningMain(interp);
return -1;
}
@ -444,9 +420,6 @@ static int
_run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
const char *codestr, PyObject *shareables)
{
if (_ensure_not_running(interp) < 0) {
return -1;
}
module_state *state = get_module_state(mod);
_sharedns *shared = _get_shared_ns(shareables);
@ -457,8 +430,26 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
// Switch to interpreter.
PyThreadState *save_tstate = NULL;
if (interp != PyInterpreterState_Get()) {
// XXX Using the "head" thread isn't strictly correct.
// XXX gh-109860: Using the "head" thread isn't strictly correct.
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
assert(tstate != NULL);
// Hack (until gh-109860): The interpreter's initial thread state
// is least likely to break.
while(tstate->next != NULL) {
tstate = tstate->next;
}
// We must do this check before switching interpreters, so any
// exception gets raised in the right one.
// XXX gh-109860: Drop this redundant check once we stop
// re-using tstates that might already be in use.
if (_PyInterpreterState_IsRunningMain(interp)) {
PyErr_SetString(PyExc_RuntimeError,
"interpreter already running");
if (shared != NULL) {
_sharedns_free(shared);
}
return -1;
}
// XXX Possible GILState issues?
save_tstate = PyThreadState_Swap(tstate);
}
@ -478,8 +469,10 @@ _run_script_in_interpreter(PyObject *mod, PyInterpreterState *interp,
_sharedexception_apply(&exc, state->RunFailedError);
}
else if (result != 0) {
// We were unable to allocate a shared exception.
PyErr_NoMemory();
if (!PyErr_Occurred()) {
// We were unable to allocate a shared exception.
PyErr_NoMemory();
}
}
if (shared != NULL) {
@ -574,12 +567,20 @@ interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
// Ensure the interpreter isn't running.
/* XXX We *could* support destroying a running interpreter but
aren't going to worry about it for now. */
if (_ensure_not_running(interp) < 0) {
if (_PyInterpreterState_IsRunningMain(interp)) {
PyErr_Format(PyExc_RuntimeError, "interpreter running");
return NULL;
}
// Destroy the interpreter.
// XXX gh-109860: Using the "head" thread isn't strictly correct.
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
assert(tstate != NULL);
// Hack (until gh-109860): The interpreter's initial thread state
// is least likely to break.
while(tstate->next != NULL) {
tstate = tstate->next;
}
// XXX Possible GILState issues?
PyThreadState *save_tstate = PyThreadState_Swap(tstate);
Py_EndInterpreter(tstate);
@ -748,11 +749,7 @@ interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
if (interp == NULL) {
return NULL;
}
int is_running = _is_running(interp);
if (is_running < 0) {
return NULL;
}
if (is_running) {
if (_PyInterpreterState_IsRunningMain(interp)) {
Py_RETURN_TRUE;
}
Py_RETURN_FALSE;
@ -763,6 +760,7 @@ PyDoc_STRVAR(is_running_doc,
\n\
Return whether or not the identified interpreter is running.");
static PyMethodDef module_functions[] = {
{"create", _PyCFunction_CAST(interp_create),
METH_VARARGS | METH_KEYWORDS, create_doc},

View File

@ -612,6 +612,9 @@ pymain_run_python(int *exitcode)
pymain_header(config);
_PyInterpreterState_SetRunningMain(interp);
assert(!PyErr_Occurred());
if (config->run_command) {
*exitcode = pymain_run_command(config->run_command);
}
@ -635,6 +638,7 @@ error:
*exitcode = pymain_exit_err_print();
done:
_PyInterpreterState_SetNotRunningMain(interp);
Py_XDECREF(main_importer_path);
}

View File

@ -1091,6 +1091,39 @@ _PyInterpreterState_DeleteExceptMain(_PyRuntimeState *runtime)
#endif
int
_PyInterpreterState_SetRunningMain(PyInterpreterState *interp)
{
if (interp->threads.main != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"interpreter already running");
return -1;
}
PyThreadState *tstate = current_fast_get(&_PyRuntime);
_Py_EnsureTstateNotNULL(tstate);
if (tstate->interp != interp) {
PyErr_SetString(PyExc_RuntimeError,
"current tstate has wrong interpreter");
return -1;
}
interp->threads.main = tstate;
return 0;
}
void
_PyInterpreterState_SetNotRunningMain(PyInterpreterState *interp)
{
assert(interp->threads.main == current_fast_get(&_PyRuntime));
interp->threads.main = NULL;
}
int
_PyInterpreterState_IsRunningMain(PyInterpreterState *interp)
{
return (interp->threads.main != NULL);
}
//----------
// accessors
//----------
@ -2801,6 +2834,10 @@ _register_builtins_for_crossinterpreter_data(struct _xidregistry *xidregistry)
}
/*************/
/* Other API */
/*************/
_PyFrameEvalFunction
_PyInterpreterState_GetEvalFrameFunc(PyInterpreterState *interp)
{