issue6643 - Two locks held within the threading module on each thread instance
needed to be reinitialized after fork(). Adds tests to confirm that they are and that a potential deadlock and crasher bug are fixed (platform dependant).
This commit is contained in:
parent
68530ac3c0
commit
96c886ce96
|
@ -11,6 +11,7 @@ import time
|
|||
import unittest
|
||||
import weakref
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from test import lock_tests
|
||||
|
||||
|
@ -272,7 +273,6 @@ class ThreadTests(BaseTestCase):
|
|||
except ImportError:
|
||||
raise unittest.SkipTest("cannot import ctypes")
|
||||
|
||||
import subprocess
|
||||
rc = subprocess.call([sys.executable, "-c", """if 1:
|
||||
import ctypes, sys, time, _thread
|
||||
|
||||
|
@ -303,7 +303,6 @@ class ThreadTests(BaseTestCase):
|
|||
def test_finalize_with_trace(self):
|
||||
# Issue1733757
|
||||
# Avoid a deadlock when sys.settrace steps into threading._shutdown
|
||||
import subprocess
|
||||
p = subprocess.Popen([sys.executable, "-c", """if 1:
|
||||
import sys, threading
|
||||
|
||||
|
@ -338,7 +337,6 @@ class ThreadTests(BaseTestCase):
|
|||
def test_join_nondaemon_on_shutdown(self):
|
||||
# Issue 1722344
|
||||
# Raising SystemExit skipped threading._shutdown
|
||||
import subprocess
|
||||
p = subprocess.Popen([sys.executable, "-c", """if 1:
|
||||
import threading
|
||||
from time import sleep
|
||||
|
@ -445,7 +443,6 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
|||
sys.stdout.flush()
|
||||
\n""" + script
|
||||
|
||||
import subprocess
|
||||
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
|
||||
rc = p.wait()
|
||||
data = p.stdout.read().decode().replace('\r', '')
|
||||
|
@ -512,6 +509,152 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
|||
"""
|
||||
self._run_and_join(script)
|
||||
|
||||
def assertScriptHasOutput(self, script, expected_output):
|
||||
p = subprocess.Popen([sys.executable, "-c", script],
|
||||
stdout=subprocess.PIPE)
|
||||
rc = p.wait()
|
||||
data = p.stdout.read().decode().replace('\r', '')
|
||||
self.assertEqual(rc, 0, "Unexpected error")
|
||||
self.assertEqual(data, expected_output)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
||||
def test_4_joining_across_fork_in_worker_thread(self):
|
||||
# There used to be a possible deadlock when forking from a child
|
||||
# thread. See http://bugs.python.org/issue6643.
|
||||
|
||||
# Skip platforms with known problems forking from a worker thread.
|
||||
# See http://bugs.python.org/issue3863.
|
||||
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
|
||||
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
|
||||
|
||||
# The script takes the following steps:
|
||||
# - The main thread in the parent process starts a new thread and then
|
||||
# tries to join it.
|
||||
# - The join operation acquires the Lock inside the thread's _block
|
||||
# Condition. (See threading.py:Thread.join().)
|
||||
# - We stub out the acquire method on the condition to force it to wait
|
||||
# until the child thread forks. (See LOCK ACQUIRED HERE)
|
||||
# - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
|
||||
# HERE)
|
||||
# - The main thread of the parent process enters Condition.wait(),
|
||||
# which releases the lock on the child thread.
|
||||
# - The child process returns. Without the necessary fix, when the
|
||||
# main thread of the child process (which used to be the child thread
|
||||
# in the parent process) attempts to exit, it will try to acquire the
|
||||
# lock in the Thread._block Condition object and hang, because the
|
||||
# lock was held across the fork.
|
||||
|
||||
script = """if 1:
|
||||
import os, time, threading
|
||||
|
||||
finish_join = False
|
||||
start_fork = False
|
||||
|
||||
def worker():
|
||||
# Wait until this thread's lock is acquired before forking to
|
||||
# create the deadlock.
|
||||
global finish_join
|
||||
while not start_fork:
|
||||
time.sleep(0.01)
|
||||
# LOCK HELD: Main thread holds lock across this call.
|
||||
childpid = os.fork()
|
||||
finish_join = True
|
||||
if childpid != 0:
|
||||
# Parent process just waits for child.
|
||||
os.waitpid(childpid, 0)
|
||||
# Child process should just return.
|
||||
|
||||
w = threading.Thread(target=worker)
|
||||
|
||||
# Stub out the private condition variable's lock acquire method.
|
||||
# This acquires the lock and then waits until the child has forked
|
||||
# before returning, which will release the lock soon after. If
|
||||
# someone else tries to fix this test case by acquiring this lock
|
||||
# before forking instead of reseting it, the test case will
|
||||
# deadlock when it shouldn't.
|
||||
condition = w._block
|
||||
orig_acquire = condition.acquire
|
||||
call_count_lock = threading.Lock()
|
||||
call_count = 0
|
||||
def my_acquire():
|
||||
global call_count
|
||||
global start_fork
|
||||
orig_acquire() # LOCK ACQUIRED HERE
|
||||
start_fork = True
|
||||
if call_count == 0:
|
||||
while not finish_join:
|
||||
time.sleep(0.01) # WORKER THREAD FORKS HERE
|
||||
with call_count_lock:
|
||||
call_count += 1
|
||||
condition.acquire = my_acquire
|
||||
|
||||
w.start()
|
||||
w.join()
|
||||
print('end of main')
|
||||
"""
|
||||
self.assertScriptHasOutput(script, "end of main\n")
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
||||
def test_5_clear_waiter_locks_to_avoid_crash(self):
|
||||
# Check that a spawned thread that forks doesn't segfault on certain
|
||||
# platforms, namely OS X. This used to happen if there was a waiter
|
||||
# lock in the thread's condition variable's waiters list. Even though
|
||||
# we know the lock will be held across the fork, it is not safe to
|
||||
# release locks held across forks on all platforms, so releasing the
|
||||
# waiter lock caused a segfault on OS X. Furthermore, since locks on
|
||||
# OS X are (as of this writing) implemented with a mutex + condition
|
||||
# variable instead of a semaphore, while we know that the Python-level
|
||||
# lock will be acquired, we can't know if the internal mutex will be
|
||||
# acquired at the time of the fork.
|
||||
|
||||
# Skip platforms with known problems forking from a worker thread.
|
||||
# See http://bugs.python.org/issue3863.
|
||||
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
|
||||
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
|
||||
script = """if True:
|
||||
import os, time, threading
|
||||
|
||||
start_fork = False
|
||||
|
||||
def worker():
|
||||
# Wait until the main thread has attempted to join this thread
|
||||
# before continuing.
|
||||
while not start_fork:
|
||||
time.sleep(0.01)
|
||||
childpid = os.fork()
|
||||
if childpid != 0:
|
||||
# Parent process just waits for child.
|
||||
(cpid, rc) = os.waitpid(childpid, 0)
|
||||
assert cpid == childpid
|
||||
assert rc == 0
|
||||
print('end of worker thread')
|
||||
else:
|
||||
# Child process should just return.
|
||||
pass
|
||||
|
||||
w = threading.Thread(target=worker)
|
||||
|
||||
# Stub out the private condition variable's _release_save method.
|
||||
# This releases the condition's lock and flips the global that
|
||||
# causes the worker to fork. At this point, the problematic waiter
|
||||
# lock has been acquired once by the waiter and has been put onto
|
||||
# the waiters list.
|
||||
condition = w._block
|
||||
orig_release_save = condition._release_save
|
||||
def my_release_save():
|
||||
global start_fork
|
||||
orig_release_save()
|
||||
# Waiter lock held here, condition lock released.
|
||||
start_fork = True
|
||||
condition._release_save = my_release_save
|
||||
|
||||
w.start()
|
||||
w.join()
|
||||
print('end of main thread')
|
||||
"""
|
||||
output = "end of worker thread\nend of main thread\n"
|
||||
self.assertScriptHasOutput(script, output)
|
||||
|
||||
|
||||
class ThreadingExceptionTests(BaseTestCase):
|
||||
# A RuntimeError should be raised if Thread.start() is called
|
||||
|
|
|
@ -1064,6 +1064,10 @@ def _after_fork():
|
|||
# its new value since it can have changed.
|
||||
ident = _get_ident()
|
||||
thread._ident = ident
|
||||
# Any condition variables hanging off of the active thread may
|
||||
# be in an invalid state, so we reinitialize them.
|
||||
thread._block.__init__()
|
||||
thread._started._cond.__init__()
|
||||
new_active[ident] = thread
|
||||
else:
|
||||
# All the others are already stopped.
|
||||
|
|
Loading…
Reference in New Issue