#include "Python.h" #include "pycore_llist.h" #include "pycore_lock.h" // _PyRawMutex #include "pycore_parking_lot.h" #include "pycore_pyerrors.h" // _Py_FatalErrorFormat #include "pycore_pystate.h" // _PyThreadState_GET #include "pycore_semaphore.h" // _PySemaphore #include "pycore_time.h" // _PyTime_Add() #include typedef struct { // The mutex protects the waiter queue and the num_waiters counter. _PyRawMutex mutex; // Linked list of `struct wait_entry` waiters in this bucket. struct llist_node root; size_t num_waiters; } Bucket; struct wait_entry { void *park_arg; uintptr_t addr; _PySemaphore sema; struct llist_node node; bool is_unparking; }; // Prime number to avoid correlations with memory addresses. // We want this to be roughly proportional to the number of CPU cores // to minimize contention on the bucket locks, but not too big to avoid // wasting memory. The exact choice does not matter much. #define NUM_BUCKETS 257 #define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) } #define BUCKET_INIT_2(b, i) BUCKET_INIT(b, i), BUCKET_INIT(b, i+1) #define BUCKET_INIT_4(b, i) BUCKET_INIT_2(b, i), BUCKET_INIT_2(b, i+2) #define BUCKET_INIT_8(b, i) BUCKET_INIT_4(b, i), BUCKET_INIT_4(b, i+4) #define BUCKET_INIT_16(b, i) BUCKET_INIT_8(b, i), BUCKET_INIT_8(b, i+8) #define BUCKET_INIT_32(b, i) BUCKET_INIT_16(b, i), BUCKET_INIT_16(b, i+16) #define BUCKET_INIT_64(b, i) BUCKET_INIT_32(b, i), BUCKET_INIT_32(b, i+32) #define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i), BUCKET_INIT_64(b, i+64) #define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128) // Table of waiters (hashed by address) static Bucket buckets[NUM_BUCKETS] = { BUCKET_INIT_256(buckets, 0), BUCKET_INIT(buckets, 256), }; void _PySemaphore_Init(_PySemaphore *sema) { #if defined(MS_WINDOWS) sema->platform_sem = CreateSemaphore( NULL, // attributes 0, // initial count 10, // maximum count NULL // unnamed ); if (!sema->platform_sem) { Py_FatalError("parking_lot: CreateSemaphore failed"); } #elif defined(_Py_USE_SEMAPHORES) if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) { Py_FatalError("parking_lot: sem_init failed"); } #else if (pthread_mutex_init(&sema->mutex, NULL) != 0) { Py_FatalError("parking_lot: pthread_mutex_init failed"); } if (pthread_cond_init(&sema->cond, NULL)) { Py_FatalError("parking_lot: pthread_cond_init failed"); } sema->counter = 0; #endif } void _PySemaphore_Destroy(_PySemaphore *sema) { #if defined(MS_WINDOWS) CloseHandle(sema->platform_sem); #elif defined(_Py_USE_SEMAPHORES) sem_destroy(&sema->platform_sem); #else pthread_mutex_destroy(&sema->mutex); pthread_cond_destroy(&sema->cond); #endif } static int _PySemaphore_PlatformWait(_PySemaphore *sema, PyTime_t timeout) { int res; #if defined(MS_WINDOWS) DWORD wait; DWORD millis = 0; if (timeout < 0) { millis = INFINITE; } else { PyTime_t div = _PyTime_AsMilliseconds(timeout, _PyTime_ROUND_TIMEOUT); // Prevent overflow with clamping the result if ((PyTime_t)PY_DWORD_MAX < div) { millis = PY_DWORD_MAX; } else { millis = (DWORD) div; } } wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE); if (wait == WAIT_OBJECT_0) { res = Py_PARK_OK; } else if (wait == WAIT_TIMEOUT) { res = Py_PARK_TIMEOUT; } else { res = Py_PARK_INTR; } #elif defined(_Py_USE_SEMAPHORES) int err; if (timeout >= 0) { struct timespec ts; #if defined(CLOCK_MONOTONIC) && defined(HAVE_SEM_CLOCKWAIT) && !defined(_Py_THREAD_SANITIZER) PyTime_t now; // silently ignore error: cannot report error to the caller (void)PyTime_MonotonicRaw(&now); PyTime_t deadline = _PyTime_Add(now, timeout); _PyTime_AsTimespec_clamp(deadline, &ts); err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts); #else PyTime_t now; // silently ignore error: cannot report error to the caller (void)PyTime_TimeRaw(&now); PyTime_t deadline = _PyTime_Add(now, timeout); _PyTime_AsTimespec_clamp(deadline, &ts); err = sem_timedwait(&sema->platform_sem, &ts); #endif } else { err = sem_wait(&sema->platform_sem); } if (err == -1) { err = errno; if (err == EINTR) { res = Py_PARK_INTR; } else if (err == ETIMEDOUT) { res = Py_PARK_TIMEOUT; } else { _Py_FatalErrorFormat(__func__, "unexpected error from semaphore: %d", err); } } else { res = Py_PARK_OK; } #else pthread_mutex_lock(&sema->mutex); int err = 0; if (sema->counter == 0) { if (timeout >= 0) { struct timespec ts; #if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP) _PyTime_AsTimespec_clamp(timeout, &ts); err = pthread_cond_timedwait_relative_np(&sema->cond, &sema->mutex, &ts); #else PyTime_t now; (void)PyTime_TimeRaw(&now); PyTime_t deadline = _PyTime_Add(now, timeout); _PyTime_AsTimespec_clamp(deadline, &ts); err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts); #endif // HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP } else { err = pthread_cond_wait(&sema->cond, &sema->mutex); } } if (sema->counter > 0) { sema->counter--; res = Py_PARK_OK; } else if (err) { res = Py_PARK_TIMEOUT; } else { res = Py_PARK_INTR; } pthread_mutex_unlock(&sema->mutex); #endif return res; } int _PySemaphore_Wait(_PySemaphore *sema, PyTime_t timeout, int detach) { PyThreadState *tstate = NULL; if (detach) { tstate = _PyThreadState_GET(); if (tstate && _Py_atomic_load_int_relaxed(&tstate->state) == _Py_THREAD_ATTACHED) { // Only detach if we are attached PyEval_ReleaseThread(tstate); } else { tstate = NULL; } } int res = _PySemaphore_PlatformWait(sema, timeout); if (tstate) { PyEval_AcquireThread(tstate); } return res; } void _PySemaphore_Wakeup(_PySemaphore *sema) { #if defined(MS_WINDOWS) if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) { Py_FatalError("parking_lot: ReleaseSemaphore failed"); } #elif defined(_Py_USE_SEMAPHORES) int err = sem_post(&sema->platform_sem); if (err != 0) { Py_FatalError("parking_lot: sem_post failed"); } #else pthread_mutex_lock(&sema->mutex); sema->counter++; pthread_cond_signal(&sema->cond); pthread_mutex_unlock(&sema->mutex); #endif } static void enqueue(Bucket *bucket, const void *address, struct wait_entry *wait) { llist_insert_tail(&bucket->root, &wait->node); ++bucket->num_waiters; } static struct wait_entry * dequeue(Bucket *bucket, const void *address) { // find the first waiter that is waiting on `address` struct llist_node *root = &bucket->root; struct llist_node *node; llist_for_each(node, root) { struct wait_entry *wait = llist_data(node, struct wait_entry, node); if (wait->addr == (uintptr_t)address) { llist_remove(node); --bucket->num_waiters; wait->is_unparking = true; return wait; } } return NULL; } static void dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst) { // remove and append all matching waiters to dst struct llist_node *root = &bucket->root; struct llist_node *node; llist_for_each_safe(node, root) { struct wait_entry *wait = llist_data(node, struct wait_entry, node); if (wait->addr == (uintptr_t)address) { llist_remove(node); llist_insert_tail(dst, node); --bucket->num_waiters; wait->is_unparking = true; } } } // Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes) static int atomic_memcmp(const void *addr, const void *expected, size_t addr_size) { switch (addr_size) { case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected; case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected; case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected; case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected; default: Py_UNREACHABLE(); } } int _PyParkingLot_Park(const void *addr, const void *expected, size_t size, PyTime_t timeout_ns, void *park_arg, int detach) { struct wait_entry wait = { .park_arg = park_arg, .addr = (uintptr_t)addr, .is_unparking = false, }; Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS]; _PyRawMutex_Lock(&bucket->mutex); if (!atomic_memcmp(addr, expected, size)) { _PyRawMutex_Unlock(&bucket->mutex); return Py_PARK_AGAIN; } _PySemaphore_Init(&wait.sema); enqueue(bucket, addr, &wait); _PyRawMutex_Unlock(&bucket->mutex); int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach); if (res == Py_PARK_OK) { goto done; } // timeout or interrupt _PyRawMutex_Lock(&bucket->mutex); if (wait.is_unparking) { _PyRawMutex_Unlock(&bucket->mutex); // Another thread has started to unpark us. Wait until we process the // wakeup signal. do { res = _PySemaphore_Wait(&wait.sema, -1, detach); } while (res != Py_PARK_OK); goto done; } else { llist_remove(&wait.node); --bucket->num_waiters; } _PyRawMutex_Unlock(&bucket->mutex); done: _PySemaphore_Destroy(&wait.sema); return res; } void _PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg) { Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS]; // Find the first waiter that is waiting on `addr` _PyRawMutex_Lock(&bucket->mutex); struct wait_entry *waiter = dequeue(bucket, addr); if (waiter) { int has_more_waiters = (bucket->num_waiters > 0); fn(arg, waiter->park_arg, has_more_waiters); } else { fn(arg, NULL, 0); } _PyRawMutex_Unlock(&bucket->mutex); if (waiter) { // Wakeup the waiter outside of the bucket lock _PySemaphore_Wakeup(&waiter->sema); } } void _PyParkingLot_UnparkAll(const void *addr) { struct llist_node head = LLIST_INIT(head); Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS]; _PyRawMutex_Lock(&bucket->mutex); dequeue_all(bucket, addr, &head); _PyRawMutex_Unlock(&bucket->mutex); struct llist_node *node; llist_for_each_safe(node, &head) { struct wait_entry *waiter = llist_data(node, struct wait_entry, node); llist_remove(node); _PySemaphore_Wakeup(&waiter->sema); } } void _PyParkingLot_AfterFork(void) { // After a fork only one thread remains. That thread cannot be blocked // so all entries in the parking lot are for dead threads. memset(buckets, 0, sizeof(buckets)); for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) { llist_init(&buckets[i].root); } }