gh-110693: Pending Calls Machinery Cleanups (gh-118296)

This does some cleanup in preparation for later changes.
This commit is contained in:
Eric Snow 2024-04-25 19:05:51 -06:00 committed by GitHub
parent d5df25268b
commit 09c2947581
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 319 additions and 117 deletions

View File

@ -48,8 +48,12 @@ extern void _PyEval_SignalReceived(void);
#define _Py_PENDING_MAINTHREADONLY 1 #define _Py_PENDING_MAINTHREADONLY 1
#define _Py_PENDING_RAWFREE 2 #define _Py_PENDING_RAWFREE 2
typedef int _Py_add_pending_call_result;
#define _Py_ADD_PENDING_SUCCESS 0
#define _Py_ADD_PENDING_FULL -1
// Export for '_testinternalcapi' shared extension // Export for '_testinternalcapi' shared extension
PyAPI_FUNC(int) _PyEval_AddPendingCall( PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
PyInterpreterState *interp, PyInterpreterState *interp,
_Py_pending_call_func func, _Py_pending_call_func func,
void *arg, void *arg,

View File

@ -14,28 +14,56 @@ extern "C" {
typedef int (*_Py_pending_call_func)(void *); typedef int (*_Py_pending_call_func)(void *);
struct _pending_call {
_Py_pending_call_func func;
void *arg;
int flags;
};
#define PENDINGCALLSARRAYSIZE 32
#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
/* For interpreter-level pending calls, we want to avoid spending too
much time on pending calls in any one thread, so we apply a limit. */
#if MAXPENDINGCALLS > 100
# define MAXPENDINGCALLSLOOP 100
#else
# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
#endif
#define MAXPENDINGCALLS_MAIN PENDINGCALLSARRAYSIZE
/* For the main thread, we want to make sure all pending calls are
run at once, for the sake of prompt signal handling. This is
unlikely to cause any problems since there should be very few
pending calls for the main thread. */
#define MAXPENDINGCALLSLOOP_MAIN 0
struct _pending_calls { struct _pending_calls {
int busy; int busy;
PyMutex mutex; PyMutex mutex;
/* Request for running pending calls. */ /* Request for running pending calls. */
int32_t calls_to_do; int32_t npending;
#define NPENDINGCALLS 32 /* The maximum allowed number of pending calls.
struct _pending_call { If the queue fills up to this point then _PyEval_AddPendingCall()
_Py_pending_call_func func; will return _Py_ADD_PENDING_FULL. */
void *arg; int32_t max;
int flags; /* We don't want a flood of pending calls to interrupt any one thread
} calls[NPENDINGCALLS]; for too long, so we keep a limit on the number handled per pass.
A value of 0 means there is no limit (other than the maximum
size of the list of pending calls). */
int32_t maxloop;
struct _pending_call calls[PENDINGCALLSARRAYSIZE];
int first; int first;
int last; int next;
}; };
typedef enum { typedef enum {
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
PERF_STATUS_OK = 1, // Perf trampoline is ready to be executed PERF_STATUS_OK = 1, // Perf trampoline is ready to be executed
} perf_status_t; } perf_status_t;
#ifdef PY_HAVE_PERF_TRAMPOLINE #ifdef PY_HAVE_PERF_TRAMPOLINE
struct code_arena_st; struct code_arena_st;
@ -48,6 +76,7 @@ struct trampoline_api_st {
}; };
#endif #endif
struct _ceval_runtime_state { struct _ceval_runtime_state {
struct { struct {
#ifdef PY_HAVE_PERF_TRAMPOLINE #ifdef PY_HAVE_PERF_TRAMPOLINE
@ -62,10 +91,15 @@ struct _ceval_runtime_state {
#endif #endif
} perf; } perf;
/* Pending calls to be made only on the main thread. */ /* Pending calls to be made only on the main thread. */
// The signal machinery falls back on this
// so it must be especially stable and efficient.
// For example, we use a preallocated array
// for the list of pending calls.
struct _pending_calls pending_mainthread; struct _pending_calls pending_mainthread;
PyMutex sys_trace_profile_mutex; PyMutex sys_trace_profile_mutex;
}; };
#ifdef PY_HAVE_PERF_TRAMPOLINE #ifdef PY_HAVE_PERF_TRAMPOLINE
# define _PyEval_RUNTIME_PERF_INIT \ # define _PyEval_RUNTIME_PERF_INIT \
{ \ { \

View File

@ -114,6 +114,10 @@ extern PyTypeObject _PyExc_MemoryError;
.autoTSSkey = Py_tss_NEEDS_INIT, \ .autoTSSkey = Py_tss_NEEDS_INIT, \
.parser = _parser_runtime_state_INIT, \ .parser = _parser_runtime_state_INIT, \
.ceval = { \ .ceval = { \
.pending_mainthread = { \
.max = MAXPENDINGCALLS_MAIN, \
.maxloop = MAXPENDINGCALLSLOOP_MAIN, \
}, \
.perf = _PyEval_RUNTIME_PERF_INIT, \ .perf = _PyEval_RUNTIME_PERF_INIT, \
}, \ }, \
.gilstate = { \ .gilstate = { \
@ -166,6 +170,10 @@ extern PyTypeObject _PyExc_MemoryError;
.imports = IMPORTS_INIT, \ .imports = IMPORTS_INIT, \
.ceval = { \ .ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \ .recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
.pending = { \
.max = MAXPENDINGCALLS, \
.maxloop = MAXPENDINGCALLSLOOP, \
}, \
}, \ }, \
.gc = { \ .gc = { \
.enabled = 1, \ .enabled = 1, \

View File

@ -1172,6 +1172,12 @@ class CAPITest(unittest.TestCase):
self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname') self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')
def test_gen_get_code(self):
def genf(): yield
gen = genf()
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
@requires_limited_api @requires_limited_api
class TestHeapTypeRelative(unittest.TestCase): class TestHeapTypeRelative(unittest.TestCase):
"""Test API for extending opaque types (PEP 697)""" """Test API for extending opaque types (PEP 697)"""
@ -1452,7 +1458,7 @@ class TestPendingCalls(unittest.TestCase):
# about when pending calls get run. This is especially relevant # about when pending calls get run. This is especially relevant
# here for creating deterministic tests. # here for creating deterministic tests.
def pendingcalls_submit(self, l, n): def main_pendingcalls_submit(self, l, n):
def callback(): def callback():
#this function can be interrupted by thread switching so let's #this function can be interrupted by thread switching so let's
#use an atomic operation #use an atomic operation
@ -1467,12 +1473,27 @@ class TestPendingCalls(unittest.TestCase):
if _testcapi._pending_threadfunc(callback): if _testcapi._pending_threadfunc(callback):
break break
def pendingcalls_wait(self, l, n, context = None): def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
def callback():
#this function can be interrupted by thread switching so let's
#use an atomic operation
l.append(None)
if main:
return _testcapi._pending_threadfunc(callback, n,
blocking=False,
ensure_added=ensure)
else:
return _testinternalcapi.pending_threadfunc(callback, n,
blocking=False,
ensure_added=ensure)
def pendingcalls_wait(self, l, numadded, context = None):
#now, stick around until l[0] has grown to 10 #now, stick around until l[0] has grown to 10
count = 0 count = 0
while len(l) != n: while len(l) != numadded:
#this busy loop is where we expect to be interrupted to #this busy loop is where we expect to be interrupted to
#run our callbacks. Note that callbacks are only run on the #run our callbacks. Note that some callbacks are only run on the
#main thread #main thread
if False and support.verbose: if False and support.verbose:
print("(%i)"%(len(l),),) print("(%i)"%(len(l),),)
@ -1482,12 +1503,12 @@ class TestPendingCalls(unittest.TestCase):
continue continue
count += 1 count += 1
self.assertTrue(count < 10000, self.assertTrue(count < 10000,
"timeout waiting for %i callbacks, got %i"%(n, len(l))) "timeout waiting for %i callbacks, got %i"%(numadded, len(l)))
if False and support.verbose: if False and support.verbose:
print("(%i)"%(len(l),)) print("(%i)"%(len(l),))
@threading_helper.requires_working_threading() @threading_helper.requires_working_threading()
def test_pendingcalls_threaded(self): def test_main_pendingcalls_threaded(self):
#do every callback on a separate thread #do every callback on a separate thread
n = 32 #total callbacks n = 32 #total callbacks
@ -1501,15 +1522,15 @@ class TestPendingCalls(unittest.TestCase):
context.lock = threading.Lock() context.lock = threading.Lock()
context.event = threading.Event() context.event = threading.Event()
threads = [threading.Thread(target=self.pendingcalls_thread, threads = [threading.Thread(target=self.main_pendingcalls_thread,
args=(context,)) args=(context,))
for i in range(context.nThreads)] for i in range(context.nThreads)]
with threading_helper.start_threads(threads): with threading_helper.start_threads(threads):
self.pendingcalls_wait(context.l, n, context) self.pendingcalls_wait(context.l, n, context)
def pendingcalls_thread(self, context): def main_pendingcalls_thread(self, context):
try: try:
self.pendingcalls_submit(context.l, context.n) self.main_pendingcalls_submit(context.l, context.n)
finally: finally:
with context.lock: with context.lock:
context.nFinished += 1 context.nFinished += 1
@ -1519,20 +1540,54 @@ class TestPendingCalls(unittest.TestCase):
if nFinished == context.nThreads: if nFinished == context.nThreads:
context.event.set() context.event.set()
def test_pendingcalls_non_threaded(self): def test_main_pendingcalls_non_threaded(self):
#again, just using the main thread, likely they will all be dispatched at #again, just using the main thread, likely they will all be dispatched at
#once. It is ok to ask for too many, because we loop until we find a slot. #once. It is ok to ask for too many, because we loop until we find a slot.
#the loop can be interrupted to dispatch. #the loop can be interrupted to dispatch.
#there are only 32 dispatch slots, so we go for twice that! #there are only 32 dispatch slots, so we go for twice that!
l = [] l = []
n = 64 n = 64
self.pendingcalls_submit(l, n) self.main_pendingcalls_submit(l, n)
self.pendingcalls_wait(l, n) self.pendingcalls_wait(l, n)
def test_gen_get_code(self): def test_max_pending(self):
def genf(): yield with self.subTest('main-only'):
gen = genf() maxpending = 32
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
l = []
added = self.pendingcalls_submit(l, 1, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, 1)
l = []
added = self.pendingcalls_submit(l, maxpending, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)
l = []
added = self.pendingcalls_submit(l, maxpending+1, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)
with self.subTest('not main-only'):
# Per-interpreter pending calls has the same low limit
# on how many may be pending at a time.
maxpending = 32
l = []
added = self.pendingcalls_submit(l, 1, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, 1)
l = []
added = self.pendingcalls_submit(l, maxpending, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)
l = []
added = self.pendingcalls_submit(l, maxpending+1, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)
class PendingTask(types.SimpleNamespace): class PendingTask(types.SimpleNamespace):

View File

@ -819,25 +819,55 @@ static int _pending_callback(void *arg)
* run from any python thread. * run from any python thread.
*/ */
static PyObject * static PyObject *
pending_threadfunc(PyObject *self, PyObject *arg) pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)
{ {
static char *kwlist[] = {"callback", "num",
"blocking", "ensure_added", NULL};
PyObject *callable; PyObject *callable;
int r; unsigned int num = 1;
if (PyArg_ParseTuple(arg, "O", &callable) == 0) int blocking = 0;
int ensure_added = 0;
if (!PyArg_ParseTupleAndKeywords(arg, kwargs,
"O|I$pp:_pending_threadfunc", kwlist,
&callable, &num, &blocking, &ensure_added))
{
return NULL; return NULL;
}
/* create the reference for the callbackwhile we hold the lock */ /* create the reference for the callbackwhile we hold the lock */
Py_INCREF(callable); for (unsigned int i = 0; i < num; i++) {
Py_INCREF(callable);
Py_BEGIN_ALLOW_THREADS
r = Py_AddPendingCall(&_pending_callback, callable);
Py_END_ALLOW_THREADS
if (r<0) {
Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
Py_RETURN_FALSE;
} }
Py_RETURN_TRUE;
PyThreadState *save_tstate = NULL;
if (!blocking) {
save_tstate = PyEval_SaveThread();
}
unsigned int num_added = 0;
for (; num_added < num; num_added++) {
if (ensure_added) {
int r;
do {
r = Py_AddPendingCall(&_pending_callback, callable);
} while (r < 0);
}
else {
if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
break;
}
}
}
if (!blocking) {
PyEval_RestoreThread(save_tstate);
}
for (unsigned int i = num_added; i < num; i++) {
Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
}
/* The callable is decref'ed above in each added _pending_callback(). */
return PyLong_FromUnsignedLong((unsigned long)num_added);
} }
/* Test PyOS_string_to_double. */ /* Test PyOS_string_to_double. */
@ -3232,7 +3262,8 @@ static PyMethodDef TestMethods[] = {
{"_spawn_pthread_waiter", spawn_pthread_waiter, METH_NOARGS}, {"_spawn_pthread_waiter", spawn_pthread_waiter, METH_NOARGS},
{"_end_spawned_pthread", end_spawned_pthread, METH_NOARGS}, {"_end_spawned_pthread", end_spawned_pthread, METH_NOARGS},
#endif #endif
{"_pending_threadfunc", pending_threadfunc, METH_VARARGS}, {"_pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
METH_VARARGS|METH_KEYWORDS},
#ifdef HAVE_GETTIMEOFDAY #ifdef HAVE_GETTIMEOFDAY
{"profile_int", profile_int, METH_NOARGS}, {"profile_int", profile_int, METH_NOARGS},
#endif #endif

View File

@ -1062,37 +1062,56 @@ static PyObject *
pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs) pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
{ {
PyObject *callable; PyObject *callable;
unsigned int num = 1;
int blocking = 0;
int ensure_added = 0; int ensure_added = 0;
static char *kwlist[] = {"", "ensure_added", NULL}; static char *kwlist[] = {"callback", "num",
"blocking", "ensure_added", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, if (!PyArg_ParseTupleAndKeywords(args, kwargs,
"O|$p:pending_threadfunc", kwlist, "O|I$pp:pending_threadfunc", kwlist,
&callable, &ensure_added)) &callable, &num, &blocking, &ensure_added))
{ {
return NULL; return NULL;
} }
PyInterpreterState *interp = _PyInterpreterState_GET(); PyInterpreterState *interp = _PyInterpreterState_GET();
/* create the reference for the callbackwhile we hold the lock */ /* create the reference for the callbackwhile we hold the lock */
Py_INCREF(callable); for (unsigned int i = 0; i < num; i++) {
Py_INCREF(callable);
int r;
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
Py_END_ALLOW_THREADS
if (r < 0) {
/* unsuccessful add */
if (!ensure_added) {
Py_DECREF(callable);
Py_RETURN_FALSE;
}
do {
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
Py_END_ALLOW_THREADS
} while (r < 0);
} }
Py_RETURN_TRUE; PyThreadState *save_tstate = NULL;
if (!blocking) {
save_tstate = PyEval_SaveThread();
}
unsigned int num_added = 0;
for (; num_added < num; num_added++) {
if (ensure_added) {
_Py_add_pending_call_result r;
do {
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
assert(r == _Py_ADD_PENDING_SUCCESS
|| r == _Py_ADD_PENDING_FULL);
} while (r == _Py_ADD_PENDING_FULL);
}
else {
if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) {
break;
}
}
}
if (!blocking) {
PyEval_RestoreThread(save_tstate);
}
for (unsigned int i = num_added; i < num; i++) {
Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
}
/* The callable is decref'ed in _pending_callback() above. */
return PyLong_FromUnsignedLong((unsigned long)num_added);
} }
@ -1135,14 +1154,16 @@ pending_identify(PyObject *self, PyObject *args)
PyThread_acquire_lock(mutex, WAIT_LOCK); PyThread_acquire_lock(mutex, WAIT_LOCK);
/* It gets released in _pending_identify_callback(). */ /* It gets released in _pending_identify_callback(). */
int r; _Py_add_pending_call_result r;
do { do {
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp, r = _PyEval_AddPendingCall(interp,
&_pending_identify_callback, (void *)mutex, &_pending_identify_callback, (void *)mutex,
0); 0);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
} while (r < 0); assert(r == _Py_ADD_PENDING_SUCCESS
|| r == _Py_ADD_PENDING_FULL);
} while (r == _Py_ADD_PENDING_FULL);
/* Wait for the pending call to complete. */ /* Wait for the pending call to complete. */
PyThread_acquire_lock(mutex, WAIT_LOCK); PyThread_acquire_lock(mutex, WAIT_LOCK);

View File

@ -84,15 +84,15 @@ update_eval_breaker_for_thread(PyInterpreterState *interp, PyThreadState *tstate
return; return;
#endif #endif
int32_t calls_to_do = _Py_atomic_load_int32_relaxed( int32_t npending = _Py_atomic_load_int32_relaxed(
&interp->ceval.pending.calls_to_do); &interp->ceval.pending.npending);
if (calls_to_do) { if (npending) {
_Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT); _Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT);
} }
else if (_Py_IsMainThread()) { else if (_Py_IsMainThread()) {
calls_to_do = _Py_atomic_load_int32_relaxed( npending = _Py_atomic_load_int32_relaxed(
&_PyRuntime.ceval.pending_mainthread.calls_to_do); &_PyRuntime.ceval.pending_mainthread.npending);
if (calls_to_do) { if (npending) {
_Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT); _Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT);
} }
} }
@ -624,6 +624,34 @@ PyEval_RestoreThread(PyThreadState *tstate)
} }
void
_PyEval_SignalReceived(void)
{
_Py_set_eval_breaker_bit(_PyRuntime.main_tstate, _PY_SIGNALS_PENDING_BIT);
}
#ifndef Py_GIL_DISABLED
static void
signal_active_thread(PyInterpreterState *interp, uintptr_t bit)
{
struct _gil_runtime_state *gil = interp->ceval.gil;
// If a thread from the targeted interpreter is holding the GIL, signal
// that thread. Otherwise, the next thread to run from the targeted
// interpreter will have its bit set as part of taking the GIL.
MUTEX_LOCK(gil->mutex);
if (_Py_atomic_load_int_relaxed(&gil->locked)) {
PyThreadState *holder = (PyThreadState*)_Py_atomic_load_ptr_relaxed(&gil->last_holder);
if (holder->interp == interp) {
_Py_set_eval_breaker_bit(holder, bit);
}
}
MUTEX_UNLOCK(gil->mutex);
}
#endif
/* Mechanism whereby asynchronously executing callbacks (e.g. UNIX /* Mechanism whereby asynchronously executing callbacks (e.g. UNIX
signal handlers or Mac I/O completion routines) can schedule calls signal handlers or Mac I/O completion routines) can schedule calls
to a function to be called synchronously. to a function to be called synchronously.
@ -646,29 +674,31 @@ PyEval_RestoreThread(PyThreadState *tstate)
threadstate. threadstate.
*/ */
void
_PyEval_SignalReceived(void)
{
_Py_set_eval_breaker_bit(_PyRuntime.main_tstate, _PY_SIGNALS_PENDING_BIT);
}
/* Push one item onto the queue while holding the lock. */ /* Push one item onto the queue while holding the lock. */
static int static int
_push_pending_call(struct _pending_calls *pending, _push_pending_call(struct _pending_calls *pending,
_Py_pending_call_func func, void *arg, int flags) _Py_pending_call_func func, void *arg, int flags)
{ {
int i = pending->last; if (pending->npending == pending->max) {
int j = (i + 1) % NPENDINGCALLS; return _Py_ADD_PENDING_FULL;
if (j == pending->first) {
return -1; /* Queue full */
} }
assert(pending->npending < pending->max);
int i = pending->next;
assert(pending->calls[i].func == NULL);
pending->calls[i].func = func; pending->calls[i].func = func;
pending->calls[i].arg = arg; pending->calls[i].arg = arg;
pending->calls[i].flags = flags; pending->calls[i].flags = flags;
pending->last = j;
assert(pending->calls_to_do < NPENDINGCALLS); assert(pending->npending < PENDINGCALLSARRAYSIZE);
_Py_atomic_add_int32(&pending->calls_to_do, 1); _Py_atomic_add_int32(&pending->npending, 1);
return 0;
pending->next = (i + 1) % PENDINGCALLSARRAYSIZE;
assert(pending->next != pending->first
|| pending->npending == pending->max);
return _Py_ADD_PENDING_SUCCESS;
} }
static int static int
@ -676,8 +706,9 @@ _next_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg, int *flags) int (**func)(void *), void **arg, int *flags)
{ {
int i = pending->first; int i = pending->first;
if (i == pending->last) { if (pending->npending == 0) {
/* Queue empty */ /* Queue empty */
assert(i == pending->next);
assert(pending->calls[i].func == NULL); assert(pending->calls[i].func == NULL);
return -1; return -1;
} }
@ -695,38 +726,18 @@ _pop_pending_call(struct _pending_calls *pending,
int i = _next_pending_call(pending, func, arg, flags); int i = _next_pending_call(pending, func, arg, flags);
if (i >= 0) { if (i >= 0) {
pending->calls[i] = (struct _pending_call){0}; pending->calls[i] = (struct _pending_call){0};
pending->first = (i + 1) % NPENDINGCALLS; pending->first = (i + 1) % PENDINGCALLSARRAYSIZE;
assert(pending->calls_to_do > 0); assert(pending->npending > 0);
_Py_atomic_add_int32(&pending->calls_to_do, -1); _Py_atomic_add_int32(&pending->npending, -1);
} }
} }
#ifndef Py_GIL_DISABLED
static void
signal_active_thread(PyInterpreterState *interp, uintptr_t bit)
{
struct _gil_runtime_state *gil = interp->ceval.gil;
// If a thread from the targeted interpreter is holding the GIL, signal
// that thread. Otherwise, the next thread to run from the targeted
// interpreter will have its bit set as part of taking the GIL.
MUTEX_LOCK(gil->mutex);
if (_Py_atomic_load_int_relaxed(&gil->locked)) {
PyThreadState *holder = (PyThreadState*)_Py_atomic_load_ptr_relaxed(&gil->last_holder);
if (holder->interp == interp) {
_Py_set_eval_breaker_bit(holder, bit);
}
}
MUTEX_UNLOCK(gil->mutex);
}
#endif
/* This implementation is thread-safe. It allows /* This implementation is thread-safe. It allows
scheduling to be made from any thread, and even from an executing scheduling to be made from any thread, and even from an executing
callback. callback.
*/ */
int _Py_add_pending_call_result
_PyEval_AddPendingCall(PyInterpreterState *interp, _PyEval_AddPendingCall(PyInterpreterState *interp,
_Py_pending_call_func func, void *arg, int flags) _Py_pending_call_func func, void *arg, int flags)
{ {
@ -739,7 +750,8 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
} }
PyMutex_Lock(&pending->mutex); PyMutex_Lock(&pending->mutex);
int result = _push_pending_call(pending, func, arg, flags); _Py_add_pending_call_result result =
_push_pending_call(pending, func, arg, flags);
PyMutex_Unlock(&pending->mutex); PyMutex_Unlock(&pending->mutex);
if (main_only) { if (main_only) {
@ -762,7 +774,15 @@ Py_AddPendingCall(_Py_pending_call_func func, void *arg)
/* Legacy users of this API will continue to target the main thread /* Legacy users of this API will continue to target the main thread
(of the main interpreter). */ (of the main interpreter). */
PyInterpreterState *interp = _PyInterpreterState_Main(); PyInterpreterState *interp = _PyInterpreterState_Main();
return _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY); _Py_add_pending_call_result r =
_PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
if (r == _Py_ADD_PENDING_FULL) {
return -1;
}
else {
assert(r == _Py_ADD_PENDING_SUCCESS);
return 0;
}
} }
static int static int
@ -782,10 +802,21 @@ handle_signals(PyThreadState *tstate)
} }
static int static int
_make_pending_calls(struct _pending_calls *pending) _make_pending_calls(struct _pending_calls *pending, int32_t *p_npending)
{ {
int res = 0;
int32_t npending = -1;
assert(sizeof(pending->max) <= sizeof(size_t)
&& ((size_t)pending->max) <= Py_ARRAY_LENGTH(pending->calls));
int32_t maxloop = pending->maxloop;
if (maxloop == 0) {
maxloop = pending->max;
}
assert(maxloop > 0 && maxloop <= pending->max);
/* perform a bounded number of calls, in case of recursion */ /* perform a bounded number of calls, in case of recursion */
for (int i=0; i<NPENDINGCALLS; i++) { for (int i=0; i<maxloop; i++) {
_Py_pending_call_func func = NULL; _Py_pending_call_func func = NULL;
void *arg = NULL; void *arg = NULL;
int flags = 0; int flags = 0;
@ -793,21 +824,29 @@ _make_pending_calls(struct _pending_calls *pending)
/* pop one item off the queue while holding the lock */ /* pop one item off the queue while holding the lock */
PyMutex_Lock(&pending->mutex); PyMutex_Lock(&pending->mutex);
_pop_pending_call(pending, &func, &arg, &flags); _pop_pending_call(pending, &func, &arg, &flags);
npending = pending->npending;
PyMutex_Unlock(&pending->mutex); PyMutex_Unlock(&pending->mutex);
/* having released the lock, perform the callback */ /* Check if there are any more pending calls. */
if (func == NULL) { if (func == NULL) {
assert(npending == 0);
break; break;
} }
int res = func(arg);
/* having released the lock, perform the callback */
res = func(arg);
if ((flags & _Py_PENDING_RAWFREE) && arg != NULL) { if ((flags & _Py_PENDING_RAWFREE) && arg != NULL) {
PyMem_RawFree(arg); PyMem_RawFree(arg);
} }
if (res != 0) { if (res != 0) {
return -1; res = -1;
goto finally;
} }
} }
return 0;
finally:
*p_npending = npending;
return res;
} }
static void static void
@ -861,26 +900,36 @@ make_pending_calls(PyThreadState *tstate)
added in-between re-signals */ added in-between re-signals */
unsignal_pending_calls(tstate, interp); unsignal_pending_calls(tstate, interp);
if (_make_pending_calls(pending) != 0) { int32_t npending;
if (_make_pending_calls(pending, &npending) != 0) {
pending->busy = 0; pending->busy = 0;
/* There might not be more calls to make, but we play it safe. */ /* There might not be more calls to make, but we play it safe. */
signal_pending_calls(tstate, interp); signal_pending_calls(tstate, interp);
return -1; return -1;
} }
if (npending > 0) {
/* We hit pending->maxloop. */
signal_pending_calls(tstate, interp);
}
if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) { if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
if (_make_pending_calls(pending_main) != 0) { if (_make_pending_calls(pending_main, &npending) != 0) {
pending->busy = 0; pending->busy = 0;
/* There might not be more calls to make, but we play it safe. */ /* There might not be more calls to make, but we play it safe. */
signal_pending_calls(tstate, interp); signal_pending_calls(tstate, interp);
return -1; return -1;
} }
if (npending > 0) {
/* We hit pending_main->maxloop. */
signal_pending_calls(tstate, interp);
}
} }
pending->busy = 0; pending->busy = 0;
return 0; return 0;
} }
void void
_Py_set_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit) _Py_set_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit)
{ {