From 0e9c364f4ac18a2237bdbac702b96bcf8ef9cb09 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 4 Nov 2023 14:59:24 +0100 Subject: [PATCH] GH-110829: Ensure Thread.join() joins the OS thread (#110848) Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes. --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> --- Include/cpython/pthread_stubs.h | 1 + Include/internal/pycore_pythread.h | 42 +++ Lib/test/_test_multiprocessing.py | 3 + Lib/test/audit-tests.py | 3 + Lib/test/test_audit.py | 2 + .../test_process_pool.py | 6 +- Lib/test/test_thread.py | 126 +++++++ Lib/test/test_threading.py | 47 ++- Lib/threading.py | 63 +++- ...-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst | 1 + Modules/_threadmodule.c | 347 +++++++++++++++--- Python/thread_nt.h | 58 ++- Python/thread_pthread.h | 71 +++- Python/thread_pthread_stubs.h | 9 + 14 files changed, 676 insertions(+), 103 deletions(-) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst diff --git a/Include/cpython/pthread_stubs.h b/Include/cpython/pthread_stubs.h index 5246968ea05..e542eaa5bff 100644 --- a/Include/cpython/pthread_stubs.h +++ b/Include/cpython/pthread_stubs.h @@ -83,6 +83,7 @@ PyAPI_FUNC(int) pthread_create(pthread_t *restrict thread, void *(*start_routine)(void *), void *restrict arg); PyAPI_FUNC(int) pthread_detach(pthread_t thread); +PyAPI_FUNC(int) pthread_join(pthread_t thread, void** value_ptr); PyAPI_FUNC(pthread_t) pthread_self(void); PyAPI_FUNC(int) pthread_exit(void *retval) __attribute__ ((__noreturn__)); PyAPI_FUNC(int) pthread_attr_init(pthread_attr_t *attr); diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index d31ffc78130..9c9a09f60f3 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -106,6 +106,48 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries( PyThread_type_lock, PY_TIMEOUT_T microseconds); +typedef unsigned long long PyThread_ident_t; +typedef Py_uintptr_t PyThread_handle_t; + +#define PY_FORMAT_THREAD_IDENT_T "llu" +#define Py_PARSE_THREAD_IDENT_T "K" + +PyAPI_FUNC(PyThread_ident_t) PyThread_get_thread_ident_ex(void); + +/* Thread joining APIs. + * + * These APIs have a strict contract: + * - Either PyThread_join_thread or PyThread_detach_thread must be called + * exactly once with the given handle. + * - Calling neither PyThread_join_thread nor PyThread_detach_thread results + * in a resource leak until the end of the process. + * - Any other usage, such as calling both PyThread_join_thread and + * PyThread_detach_thread, or calling them more than once (including + * simultaneously), results in undefined behavior. + */ +PyAPI_FUNC(int) PyThread_start_joinable_thread(void (*func)(void *), + void *arg, + PyThread_ident_t* ident, + PyThread_handle_t* handle); +/* + * Join a thread started with `PyThread_start_joinable_thread`. + * This function cannot be interrupted. It returns 0 on success, + * a non-zero value on failure. + */ +PyAPI_FUNC(int) PyThread_join_thread(PyThread_handle_t); +/* + * Detach a thread started with `PyThread_start_joinable_thread`, such + * that its resources are relased as soon as it exits. + * This function cannot be interrupted. It returns 0 on success, + * a non-zero value on failure. + */ +PyAPI_FUNC(int) PyThread_detach_thread(PyThread_handle_t); + +/* + * Obtain the new thread ident and handle in a forked child process. + */ +PyAPI_FUNC(void) PyThread_update_thread_after_fork(PyThread_ident_t* ident, + PyThread_handle_t* handle); #ifdef __cplusplus } diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index bf87a3e8d6f..ec003d8dc43 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2693,6 +2693,9 @@ class _TestPool(BaseTestCase): p.join() def test_terminate(self): + if self.TYPE == 'threads': + self.skipTest("Threads cannot be terminated") + # Simulate slow tasks which take "forever" to complete p = self.Pool(3) args = [support.LONG_TIMEOUT for i in range(10_000)] diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py index 89f407de4b0..ce4a11b119c 100644 --- a/Lib/test/audit-tests.py +++ b/Lib/test/audit-tests.py @@ -455,6 +455,9 @@ def test_threading(): i = _thread.start_new_thread(test_func(), ()) lock.acquire() + handle = _thread.start_joinable_thread(test_func()) + handle.join() + def test_threading_abort(): # Ensures that aborting PyThreadState_New raises the correct exception diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index 47e5832d311..cd0a4e22648 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -209,6 +209,8 @@ class AuditTest(unittest.TestCase): expected = [ ("_thread.start_new_thread", "(, (), None)"), ("test.test_func", "()"), + ("_thread.start_joinable_thread", "(,)"), + ("test.test_func", "()"), ] self.assertEqual(actual, expected) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c73c2da1a01..3e61b0c9387 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -194,11 +194,11 @@ class ProcessPoolExecutorTest(ExecutorTest): context = self.get_context() - # gh-109047: Mock the threading.start_new_thread() function to inject + # gh-109047: Mock the threading.start_joinable_thread() function to inject # RuntimeError: simulate the error raised during Python finalization. # Block the second creation: create _ExecutorManagerThread, but block # QueueFeederThread. - orig_start_new_thread = threading._start_new_thread + orig_start_new_thread = threading._start_joinable_thread nthread = 0 def mock_start_new_thread(func, *args): nonlocal nthread @@ -208,7 +208,7 @@ class ProcessPoolExecutorTest(ExecutorTest): nthread += 1 return orig_start_new_thread(func, *args) - with support.swap_attr(threading, '_start_new_thread', + with support.swap_attr(threading, '_start_joinable_thread', mock_start_new_thread): executor = self.executor_type(max_workers=2, mp_context=context) with executor: diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 831aaf5b6a7..931cb4b797e 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -160,6 +160,132 @@ class ThreadRunningTests(BasicThreadTest): f"Exception ignored in thread started by {task!r}") self.assertIsNotNone(cm.unraisable.exc_traceback) + def test_join_thread(self): + finished = [] + + def task(): + time.sleep(0.05) + finished.append(thread.get_ident()) + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + self.assertEqual(len(finished), 1) + self.assertEqual(handle.ident, finished[0]) + + def test_join_thread_already_exited(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + time.sleep(0.05) + handle.join() + + def test_join_several_times(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.join() + + def test_joinable_not_joined(self): + handle_destroyed = thread.allocate_lock() + handle_destroyed.acquire() + + def task(): + handle_destroyed.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + del handle + handle_destroyed.release() + + def test_join_from_self(self): + errors = [] + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() + task_tried_to_join = thread.allocate_lock() + task_tried_to_join.acquire() + + def task(): + start_joinable_thread_returned.acquire() + try: + handles[0].join() + except Exception as e: + errors.append(e) + finally: + task_tried_to_join.release() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() + # Can still join after joining failed in other thread + task_tried_to_join.acquire() + handle.join() + + assert len(errors) == 1 + with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): + raise errors[0] + + def test_detach_from_self(self): + errors = [] + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() + thread_detached = thread.allocate_lock() + thread_detached.acquire() + + def task(): + start_joinable_thread_returned.acquire() + try: + handles[0].detach() + except Exception as e: + errors.append(e) + finally: + thread_detached.release() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() + thread_detached.acquire() + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.join() + + assert len(errors) == 0 + + def test_detach_then_join(self): + lock = thread.allocate_lock() + lock.acquire() + + def task(): + lock.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + # detach() returns even though the thread is blocked on lock + handle.detach() + # join() then cannot be called anymore + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.join() + lock.release() + + def test_join_then_detach(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.detach() + class Barrier: def __init__(self, num_threads): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 00a64372b39..146e2dbc0fc 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -376,8 +376,8 @@ class ThreadTests(BaseTestCase): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): raise threading.ThreadError() - _start_new_thread = threading._start_new_thread - threading._start_new_thread = fail_new_thread + _start_joinable_thread = threading._start_joinable_thread + threading._start_joinable_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(threading.ThreadError, t.start) @@ -385,7 +385,7 @@ class ThreadTests(BaseTestCase): t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: - threading._start_new_thread = _start_new_thread + threading._start_joinable_thread = _start_joinable_thread def test_finalize_running_thread(self): # Issue 1402: the PyGILState_Ensure / _Release functions may be called @@ -482,6 +482,47 @@ class ThreadTests(BaseTestCase): finally: sys.setswitchinterval(old_interval) + def test_join_from_multiple_threads(self): + # Thread.join() should be thread-safe + errors = [] + + def worker(): + time.sleep(0.005) + + def joiner(thread): + try: + thread.join() + except Exception as e: + errors.append(e) + + for N in range(2, 20): + threads = [threading.Thread(target=worker)] + for i in range(N): + threads.append(threading.Thread(target=joiner, + args=(threads[0],))) + for t in threads: + t.start() + time.sleep(0.01) + for t in threads: + t.join() + if errors: + raise errors[0] + + def test_join_with_timeout(self): + lock = _thread.allocate_lock() + lock.acquire() + + def worker(): + lock.acquire() + + thread = threading.Thread(target=worker) + thread.start() + thread.join(timeout=0.01) + assert thread.is_alive() + lock.release() + thread.join() + assert not thread.is_alive() + def test_no_refcycle_through_target(self): class RunSelfFunction(object): def __init__(self, should_raise): diff --git a/Lib/threading.py b/Lib/threading.py index 41c3a9ff938..85aff589680 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -5,6 +5,7 @@ import sys as _sys import _thread import functools import warnings +import _weakref from time import monotonic as _time from _weakrefset import WeakSet @@ -33,7 +34,7 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 'setprofile_all_threads','settrace_all_threads'] # Rename some stuff so "from threading import *" is safe -_start_new_thread = _thread.start_new_thread +_start_joinable_thread = _thread.start_joinable_thread _daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock _set_sentinel = _thread._set_sentinel @@ -589,7 +590,7 @@ class Event: return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" def _at_fork_reinit(self): - # Private method called by Thread._reset_internal_locks() + # Private method called by Thread._after_fork() self._cond._at_fork_reinit() def is_set(self): @@ -924,6 +925,8 @@ class Thread: if _HAVE_THREAD_NATIVE_ID: self._native_id = None self._tstate_lock = None + self._join_lock = None + self._handle = None self._started = Event() self._is_stopped = False self._initialized = True @@ -933,22 +936,32 @@ class Thread: # For debugging and _after_fork() _dangling.add(self) - def _reset_internal_locks(self, is_alive): - # private! Called by _after_fork() to reset our internal locks as - # they may be in an invalid state leading to a deadlock or crash. + def _after_fork(self, new_ident=None): + # Private! Called by threading._after_fork(). self._started._at_fork_reinit() - if is_alive: + if new_ident is not None: + # This thread is alive. + self._ident = new_ident + if self._handle is not None: + self._handle.after_fork_alive() + assert self._handle.ident == new_ident # bpo-42350: If the fork happens when the thread is already stopped # (ex: after threading._shutdown() has been called), _tstate_lock # is None. Do nothing in this case. if self._tstate_lock is not None: self._tstate_lock._at_fork_reinit() self._tstate_lock.acquire() + if self._join_lock is not None: + self._join_lock._at_fork_reinit() else: - # The thread isn't alive after fork: it doesn't have a tstate + # This thread isn't alive after fork: it doesn't have a tstate # anymore. self._is_stopped = True self._tstate_lock = None + self._join_lock = None + if self._handle is not None: + self._handle.after_fork_dead() + self._handle = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" @@ -980,15 +993,18 @@ class Thread: if self._started.is_set(): raise RuntimeError("threads can only be started once") + self._join_lock = _allocate_lock() + with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + # Start joinable thread + self._handle = _start_joinable_thread(self._bootstrap) except Exception: with _active_limbo_lock: del _limbo[self] raise - self._started.wait() + self._started.wait() # Will set ident and native_id def run(self): """Method representing the thread's activity. @@ -1144,6 +1160,22 @@ class Thread: # historically .join(timeout=x) for x<0 has acted as if timeout=0 self._wait_for_tstate_lock(timeout=max(timeout, 0)) + if self._is_stopped: + self._join_os_thread() + + def _join_os_thread(self): + join_lock = self._join_lock + if join_lock is None: + return + with join_lock: + # Calling join() multiple times would raise an exception + # in one of the callers. + if self._handle is not None: + self._handle.join() + self._handle = None + # No need to keep this around + self._join_lock = None + def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. # At the end of the thread's life, after all knowledge of the thread @@ -1223,7 +1255,10 @@ class Thread: if self._is_stopped or not self._started.is_set(): return False self._wait_for_tstate_lock(False) - return not self._is_stopped + if not self._is_stopped: + return True + self._join_os_thread() + return False @property def daemon(self): @@ -1679,15 +1714,13 @@ def _after_fork(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. if thread is current: - # There is only one active thread. We reset the ident to - # its new value since it can have changed. - thread._reset_internal_locks(True) + # This is the one and only active thread. ident = get_ident() - thread._ident = ident + thread._after_fork(new_ident=ident) new_active[ident] = thread else: # All the others are already stopped. - thread._reset_internal_locks(False) + thread._after_fork() thread._stop() _limbo.clear() diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst b/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst new file mode 100644 index 00000000000..f4fa61db369 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst @@ -0,0 +1 @@ +Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 9eecebddb72..88ca9032b5e 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -22,12 +22,13 @@ // Forward declarations static struct PyModuleDef thread_module; - +// Module state typedef struct { PyTypeObject *excepthook_type; PyTypeObject *lock_type; PyTypeObject *local_type; PyTypeObject *local_dummy_type; + PyTypeObject *thread_handle_type; } thread_module_state; static inline thread_module_state* @@ -38,6 +39,145 @@ get_thread_state(PyObject *module) return (thread_module_state *)state; } +// _ThreadHandle type + +typedef struct { + PyObject_HEAD + PyThread_ident_t ident; + PyThread_handle_t handle; + char joinable; +} ThreadHandleObject; + +static ThreadHandleObject* +new_thread_handle(thread_module_state* state) +{ + ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type); + if (self == NULL) { + return NULL; + } + self->ident = 0; + self->handle = 0; + self->joinable = 0; + return self; +} + +static void +ThreadHandle_dealloc(ThreadHandleObject *self) +{ + PyObject *tp = (PyObject *) Py_TYPE(self); + if (self->joinable) { + int ret = PyThread_detach_thread(self->handle); + if (ret) { + PyErr_SetString(ThreadError, "Failed detaching thread"); + PyErr_WriteUnraisable(tp); + } + } + PyObject_Free(self); + Py_DECREF(tp); +} + +static PyObject * +ThreadHandle_repr(ThreadHandleObject *self) +{ + return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">", + Py_TYPE(self)->tp_name, self->ident); +} + +static PyObject * +ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored) +{ + return PyLong_FromUnsignedLongLong(self->ident); +} + + +static PyObject * +ThreadHandle_after_fork_alive(ThreadHandleObject *self, void* ignored) +{ + PyThread_update_thread_after_fork(&self->ident, &self->handle); + Py_RETURN_NONE; +} + +static PyObject * +ThreadHandle_after_fork_dead(ThreadHandleObject *self, void* ignored) +{ + // Disallow calls to detach() and join() as they could crash. + self->joinable = 0; + Py_RETURN_NONE; +} + +static PyObject * +ThreadHandle_detach(ThreadHandleObject *self, void* ignored) +{ + if (!self->joinable) { + PyErr_SetString(PyExc_ValueError, + "the thread is not joinable and thus cannot be detached"); + return NULL; + } + self->joinable = 0; + // This is typically short so no need to release the GIL + int ret = PyThread_detach_thread(self->handle); + if (ret) { + PyErr_SetString(ThreadError, "Failed detaching thread"); + return NULL; + } + Py_RETURN_NONE; +} + +static PyObject * +ThreadHandle_join(ThreadHandleObject *self, void* ignored) +{ + if (!self->joinable) { + PyErr_SetString(PyExc_ValueError, "the thread is not joinable"); + return NULL; + } + if (self->ident == PyThread_get_thread_ident_ex()) { + // PyThread_join_thread() would deadlock or error out. + PyErr_SetString(ThreadError, "Cannot join current thread"); + return NULL; + } + // Before actually joining, we must first mark the thread as non-joinable, + // as joining several times simultaneously or sequentially is undefined behavior. + self->joinable = 0; + int ret; + Py_BEGIN_ALLOW_THREADS + ret = PyThread_join_thread(self->handle); + Py_END_ALLOW_THREADS + if (ret) { + PyErr_SetString(ThreadError, "Failed joining thread"); + return NULL; + } + Py_RETURN_NONE; +} + +static PyGetSetDef ThreadHandle_getsetlist[] = { + {"ident", (getter)ThreadHandle_get_ident, NULL, NULL}, + {0}, +}; + +static PyMethodDef ThreadHandle_methods[] = +{ + {"after_fork_alive", (PyCFunction)ThreadHandle_after_fork_alive, METH_NOARGS}, + {"after_fork_dead", (PyCFunction)ThreadHandle_after_fork_dead, METH_NOARGS}, + {"detach", (PyCFunction)ThreadHandle_detach, METH_NOARGS}, + {"join", (PyCFunction)ThreadHandle_join, METH_NOARGS}, + {0, 0} +}; + +static PyType_Slot ThreadHandle_Type_slots[] = { + {Py_tp_dealloc, (destructor)ThreadHandle_dealloc}, + {Py_tp_repr, (reprfunc)ThreadHandle_repr}, + {Py_tp_getset, ThreadHandle_getsetlist}, + {Py_tp_methods, ThreadHandle_methods}, + {0, 0} +}; + +static PyType_Spec ThreadHandle_Type_spec = { + "_thread._ThreadHandle", + sizeof(ThreadHandleObject), + 0, + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION, + ThreadHandle_Type_slots, +}; /* Lock objects */ @@ -274,7 +414,7 @@ static PyType_Spec lock_type_spec = { typedef struct { PyObject_HEAD PyThread_type_lock rlock_lock; - unsigned long rlock_owner; + PyThread_ident_t rlock_owner; unsigned long rlock_count; PyObject *in_weakreflist; } rlockobject; @@ -311,13 +451,13 @@ static PyObject * rlock_acquire(rlockobject *self, PyObject *args, PyObject *kwds) { _PyTime_t timeout; - unsigned long tid; + PyThread_ident_t tid; PyLockStatus r = PY_LOCK_ACQUIRED; if (lock_acquire_parse_args(args, kwds, &timeout) < 0) return NULL; - tid = PyThread_get_thread_ident(); + tid = PyThread_get_thread_ident_ex(); if (self->rlock_count > 0 && tid == self->rlock_owner) { unsigned long count = self->rlock_count + 1; if (count <= self->rlock_count) { @@ -360,7 +500,7 @@ the lock is taken and its internal counter initialized to 1."); static PyObject * rlock_release(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long tid = PyThread_get_thread_ident(); + PyThread_ident_t tid = PyThread_get_thread_ident_ex(); if (self->rlock_count == 0 || self->rlock_owner != tid) { PyErr_SetString(PyExc_RuntimeError, @@ -389,11 +529,12 @@ to be available for other threads."); static PyObject * rlock_acquire_restore(rlockobject *self, PyObject *args) { - unsigned long owner; + PyThread_ident_t owner; unsigned long count; int r = 1; - if (!PyArg_ParseTuple(args, "(kk):_acquire_restore", &count, &owner)) + if (!PyArg_ParseTuple(args, "(k" Py_PARSE_THREAD_IDENT_T "):_acquire_restore", + &count, &owner)) return NULL; if (!PyThread_acquire_lock(self->rlock_lock, 0)) { @@ -419,7 +560,7 @@ For internal use by `threading.Condition`."); static PyObject * rlock_release_save(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long owner; + PyThread_ident_t owner; unsigned long count; if (self->rlock_count == 0) { @@ -433,7 +574,7 @@ rlock_release_save(rlockobject *self, PyObject *Py_UNUSED(ignored)) self->rlock_count = 0; self->rlock_owner = 0; PyThread_release_lock(self->rlock_lock); - return Py_BuildValue("kk", count, owner); + return Py_BuildValue("k" Py_PARSE_THREAD_IDENT_T, count, owner); } PyDoc_STRVAR(rlock_release_save_doc, @@ -444,7 +585,7 @@ For internal use by `threading.Condition`."); static PyObject * rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long tid = PyThread_get_thread_ident(); + PyThread_ident_t tid = PyThread_get_thread_ident_ex(); return PyLong_FromUnsignedLong( self->rlock_owner == tid ? self->rlock_count : 0UL); } @@ -457,7 +598,7 @@ For internal use by reentrancy checks."); static PyObject * rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long tid = PyThread_get_thread_ident(); + PyThread_ident_t tid = PyThread_get_thread_ident_ex(); if (self->rlock_count > 0 && self->rlock_owner == tid) { Py_RETURN_TRUE; @@ -493,7 +634,8 @@ rlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) static PyObject * rlock_repr(rlockobject *self) { - return PyUnicode_FromFormat("<%s %s object owner=%ld count=%lu at %p>", + return PyUnicode_FromFormat( + "<%s %s object owner=%" PY_FORMAT_THREAD_IDENT_T " count=%lu at %p>", self->rlock_count ? "locked" : "unlocked", Py_TYPE(self)->tp_name, self->rlock_owner, self->rlock_count, self); @@ -1109,10 +1251,66 @@ PyDoc_STRVAR(daemon_threads_allowed_doc, Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); +static int +do_start_new_thread(thread_module_state* state, + PyObject *func, PyObject* args, PyObject* kwargs, + int joinable, + PyThread_ident_t* ident, PyThread_handle_t* handle) +{ + PyInterpreterState *interp = _PyInterpreterState_GET(); + if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) { + PyErr_SetString(PyExc_RuntimeError, + "thread is not supported for isolated subinterpreters"); + return -1; + } + if (interp->finalizing) { + PyErr_SetString(PyExc_RuntimeError, + "can't create new thread at interpreter shutdown"); + return -1; + } + + // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(), + // because it should be possible to call thread_bootstate_free() + // without holding the GIL. + struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate)); + if (boot == NULL) { + PyErr_NoMemory(); + return -1; + } + boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); + if (boot->tstate == NULL) { + PyMem_RawFree(boot); + if (!PyErr_Occurred()) { + PyErr_NoMemory(); + } + return -1; + } + boot->func = Py_NewRef(func); + boot->args = Py_NewRef(args); + boot->kwargs = Py_XNewRef(kwargs); + + int err; + if (joinable) { + err = PyThread_start_joinable_thread(thread_run, (void*) boot, ident, handle); + } else { + *handle = 0; + *ident = PyThread_start_new_thread(thread_run, (void*) boot); + err = (*ident == PYTHREAD_INVALID_THREAD_ID); + } + if (err) { + PyErr_SetString(ThreadError, "can't start new thread"); + PyThreadState_Clear(boot->tstate); + thread_bootstate_free(boot, 1); + return -1; + } + return 0; +} + static PyObject * -thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) +thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) { PyObject *func, *args, *kwargs = NULL; + thread_module_state *state = get_thread_state(module); if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, &func, &args, &kwargs)) @@ -1138,57 +1336,73 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) return NULL; } - PyInterpreterState *interp = _PyInterpreterState_GET(); - if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) { - PyErr_SetString(PyExc_RuntimeError, - "thread is not supported for isolated subinterpreters"); + PyThread_ident_t ident = 0; + PyThread_handle_t handle; + if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0, + &ident, &handle)) { return NULL; } - if (interp->finalizing) { - PyErr_SetString(PyExc_RuntimeError, - "can't create new thread at interpreter shutdown"); - return NULL; - } - - // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(), - // because it should be possible to call thread_bootstate_free() - // without holding the GIL. - struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate)); - if (boot == NULL) { - return PyErr_NoMemory(); - } - boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); - if (boot->tstate == NULL) { - PyMem_RawFree(boot); - if (!PyErr_Occurred()) { - return PyErr_NoMemory(); - } - return NULL; - } - boot->func = Py_NewRef(func); - boot->args = Py_NewRef(args); - boot->kwargs = Py_XNewRef(kwargs); - - unsigned long ident = PyThread_start_new_thread(thread_run, (void*) boot); - if (ident == PYTHREAD_INVALID_THREAD_ID) { - PyErr_SetString(ThreadError, "can't start new thread"); - PyThreadState_Clear(boot->tstate); - thread_bootstate_free(boot, 1); - return NULL; - } - return PyLong_FromUnsignedLong(ident); + return PyLong_FromUnsignedLongLong(ident); } PyDoc_STRVAR(start_new_doc, "start_new_thread(function, args[, kwargs])\n\ (start_new() is an obsolete synonym)\n\ \n\ -Start a new thread and return its identifier. The thread will call the\n\ -function with positional arguments from the tuple args and keyword arguments\n\ -taken from the optional dictionary kwargs. The thread exits when the\n\ -function returns; the return value is ignored. The thread will also exit\n\ -when the function raises an unhandled exception; a stack trace will be\n\ -printed unless the exception is SystemExit.\n"); +Start a new thread and return its identifier.\n\ +\n\ +The thread will call the function with positional arguments from the\n\ +tuple args and keyword arguments taken from the optional dictionary\n\ +kwargs. The thread exits when the function returns; the return value\n\ +is ignored. The thread will also exit when the function raises an\n\ +unhandled exception; a stack trace will be printed unless the exception\n\ +is SystemExit.\n"); + +static PyObject * +thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func) +{ + thread_module_state *state = get_thread_state(module); + + if (!PyCallable_Check(func)) { + PyErr_SetString(PyExc_TypeError, + "thread function must be callable"); + return NULL; + } + + if (PySys_Audit("_thread.start_joinable_thread", "O", func) < 0) { + return NULL; + } + + PyObject* args = PyTuple_New(0); + if (args == NULL) { + return NULL; + } + ThreadHandleObject* hobj = new_thread_handle(state); + if (hobj == NULL) { + Py_DECREF(args); + return NULL; + } + if (do_start_new_thread(state, func, args, /*kwargs=*/ NULL, /*joinable=*/ 1, + &hobj->ident, &hobj->handle)) { + Py_DECREF(args); + Py_DECREF(hobj); + return NULL; + } + Py_DECREF(args); + hobj->joinable = 1; + return (PyObject*) hobj; +} + +PyDoc_STRVAR(start_joinable_doc, +"start_joinable_thread(function)\n\ +\n\ +*For internal use only*: start a new thread.\n\ +\n\ +Like start_new_thread(), this starts a new thread calling the given function.\n\ +Unlike start_new_thread(), this returns a handle object with methods to join\n\ +or detach the given thread.\n\ +This function is not for third-party code, please use the\n\ +`threading` module instead.\n"); static PyObject * thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored)) @@ -1248,12 +1462,12 @@ information about locks."); static PyObject * thread_get_ident(PyObject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long ident = PyThread_get_thread_ident(); + PyThread_ident_t ident = PyThread_get_thread_ident_ex(); if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "no current thread ident"); return NULL; } - return PyLong_FromUnsignedLong(ident); + return PyLong_FromUnsignedLongLong(ident); } PyDoc_STRVAR(get_ident_doc, @@ -1440,8 +1654,8 @@ thread_excepthook_file(PyObject *file, PyObject *exc_type, PyObject *exc_value, Py_DECREF(name); } else { - unsigned long ident = PyThread_get_thread_ident(); - PyObject *str = PyUnicode_FromFormat("%lu", ident); + PyThread_ident_t ident = PyThread_get_thread_ident_ex(); + PyObject *str = PyUnicode_FromFormat("%" PY_FORMAT_THREAD_IDENT_T, ident); if (str != NULL) { if (PyFile_WriteObject(str, file, Py_PRINT_RAW) < 0) { Py_DECREF(str); @@ -1574,6 +1788,8 @@ static PyMethodDef thread_methods[] = { METH_VARARGS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, + {"start_joinable_thread", (PyCFunction)thread_PyThread_start_joinable_thread, + METH_O, start_joinable_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1617,6 +1833,15 @@ thread_module_exec(PyObject *module) // Initialize the C thread library PyThread_init_thread(); + // _ThreadHandle + state->thread_handle_type = (PyTypeObject *)PyType_FromSpec(&ThreadHandle_Type_spec); + if (state->thread_handle_type == NULL) { + return -1; + } + if (PyDict_SetItemString(d, "_ThreadHandle", (PyObject *)state->thread_handle_type) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromSpec(&lock_type_spec); if (state->lock_type == NULL) { @@ -1690,6 +1915,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) Py_VISIT(state->lock_type); Py_VISIT(state->local_type); Py_VISIT(state->local_dummy_type); + Py_VISIT(state->thread_handle_type); return 0; } @@ -1701,6 +1927,7 @@ thread_module_clear(PyObject *module) Py_CLEAR(state->lock_type); Py_CLEAR(state->local_type); Py_CLEAR(state->local_dummy_type); + Py_CLEAR(state->thread_handle_type); return 0; } diff --git a/Python/thread_nt.h b/Python/thread_nt.h index 26f441bd6d3..14b9cddc24c 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -182,9 +182,9 @@ bootstrap(void *call) return 0; } -unsigned long -PyThread_start_new_thread(void (*func)(void *), void *arg) -{ +int +PyThread_start_joinable_thread(void (*func)(void *), void *arg, + PyThread_ident_t* ident, PyThread_handle_t* handle) { HANDLE hThread; unsigned threadID; callobj *obj; @@ -194,7 +194,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj)); if (!obj) - return PYTHREAD_INVALID_THREAD_ID; + return -1; obj->func = func; obj->arg = arg; PyThreadState *tstate = _PyThreadState_GET(); @@ -207,22 +207,51 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) /* I've seen errno == EAGAIN here, which means "there are * too many threads". */ - int e = errno; - threadID = (unsigned)-1; HeapFree(GetProcessHeap(), 0, obj); + return -1; } - else { - CloseHandle(hThread); + *ident = threadID; + // The cast is safe since HANDLE is pointer-sized + *handle = (PyThread_handle_t) hThread; + return 0; +} + +unsigned long +PyThread_start_new_thread(void (*func)(void *), void *arg) { + PyThread_handle_t handle; + PyThread_ident_t ident; + if (PyThread_start_joinable_thread(func, arg, &ident, &handle)) { + return PYTHREAD_INVALID_THREAD_ID; } - return threadID; + CloseHandle((HANDLE) handle); + // The cast is safe since the ident is really an unsigned int + return (unsigned long) ident; +} + +int +PyThread_join_thread(PyThread_handle_t handle) { + HANDLE hThread = (HANDLE) handle; + int errored = (WaitForSingleObject(hThread, INFINITE) != WAIT_OBJECT_0); + CloseHandle(hThread); + return errored; +} + +int +PyThread_detach_thread(PyThread_handle_t handle) { + HANDLE hThread = (HANDLE) handle; + return (CloseHandle(hThread) == 0); +} + +void +PyThread_update_thread_after_fork(PyThread_ident_t* ident, PyThread_handle_t* handle) { } /* * Return the thread Id instead of a handle. The Id is said to uniquely identify the * thread in the system */ -unsigned long -PyThread_get_thread_ident(void) +PyThread_ident_t +PyThread_get_thread_ident_ex(void) { if (!initialized) PyThread_init_thread(); @@ -230,6 +259,13 @@ PyThread_get_thread_ident(void) return GetCurrentThreadId(); } +unsigned long +PyThread_get_thread_ident(void) +{ + return (unsigned long) PyThread_get_thread_ident_ex(); +} + + #ifdef PY_HAVE_THREAD_NATIVE_ID /* * Return the native Thread ID (TID) of the calling thread. diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index 76a1f7763f2..a8df5449714 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -235,8 +235,8 @@ pythread_wrapper(void *arg) return NULL; } -unsigned long -PyThread_start_new_thread(void (*func)(void *), void *arg) +static int +do_start_joinable_thread(void (*func)(void *), void *arg, pthread_t* out_id) { pthread_t th; int status; @@ -252,7 +252,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED) if (pthread_attr_init(&attrs) != 0) - return PYTHREAD_INVALID_THREAD_ID; + return -1; #endif #if defined(THREAD_STACK_SIZE) PyThreadState *tstate = _PyThreadState_GET(); @@ -261,7 +261,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) if (tss != 0) { if (pthread_attr_setstacksize(&attrs, tss) != 0) { pthread_attr_destroy(&attrs); - return PYTHREAD_INVALID_THREAD_ID; + return -1; } } #endif @@ -272,7 +272,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback)); if (callback == NULL) { - return PYTHREAD_INVALID_THREAD_ID; + return -1; } callback->func = func; @@ -292,11 +292,34 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) if (status != 0) { PyMem_RawFree(callback); + return -1; + } + *out_id = th; + return 0; +} + +int +PyThread_start_joinable_thread(void (*func)(void *), void *arg, + PyThread_ident_t* ident, PyThread_handle_t* handle) { + pthread_t th = (pthread_t) 0; + if (do_start_joinable_thread(func, arg, &th)) { + return -1; + } + *ident = (PyThread_ident_t) th; + *handle = (PyThread_handle_t) th; + assert(th == (pthread_t) *ident); + assert(th == (pthread_t) *handle); + return 0; +} + +unsigned long +PyThread_start_new_thread(void (*func)(void *), void *arg) +{ + pthread_t th = (pthread_t) 0; + if (do_start_joinable_thread(func, arg, &th)) { return PYTHREAD_INVALID_THREAD_ID; } - pthread_detach(th); - #if SIZEOF_PTHREAD_T <= SIZEOF_LONG return (unsigned long) th; #else @@ -304,20 +327,46 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) #endif } +int +PyThread_join_thread(PyThread_handle_t th) { + return pthread_join((pthread_t) th, NULL); +} + +int +PyThread_detach_thread(PyThread_handle_t th) { + return pthread_detach((pthread_t) th); +} + +void +PyThread_update_thread_after_fork(PyThread_ident_t* ident, PyThread_handle_t* handle) { + // The thread id might have been updated in the forked child + pthread_t th = pthread_self(); + *ident = (PyThread_ident_t) th; + *handle = (PyThread_handle_t) th; + assert(th == (pthread_t) *ident); + assert(th == (pthread_t) *handle); +} + /* XXX This implementation is considered (to quote Tim Peters) "inherently hosed" because: - It does not guarantee the promise that a non-zero integer is returned. - The cast to unsigned long is inherently unsafe. - It is not clear that the 'volatile' (for AIX?) are any longer necessary. */ -unsigned long -PyThread_get_thread_ident(void) -{ +PyThread_ident_t +PyThread_get_thread_ident_ex(void) { volatile pthread_t threadid; if (!initialized) PyThread_init_thread(); threadid = pthread_self(); - return (unsigned long) threadid; + assert(threadid == (pthread_t) (PyThread_ident_t) threadid); + return (PyThread_ident_t) threadid; +} + +unsigned long +PyThread_get_thread_ident(void) +{ + return (unsigned long) PyThread_get_thread_ident_ex(); } #ifdef PY_HAVE_THREAD_NATIVE_ID diff --git a/Python/thread_pthread_stubs.h b/Python/thread_pthread_stubs.h index 48bad36ec44..4741e594e52 100644 --- a/Python/thread_pthread_stubs.h +++ b/Python/thread_pthread_stubs.h @@ -94,6 +94,15 @@ pthread_detach(pthread_t thread) return 0; } +int +pthread_join(pthread_t thread, void** value_ptr) +{ + if (value_ptr) { + *value_ptr = NULL; + } + return 0; +} + PyAPI_FUNC(pthread_t) pthread_self(void) { return 0;