bpo-38456: Use /bin/true in test_subprocess (GH-16736)
* bpo-38456: Use /bin/true in test_subprocess. Instead of sys.executable, "-c", "pass" or "import sys; sys.exit(0)" use /bin/true when it is available. On a reasonable machine this shaves up to two seconds wall time off the otherwise ~40sec execution on a --with-pydebug build. It should be more notable on many buildbots or overloaded slower I/O systems (CI, etc).
This commit is contained in:
parent
f3751efb5c
commit
67b93f80c7
|
@ -54,6 +54,16 @@ NONEXISTING_CMD = ('nonexisting_i_hope',)
|
|||
# Ignore errors that indicate the command was not found
|
||||
NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
|
||||
|
||||
ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
|
||||
|
||||
|
||||
def setUpModule():
|
||||
shell_true = shutil.which('true')
|
||||
if (os.access(shell_true, os.X_OK) and
|
||||
subprocess.run([shell_true]).returncode == 0):
|
||||
global ZERO_RETURN_CMD
|
||||
ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup.
|
||||
|
||||
|
||||
class BaseTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -98,7 +108,7 @@ class PopenExecuteChildRaises(subprocess.Popen):
|
|||
class ProcessTestCase(BaseTestCase):
|
||||
|
||||
def test_io_buffered_by_default(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
try:
|
||||
|
@ -112,7 +122,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
p.wait()
|
||||
|
||||
def test_io_unbuffered_works(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, bufsize=0)
|
||||
try:
|
||||
|
@ -142,8 +152,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
|
||||
def test_check_call_zero(self):
|
||||
# check_call() function with zero return code
|
||||
rc = subprocess.check_call([sys.executable, "-c",
|
||||
"import sys; sys.exit(0)"])
|
||||
rc = subprocess.check_call(ZERO_RETURN_CMD)
|
||||
self.assertEqual(rc, 0)
|
||||
|
||||
def test_check_call_nonzero(self):
|
||||
|
@ -709,19 +718,19 @@ class ProcessTestCase(BaseTestCase):
|
|||
newenv = os.environ.copy()
|
||||
newenv["FRUIT\0VEGETABLE"] = "cabbage"
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
|
||||
subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
|
||||
|
||||
# null character in the environment variable value
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
|
||||
subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
|
||||
|
||||
# equal character in the environment variable name
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT=ORANGE"] = "lemon"
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
|
||||
subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
|
||||
|
||||
# equal character in the environment variable value
|
||||
newenv = os.environ.copy()
|
||||
|
@ -822,7 +831,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
options['stderr'] = subprocess.PIPE
|
||||
if not options:
|
||||
continue
|
||||
p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD, **options)
|
||||
p.communicate()
|
||||
if p.stdin is not None:
|
||||
self.assertTrue(p.stdin.closed)
|
||||
|
@ -961,7 +970,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
#
|
||||
# We set stdout to PIPE because, as of this writing, a different
|
||||
# code path is tested when the number of pipes is zero or one.
|
||||
p = subprocess.Popen([sys.executable, "-c", "pass"],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
universal_newlines=True)
|
||||
|
@ -1109,7 +1118,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
self.assertEqual(p.poll(), 0)
|
||||
|
||||
def test_wait(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "pass"])
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD)
|
||||
self.assertEqual(p.wait(), 0)
|
||||
# Subsequent invocations should just return the returncode
|
||||
self.assertEqual(p.wait(), 0)
|
||||
|
@ -1128,14 +1137,14 @@ class ProcessTestCase(BaseTestCase):
|
|||
# an invalid type of the bufsize argument should raise
|
||||
# TypeError.
|
||||
with self.assertRaises(TypeError):
|
||||
subprocess.Popen([sys.executable, "-c", "pass"], "orange")
|
||||
subprocess.Popen(ZERO_RETURN_CMD, "orange")
|
||||
|
||||
def test_bufsize_is_none(self):
|
||||
# bufsize=None should be the same as bufsize=0.
|
||||
p = subprocess.Popen([sys.executable, "-c", "pass"], None)
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD, None)
|
||||
self.assertEqual(p.wait(), 0)
|
||||
# Again with keyword arg
|
||||
p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
|
||||
self.assertEqual(p.wait(), 0)
|
||||
|
||||
def _test_bufsize_equal_one(self, line, expected, universal_newlines):
|
||||
|
@ -1340,7 +1349,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
|
||||
def test_communicate_epipe(self):
|
||||
# Issue 10963: communicate() should hide EPIPE
|
||||
p = subprocess.Popen([sys.executable, "-c", 'pass'],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
|
@ -1351,7 +1360,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
|
||||
def test_communicate_epipe_only_stdin(self):
|
||||
# Issue 10963: communicate() should hide EPIPE
|
||||
p = subprocess.Popen([sys.executable, "-c", 'pass'],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE)
|
||||
self.addCleanup(p.stdin.close)
|
||||
p.wait()
|
||||
|
@ -1390,7 +1399,7 @@ class ProcessTestCase(BaseTestCase):
|
|||
fds_before_popen = os.listdir(fd_directory)
|
||||
with self.assertRaises(PopenTestException):
|
||||
PopenExecuteChildRaises(
|
||||
[sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
|
||||
ZERO_RETURN_CMD, stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
# NOTE: This test doesn't verify that the real _execute_child
|
||||
|
@ -1433,7 +1442,7 @@ class RunFuncTestCase(BaseTestCase):
|
|||
|
||||
def test_check_zero(self):
|
||||
# check_returncode shouldn't raise when returncode is zero
|
||||
cp = self.run_python("import sys; sys.exit(0)", check=True)
|
||||
cp = subprocess.run(ZERO_RETURN_CMD, check=True)
|
||||
self.assertEqual(cp.returncode, 0)
|
||||
|
||||
def test_timeout(self):
|
||||
|
@ -1791,16 +1800,16 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
self.assertEqual(child_user, user_uid)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], user=-1)
|
||||
subprocess.check_call(ZERO_RETURN_CMD, user=-1)
|
||||
|
||||
if pwd is None:
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], user=name_uid)
|
||||
subprocess.check_call(ZERO_RETURN_CMD, user=name_uid)
|
||||
|
||||
@unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
|
||||
def test_user_error(self):
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], user=65535)
|
||||
subprocess.check_call(ZERO_RETURN_CMD, user=65535)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
|
||||
def test_group(self):
|
||||
|
@ -1834,16 +1843,16 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
|
||||
# make sure we bomb on negative values
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], group=-1)
|
||||
subprocess.check_call(ZERO_RETURN_CMD, group=-1)
|
||||
|
||||
if grp is None:
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], group=name_group)
|
||||
subprocess.check_call(ZERO_RETURN_CMD, group=name_group)
|
||||
|
||||
@unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
|
||||
def test_group_error(self):
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], group=65535)
|
||||
subprocess.check_call(ZERO_RETURN_CMD, group=65535)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
|
||||
def test_extra_groups(self):
|
||||
|
@ -1882,17 +1891,17 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
|
||||
# make sure we bomb on negative values
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[-1])
|
||||
subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1])
|
||||
|
||||
if grp is None:
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"],
|
||||
subprocess.check_call(ZERO_RETURN_CMD,
|
||||
extra_groups=[name_group])
|
||||
|
||||
@unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
|
||||
def test_extra_groups_error(self):
|
||||
with self.assertRaises(ValueError):
|
||||
subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[])
|
||||
subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])
|
||||
|
||||
@unittest.skipIf(mswindows or not hasattr(os, 'umask'),
|
||||
'POSIX umask() is not available.')
|
||||
|
@ -1904,7 +1913,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
# We set an unusual umask in the child so as a unique mode
|
||||
# for us to test the child's touched file for.
|
||||
subprocess.check_call(
|
||||
[sys.executable, "-c", f"open({name!r}, 'w')"], # touch
|
||||
[sys.executable, "-c", f"open({name!r}, 'w').close()"],
|
||||
umask=0o053)
|
||||
# Ignore execute permissions entirely in our test,
|
||||
# filesystems could be mounted to ignore or force that.
|
||||
|
@ -2007,7 +2016,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
|
||||
with self.assertRaises(subprocess.SubprocessError):
|
||||
self._TestExecuteChildPopen(
|
||||
self, [sys.executable, "-c", "pass"],
|
||||
self, ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, preexec_fn=raise_it)
|
||||
|
||||
|
@ -2464,7 +2473,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
|
||||
try:
|
||||
subprocess.call(
|
||||
[sys.executable, "-c", "pass"],
|
||||
ZERO_RETURN_CMD,
|
||||
preexec_fn=prepare)
|
||||
except ValueError as err:
|
||||
# Pure Python implementations keeps the message
|
||||
|
@ -2507,29 +2516,30 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
|
||||
|
||||
def test_bytes_program(self):
|
||||
abs_program = os.fsencode(sys.executable)
|
||||
path, program = os.path.split(sys.executable)
|
||||
abs_program = os.fsencode(ZERO_RETURN_CMD[0])
|
||||
args = list(ZERO_RETURN_CMD[1:])
|
||||
path, program = os.path.split(ZERO_RETURN_CMD[0])
|
||||
program = os.fsencode(program)
|
||||
|
||||
# absolute bytes path
|
||||
exitcode = subprocess.call([abs_program, "-c", "pass"])
|
||||
exitcode = subprocess.call([abs_program]+args)
|
||||
self.assertEqual(exitcode, 0)
|
||||
|
||||
# absolute bytes path as a string
|
||||
cmd = b"'" + abs_program + b"' -c pass"
|
||||
cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
|
||||
exitcode = subprocess.call(cmd, shell=True)
|
||||
self.assertEqual(exitcode, 0)
|
||||
|
||||
# bytes program, unicode PATH
|
||||
env = os.environ.copy()
|
||||
env["PATH"] = path
|
||||
exitcode = subprocess.call([program, "-c", "pass"], env=env)
|
||||
exitcode = subprocess.call([program]+args, env=env)
|
||||
self.assertEqual(exitcode, 0)
|
||||
|
||||
# bytes program, bytes PATH
|
||||
envb = os.environb.copy()
|
||||
envb[b"PATH"] = os.fsencode(path)
|
||||
exitcode = subprocess.call([program, "-c", "pass"], env=envb)
|
||||
exitcode = subprocess.call([program]+args, env=envb)
|
||||
self.assertEqual(exitcode, 0)
|
||||
|
||||
def test_pipe_cloexec(self):
|
||||
|
@ -2757,7 +2767,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
# pass_fds overrides close_fds with a warning.
|
||||
with self.assertWarns(RuntimeWarning) as context:
|
||||
self.assertFalse(subprocess.call(
|
||||
[sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
ZERO_RETURN_CMD,
|
||||
close_fds=False, pass_fds=(fd, )))
|
||||
self.assertIn('overriding close_fds', str(context.warning))
|
||||
|
||||
|
@ -2819,19 +2829,19 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
|
||||
def test_stdout_stdin_are_single_inout_fd(self):
|
||||
with io.open(os.devnull, "r+") as inout:
|
||||
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdout=inout, stdin=inout)
|
||||
p.wait()
|
||||
|
||||
def test_stdout_stderr_are_single_inout_fd(self):
|
||||
with io.open(os.devnull, "r+") as inout:
|
||||
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdout=inout, stderr=inout)
|
||||
p.wait()
|
||||
|
||||
def test_stderr_stdin_are_single_inout_fd(self):
|
||||
with io.open(os.devnull, "r+") as inout:
|
||||
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
p = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stderr=inout, stdin=inout)
|
||||
p.wait()
|
||||
|
||||
|
@ -3031,7 +3041,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
def test_communicate_BrokenPipeError_stdin_close(self):
|
||||
# By not setting stdout or stderr or a timeout we force the fast path
|
||||
# that just calls _stdin_write() internally due to our mock.
|
||||
proc = subprocess.Popen([sys.executable, '-c', 'pass'])
|
||||
proc = subprocess.Popen(ZERO_RETURN_CMD)
|
||||
with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
|
||||
mock_proc_stdin.close.side_effect = BrokenPipeError
|
||||
proc.communicate() # Should swallow BrokenPipeError from close.
|
||||
|
@ -3040,7 +3050,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
def test_communicate_BrokenPipeError_stdin_write(self):
|
||||
# By not setting stdout or stderr or a timeout we force the fast path
|
||||
# that just calls _stdin_write() internally due to our mock.
|
||||
proc = subprocess.Popen([sys.executable, '-c', 'pass'])
|
||||
proc = subprocess.Popen(ZERO_RETURN_CMD)
|
||||
with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
|
||||
mock_proc_stdin.write.side_effect = BrokenPipeError
|
||||
proc.communicate(b'stuff') # Should swallow the BrokenPipeError.
|
||||
|
@ -3079,7 +3089,7 @@ class POSIXProcessTestCase(BaseTestCase):
|
|||
'need _testcapi.W_STOPCODE')
|
||||
def test_stopped(self):
|
||||
"""Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
|
||||
args = [sys.executable, '-c', 'pass']
|
||||
args = ZERO_RETURN_CMD
|
||||
proc = subprocess.Popen(args)
|
||||
|
||||
# Wait until the real process completes to avoid zombie process
|
||||
|
@ -3109,7 +3119,7 @@ class Win32ProcessTestCase(BaseTestCase):
|
|||
# Since Python is a console process, it won't be affected
|
||||
# by wShowWindow, but the argument should be silently
|
||||
# ignored
|
||||
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
subprocess.call(ZERO_RETURN_CMD,
|
||||
startupinfo=startupinfo)
|
||||
|
||||
def test_startupinfo_keywords(self):
|
||||
|
@ -3125,7 +3135,7 @@ class Win32ProcessTestCase(BaseTestCase):
|
|||
# Since Python is a console process, it won't be affected
|
||||
# by wShowWindow, but the argument should be silently
|
||||
# ignored
|
||||
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
subprocess.call(ZERO_RETURN_CMD,
|
||||
startupinfo=startupinfo)
|
||||
|
||||
def test_startupinfo_copy(self):
|
||||
|
@ -3137,7 +3147,7 @@ class Win32ProcessTestCase(BaseTestCase):
|
|||
# Call Popen() twice with the same startupinfo object to make sure
|
||||
# that it's not modified
|
||||
for _ in range(2):
|
||||
cmd = [sys.executable, "-c", "pass"]
|
||||
cmd = ZERO_RETURN_CMD
|
||||
with open(os.devnull, 'w') as null:
|
||||
proc = subprocess.Popen(cmd,
|
||||
stdout=null,
|
||||
|
@ -3177,7 +3187,7 @@ class Win32ProcessTestCase(BaseTestCase):
|
|||
class BadEnv(dict):
|
||||
keys = None
|
||||
with self.assertRaises(TypeError):
|
||||
subprocess.Popen([sys.executable, "-c", "pass"], env=BadEnv())
|
||||
subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
|
||||
|
||||
def test_close_fds(self):
|
||||
# close file descriptors
|
||||
|
@ -3238,13 +3248,13 @@ class Win32ProcessTestCase(BaseTestCase):
|
|||
def test_empty_attribute_list(self):
|
||||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.lpAttributeList = {}
|
||||
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
subprocess.call(ZERO_RETURN_CMD,
|
||||
startupinfo=startupinfo)
|
||||
|
||||
def test_empty_handle_list(self):
|
||||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.lpAttributeList = {"handle_list": []}
|
||||
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
subprocess.call(ZERO_RETURN_CMD,
|
||||
startupinfo=startupinfo)
|
||||
|
||||
def test_shell_sequence(self):
|
||||
|
@ -3543,7 +3553,7 @@ class ContextManagerTests(BaseTestCase):
|
|||
|
||||
def test_broken_pipe_cleanup(self):
|
||||
"""Broken pipe error should not prevent wait() (Issue 21619)"""
|
||||
proc = subprocess.Popen([sys.executable, '-c', 'pass'],
|
||||
proc = subprocess.Popen(ZERO_RETURN_CMD,
|
||||
stdin=subprocess.PIPE,
|
||||
bufsize=support.PIPE_MAX_SIZE*2)
|
||||
proc = proc.__enter__()
|
||||
|
|
Loading…
Reference in New Issue