gh-111964: Add _PyRWMutex a "readers-writer" lock (gh-112859)

This adds `_PyRWMutex`, a "readers-writer" lock, which wil be used to
serialize global stop-the-world pauses with per-interpreter pauses.
This commit is contained in:
Sam Gross 2023-12-15 20:56:55 -05:00 committed by GitHub
parent 40574da019
commit 5ae75e1be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 244 additions and 0 deletions

View File

@ -213,6 +213,45 @@ _PyOnceFlag_CallOnce(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg)
return _PyOnceFlag_CallOnceSlow(flag, fn, arg);
}
// A readers-writer (RW) lock. The lock supports multiple concurrent readers or
// a single writer. The lock is write-preferring: if a writer is waiting while
// the lock is read-locked then, new readers will be blocked. This avoids
// starvation of writers.
//
// In C++, the equivalent synchronization primitive is std::shared_mutex
// with shared ("read") and exclusive ("write") locking.
//
// The two least significant bits are used to indicate if the lock is
// write-locked and if there are parked threads (either readers or writers)
// waiting to acquire the lock. The remaining bits are used to indicate the
// number of readers holding the lock.
//
// 0b000..00000: unlocked
// 0bnnn..nnn00: nnn..nnn readers holding the lock
// 0bnnn..nnn10: nnn..nnn readers holding the lock and a writer is waiting
// 0b00000..010: unlocked with awoken writer about to acquire lock
// 0b00000..001: write-locked
// 0b00000..011: write-locked and readers or other writers are waiting
//
// Note that reader_count must be zero if the lock is held by a writer, and
// vice versa. The lock can only be held by readers or a writer, but not both.
//
// The design is optimized for simplicity of the implementation. The lock is
// not fair: if fairness is desired, use an additional PyMutex to serialize
// writers. The lock is also not reentrant.
typedef struct {
uintptr_t bits;
} _PyRWMutex;
// Read lock (i.e., shared lock)
PyAPI_FUNC(void) _PyRWMutex_RLock(_PyRWMutex *rwmutex);
PyAPI_FUNC(void) _PyRWMutex_RUnlock(_PyRWMutex *rwmutex);
// Write lock (i.e., exclusive lock)
PyAPI_FUNC(void) _PyRWMutex_Lock(_PyRWMutex *rwmutex);
PyAPI_FUNC(void) _PyRWMutex_Unlock(_PyRWMutex *rwmutex);
#ifdef __cplusplus
}
#endif

View File

