Merged revisions 87710 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r87710 | gregory.p.smith | 2011-01-03 13:06:12 -0800 (Mon, 03 Jan 2011) | 4 lines

  issue6643 - Two locks held within the threading module on each thread instance
  needed to be reinitialized after fork().  Adds tests to confirm that they are
  and that a potential deadlock and crasher bug are fixed (platform dependant).
........

This required a bit more fiddling for 2.x as __block and __started are __
private as well as the __started Event's __cond.  A new "private"
_reset_internal_locks() method is added to Thread and _Event objects to
address this.
This commit is contained in:
Gregory P. Smith 2011-01-04 01:10:08 +00:00
parent 7247d67e87
commit 2b79a81461
2 changed files with 166 additions and 4 deletions

View File

@ -10,6 +10,8 @@ threading = test.test_support.import_module('threading')
import time
import unittest
import weakref
import os
import subprocess
from test import lock_tests
@ -275,7 +277,6 @@ class ThreadTests(BaseTestCase):
print("test_finalize_with_runnning_thread can't import ctypes")
return # can't do anything
import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, thread
@ -306,7 +307,6 @@ class ThreadTests(BaseTestCase):
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, threading
@ -341,7 +341,6 @@ class ThreadTests(BaseTestCase):
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1:
import threading
from time import sleep
@ -428,7 +427,6 @@ class ThreadJoinOnShutdown(BaseTestCase):
print 'end of thread'
\n""" + script
import subprocess
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().replace('\r', '')
@ -500,6 +498,152 @@ class ThreadJoinOnShutdown(BaseTestCase):
"""
self._run_and_join(script)
def assertScriptHasOutput(self, script, expected_output):
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().decode().replace('\r', '')
self.assertEqual(rc, 0, "Unexpected error")
self.assertEqual(data, expected_output)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_4_joining_across_fork_in_worker_thread(self):
# There used to be a possible deadlock when forking from a child
# thread. See http://bugs.python.org/issue6643.
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
# The script takes the following steps:
# - The main thread in the parent process starts a new thread and then
# tries to join it.
# - The join operation acquires the Lock inside the thread's _block
# Condition. (See threading.py:Thread.join().)
# - We stub out the acquire method on the condition to force it to wait
# until the child thread forks. (See LOCK ACQUIRED HERE)
# - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
# HERE)
# - The main thread of the parent process enters Condition.wait(),
# which releases the lock on the child thread.
# - The child process returns. Without the necessary fix, when the
# main thread of the child process (which used to be the child thread
# in the parent process) attempts to exit, it will try to acquire the
# lock in the Thread._block Condition object and hang, because the
# lock was held across the fork.
script = """if 1:
import os, time, threading
finish_join = False
start_fork = False
def worker():
# Wait until this thread's lock is acquired before forking to
# create the deadlock.
global finish_join
while not start_fork:
time.sleep(0.01)
# LOCK HELD: Main thread holds lock across this call.
childpid = os.fork()
finish_join = True
if childpid != 0:
# Parent process just waits for child.
os.waitpid(childpid, 0)
# Child process should just return.
w = threading.Thread(target=worker)
# Stub out the private condition variable's lock acquire method.
# This acquires the lock and then waits until the child has forked
# before returning, which will release the lock soon after. If
# someone else tries to fix this test case by acquiring this lock
# before forking instead of reseting it, the test case will
# deadlock when it shouldn't.
condition = w._block
orig_acquire = condition.acquire
call_count_lock = threading.Lock()
call_count = 0
def my_acquire():
global call_count
global start_fork
orig_acquire() # LOCK ACQUIRED HERE
start_fork = True
if call_count == 0:
while not finish_join:
time.sleep(0.01) # WORKER THREAD FORKS HERE
with call_count_lock:
call_count += 1
condition.acquire = my_acquire
w.start()
w.join()
print('end of main')
"""
self.assertScriptHasOutput(script, "end of main\n")
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_5_clear_waiter_locks_to_avoid_crash(self):
# Check that a spawned thread that forks doesn't segfault on certain
# platforms, namely OS X. This used to happen if there was a waiter
# lock in the thread's condition variable's waiters list. Even though
# we know the lock will be held across the fork, it is not safe to
# release locks held across forks on all platforms, so releasing the
# waiter lock caused a segfault on OS X. Furthermore, since locks on
# OS X are (as of this writing) implemented with a mutex + condition
# variable instead of a semaphore, while we know that the Python-level
# lock will be acquired, we can't know if the internal mutex will be
# acquired at the time of the fork.
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
script = """if True:
import os, time, threading
start_fork = False
def worker():
# Wait until the main thread has attempted to join this thread
# before continuing.
while not start_fork:
time.sleep(0.01)
childpid = os.fork()
if childpid != 0:
# Parent process just waits for child.
(cpid, rc) = os.waitpid(childpid, 0)
assert cpid == childpid
assert rc == 0
print('end of worker thread')
else:
# Child process should just return.
pass
w = threading.Thread(target=worker)
# Stub out the private condition variable's _release_save method.
# This releases the condition's lock and flips the global that
# causes the worker to fork. At this point, the problematic waiter
# lock has been acquired once by the waiter and has been put onto
# the waiters list.
condition = w._block
orig_release_save = condition._release_save
def my_release_save():
global start_fork
orig_release_save()
# Waiter lock held here, condition lock released.
start_fork = True
condition._release_save = my_release_save
w.start()
w.join()
print('end of main thread')
"""
output = "end of worker thread\nend of main thread\n"
self.assertScriptHasOutput(script, output)
class ThreadingExceptionTests(BaseTestCase):
# A RuntimeError should be raised if Thread.start() is called

View File

@ -373,6 +373,10 @@ class _Event(_Verbose):
self.__cond = Condition(Lock())
self.__flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self.__cond.__init__()
def isSet(self):
return self.__flag
@ -449,6 +453,17 @@ class Thread(_Verbose):
# sys.exc_info since it can be changed between instances
self.__stderr = _sys.stderr
def _reset_internal_locks(self):
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
self.__block.__init__()
self.__started._reset_internal_locks()
@property
def _block(self):
# used by a unittest
return self.__block
def _set_daemon(self):
# Overridden in _MainThread and _DummyThread
return current_thread().daemon
@ -867,6 +882,9 @@ def _after_fork():
# its new value since it can have changed.
ident = _get_ident()
thread._Thread__ident = ident
# Any condition variables hanging off of the active thread may
# be in an invalid state, so we reinitialize them.
thread._reset_internal_locks()
new_active[ident] = thread
else:
# All the others are already stopped.