Issue #18808: Thread.join() now waits for the underlying thread state to be destroyed before returning.
This prevents unpredictable aborts in Py_EndInterpreter() when some non-daemon threads are still running.
This commit is contained in:
parent
eda7c64151
commit
7b4769937f
|
@ -118,6 +118,32 @@ typedef struct _ts {
|
||||||
int trash_delete_nesting;
|
int trash_delete_nesting;
|
||||||
PyObject *trash_delete_later;
|
PyObject *trash_delete_later;
|
||||||
|
|
||||||
|
/* Called when a thread state is deleted normally, but not when it
|
||||||
|
* is destroyed after fork().
|
||||||
|
* Pain: to prevent rare but fatal shutdown errors (issue 18808),
|
||||||
|
* Thread.join() must wait for the join'ed thread's tstate to be unlinked
|
||||||
|
* from the tstate chain. That happens at the end of a thread's life,
|
||||||
|
* in pystate.c.
|
||||||
|
* The obvious way doesn't quite work: create a lock which the tstate
|
||||||
|
* unlinking code releases, and have Thread.join() wait to acquire that
|
||||||
|
* lock. The problem is that we _are_ at the end of the thread's life:
|
||||||
|
* if the thread holds the last reference to the lock, decref'ing the
|
||||||
|
* lock will delete the lock, and that may trigger arbitrary Python code
|
||||||
|
* if there's a weakref, with a callback, to the lock. But by this time
|
||||||
|
* _PyThreadState_Current is already NULL, so only the simplest of C code
|
||||||
|
* can be allowed to run (in particular it must not be possible to
|
||||||
|
* release the GIL).
|
||||||
|
* So instead of holding the lock directly, the tstate holds a weakref to
|
||||||
|
* the lock: that's the value of on_delete_data below. Decref'ing a
|
||||||
|
* weakref is harmless.
|
||||||
|
* on_delete points to _threadmodule.c's static release_sentinel() function.
|
||||||
|
* After the tstate is unlinked, release_sentinel is called with the
|
||||||
|
* weakref-to-lock (on_delete_data) argument, and release_sentinel releases
|
||||||
|
* the indirectly held lock.
|
||||||
|
*/
|
||||||
|
void (*on_delete)(void *);
|
||||||
|
void *on_delete_data;
|
||||||
|
|
||||||
/* XXX signal handlers should also be here */
|
/* XXX signal handlers should also be here */
|
||||||
|
|
||||||
} PyThreadState;
|
} PyThreadState;
|
||||||
|
|
|
@ -81,6 +81,10 @@ def stack_size(size=None):
|
||||||
raise error("setting thread stack size not supported")
|
raise error("setting thread stack size not supported")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def _set_sentinel():
|
||||||
|
"""Dummy implementation of _thread._set_sentinel()."""
|
||||||
|
return LockType()
|
||||||
|
|
||||||
class LockType(object):
|
class LockType(object):
|
||||||
"""Class implementing dummy implementation of _thread.LockType.
|
"""Class implementing dummy implementation of _thread.LockType.
|
||||||
|
|
||||||
|
|
|
@ -539,6 +539,40 @@ class ThreadTests(BaseTestCase):
|
||||||
self.assertEqual(err, b"")
|
self.assertEqual(err, b"")
|
||||||
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
|
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
|
||||||
|
|
||||||
|
def test_tstate_lock(self):
|
||||||
|
# Test an implementation detail of Thread objects.
|
||||||
|
started = _thread.allocate_lock()
|
||||||
|
finish = _thread.allocate_lock()
|
||||||
|
started.acquire()
|
||||||
|
finish.acquire()
|
||||||
|
def f():
|
||||||
|
started.release()
|
||||||
|
finish.acquire()
|
||||||
|
time.sleep(0.01)
|
||||||
|
# The tstate lock is None until the thread is started
|
||||||
|
t = threading.Thread(target=f)
|
||||||
|
self.assertIs(t._tstate_lock, None)
|
||||||
|
t.start()
|
||||||
|
started.acquire()
|
||||||
|
self.assertTrue(t.is_alive())
|
||||||
|
# The tstate lock can't be acquired when the thread is running
|
||||||
|
# (or suspended).
|
||||||
|
tstate_lock = t._tstate_lock
|
||||||
|
self.assertFalse(tstate_lock.acquire(timeout=0), False)
|
||||||
|
finish.release()
|
||||||
|
# When the thread ends, the state_lock can be successfully
|
||||||
|
# acquired.
|
||||||
|
self.assertTrue(tstate_lock.acquire(timeout=5), False)
|
||||||
|
# But is_alive() is still True: we hold _tstate_lock now, which
|
||||||
|
# prevents is_alive() from knowing the thread's end-of-life C code
|
||||||
|
# is done.
|
||||||
|
self.assertTrue(t.is_alive())
|
||||||
|
# Let is_alive() find out the C code is done.
|
||||||
|
tstate_lock.release()
|
||||||
|
self.assertFalse(t.is_alive())
|
||||||
|
# And verify the thread disposed of _tstate_lock.
|
||||||
|
self.assertTrue(t._tstate_lock is None)
|
||||||
|
|
||||||
|
|
||||||
class ThreadJoinOnShutdown(BaseTestCase):
|
class ThreadJoinOnShutdown(BaseTestCase):
|
||||||
|
|
||||||
|
@ -669,7 +703,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
||||||
# someone else tries to fix this test case by acquiring this lock
|
# someone else tries to fix this test case by acquiring this lock
|
||||||
# before forking instead of resetting it, the test case will
|
# before forking instead of resetting it, the test case will
|
||||||
# deadlock when it shouldn't.
|
# deadlock when it shouldn't.
|
||||||
condition = w._block
|
condition = w._stopped._cond
|
||||||
orig_acquire = condition.acquire
|
orig_acquire = condition.acquire
|
||||||
call_count_lock = threading.Lock()
|
call_count_lock = threading.Lock()
|
||||||
call_count = 0
|
call_count = 0
|
||||||
|
@ -733,7 +767,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
||||||
# causes the worker to fork. At this point, the problematic waiter
|
# causes the worker to fork. At this point, the problematic waiter
|
||||||
# lock has been acquired once by the waiter and has been put onto
|
# lock has been acquired once by the waiter and has been put onto
|
||||||
# the waiters list.
|
# the waiters list.
|
||||||
condition = w._block
|
condition = w._stopped._cond
|
||||||
orig_release_save = condition._release_save
|
orig_release_save = condition._release_save
|
||||||
def my_release_save():
|
def my_release_save():
|
||||||
global start_fork
|
global start_fork
|
||||||
|
@ -867,6 +901,38 @@ class SubinterpThreadingTests(BaseTestCase):
|
||||||
# The thread was joined properly.
|
# The thread was joined properly.
|
||||||
self.assertEqual(os.read(r, 1), b"x")
|
self.assertEqual(os.read(r, 1), b"x")
|
||||||
|
|
||||||
|
def test_threads_join_2(self):
|
||||||
|
# Same as above, but a delay gets introduced after the thread's
|
||||||
|
# Python code returned but before the thread state is deleted.
|
||||||
|
# To achieve this, we register a thread-local object which sleeps
|
||||||
|
# a bit when deallocated.
|
||||||
|
r, w = os.pipe()
|
||||||
|
self.addCleanup(os.close, r)
|
||||||
|
self.addCleanup(os.close, w)
|
||||||
|
code = r"""if 1:
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
class Sleeper:
|
||||||
|
def __del__(self):
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
tls = threading.local()
|
||||||
|
|
||||||
|
def f():
|
||||||
|
# Sleep a bit so that the thread is still running when
|
||||||
|
# Py_EndInterpreter is called.
|
||||||
|
time.sleep(0.05)
|
||||||
|
tls.x = Sleeper()
|
||||||
|
os.write(%d, b"x")
|
||||||
|
threading.Thread(target=f).start()
|
||||||
|
""" % (w,)
|
||||||
|
ret = _testcapi.run_in_subinterp(code)
|
||||||
|
self.assertEqual(ret, 0)
|
||||||
|
# The thread was joined properly.
|
||||||
|
self.assertEqual(os.read(r, 1), b"x")
|
||||||
|
|
||||||
def test_daemon_threads_fatal_error(self):
|
def test_daemon_threads_fatal_error(self):
|
||||||
subinterp_code = r"""if 1:
|
subinterp_code = r"""if 1:
|
||||||
import os
|
import os
|
||||||
|
|
|
@ -33,6 +33,7 @@ __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event',
|
||||||
# Rename some stuff so "from threading import *" is safe
|
# Rename some stuff so "from threading import *" is safe
|
||||||
_start_new_thread = _thread.start_new_thread
|
_start_new_thread = _thread.start_new_thread
|
||||||
_allocate_lock = _thread.allocate_lock
|
_allocate_lock = _thread.allocate_lock
|
||||||
|
_set_sentinel = _thread._set_sentinel
|
||||||
get_ident = _thread.get_ident
|
get_ident = _thread.get_ident
|
||||||
ThreadError = _thread.error
|
ThreadError = _thread.error
|
||||||
try:
|
try:
|
||||||
|
@ -548,28 +549,33 @@ class Thread:
|
||||||
else:
|
else:
|
||||||
self._daemonic = current_thread().daemon
|
self._daemonic = current_thread().daemon
|
||||||
self._ident = None
|
self._ident = None
|
||||||
|
self._tstate_lock = None
|
||||||
self._started = Event()
|
self._started = Event()
|
||||||
self._stopped = False
|
self._stopped = Event()
|
||||||
self._block = Condition(Lock())
|
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
# sys.stderr is not stored in the class like
|
# sys.stderr is not stored in the class like
|
||||||
# sys.exc_info since it can be changed between instances
|
# sys.exc_info since it can be changed between instances
|
||||||
self._stderr = _sys.stderr
|
self._stderr = _sys.stderr
|
||||||
_dangling.add(self)
|
_dangling.add(self)
|
||||||
|
|
||||||
def _reset_internal_locks(self):
|
def _reset_internal_locks(self, is_alive):
|
||||||
# private! Called by _after_fork() to reset our internal locks as
|
# private! Called by _after_fork() to reset our internal locks as
|
||||||
# they may be in an invalid state leading to a deadlock or crash.
|
# they may be in an invalid state leading to a deadlock or crash.
|
||||||
if hasattr(self, '_block'): # DummyThread deletes _block
|
|
||||||
self._block.__init__()
|
|
||||||
self._started._reset_internal_locks()
|
self._started._reset_internal_locks()
|
||||||
|
self._stopped._reset_internal_locks()
|
||||||
|
if is_alive:
|
||||||
|
self._set_tstate_lock()
|
||||||
|
else:
|
||||||
|
# The thread isn't alive after fork: it doesn't have a tstate
|
||||||
|
# anymore.
|
||||||
|
self._tstate_lock = None
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
assert self._initialized, "Thread.__init__() was not called"
|
assert self._initialized, "Thread.__init__() was not called"
|
||||||
status = "initial"
|
status = "initial"
|
||||||
if self._started.is_set():
|
if self._started.is_set():
|
||||||
status = "started"
|
status = "started"
|
||||||
if self._stopped:
|
if self._stopped.is_set():
|
||||||
status = "stopped"
|
status = "stopped"
|
||||||
if self._daemonic:
|
if self._daemonic:
|
||||||
status += " daemon"
|
status += " daemon"
|
||||||
|
@ -625,9 +631,18 @@ class Thread:
|
||||||
def _set_ident(self):
|
def _set_ident(self):
|
||||||
self._ident = get_ident()
|
self._ident = get_ident()
|
||||||
|
|
||||||
|
def _set_tstate_lock(self):
|
||||||
|
"""
|
||||||
|
Set a lock object which will be released by the interpreter when
|
||||||
|
the underlying thread state (see pystate.h) gets deleted.
|
||||||
|
"""
|
||||||
|
self._tstate_lock = _set_sentinel()
|
||||||
|
self._tstate_lock.acquire()
|
||||||
|
|
||||||
def _bootstrap_inner(self):
|
def _bootstrap_inner(self):
|
||||||
try:
|
try:
|
||||||
self._set_ident()
|
self._set_ident()
|
||||||
|
self._set_tstate_lock()
|
||||||
self._started.set()
|
self._started.set()
|
||||||
with _active_limbo_lock:
|
with _active_limbo_lock:
|
||||||
_active[self._ident] = self
|
_active[self._ident] = self
|
||||||
|
@ -691,10 +706,7 @@ class Thread:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _stop(self):
|
def _stop(self):
|
||||||
self._block.acquire()
|
self._stopped.set()
|
||||||
self._stopped = True
|
|
||||||
self._block.notify_all()
|
|
||||||
self._block.release()
|
|
||||||
|
|
||||||
def _delete(self):
|
def _delete(self):
|
||||||
"Remove current thread from the dict of currently running threads."
|
"Remove current thread from the dict of currently running threads."
|
||||||
|
@ -738,21 +750,29 @@ class Thread:
|
||||||
raise RuntimeError("cannot join thread before it is started")
|
raise RuntimeError("cannot join thread before it is started")
|
||||||
if self is current_thread():
|
if self is current_thread():
|
||||||
raise RuntimeError("cannot join current thread")
|
raise RuntimeError("cannot join current thread")
|
||||||
|
if not self.is_alive():
|
||||||
|
return
|
||||||
|
self._stopped.wait(timeout)
|
||||||
|
if self._stopped.is_set():
|
||||||
|
self._wait_for_tstate_lock(timeout is None)
|
||||||
|
|
||||||
self._block.acquire()
|
def _wait_for_tstate_lock(self, block):
|
||||||
try:
|
# Issue #18808: wait for the thread state to be gone.
|
||||||
if timeout is None:
|
# When self._stopped is set, the Python part of the thread is done,
|
||||||
while not self._stopped:
|
# but the thread's tstate has not yet been destroyed. The C code
|
||||||
self._block.wait()
|
# releases self._tstate_lock when the C part of the thread is done
|
||||||
else:
|
# (the code at the end of the thread's life to remove all knowledge
|
||||||
deadline = _time() + timeout
|
# of the thread from the C data structures).
|
||||||
while not self._stopped:
|
# This method waits to acquire _tstate_lock if `block` is True, or
|
||||||
delay = deadline - _time()
|
# sees whether it can be acquired immediately if `block` is False.
|
||||||
if delay <= 0:
|
# If it does acquire the lock, the C code is done, and _tstate_lock
|
||||||
break
|
# is set to None.
|
||||||
self._block.wait(delay)
|
lock = self._tstate_lock
|
||||||
finally:
|
if lock is None:
|
||||||
self._block.release()
|
return # already determined that the C code is done
|
||||||
|
if lock.acquire(block):
|
||||||
|
lock.release()
|
||||||
|
self._tstate_lock = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def name(self):
|
||||||
|
@ -771,7 +791,14 @@ class Thread:
|
||||||
|
|
||||||
def is_alive(self):
|
def is_alive(self):
|
||||||
assert self._initialized, "Thread.__init__() not called"
|
assert self._initialized, "Thread.__init__() not called"
|
||||||
return self._started.is_set() and not self._stopped
|
if not self._started.is_set():
|
||||||
|
return False
|
||||||
|
if not self._stopped.is_set():
|
||||||
|
return True
|
||||||
|
# The Python part of the thread is done, but the C part may still be
|
||||||
|
# waiting to run.
|
||||||
|
self._wait_for_tstate_lock(False)
|
||||||
|
return self._tstate_lock is not None
|
||||||
|
|
||||||
isAlive = is_alive
|
isAlive = is_alive
|
||||||
|
|
||||||
|
@ -854,11 +881,6 @@ class _DummyThread(Thread):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
|
Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
|
||||||
|
|
||||||
# Thread._block consumes an OS-level locking primitive, which
|
|
||||||
# can never be used by a _DummyThread. Since a _DummyThread
|
|
||||||
# instance is immortal, that's bad, so release this resource.
|
|
||||||
del self._block
|
|
||||||
|
|
||||||
self._started.set()
|
self._started.set()
|
||||||
self._set_ident()
|
self._set_ident()
|
||||||
with _active_limbo_lock:
|
with _active_limbo_lock:
|
||||||
|
@ -952,15 +974,16 @@ def _after_fork():
|
||||||
for thread in _enumerate():
|
for thread in _enumerate():
|
||||||
# Any lock/condition variable may be currently locked or in an
|
# Any lock/condition variable may be currently locked or in an
|
||||||
# invalid state, so we reinitialize them.
|
# invalid state, so we reinitialize them.
|
||||||
thread._reset_internal_locks()
|
|
||||||
if thread is current:
|
if thread is current:
|
||||||
# There is only one active thread. We reset the ident to
|
# There is only one active thread. We reset the ident to
|
||||||
# its new value since it can have changed.
|
# its new value since it can have changed.
|
||||||
|
thread._reset_internal_locks(True)
|
||||||
ident = get_ident()
|
ident = get_ident()
|
||||||
thread._ident = ident
|
thread._ident = ident
|
||||||
new_active[ident] = thread
|
new_active[ident] = thread
|
||||||
else:
|
else:
|
||||||
# All the others are already stopped.
|
# All the others are already stopped.
|
||||||
|
thread._reset_internal_locks(False)
|
||||||
thread._stop()
|
thread._stop()
|
||||||
|
|
||||||
_limbo.clear()
|
_limbo.clear()
|
||||||
|
|
|
@ -56,6 +56,10 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #18808: Thread.join() now waits for the underlying thread state to
|
||||||
|
be destroyed before returning. This prevents unpredictable aborts in
|
||||||
|
Py_EndInterpreter() when some non-daemon threads are still running.
|
||||||
|
|
||||||
- Issue #18458: Prevent crashes with newer versions of libedit. Its readline
|
- Issue #18458: Prevent crashes with newer versions of libedit. Its readline
|
||||||
emulation has changed from 0-based indexing to 1-based like gnu readline.
|
emulation has changed from 0-based indexing to 1-based like gnu readline.
|
||||||
|
|
||||||
|
|
|
@ -1172,6 +1172,66 @@ yet finished.\n\
|
||||||
This function is meant for internal and specialized purposes only.\n\
|
This function is meant for internal and specialized purposes only.\n\
|
||||||
In most applications `threading.enumerate()` should be used instead.");
|
In most applications `threading.enumerate()` should be used instead.");
|
||||||
|
|
||||||
|
static void
|
||||||
|
release_sentinel(void *wr)
|
||||||
|
{
|
||||||
|
/* Tricky: this function is called when the current thread state
|
||||||
|
is being deleted. Therefore, only simple C code can safely
|
||||||
|
execute here. */
|
||||||
|
PyObject *obj = PyWeakref_GET_OBJECT(wr);
|
||||||
|
lockobject *lock;
|
||||||
|
if (obj != Py_None) {
|
||||||
|
assert(Py_TYPE(obj) == &Locktype);
|
||||||
|
lock = (lockobject *) obj;
|
||||||
|
if (lock->locked) {
|
||||||
|
PyThread_release_lock(lock->lock_lock);
|
||||||
|
lock->locked = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* Deallocating a weakref with a NULL callback only calls
|
||||||
|
PyObject_GC_Del(), which can't call any Python code. */
|
||||||
|
Py_DECREF(wr);
|
||||||
|
}
|
||||||
|
|
||||||
|
static PyObject *
|
||||||
|
thread__set_sentinel(PyObject *self)
|
||||||
|
{
|
||||||
|
PyObject *wr;
|
||||||
|
PyThreadState *tstate = PyThreadState_Get();
|
||||||
|
lockobject *lock;
|
||||||
|
|
||||||
|
if (tstate->on_delete_data != NULL) {
|
||||||
|
/* We must support the re-creation of the lock from a
|
||||||
|
fork()ed child. */
|
||||||
|
assert(tstate->on_delete == &release_sentinel);
|
||||||
|
wr = (PyObject *) tstate->on_delete_data;
|
||||||
|
tstate->on_delete = NULL;
|
||||||
|
tstate->on_delete_data = NULL;
|
||||||
|
Py_DECREF(wr);
|
||||||
|
}
|
||||||
|
lock = newlockobject();
|
||||||
|
if (lock == NULL)
|
||||||
|
return NULL;
|
||||||
|
/* The lock is owned by whoever called _set_sentinel(), but the weakref
|
||||||
|
hangs to the thread state. */
|
||||||
|
wr = PyWeakref_NewRef((PyObject *) lock, NULL);
|
||||||
|
if (wr == NULL) {
|
||||||
|
Py_DECREF(lock);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
tstate->on_delete_data = (void *) wr;
|
||||||
|
tstate->on_delete = &release_sentinel;
|
||||||
|
return (PyObject *) lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
PyDoc_STRVAR(_set_sentinel_doc,
|
||||||
|
"_set_sentinel() -> lock\n\
|
||||||
|
\n\
|
||||||
|
Set a sentinel lock that will be released when the current thread\n\
|
||||||
|
state is finalized (after it is untied from the interpreter).\n\
|
||||||
|
\n\
|
||||||
|
This is a private API for the threading module.");
|
||||||
|
|
||||||
static PyObject *
|
static PyObject *
|
||||||
thread_stack_size(PyObject *self, PyObject *args)
|
thread_stack_size(PyObject *self, PyObject *args)
|
||||||
{
|
{
|
||||||
|
@ -1247,6 +1307,8 @@ static PyMethodDef thread_methods[] = {
|
||||||
METH_NOARGS, _count_doc},
|
METH_NOARGS, _count_doc},
|
||||||
{"stack_size", (PyCFunction)thread_stack_size,
|
{"stack_size", (PyCFunction)thread_stack_size,
|
||||||
METH_VARARGS, stack_size_doc},
|
METH_VARARGS, stack_size_doc},
|
||||||
|
{"_set_sentinel", (PyCFunction)thread__set_sentinel,
|
||||||
|
METH_NOARGS, _set_sentinel_doc},
|
||||||
{NULL, NULL} /* sentinel */
|
{NULL, NULL} /* sentinel */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -208,6 +208,8 @@ new_threadstate(PyInterpreterState *interp, int init)
|
||||||
|
|
||||||
tstate->trash_delete_nesting = 0;
|
tstate->trash_delete_nesting = 0;
|
||||||
tstate->trash_delete_later = NULL;
|
tstate->trash_delete_later = NULL;
|
||||||
|
tstate->on_delete = NULL;
|
||||||
|
tstate->on_delete_data = NULL;
|
||||||
|
|
||||||
if (init)
|
if (init)
|
||||||
_PyThreadState_Init(tstate);
|
_PyThreadState_Init(tstate);
|
||||||
|
@ -390,6 +392,9 @@ tstate_delete_common(PyThreadState *tstate)
|
||||||
if (tstate->next)
|
if (tstate->next)
|
||||||
tstate->next->prev = tstate->prev;
|
tstate->next->prev = tstate->prev;
|
||||||
HEAD_UNLOCK();
|
HEAD_UNLOCK();
|
||||||
|
if (tstate->on_delete != NULL) {
|
||||||
|
tstate->on_delete(tstate->on_delete_data);
|
||||||
|
}
|
||||||
PyMem_RawFree(tstate);
|
PyMem_RawFree(tstate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue