Try doing a raw test of os.fork()/os.kill().

This commit is contained in:
Richard Oudkerk 2013-10-17 14:24:06 +01:00
parent 7d270ee05d
commit 1e2f67c05f
1 changed files with 36 additions and 5 deletions

View File

@ -273,10 +273,11 @@ class _TestProcess(BaseTestCase):
@classmethod
def _test_terminate(cls):
print('signal.getsignal(SIGTERM) =', signal.getsignal(signal.SIGTERM))
print('starting sleep')
print('signal.getsignal(SIGTERM) =',
signal.getsignal(signal.SIGTERM), file=sys.stderr)
print('starting sleep', file=sys.stderr)
time.sleep(100)
print('finished sleep')
print('finished sleep', file=sys.stderr)
def test_terminate(self):
if self.TYPE == 'threads':
@ -314,11 +315,12 @@ class _TestProcess(BaseTestCase):
try:
signal.alarm(10)
self.assertEqual(join(), None)
signal.alarm(0)
except RuntimeError:
print('os.waitpid() =', os.waitpid(p.pid, os.WNOHANG))
print('os.waitpid() =',
os.waitpid(p.pid, os.WNOHANG), file=sys.stderr)
raise
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
else:
self.assertEqual(join(), None)
@ -333,6 +335,35 @@ class _TestProcess(BaseTestCase):
# XXX sometimes get p.exitcode == 0 on Windows ...
#self.assertEqual(p.exitcode, -signal.SIGTERM)
@unittest.skipIf(WIN32, 'Unix only')
def test_sigterm(self):
# A test for the Gentoo build bot which does not directly use
# multiprocessing. Start and terminate child processes.
if self.TYPE != 'processes':
return
for i in range(10):
pid = os.fork()
if pid == 0:
try:
print('sleeping', file=sys.stderr)
time.sleep(100)
print('waking', file=sys.stderr)
finally:
sys.stderr.flush()
os._exit(0)
else:
os.kill(pid, signal.SIGTERM)
def handler(*args):
raise RuntimeError('join took too long: %s' % p)
old_handler = signal.signal(signal.SIGALRM, handler)
try:
signal.alarm(10)
pid_status = os.waitpid(pid, 0)
self.assertEqual(pid_status[0], pid)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def test_cpu_count(self):
try:
cpus = multiprocessing.cpu_count()