diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 1ed4b9e731b..ea4062732d7 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -598,34 +598,73 @@ class PendingSignalsTests(unittest.TestCase): with self.assertRaises(ZeroDivisionError): signal.pthread_kill(current, signum) + def check_sigwait(self, test, signum): + # sigwait must be called with the signal blocked: since the current + # process might have several threads running, we fork() a child process + # to have a single thread. + pid = os.fork() + if pid == 0: + # child: block and wait the signal + try: + signal.signal(signum, self.handler) + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + + # Do the tests + test(signum) + + # The handler must not be called on unblock + try: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + except ZeroDivisionError: + print("the signal handler has been called", + file=sys.stderr) + os._exit(1) + + os._exit(0) + finally: + os._exit(1) + else: + # parent: let the child some time to wait, send him the signal, and + # check it correcty received it + self.assertEqual(os.waitpid(pid, 0), (pid, 0)) + @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def test_sigwait(self): - old_handler = signal.signal(signal.SIGALRM, self.handler) - self.addCleanup(signal.signal, signal.SIGALRM, old_handler) + def test(signum): + signal.alarm(1) + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" + % (received, signum), + file=sys.stderr) + os._exit(1) - signal.alarm(1) - self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM) + self.check_sigwait(test, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') @unittest.skipIf(threading is None, "test needs threading module") + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def test_sigwait_thread(self): - signum = signal.SIGUSR1 - old_handler = signal.signal(signum, self.handler) - self.addCleanup(signal.signal, signum, old_handler) - - def kill_later(): + def kill_later(signum): + # wait until the main thread is waiting in sigwait() time.sleep(1) os.kill(os.getpid(), signum) - killer = threading.Thread(target=kill_later) - killer.start() - try: - self.assertEqual(signal.sigwait([signum]), signum) - finally: + def test(signum): + killer = threading.Thread(target=kill_later, args=(signum,)) + killer.start() + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" % (received, signum), + file=sys.stderr) + os._exit(1) killer.join() + self.check_sigwait(test, signal.SIGUSR1) + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 'need signal.pthread_sigmask()') def test_pthread_sigmask_arguments(self):