mirror of https://github.com/python/cpython
Attempt to speed up some subprocess tests (and hopefully keep them reliable).
This commit is contained in:
parent
bc35bebb45
commit
ab66d2a6cb
|
@ -686,26 +686,19 @@ class ProcessTestCase(BaseTestCase):
|
|||
self.assertEqual(subprocess.list2cmdline(['ab', '']),
|
||||
'ab ""')
|
||||
|
||||
|
||||
def test_poll(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "import time; time.sleep(1)"])
|
||||
count = 0
|
||||
while p.poll() is None:
|
||||
time.sleep(0.1)
|
||||
count += 1
|
||||
# We expect that the poll loop probably went around about 10 times,
|
||||
# but, based on system scheduling we can't control, it's possible
|
||||
# poll() never returned None. It "should be" very rare that it
|
||||
# didn't go around at least twice.
|
||||
self.assertGreaterEqual(count, 2)
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
"import os",
|
||||
"os.read(1)"], stdin=subprocess.PIPE)
|
||||
self.addCleanup(p.stdin.close)
|
||||
self.assertIsNone(p.poll())
|
||||
os.write(p.stdin.fileno(), b'A')
|
||||
p.wait()
|
||||
# Subsequent invocations should just return the returncode
|
||||
self.assertEqual(p.poll(), 0)
|
||||
|
||||
|
||||
def test_wait(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "import time; time.sleep(2)"])
|
||||
p = subprocess.Popen([sys.executable, "-c", "pass"])
|
||||
self.assertEqual(p.wait(), 0)
|
||||
# Subsequent invocations should just return the returncode
|
||||
self.assertEqual(p.wait(), 0)
|
||||
|
@ -797,25 +790,29 @@ class ProcessTestCase(BaseTestCase):
|
|||
p = subprocess.Popen([sys.executable, "-c", 'pass'],
|
||||
stdin=subprocess.PIPE)
|
||||
self.addCleanup(p.stdin.close)
|
||||
time.sleep(2)
|
||||
p.wait()
|
||||
p.communicate(b"x" * 2**20)
|
||||
|
||||
@unittest.skipUnless(hasattr(signal, 'SIGALRM'),
|
||||
"Requires signal.SIGALRM")
|
||||
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
|
||||
"Requires signal.SIGUSR1")
|
||||
@unittest.skipUnless(hasattr(os, 'kill'),
|
||||
"Requires os.kill")
|
||||
@unittest.skipUnless(hasattr(os, 'getppid'),
|
||||
"Requires os.getppid")
|
||||
def test_communicate_eintr(self):
|
||||
# Issue #12493: communicate() should handle EINTR
|
||||
def handler(signum, frame):
|
||||
pass
|
||||
old_handler = signal.signal(signal.SIGALRM, handler)
|
||||
self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
|
||||
old_handler = signal.signal(signal.SIGUSR1, handler)
|
||||
self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
|
||||
|
||||
# the process is running for 2 seconds
|
||||
args = [sys.executable, "-c", 'import time; time.sleep(2)']
|
||||
args = [sys.executable, "-c",
|
||||
'import os, signal;'
|
||||
'os.kill(os.getppid(), signal.SIGUSR1)']
|
||||
for stream in ('stdout', 'stderr'):
|
||||
kw = {stream: subprocess.PIPE}
|
||||
with subprocess.Popen(args, **kw) as process:
|
||||
signal.alarm(1)
|
||||
# communicate() will be interrupted by SIGALRM
|
||||
# communicate() will be interrupted by SIGUSR1
|
||||
process.communicate()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue