bpo-40089: Add _at_fork_reinit() method to locks (GH-19195)
Add a private _at_fork_reinit() method to _thread.Lock, _thread.RLock, threading.RLock and threading.Condition classes: reinitialize the lock after fork in the child process; reset the lock to the unlocked state. Rename also the private _reset_internal_locks() method of threading.Event to _at_fork_reinit(). * Add _PyThread_at_fork_reinit() private function. It is excluded from the limited C API. * threading.Thread._reset_internal_locks() now calls _at_fork_reinit() on self._tstate_lock rather than creating a new Python lock object.
This commit is contained in:
parent
48b069a003
commit
87255be696
|
@ -36,6 +36,15 @@ PyAPI_FUNC(int) PyThread_acquire_lock(PyThread_type_lock, int);
|
|||
#define WAIT_LOCK 1
|
||||
#define NOWAIT_LOCK 0
|
||||
|
||||
#ifndef Py_LIMITED_API
|
||||
#ifdef HAVE_FORK
|
||||
/* Private function to reinitialize a lock at fork in the child process.
|
||||
Reset the lock to the unlocked state.
|
||||
Return 0 on success, return -1 on error. */
|
||||
PyAPI_FUNC(int) _PyThread_at_fork_reinit(PyThread_type_lock *lock);
|
||||
#endif /* HAVE_FORK */
|
||||
#endif /* !Py_LIMITED_API */
|
||||
|
||||
/* PY_TIMEOUT_T is the integral type used to specify timeouts when waiting
|
||||
on a lock (see PyThread_acquire_lock_timed() below).
|
||||
PY_TIMEOUT_MAX is the highest usable value (in microseconds) of that
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
Various tests for synchronization primitives.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from _thread import start_new_thread, TIMEOUT_MAX
|
||||
|
@ -12,6 +13,11 @@ import weakref
|
|||
from test import support
|
||||
|
||||
|
||||
requires_fork = unittest.skipUnless(hasattr(os, 'fork'),
|
||||
"platform doesn't support fork "
|
||||
"(no _at_fork_reinit method)")
|
||||
|
||||
|
||||
def _wait():
|
||||
# A crude wait/yield function not relying on synchronization primitives.
|
||||
time.sleep(0.01)
|
||||
|
@ -265,6 +271,25 @@ class LockTests(BaseLockTests):
|
|||
self.assertFalse(lock.locked())
|
||||
self.assertTrue(lock.acquire(blocking=False))
|
||||
|
||||
@requires_fork
|
||||
def test_at_fork_reinit(self):
|
||||
def use_lock(lock):
|
||||
# make sure that the lock still works normally
|
||||
# after _at_fork_reinit()
|
||||
lock.acquire()
|
||||
lock.release()
|
||||
|
||||
# unlocked
|
||||
lock = self.locktype()
|
||||
lock._at_fork_reinit()
|
||||
use_lock(lock)
|
||||
|
||||
# locked: _at_fork_reinit() resets the lock to the unlocked state
|
||||
lock2 = self.locktype()
|
||||
lock2.acquire()
|
||||
lock2._at_fork_reinit()
|
||||
use_lock(lock2)
|
||||
|
||||
|
||||
class RLockTests(BaseLockTests):
|
||||
"""
|
||||
|
@ -417,12 +442,13 @@ class EventTests(BaseTestCase):
|
|||
b.wait_for_finished()
|
||||
self.assertEqual(results, [True] * N)
|
||||
|
||||
def test_reset_internal_locks(self):
|
||||
@requires_fork
|
||||
def test_at_fork_reinit(self):
|
||||
# ensure that condition is still using a Lock after reset
|
||||
evt = self.eventtype()
|
||||
with evt._cond:
|
||||
self.assertFalse(evt._cond.acquire(False))
|
||||
evt._reset_internal_locks()
|
||||
evt._at_fork_reinit()
|
||||
with evt._cond:
|
||||
self.assertFalse(evt._cond.acquire(False))
|
||||
|
||||
|
|
|
@ -123,6 +123,11 @@ class _RLock:
|
|||
hex(id(self))
|
||||
)
|
||||
|
||||
def _at_fork_reinit(self):
|
||||
self._block._at_fork_reinit()
|
||||
self._owner = None
|
||||
self._count = 0
|
||||
|
||||
def acquire(self, blocking=True, timeout=-1):
|
||||
"""Acquire a lock, blocking or non-blocking.
|
||||
|
||||
|
@ -245,6 +250,10 @@ class Condition:
|
|||
pass
|
||||
self._waiters = _deque()
|
||||
|
||||
def _at_fork_reinit(self):
|
||||
self._lock._at_fork_reinit()
|
||||
self._waiters.clear()
|
||||
|
||||
def __enter__(self):
|
||||
return self._lock.__enter__()
|
||||
|
||||
|
@ -514,9 +523,9 @@ class Event:
|
|||
self._cond = Condition(Lock())
|
||||
self._flag = False
|
||||
|
||||
def _reset_internal_locks(self):
|
||||
# private! called by Thread._reset_internal_locks by _after_fork()
|
||||
self._cond.__init__(Lock())
|
||||
def _at_fork_reinit(self):
|
||||
# Private method called by Thread._reset_internal_locks()
|
||||
self._cond._at_fork_reinit()
|
||||
|
||||
def is_set(self):
|
||||
"""Return true if and only if the internal flag is true."""
|
||||
|
@ -816,9 +825,10 @@ class Thread:
|
|||
def _reset_internal_locks(self, is_alive):
|
||||
# private! Called by _after_fork() to reset our internal locks as
|
||||
# they may be in an invalid state leading to a deadlock or crash.
|
||||
self._started._reset_internal_locks()
|
||||
self._started._at_fork_reinit()
|
||||
if is_alive:
|
||||
self._set_tstate_lock()
|
||||
self._tstate_lock._at_fork_reinit()
|
||||
self._tstate_lock.acquire()
|
||||
else:
|
||||
# The thread isn't alive after fork: it doesn't have a tstate
|
||||
# anymore.
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
Add a private ``_at_fork_reinit()`` method to :class:`_thread.Lock`,
|
||||
:class:`_thread.RLock`, :class:`threading.RLock` and
|
||||
:class:`threading.Condition` classes: reinitialize the lock at fork in the
|
||||
child process, reset the lock to the unlocked state.
|
||||
Rename also the private ``_reset_internal_locks()`` method of
|
||||
:class:`threading.Event` to ``_at_fork_reinit()``.
|
|
@ -213,6 +213,22 @@ lock_repr(lockobject *self)
|
|||
self->locked ? "locked" : "unlocked", Py_TYPE(self)->tp_name, self);
|
||||
}
|
||||
|
||||
#ifdef HAVE_FORK
|
||||
static PyObject *
|
||||
lock__at_fork_reinit(lockobject *self, PyObject *Py_UNUSED(args))
|
||||
{
|
||||
if (_PyThread_at_fork_reinit(&self->lock_lock) < 0) {
|
||||
PyErr_SetString(ThreadError, "failed to reinitialize lock at fork");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
self->locked = 0;
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
#endif /* HAVE_FORK */
|
||||
|
||||
|
||||
static PyMethodDef lock_methods[] = {
|
||||
{"acquire_lock", (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
|
||||
METH_VARARGS | METH_KEYWORDS, acquire_doc},
|
||||
|
@ -230,6 +246,10 @@ static PyMethodDef lock_methods[] = {
|
|||
METH_VARARGS | METH_KEYWORDS, acquire_doc},
|
||||
{"__exit__", (PyCFunction)lock_PyThread_release_lock,
|
||||
METH_VARARGS, release_doc},
|
||||
#ifdef HAVE_FORK
|
||||
{"_at_fork_reinit", (PyCFunction)lock__at_fork_reinit,
|
||||
METH_NOARGS, NULL},
|
||||
#endif
|
||||
{NULL, NULL} /* sentinel */
|
||||
};
|
||||
|
||||
|
@ -446,22 +466,20 @@ For internal use by `threading.Condition`.");
|
|||
static PyObject *
|
||||
rlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
|
||||
{
|
||||
rlockobject *self;
|
||||
|
||||
self = (rlockobject *) type->tp_alloc(type, 0);
|
||||
if (self != NULL) {
|
||||
self->in_weakreflist = NULL;
|
||||
self->rlock_owner = 0;
|
||||
self->rlock_count = 0;
|
||||
|
||||
self->rlock_lock = PyThread_allocate_lock();
|
||||
if (self->rlock_lock == NULL) {
|
||||
Py_DECREF(self);
|
||||
PyErr_SetString(ThreadError, "can't allocate lock");
|
||||
return NULL;
|
||||
}
|
||||
rlockobject *self = (rlockobject *) type->tp_alloc(type, 0);
|
||||
if (self == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
self->in_weakreflist = NULL;
|
||||
self->rlock_owner = 0;
|
||||
self->rlock_count = 0;
|
||||
|
||||
self->rlock_lock = PyThread_allocate_lock();
|
||||
if (self->rlock_lock == NULL) {
|
||||
Py_DECREF(self);
|
||||
PyErr_SetString(ThreadError, "can't allocate lock");
|
||||
return NULL;
|
||||
}
|
||||
return (PyObject *) self;
|
||||
}
|
||||
|
||||
|
@ -475,6 +493,23 @@ rlock_repr(rlockobject *self)
|
|||
}
|
||||
|
||||
|
||||
#ifdef HAVE_FORK
|
||||
static PyObject *
|
||||
rlock__at_fork_reinit(rlockobject *self, PyObject *Py_UNUSED(args))
|
||||
{
|
||||
if (_PyThread_at_fork_reinit(&self->rlock_lock) < 0) {
|
||||
PyErr_SetString(ThreadError, "failed to reinitialize lock at fork");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
self->rlock_owner = 0;
|
||||
self->rlock_count = 0;
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
#endif /* HAVE_FORK */
|
||||
|
||||
|
||||
static PyMethodDef rlock_methods[] = {
|
||||
{"acquire", (PyCFunction)(void(*)(void))rlock_acquire,
|
||||
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
|
||||
|
@ -490,6 +525,10 @@ static PyMethodDef rlock_methods[] = {
|
|||
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
|
||||
{"__exit__", (PyCFunction)rlock_release,
|
||||
METH_VARARGS, rlock_release_doc},
|
||||
#ifdef HAVE_FORK
|
||||
{"_at_fork_reinit", (PyCFunction)rlock__at_fork_reinit,
|
||||
METH_NOARGS, NULL},
|
||||
#endif
|
||||
{NULL, NULL} /* sentinel */
|
||||
};
|
||||
|
||||
|
|
|
@ -491,7 +491,8 @@ register_at_forker(PyObject **lst, PyObject *func)
|
|||
}
|
||||
return PyList_Append(*lst, func);
|
||||
}
|
||||
#endif
|
||||
#endif /* HAVE_FORK */
|
||||
|
||||
|
||||
/* Legacy wrapper */
|
||||
void
|
||||
|
|
|
@ -693,6 +693,26 @@ PyThread_release_lock(PyThread_type_lock lock)
|
|||
|
||||
#endif /* USE_SEMAPHORES */
|
||||
|
||||
int
|
||||
_PyThread_at_fork_reinit(PyThread_type_lock *lock)
|
||||
{
|
||||
PyThread_type_lock new_lock = PyThread_allocate_lock();
|
||||
if (new_lock == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* bpo-6721, bpo-40089: The old lock can be in an inconsistent state.
|
||||
fork() can be called in the middle of an operation on the lock done by
|
||||
another thread. So don't call PyThread_free_lock(*lock).
|
||||
|
||||
Leak memory on purpose. Don't release the memory either since the
|
||||
address of a mutex is relevant. Putting two mutexes at the same address
|
||||
can lead to problems. */
|
||||
|
||||
*lock = new_lock;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue