cpython/Lib/test/test_fork1.py

73 lines
1.6 KiB
Python
Raw Normal View History

"""This test checks for correct fork() behavior.
We want fork1() semantics -- only the forking thread survives in the
child after a fork().
On some systems (e.g. Solaris without posix threads) we find that all
active threads survive in the child after a fork(); this is an error.
While BeOS doesn't officially support fork and native threading in
the same application, the present example should work just fine. DC
"""
import os, sys, time, thread
from test_support import verify, verbose, TestSkipped
try:
os.fork
except AttributeError:
raise TestSkipped, "os.fork not defined -- skipping test_fork1"
LONGSLEEP = 2
SHORTSLEEP = 0.5
NUM_THREADS = 4
alive = {}
stop = 0
def f(id):
while not stop:
alive[id] = os.getpid()
try:
time.sleep(SHORTSLEEP)
except IOError:
pass
def main():
for i in range(NUM_THREADS):
thread.start_new(f, (i,))
time.sleep(LONGSLEEP)
a = alive.keys()
a.sort()
verify(a == range(NUM_THREADS))
prefork_lives = alive.copy()
cpid = os.fork()
if cpid == 0:
# Child
time.sleep(LONGSLEEP)
n = 0
for key in alive.keys():
if alive[key] != prefork_lives[key]:
n = n+1
os._exit(n)
else:
# Parent
spid, status = os.waitpid(cpid, 0)
verify(spid == cpid)
verify(status == 0,
"cause = %d, exit = %d" % (status&0xff, status>>8) )
global stop
# Tell threads to die
stop = 1
time.sleep(2*SHORTSLEEP) # Wait for threads to die
main()