mirror of https://github.com/python/cpython
(merge 3.2) Issue #12469: Run wakeup and pending signal tests in a subprocess
to run the test in a fresh process with only one thread and to not change signal handling of the parent process.
This commit is contained in:
commit
d554cdf8b9
|
@ -224,117 +224,115 @@ class WindowsSignalTests(unittest.TestCase):
|
|||
|
||||
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
|
||||
class WakeupSignalTests(unittest.TestCase):
|
||||
TIMEOUT_FULL = 10
|
||||
TIMEOUT_HALF = 5
|
||||
def check_wakeup(self, test_body, *signals):
|
||||
# use a subprocess to have only one thread
|
||||
code = """if 1:
|
||||
import fcntl
|
||||
import os
|
||||
import signal
|
||||
import struct
|
||||
|
||||
def handler(self, signum, frame):
|
||||
pass
|
||||
signals = {!r}
|
||||
|
||||
def check_signum(self, *signals):
|
||||
data = os.read(self.read, len(signals)+1)
|
||||
raised = struct.unpack('%uB' % len(data), data)
|
||||
# We don't care of the signal delivery order (it's not portable or
|
||||
# reliable)
|
||||
raised = set(raised)
|
||||
signals = set(signals)
|
||||
self.assertEqual(raised, signals)
|
||||
def handler(signum, frame):
|
||||
pass
|
||||
|
||||
def check_signum(signals):
|
||||
data = os.read(read, len(signals)+1)
|
||||
raised = struct.unpack('%uB' % len(data), data)
|
||||
# We don't care of the signal delivery order (it's not portable or
|
||||
# reliable)
|
||||
raised = set(raised)
|
||||
signals = set(signals)
|
||||
assert raised == signals, "%r != %r" % (raised, signals)
|
||||
|
||||
{}
|
||||
|
||||
signal.signal(signal.SIGALRM, handler)
|
||||
read, write = os.pipe()
|
||||
for fd in (read, write):
|
||||
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
|
||||
flags = flags | os.O_NONBLOCK
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
|
||||
signal.set_wakeup_fd(write)
|
||||
|
||||
test()
|
||||
check_signum(signals)
|
||||
|
||||
os.close(read)
|
||||
os.close(write)
|
||||
""".format(signals, test_body)
|
||||
|
||||
assert_python_ok('-c', code)
|
||||
|
||||
def test_wakeup_fd_early(self):
|
||||
import select
|
||||
self.check_wakeup("""def test():
|
||||
import select
|
||||
import time
|
||||
|
||||
signal.alarm(1)
|
||||
before_time = time.time()
|
||||
# We attempt to get a signal during the sleep,
|
||||
# before select is called
|
||||
time.sleep(self.TIMEOUT_FULL)
|
||||
mid_time = time.time()
|
||||
self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
|
||||
select.select([self.read], [], [], self.TIMEOUT_FULL)
|
||||
after_time = time.time()
|
||||
self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
|
||||
self.check_signum(signal.SIGALRM)
|
||||
TIMEOUT_FULL = 10
|
||||
TIMEOUT_HALF = 5
|
||||
|
||||
signal.alarm(1)
|
||||
before_time = time.time()
|
||||
# We attempt to get a signal during the sleep,
|
||||
# before select is called
|
||||
time.sleep(TIMEOUT_FULL)
|
||||
mid_time = time.time()
|
||||
dt = mid_time - before_time
|
||||
assert dt < TIMEOUT_HALF, dt
|
||||
select.select([read], [], [], TIMEOUT_FULL)
|
||||
after_time = time.time()
|
||||
dt = after_time - mid_time
|
||||
assert dt < TIMEOUT_HALF, dt
|
||||
""", signal.SIGALRM)
|
||||
|
||||
def test_wakeup_fd_during(self):
|
||||
import select
|
||||
self.check_wakeup("""def test():
|
||||
import select
|
||||
import time
|
||||
|
||||
signal.alarm(1)
|
||||
before_time = time.time()
|
||||
# We attempt to get a signal during the select call
|
||||
self.assertRaises(select.error, select.select,
|
||||
[self.read], [], [], self.TIMEOUT_FULL)
|
||||
after_time = time.time()
|
||||
self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
|
||||
self.check_signum(signal.SIGALRM)
|
||||
TIMEOUT_FULL = 10
|
||||
TIMEOUT_HALF = 5
|
||||
|
||||
signal.alarm(1)
|
||||
before_time = time.time()
|
||||
# We attempt to get a signal during the select call
|
||||
try:
|
||||
select.select([read], [], [], TIMEOUT_FULL)
|
||||
except select.error:
|
||||
pass
|
||||
else:
|
||||
raise Exception("select.error not raised")
|
||||
after_time = time.time()
|
||||
dt = after_time - before_time
|
||||
assert dt < TIMEOUT_HALF, dt
|
||||
""", signal.SIGALRM)
|
||||
|
||||
def test_signum(self):
|
||||
old_handler = signal.signal(signal.SIGUSR1, self.handler)
|
||||
self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
|
||||
os.kill(os.getpid(), signal.SIGUSR1)
|
||||
os.kill(os.getpid(), signal.SIGALRM)
|
||||
self.check_signum(signal.SIGUSR1, signal.SIGALRM)
|
||||
self.check_wakeup("""def test():
|
||||
signal.signal(signal.SIGUSR1, handler)
|
||||
os.kill(os.getpid(), signal.SIGUSR1)
|
||||
os.kill(os.getpid(), signal.SIGALRM)
|
||||
""", signal.SIGUSR1, signal.SIGALRM)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
|
||||
'need signal.pthread_sigmask()')
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
|
||||
'need signal.pthread_kill()')
|
||||
def test_pending(self):
|
||||
signum1 = signal.SIGUSR1
|
||||
signum2 = signal.SIGUSR2
|
||||
tid = threading.current_thread().ident
|
||||
self.check_wakeup("""def test():
|
||||
signum1 = signal.SIGUSR1
|
||||
signum2 = signal.SIGUSR2
|
||||
|
||||
old_handler = signal.signal(signum1, self.handler)
|
||||
self.addCleanup(signal.signal, signum1, old_handler)
|
||||
old_handler = signal.signal(signum2, self.handler)
|
||||
self.addCleanup(signal.signal, signum2, old_handler)
|
||||
signal.signal(signum1, handler)
|
||||
signal.signal(signum2, handler)
|
||||
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
|
||||
signal.pthread_kill(tid, signum1)
|
||||
signal.pthread_kill(tid, signum2)
|
||||
# Unblocking the 2 signals calls the C signal handler twice
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
|
||||
os.kill(os.getpid(), signum1)
|
||||
os.kill(os.getpid(), signum2)
|
||||
# Unblocking the 2 signals calls the C signal handler twice
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
|
||||
""", signal.SIGUSR1, signal.SIGUSR2)
|
||||
|
||||
self.check_signum(signum1, signum2)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
|
||||
'need signal.pthread_kill()')
|
||||
def test_pthread_kill_main_thread(self):
|
||||
# Test that a signal can be sent to the main thread with pthread_kill()
|
||||
# before any other thread has been created (see issue #12392).
|
||||
code = """if True:
|
||||
import threading
|
||||
import signal
|
||||
import sys
|
||||
|
||||
def handler(signum, frame):
|
||||
sys.exit(3)
|
||||
|
||||
signal.signal(signal.SIGUSR1, handler)
|
||||
signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
|
||||
sys.exit(1)
|
||||
"""
|
||||
|
||||
with spawn_python('-c', code) as process:
|
||||
stdout, stderr = process.communicate()
|
||||
exitcode = process.wait()
|
||||
if exitcode != 3:
|
||||
raise Exception("Child error (exit code %s): %s" %
|
||||
(exitcode, stdout))
|
||||
|
||||
def setUp(self):
|
||||
import fcntl
|
||||
|
||||
self.alrm = signal.signal(signal.SIGALRM, self.handler)
|
||||
self.read, self.write = os.pipe()
|
||||
flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
|
||||
flags = flags | os.O_NONBLOCK
|
||||
fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
|
||||
self.old_wakeup = signal.set_wakeup_fd(self.write)
|
||||
|
||||
def tearDown(self):
|
||||
signal.set_wakeup_fd(self.old_wakeup)
|
||||
os.close(self.read)
|
||||
os.close(self.write)
|
||||
signal.signal(signal.SIGALRM, self.alrm)
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
|
||||
class SiginterruptTest(unittest.TestCase):
|
||||
|
@ -519,60 +517,6 @@ class PendingSignalsTests(unittest.TestCase):
|
|||
Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
|
||||
functions.
|
||||
"""
|
||||
def setUp(self):
|
||||
self.has_pthread_kill = hasattr(signal, 'pthread_kill')
|
||||
|
||||
def handler(self, signum, frame):
|
||||
1/0
|
||||
|
||||
def read_sigmask(self):
|
||||
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
|
||||
|
||||
def can_test_blocked_signals(self, skip):
|
||||
"""
|
||||
Check if a blocked signal can be raised to the main thread without
|
||||
calling its signal handler. We need pthread_kill() or exactly one
|
||||
thread (the main thread).
|
||||
|
||||
Return True if it's possible. Otherwise, return False and print a
|
||||
warning if skip is False, or raise a SkipTest exception if skip is
|
||||
True.
|
||||
"""
|
||||
if self.has_pthread_kill:
|
||||
return True
|
||||
|
||||
# The fault handler timeout thread masks all signals. If the main
|
||||
# thread masks also SIGUSR1, all threads mask this signal. In this
|
||||
# case, if we send SIGUSR1 to the process, the signal is pending in the
|
||||
# main or the faulthandler timeout thread. Unblock SIGUSR1 in the main
|
||||
# thread calls the signal handler only if the signal is pending for the
|
||||
# main thread. Stop the faulthandler timeout thread to workaround this
|
||||
# problem.
|
||||
import faulthandler
|
||||
faulthandler.cancel_dump_tracebacks_later()
|
||||
|
||||
# Issue #11998: The _tkinter module loads the Tcl library which
|
||||
# creates a thread waiting events in select(). This thread receives
|
||||
# signals blocked by all other threads. We cannot test blocked
|
||||
# signals
|
||||
if '_tkinter' in sys.modules:
|
||||
message = ("_tkinter is loaded and pthread_kill() is missing, "
|
||||
"cannot test blocked signals (issue #11998)")
|
||||
if skip:
|
||||
self.skipTest(message)
|
||||
else:
|
||||
print("WARNING: %s" % message)
|
||||
return False
|
||||
return True
|
||||
|
||||
def kill(self, signum):
|
||||
if self.has_pthread_kill:
|
||||
tid = threading.get_ident()
|
||||
signal.pthread_kill(tid, signum)
|
||||
else:
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signum)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigpending'),
|
||||
'need signal.sigpending()')
|
||||
def test_sigpending_empty(self):
|
||||
|
@ -583,70 +527,103 @@ class PendingSignalsTests(unittest.TestCase):
|
|||
@unittest.skipUnless(hasattr(signal, 'sigpending'),
|
||||
'need signal.sigpending()')
|
||||
def test_sigpending(self):
|
||||
self.can_test_blocked_signals(True)
|
||||
code = """if 1:
|
||||
import os
|
||||
import signal
|
||||
|
||||
signum = signal.SIGUSR1
|
||||
old_handler = signal.signal(signum, self.handler)
|
||||
self.addCleanup(signal.signal, signum, old_handler)
|
||||
def handler(signum, frame):
|
||||
1/0
|
||||
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
|
||||
self.kill(signum)
|
||||
self.assertEqual(signal.sigpending(), {signum})
|
||||
with self.assertRaises(ZeroDivisionError):
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
|
||||
signum = signal.SIGUSR1
|
||||
signal.signal(signum, handler)
|
||||
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
|
||||
os.kill(os.getpid(), signum)
|
||||
pending = signal.sigpending()
|
||||
assert pending == {signum}, '%s != {%s}' % (pending, signum)
|
||||
try:
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
|
||||
except ZeroDivisionError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("ZeroDivisionError not raised")
|
||||
"""
|
||||
assert_python_ok('-c', code)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
|
||||
'need signal.pthread_kill()')
|
||||
def test_pthread_kill(self):
|
||||
signum = signal.SIGUSR1
|
||||
current = threading.get_ident()
|
||||
code = """if 1:
|
||||
import signal
|
||||
import threading
|
||||
import sys
|
||||
|
||||
old_handler = signal.signal(signum, self.handler)
|
||||
self.addCleanup(signal.signal, signum, old_handler)
|
||||
signum = signal.SIGUSR1
|
||||
|
||||
with self.assertRaises(ZeroDivisionError):
|
||||
signal.pthread_kill(current, signum)
|
||||
def handler(signum, frame):
|
||||
1/0
|
||||
|
||||
signal.signal(signum, handler)
|
||||
|
||||
if sys.platform == 'freebsd6':
|
||||
# Issue #12392 and #12469: send a signal to the main thread
|
||||
# doesn't work before the creation of the first thread on
|
||||
# FreeBSD 6
|
||||
def noop():
|
||||
pass
|
||||
thread = threading.Thread(target=noop)
|
||||
thread.start()
|
||||
thread.join()
|
||||
|
||||
tid = threading.get_ident()
|
||||
try:
|
||||
signal.pthread_kill(tid, signum)
|
||||
except ZeroDivisionError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("ZeroDivisionError not raised")
|
||||
"""
|
||||
assert_python_ok('-c', code)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
|
||||
'need signal.pthread_sigmask()')
|
||||
def wait_helper(self, test, blocked):
|
||||
def wait_helper(self, blocked, test):
|
||||
"""
|
||||
test: body of the "def test(signum):" function.
|
||||
blocked: number of the blocked signal
|
||||
"""
|
||||
code = '''
|
||||
import signal
|
||||
import sys
|
||||
code = '''if 1:
|
||||
import signal
|
||||
import sys
|
||||
|
||||
def handler(signum, frame):
|
||||
1/0
|
||||
def handler(signum, frame):
|
||||
1/0
|
||||
|
||||
def test(signum):
|
||||
%s
|
||||
%s
|
||||
|
||||
blocked = %s
|
||||
signum = signal.SIGALRM
|
||||
blocked = %s
|
||||
signum = signal.SIGALRM
|
||||
|
||||
# child: block and wait the signal
|
||||
try:
|
||||
signal.signal(signum, handler)
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
|
||||
# child: block and wait the signal
|
||||
try:
|
||||
signal.signal(signum, handler)
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
|
||||
|
||||
# Do the tests
|
||||
test(signum)
|
||||
# Do the tests
|
||||
test(signum)
|
||||
|
||||
# The handler must not be called on unblock
|
||||
try:
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
|
||||
except ZeroDivisionError:
|
||||
print("the signal handler has been called",
|
||||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except BaseException as err:
|
||||
print("error: {}".format(err), file=sys.stderr)
|
||||
sys.stderr.flush()
|
||||
sys.exit(1)
|
||||
''' % (test, blocked)
|
||||
# The handler must not be called on unblock
|
||||
try:
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
|
||||
except ZeroDivisionError:
|
||||
print("the signal handler has been called",
|
||||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except BaseException as err:
|
||||
print("error: {}".format(err), file=sys.stderr)
|
||||
sys.stderr.flush()
|
||||
sys.exit(1)
|
||||
''' % (test.strip(), blocked)
|
||||
|
||||
# sig*wait* must be called with the signal blocked: since the current
|
||||
# process might have several threads running, use a subprocess to have
|
||||
|
@ -656,61 +633,56 @@ except BaseException as err:
|
|||
@unittest.skipUnless(hasattr(signal, 'sigwait'),
|
||||
'need signal.sigwait()')
|
||||
def test_sigwait(self):
|
||||
test = '''
|
||||
self.wait_helper(signal.SIGALRM, '''
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
received = signal.sigwait([signum])
|
||||
assert received == signum , 'received %s, not %s' % (received, signum)
|
||||
'''
|
||||
|
||||
self.wait_helper(test, signal.SIGALRM)
|
||||
''')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
|
||||
'need signal.sigwaitinfo()')
|
||||
def test_sigwaitinfo(self):
|
||||
test = '''
|
||||
self.wait_helper(signal.SIGALRM, '''
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
info = signal.sigwaitinfo([signum])
|
||||
assert info.si_signo == signum, "info.si_signo != %s" % signum
|
||||
'''
|
||||
|
||||
self.wait_helper(test, signal.SIGALRM)
|
||||
''')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
def test_sigtimedwait(self):
|
||||
test = '''
|
||||
self.wait_helper(signal.SIGALRM, '''
|
||||
def test(signum):
|
||||
signal.alarm(1)
|
||||
info = signal.sigtimedwait([signum], (10, 1000))
|
||||
assert info.si_signo == signum, 'info.si_signo != %s' % signum
|
||||
'''
|
||||
|
||||
self.wait_helper(test, signal.SIGALRM)
|
||||
''')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
# issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug)
|
||||
@unittest.skipIf(sys.platform =='freebsd6',
|
||||
'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6')
|
||||
"sigtimedwait() with a null timeout doens't work on FreeBSD 6")
|
||||
def test_sigtimedwait_poll(self):
|
||||
# check that polling with sigtimedwait works
|
||||
test = '''
|
||||
self.wait_helper(signal.SIGALRM, '''
|
||||
def test(signum):
|
||||
import os
|
||||
os.kill(os.getpid(), signum)
|
||||
info = signal.sigtimedwait([signum], (0, 0))
|
||||
assert info.si_signo == signum, 'info.si_signo != %s' % signum
|
||||
'''
|
||||
|
||||
self.wait_helper(test, signal.SIGALRM)
|
||||
''')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
def test_sigtimedwait_timeout(self):
|
||||
test = '''
|
||||
self.wait_helper(signal.SIGALRM, '''
|
||||
def test(signum):
|
||||
received = signal.sigtimedwait([signum], (1, 0))
|
||||
assert received is None, "received=%r" % (received,)
|
||||
'''
|
||||
|
||||
self.wait_helper(test, signal.SIGALRM)
|
||||
''')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
|
||||
'need signal.sigtimedwait()')
|
||||
|
@ -723,7 +695,8 @@ except BaseException as err:
|
|||
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
|
||||
'need signal.sigwaitinfo()')
|
||||
def test_sigwaitinfo_interrupted(self):
|
||||
test = '''
|
||||
self.wait_helper(signal.SIGUSR1, '''
|
||||
def test(signum):
|
||||
import errno
|
||||
|
||||
hndl_called = True
|
||||
|
@ -741,9 +714,7 @@ except BaseException as err:
|
|||
raise Exception("Expected EINTR to be raised by sigwaitinfo")
|
||||
else:
|
||||
raise Exception("Expected EINTR to be raised by sigwaitinfo")
|
||||
'''
|
||||
|
||||
self.wait_helper(test, signal.SIGUSR1)
|
||||
''')
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'sigwait'),
|
||||
'need signal.sigwait()')
|
||||
|
@ -791,46 +762,93 @@ except BaseException as err:
|
|||
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
|
||||
'need signal.pthread_sigmask()')
|
||||
def test_pthread_sigmask(self):
|
||||
test_blocked_signals = self.can_test_blocked_signals(False)
|
||||
code = """if 1:
|
||||
import signal
|
||||
import os; import threading
|
||||
|
||||
def handler(signum, frame):
|
||||
1/0
|
||||
|
||||
def kill(signum):
|
||||
os.kill(os.getpid(), signum)
|
||||
|
||||
def read_sigmask():
|
||||
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
|
||||
|
||||
signum = signal.SIGUSR1
|
||||
|
||||
# Install our signal handler
|
||||
old_handler = signal.signal(signum, self.handler)
|
||||
self.addCleanup(signal.signal, signum, old_handler)
|
||||
old_handler = signal.signal(signum, handler)
|
||||
|
||||
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler
|
||||
old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
|
||||
self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
|
||||
with self.assertRaises(ZeroDivisionError):
|
||||
self.kill(signum)
|
||||
try:
|
||||
kill(signum)
|
||||
except ZeroDivisionError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("ZeroDivisionError not raised")
|
||||
|
||||
# Block and then raise SIGUSR1. The signal is blocked: the signal
|
||||
# handler is not called, and the signal is now pending
|
||||
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
|
||||
if test_blocked_signals:
|
||||
self.kill(signum)
|
||||
kill(signum)
|
||||
|
||||
# Check the new mask
|
||||
blocked = self.read_sigmask()
|
||||
self.assertIn(signum, blocked)
|
||||
self.assertEqual(old_mask ^ blocked, {signum})
|
||||
blocked = read_sigmask()
|
||||
assert signum in blocked, "%s not in %s" % (signum, blocked)
|
||||
assert old_mask ^ blocked == {signum}, "%s ^ %s != {%s}" % (old_mask, blocked, signum)
|
||||
|
||||
# Unblock SIGUSR1
|
||||
if test_blocked_signals:
|
||||
with self.assertRaises(ZeroDivisionError):
|
||||
# unblock the pending signal calls immediatly the signal handler
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
|
||||
else:
|
||||
try:
|
||||
# unblock the pending signal calls immediatly the signal handler
|
||||
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
|
||||
with self.assertRaises(ZeroDivisionError):
|
||||
self.kill(signum)
|
||||
except ZeroDivisionError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("ZeroDivisionError not raised")
|
||||
try:
|
||||
kill(signum)
|
||||
except ZeroDivisionError:
|
||||
pass
|
||||
else:
|
||||
raise Exception("ZeroDivisionError not raised")
|
||||
|
||||
# Check the new mask
|
||||
unblocked = self.read_sigmask()
|
||||
self.assertNotIn(signum, unblocked)
|
||||
self.assertEqual(blocked ^ unblocked, {signum})
|
||||
self.assertSequenceEqual(old_mask, unblocked)
|
||||
# Finally, restore the previous signal handler and the signal mask
|
||||
unblocked = read_sigmask()
|
||||
assert signum not in unblocked, "%s in %s" % (signum, unblocked)
|
||||
assert blocked ^ unblocked == {signum}, "%s ^ %s != {%s}" % (blocked, unblocked, signum)
|
||||
assert old_mask == unblocked, "%s != %s" % (old_mask, unblocked)
|
||||
"""
|
||||
assert_python_ok('-c', code)
|
||||
|
||||
@unittest.skipIf(sys.platform == 'freebsd6',
|
||||
"issue #12392: send a signal to the main thread doesn't work "
|
||||
"before the creation of the first thread on FreeBSD 6")
|
||||
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
|
||||
'need signal.pthread_kill()')
|
||||
def test_pthread_kill_main_thread(self):
|
||||
# Test that a signal can be sent to the main thread with pthread_kill()
|
||||
# before any other thread has been created (see issue #12392).
|
||||
code = """if True:
|
||||
import threading
|
||||
import signal
|
||||
import sys
|
||||
|
||||
def handler(signum, frame):
|
||||
sys.exit(3)
|
||||
|
||||
signal.signal(signal.SIGUSR1, handler)
|
||||
signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
|
||||
sys.exit(2)
|
||||
"""
|
||||
|
||||
with spawn_python('-c', code) as process:
|
||||
stdout, stderr = process.communicate()
|
||||
exitcode = process.wait()
|
||||
if exitcode != 3:
|
||||
raise Exception("Child error (exit code %s): %s" %
|
||||
(exitcode, stdout))
|
||||
|
||||
|
||||
def test_main():
|
||||
|
|
|
@ -981,6 +981,10 @@ Extension Modules
|
|||
Tests
|
||||
-----
|
||||
|
||||
- Issue #12469: Run wakeup and pending signal tests in a subprocess to run the
|
||||
test in a fresh process with only one thread and to not change signal
|
||||
handling of the parent process.
|
||||
|
||||
- Issue #8716: Avoid crashes caused by Aqua Tk on OSX when attempting to run
|
||||
test_tk or test_ttk_guionly under a username that is not currently logged
|
||||
in to the console windowserver (as may be the case under buildbot or ssh).
|
||||
|
|
Loading…
Reference in New Issue