@ -372,6 +372,104 @@ test_lock_once(PyObject *self, PyObject *obj)
Py_RETURN_NONE;
}
struct test_rwlock_data {
Py_ssize_t nthreads;
_PyRWMutex rw;
PyEvent step1;
PyEvent step2;
PyEvent step3;
PyEvent done;
};
static void
rdlock_thread(void *arg)
{
struct test_rwlock_data *test_data = arg;
// Acquire the lock in read mode
_PyRWMutex_RLock(&test_data->rw);
PyEvent_Wait(&test_data->step1);
_PyRWMutex_RUnlock(&test_data->rw);
_PyRWMutex_RLock(&test_data->rw);
PyEvent_Wait(&test_data->step3);
_PyRWMutex_RUnlock(&test_data->rw);
if (_Py_atomic_add_ssize(&test_data->nthreads, -1) == 1) {
_PyEvent_Notify(&test_data->done);
}
}
static void
wrlock_thread(void *arg)
{
struct test_rwlock_data *test_data = arg;
// First acquire the lock in write mode
_PyRWMutex_Lock(&test_data->rw);
PyEvent_Wait(&test_data->step2);
_PyRWMutex_Unlock(&test_data->rw);
if (_Py_atomic_add_ssize(&test_data->nthreads, -1) == 1) {
_PyEvent_Notify(&test_data->done);
}
}
static void
wait_until(uintptr_t *ptr, uintptr_t value)
{
// wait up to two seconds for *ptr == value
int iters = 0;
uintptr_t bits;
do {
pysleep(10);
bits = _Py_atomic_load_uintptr(ptr);
iters++;
} while (bits != value && iters < 200);
}
static PyObject *
test_lock_rwlock(PyObject *self, PyObject *obj)
{
struct test_rwlock_data test_data = {.nthreads = 3};
_PyRWMutex_Lock(&test_data.rw);
assert(test_data.rw.bits == 1);
_PyRWMutex_Unlock(&test_data.rw);
assert(test_data.rw.bits == 0);
// Start two readers
PyThread_start_new_thread(rdlock_thread, &test_data);
PyThread_start_new_thread(rdlock_thread, &test_data);
// wait up to two seconds for the threads to attempt to read-lock "rw"
wait_until(&test_data.rw.bits, 8);
assert(test_data.rw.bits == 8);
// start writer (while readers hold lock)
PyThread_start_new_thread(wrlock_thread, &test_data);
wait_until(&test_data.rw.bits, 10);
assert(test_data.rw.bits == 10);
// readers release lock, writer should acquire it
_PyEvent_Notify(&test_data.step1);
wait_until(&test_data.rw.bits, 3);
assert(test_data.rw.bits == 3);
// writer releases lock, readers acquire it
_PyEvent_Notify(&test_data.step2);
wait_until(&test_data.rw.bits, 8);
assert(test_data.rw.bits == 8);
// readers release lock again
_PyEvent_Notify(&test_data.step3);
wait_until(&test_data.rw.bits, 0);
assert(test_data.rw.bits == 0);
PyEvent_Wait(&test_data.done);
Py_RETURN_NONE;
}
static PyMethodDef test_methods[] = {
{"test_lock_basic", test_lock_basic, METH_NOARGS},
{"test_lock_two_threads", test_lock_two_threads, METH_NOARGS},
@ -380,6 +478,7 @@ static PyMethodDef test_methods[] = {
_TESTINTERNALCAPI_BENCHMARK_LOCKS_METHODDEF
{"test_lock_benchmark", test_lock_benchmark, METH_NOARGS},
{"test_lock_once", test_lock_once, METH_NOARGS},
{"test_lock_rwlock", test_lock_rwlock, METH_NOARGS},
{NULL, NULL} /* sentinel */
};

View File

@ -353,3 +353,109 @@ _PyOnceFlag_CallOnceSlow(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg)
v = _Py_atomic_load_uint8(&flag->v);
}
}
#define _Py_WRITE_LOCKED 1
#define _PyRWMutex_READER_SHIFT 2
#define _Py_RWMUTEX_MAX_READERS (UINTPTR_MAX >> _PyRWMutex_READER_SHIFT)
static uintptr_t
rwmutex_set_parked_and_wait(_PyRWMutex *rwmutex, uintptr_t bits)
{
// Set _Py_HAS_PARKED and wait until we are woken up.
if ((bits & _Py_HAS_PARKED) == 0) {
uintptr_t newval = bits | _Py_HAS_PARKED;
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits, newval)) {
return bits;
}
bits = newval;
}
_PyParkingLot_Park(&rwmutex->bits, &bits, sizeof(bits), -1, NULL, 1);
return _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
}
// The number of readers holding the lock
static uintptr_t
rwmutex_reader_count(uintptr_t bits)
{
return bits >> _PyRWMutex_READER_SHIFT;
}
void
_PyRWMutex_RLock(_PyRWMutex *rwmutex)
{
uintptr_t bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
for (;;) {
if ((bits & _Py_WRITE_LOCKED)) {
// A writer already holds the lock.
bits = rwmutex_set_parked_and_wait(rwmutex, bits);
continue;
}
else if ((bits & _Py_HAS_PARKED)) {
// Reader(s) hold the lock (or just gave up the lock), but there is
// at least one waiting writer. We can't grab the lock because we
// don't want to starve the writer. Instead, we park ourselves and
// wait for the writer to eventually wake us up.
bits = rwmutex_set_parked_and_wait(rwmutex, bits);
continue;
}
else {
// The lock is unlocked or read-locked. Try to grab it.
assert(rwmutex_reader_count(bits) < _Py_RWMUTEX_MAX_READERS);
uintptr_t newval = bits + (1 << _PyRWMutex_READER_SHIFT);
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits, newval)) {
continue;
}
return;
}
}
}
void
_PyRWMutex_RUnlock(_PyRWMutex *rwmutex)
{
uintptr_t bits = _Py_atomic_add_uintptr(&rwmutex->bits, -(1 << _PyRWMutex_READER_SHIFT));
assert(rwmutex_reader_count(bits) > 0 && "lock was not read-locked");
bits -= (1 << _PyRWMutex_READER_SHIFT);
if (rwmutex_reader_count(bits) == 0 && (bits & _Py_HAS_PARKED)) {
_PyParkingLot_UnparkAll(&rwmutex->bits);
return;
}
}
void
_PyRWMutex_Lock(_PyRWMutex *rwmutex)
{
uintptr_t bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
for (;;) {
// If there are no active readers and it's not already write-locked,
// then we can grab the lock.
if ((bits & ~_Py_HAS_PARKED) == 0) {
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits,
bits | _Py_WRITE_LOCKED)) {
continue;
}
return;
}
// Otherwise, we have to wait.
bits = rwmutex_set_parked_and_wait(rwmutex, bits);
}
}
void
_PyRWMutex_Unlock(_PyRWMutex *rwmutex)
{
uintptr_t old_bits = _Py_atomic_exchange_uintptr(&rwmutex->bits, 0);
assert((old_bits & _Py_WRITE_LOCKED) && "lock was not write-locked");
assert(rwmutex_reader_count(old_bits) == 0 && "lock was read-locked");
if ((old_bits & _Py_HAS_PARKED) != 0) {
_PyParkingLot_UnparkAll(&rwmutex->bits);
}
}