bpo-36533: Reinit logging.Handler locks on fork(). (GH-12704)
Instead of attempting to acquire and release them all across fork which was leading to deadlocks in some applications that had chained their own handlers while holding multiple locks.
This commit is contained in:
parent
e85ef7a7ea
commit
64aa6d2000
|
@ -231,49 +231,38 @@ def _releaseLock():
|
||||||
# Prevent a held logging lock from blocking a child from logging.
|
# Prevent a held logging lock from blocking a child from logging.
|
||||||
|
|
||||||
if not hasattr(os, 'register_at_fork'): # Windows and friends.
|
if not hasattr(os, 'register_at_fork'): # Windows and friends.
|
||||||
def _register_at_fork_acquire_release(instance):
|
def _register_at_fork_reinit_lock(instance):
|
||||||
pass # no-op when os.register_at_fork does not exist.
|
pass # no-op when os.register_at_fork does not exist.
|
||||||
else: # The os.register_at_fork API exists
|
else:
|
||||||
os.register_at_fork(before=_acquireLock,
|
# A collection of instances with a createLock method (logging.Handler)
|
||||||
after_in_child=_releaseLock,
|
# to be called in the child after forking. The weakref avoids us keeping
|
||||||
after_in_parent=_releaseLock)
|
# discarded Handler instances alive. A set is used to avoid accumulating
|
||||||
|
# duplicate registrations as createLock() is responsible for registering
|
||||||
|
# a new Handler instance with this set in the first place.
|
||||||
|
_at_fork_reinit_lock_weakset = weakref.WeakSet()
|
||||||
|
|
||||||
# A collection of instances with acquire and release methods (logging.Handler)
|
def _register_at_fork_reinit_lock(instance):
|
||||||
# to be called before and after fork. The weakref avoids us keeping discarded
|
_acquireLock()
|
||||||
# Handler instances alive forever in case an odd program creates and destroys
|
try:
|
||||||
# many over its lifetime.
|
_at_fork_reinit_lock_weakset.add(instance)
|
||||||
_at_fork_acquire_release_weakset = weakref.WeakSet()
|
finally:
|
||||||
|
_releaseLock()
|
||||||
|
|
||||||
|
def _after_at_fork_child_reinit_locks():
|
||||||
def _register_at_fork_acquire_release(instance):
|
# _acquireLock() was called in the parent before forking.
|
||||||
# We put the instance itself in a single WeakSet as we MUST have only
|
for handler in _at_fork_reinit_lock_weakset:
|
||||||
# one atomic weak ref. used by both before and after atfork calls to
|
|
||||||
# guarantee matched pairs of acquire and release calls.
|
|
||||||
_at_fork_acquire_release_weakset.add(instance)
|
|
||||||
|
|
||||||
|
|
||||||
def _at_fork_weak_calls(method_name):
|
|
||||||
for instance in _at_fork_acquire_release_weakset:
|
|
||||||
method = getattr(instance, method_name)
|
|
||||||
try:
|
try:
|
||||||
method()
|
handler.createLock()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# Similar to what PyErr_WriteUnraisable does.
|
# Similar to what PyErr_WriteUnraisable does.
|
||||||
print("Ignoring exception from logging atfork", instance,
|
print("Ignoring exception from logging atfork", instance,
|
||||||
method_name, "method:", err, file=sys.stderr)
|
"._reinit_lock() method:", err, file=sys.stderr)
|
||||||
|
_releaseLock() # Acquired by os.register_at_fork(before=.
|
||||||
|
|
||||||
|
|
||||||
def _before_at_fork_weak_calls():
|
os.register_at_fork(before=_acquireLock,
|
||||||
_at_fork_weak_calls('acquire')
|
after_in_child=_after_at_fork_child_reinit_locks,
|
||||||
|
after_in_parent=_releaseLock)
|
||||||
|
|
||||||
def _after_at_fork_weak_calls():
|
|
||||||
_at_fork_weak_calls('release')
|
|
||||||
|
|
||||||
|
|
||||||
os.register_at_fork(before=_before_at_fork_weak_calls,
|
|
||||||
after_in_child=_after_at_fork_weak_calls,
|
|
||||||
after_in_parent=_after_at_fork_weak_calls)
|
|
||||||
|
|
||||||
|
|
||||||
#---------------------------------------------------------------------------
|
#---------------------------------------------------------------------------
|
||||||
|
@ -900,7 +889,7 @@ class Handler(Filterer):
|
||||||
Acquire a thread lock for serializing access to the underlying I/O.
|
Acquire a thread lock for serializing access to the underlying I/O.
|
||||||
"""
|
"""
|
||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
_register_at_fork_acquire_release(self)
|
_register_at_fork_reinit_lock(self)
|
||||||
|
|
||||||
def acquire(self):
|
def acquire(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -668,10 +668,28 @@ class HandlerTest(BaseTest):
|
||||||
# register_at_fork mechanism is also present and used.
|
# register_at_fork mechanism is also present and used.
|
||||||
@unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
|
@unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
|
||||||
def test_post_fork_child_no_deadlock(self):
|
def test_post_fork_child_no_deadlock(self):
|
||||||
"""Ensure forked child logging locks are not held; bpo-6721."""
|
"""Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
|
||||||
refed_h = logging.Handler()
|
class _OurHandler(logging.Handler):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.sub_handler = logging.StreamHandler(
|
||||||
|
stream=open('/dev/null', 'wt'))
|
||||||
|
|
||||||
|
def emit(self, record):
|
||||||
|
self.sub_handler.acquire()
|
||||||
|
try:
|
||||||
|
self.sub_handler.emit(record)
|
||||||
|
finally:
|
||||||
|
self.sub_handler.release()
|
||||||
|
|
||||||
|
self.assertEqual(len(logging._handlers), 0)
|
||||||
|
refed_h = _OurHandler()
|
||||||
refed_h.name = 'because we need at least one for this test'
|
refed_h.name = 'because we need at least one for this test'
|
||||||
self.assertGreater(len(logging._handlers), 0)
|
self.assertGreater(len(logging._handlers), 0)
|
||||||
|
self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
|
||||||
|
test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
|
||||||
|
test_logger.addHandler(refed_h)
|
||||||
|
test_logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
locks_held__ready_to_fork = threading.Event()
|
locks_held__ready_to_fork = threading.Event()
|
||||||
fork_happened__release_locks_and_end_thread = threading.Event()
|
fork_happened__release_locks_and_end_thread = threading.Event()
|
||||||
|
@ -709,19 +727,24 @@ class HandlerTest(BaseTest):
|
||||||
locks_held__ready_to_fork.wait()
|
locks_held__ready_to_fork.wait()
|
||||||
pid = os.fork()
|
pid = os.fork()
|
||||||
if pid == 0: # Child.
|
if pid == 0: # Child.
|
||||||
logging.error(r'Child process did not deadlock. \o/')
|
try:
|
||||||
os._exit(0)
|
test_logger.info(r'Child process did not deadlock. \o/')
|
||||||
|
finally:
|
||||||
|
os._exit(0)
|
||||||
else: # Parent.
|
else: # Parent.
|
||||||
|
test_logger.info(r'Parent process returned from fork. \o/')
|
||||||
fork_happened__release_locks_and_end_thread.set()
|
fork_happened__release_locks_and_end_thread.set()
|
||||||
lock_holder_thread.join()
|
lock_holder_thread.join()
|
||||||
start_time = time.monotonic()
|
start_time = time.monotonic()
|
||||||
while True:
|
while True:
|
||||||
|
test_logger.debug('Waiting for child process.')
|
||||||
waited_pid, status = os.waitpid(pid, os.WNOHANG)
|
waited_pid, status = os.waitpid(pid, os.WNOHANG)
|
||||||
if waited_pid == pid:
|
if waited_pid == pid:
|
||||||
break # child process exited.
|
break # child process exited.
|
||||||
if time.monotonic() - start_time > 7:
|
if time.monotonic() - start_time > 7:
|
||||||
break # so long? implies child deadlock.
|
break # so long? implies child deadlock.
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
|
test_logger.debug('Done waiting.')
|
||||||
if waited_pid != pid:
|
if waited_pid != pid:
|
||||||
os.kill(pid, signal.SIGKILL)
|
os.kill(pid, signal.SIGKILL)
|
||||||
waited_pid, status = os.waitpid(pid, 0)
|
waited_pid, status = os.waitpid(pid, 0)
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
Reinitialize logging.Handler locks in forked child processes instead of
|
||||||
|
attempting to acquire them all in the parent before forking only to be
|
||||||
|
released in the child process. The acquire/release pattern was leading to
|
||||||
|
deadlocks in code that has implemented any form of chained logging handlers
|
||||||
|
that depend upon one another as the lock acquision order cannot be
|
||||||
|
guaranteed.
|
Loading…
Reference in New Issue