Issue #8407: signal.pthread_sigmask() returns a set instead of a list

Update the doc. Refactor also related tests.
This commit is contained in:
Victor Stinner 2011-05-04 13:20:35 +02:00
parent 6fd49e152a
commit 35b300c5fd
3 changed files with 59 additions and 46 deletions

View File

@ -184,7 +184,7 @@ The :mod:`signal` module defines the following functions:
Fetch and/or change the signal mask of the calling thread. The signal mask Fetch and/or change the signal mask of the calling thread. The signal mask
is the set of signals whose delivery is currently blocked for the caller. is the set of signals whose delivery is currently blocked for the caller.
The old signal mask is returned. Return the old signal mask as a set of signals.
The behavior of the call is dependent on the value of *how*, as follows. The behavior of the call is dependent on the value of *how*, as follows.
@ -196,8 +196,9 @@ The :mod:`signal` module defines the following functions:
* :data:`SIG_SETMASK`: The set of blocked signals is set to the *mask* * :data:`SIG_SETMASK`: The set of blocked signals is set to the *mask*
argument. argument.
*mask* is a list of signal numbers (e.g. [:const:`signal.SIGINT`, *mask* is a set of signal numbers (e.g. {:const:`signal.SIGINT`,
:const:`signal.SIGTERM`]). :const:`signal.SIGTERM`}). Use ``range(1, signal.NSIG)`` for a full mask
including all signals.
For example, ``signal.pthread_sigmask(signal.SIG_BLOCK, [])`` reads the For example, ``signal.pthread_sigmask(signal.SIG_BLOCK, [])`` reads the
signal mask of the calling thread. signal mask of the calling thread.

View File

@ -486,24 +486,27 @@ class ItimerTest(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()') 'need signal.pthread_sigmask()')
class PthreadSigmaskTests(unittest.TestCase): class PendingSignalsTests(unittest.TestCase):
def test_arguments(self): """
Tests for the pthread_sigmask() function.
"""
def handler(self, signum, frame):
1/0
def read_sigmask(self):
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
def test_pthread_sigmask_arguments(self):
self.assertRaises(TypeError, signal.pthread_sigmask) self.assertRaises(TypeError, signal.pthread_sigmask)
self.assertRaises(TypeError, signal.pthread_sigmask, 1) self.assertRaises(TypeError, signal.pthread_sigmask, 1)
self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
self.assertRaises(RuntimeError, signal.pthread_sigmask, 1700, []) self.assertRaises(RuntimeError, signal.pthread_sigmask, 1700, [])
def test_block_unlock(self): def test_pthread_sigmask(self):
import faulthandler import faulthandler
pid = os.getpid() pid = os.getpid()
signum = signal.SIGUSR1 signum = signal.SIGUSR1
def handler(signum, frame):
1/0
def read_sigmask():
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
# The fault handler timeout thread masks all signals. If the main # The fault handler timeout thread masks all signals. If the main
# thread masks also SIGUSR1, all threads mask this signal. In this # thread masks also SIGUSR1, all threads mask this signal. In this
# case, if we send SIGUSR1 to the process, the signal is pending in the # case, if we send SIGUSR1 to the process, the signal is pending in the
@ -527,7 +530,7 @@ class PthreadSigmaskTests(unittest.TestCase):
"blocked by pthread_sigmask() (issue #11998)") "blocked by pthread_sigmask() (issue #11998)")
# Install our signal handler # Install our signal handler
old_handler = signal.signal(signum, handler) old_handler = signal.signal(signum, self.handler)
self.addCleanup(signal.signal, signum, old_handler) self.addCleanup(signal.signal, signum, old_handler)
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
@ -543,9 +546,9 @@ class PthreadSigmaskTests(unittest.TestCase):
os.kill(pid, signum) os.kill(pid, signum)
# Check the new mask # Check the new mask
blocked = read_sigmask() blocked = self.read_sigmask()
self.assertIn(signum, blocked) self.assertIn(signum, blocked)
self.assertEqual(set(old_mask) ^ set(blocked), {signum}) self.assertEqual(old_mask ^ blocked, {signum})
# Unblock SIGUSR1 # Unblock SIGUSR1
if can_test_blocked_signals: if can_test_blocked_signals:
@ -558,9 +561,9 @@ class PthreadSigmaskTests(unittest.TestCase):
os.kill(pid, signum) os.kill(pid, signum)
# Check the new mask # Check the new mask
unblocked = read_sigmask() unblocked = self.read_sigmask()
self.assertNotIn(signum, unblocked) self.assertNotIn(signum, unblocked)
self.assertEqual(set(blocked) ^ set(unblocked), {signum}) self.assertEqual(blocked ^ unblocked, {signum})
self.assertSequenceEqual(old_mask, unblocked) self.assertSequenceEqual(old_mask, unblocked)
# Finally, restore the previous signal handler and the signal mask # Finally, restore the previous signal handler and the signal mask
@ -570,7 +573,7 @@ def test_main():
support.run_unittest(BasicSignalTests, InterProcessSignalTests, support.run_unittest(BasicSignalTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest, WakeupSignalTests, SiginterruptTest,
ItimerTest, WindowsSignalTests, ItimerTest, WindowsSignalTests,
PthreadSigmaskTests) PendingSignalsTests)
finally: finally:
support.reap_children() support.reap_children()

View File

@ -552,11 +552,45 @@ error:
return result; return result;
} }
static PyObject*
sigset_to_set(sigset_t mask)
{
PyObject *signum, *result;
int sig;
result = PySet_New(0);
if (result == NULL)
return NULL;
for (sig = 1; sig < NSIG; sig++) {
if (sigismember(&mask, sig) != 1)
continue;
/* Handle the case where it is a member by adding the signal to
the result list. Ignore the other cases because they mean the
signal isn't a member of the mask or the signal was invalid,
and an invalid signal must have been our fault in constructing
the loop boundaries. */
signum = PyLong_FromLong(sig);
if (signum == NULL) {
Py_DECREF(result);
return NULL;
}
if (PySet_Add(result, signum) == -1) {
Py_DECREF(signum);
Py_DECREF(result);
return NULL;
}
Py_DECREF(signum);
}
return result;
}
static PyObject * static PyObject *
signal_pthread_sigmask(PyObject *self, PyObject *args) signal_pthread_sigmask(PyObject *self, PyObject *args)
{ {
int how, sig; int how;
PyObject *signals, *result, *signum; PyObject *signals;
sigset_t mask, previous; sigset_t mask, previous;
int err; int err;
@ -577,32 +611,7 @@ signal_pthread_sigmask(PyObject *self, PyObject *args)
if (PyErr_CheckSignals()) if (PyErr_CheckSignals())
return NULL; return NULL;
result = PyList_New(0); return sigset_to_set(previous);
if (result == NULL)
return NULL;
for (sig = 1; sig < NSIG; sig++) {
if (sigismember(&previous, sig) != 1)
continue;
/* Handle the case where it is a member by adding the signal to
the result list. Ignore the other cases because they mean the
signal isn't a member of the mask or the signal was invalid,
and an invalid signal must have been our fault in constructing
the loop boundaries. */
signum = PyLong_FromLong(sig);
if (signum == NULL) {
Py_DECREF(result);
return NULL;
}
if (PyList_Append(result, signum) == -1) {
Py_DECREF(signum);
Py_DECREF(result);
return NULL;
}
Py_DECREF(signum);
}
return result;
} }
PyDoc_STRVAR(signal_pthread_sigmask_doc, PyDoc_STRVAR(signal_pthread_sigmask_doc,