bpo-41586: Add pipesize parameter to subprocess & F_GETPIPE_SZ and F_SETPIPE_SZ to fcntl. (GH-21921)

* Add F_SETPIPE_SZ and F_GETPIPE_SZ to fcntl module
* Add pipesize parameter for subprocess.Popen class

This will allow the user to control the size of the pipes.
On linux the default is 64K. When a pipe is full it blocks for writing.
When a pipe is empty it blocks for reading. On processes that are
very fast this can lead to a lot of wasted CPU cycles. On a typical
Linux system the max pipe size is 1024K which is much better.
For high performance-oriented libraries such as xopen it is nice to
be able to set the pipe size.

The workaround without this feature is to use my_popen_process.stdout.fileno() in
conjuction with fcntl and 1031 (value of F_SETPIPE_SZ) to acquire this behavior.
This commit is contained in:
Ruben Vorderman 2020-10-20 01:30:02 +02:00 committed by GitHub
parent bf838227c3
commit 23c0fb8edd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 102 additions and 3 deletions

View File

@ -39,6 +39,11 @@ descriptor.
On Linux(>=3.15), the fcntl module exposes the ``F_OFD_GETLK``, ``F_OFD_SETLK`` On Linux(>=3.15), the fcntl module exposes the ``F_OFD_GETLK``, ``F_OFD_SETLK``
and ``F_OFD_SETLKW`` constants, which working with open file description locks. and ``F_OFD_SETLKW`` constants, which working with open file description locks.
.. versionchanged:: 3.10
On Linux >= 2.6.11, the fcntl module exposes the ``F_GETPIPE_SZ`` and
``F_SETPIPE_SZ`` constants, which allow to check and modify a pipe's size
respectively.
The module defines the following functions: The module defines the following functions:

View File

@ -341,7 +341,7 @@ functions.
startupinfo=None, creationflags=0, restore_signals=True, \ startupinfo=None, creationflags=0, restore_signals=True, \
start_new_session=False, pass_fds=(), \*, group=None, \ start_new_session=False, pass_fds=(), \*, group=None, \
extra_groups=None, user=None, umask=-1, \ extra_groups=None, user=None, umask=-1, \
encoding=None, errors=None, text=None) encoding=None, errors=None, text=None, pipesize=-1)
Execute a child program in a new process. On POSIX, the class uses Execute a child program in a new process. On POSIX, the class uses
:meth:`os.execvp`-like behavior to execute the child program. On Windows, :meth:`os.execvp`-like behavior to execute the child program. On Windows,
@ -625,6 +625,14 @@ functions.
* :data:`CREATE_DEFAULT_ERROR_MODE` * :data:`CREATE_DEFAULT_ERROR_MODE`
* :data:`CREATE_BREAKAWAY_FROM_JOB` * :data:`CREATE_BREAKAWAY_FROM_JOB`
*pipesize* can be used to change the size of the pipe when
:data:`PIPE` is used for *stdin*, *stdout* or *stderr*. The size of the pipe
is only changed on platforms that support this (only Linux at this time of
writing). Other platforms will ignore this parameter.
.. versionadded:: 3.10
The ``pipesize`` parameter was added.
Popen objects are supported as context managers via the :keyword:`with` statement: Popen objects are supported as context managers via the :keyword:`with` statement:
on exit, standard file descriptors are closed, and the process is waited for. on exit, standard file descriptors are closed, and the process is waited for.
:: ::

View File

@ -62,6 +62,11 @@ try:
import grp import grp
except ImportError: except ImportError:
grp = None grp = None
try:
import fcntl
except ImportError:
fcntl = None
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
"getoutput", "check_output", "run", "CalledProcessError", "DEVNULL", "getoutput", "check_output", "run", "CalledProcessError", "DEVNULL",
@ -756,7 +761,7 @@ class Popen(object):
startupinfo=None, creationflags=0, startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False, restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None, pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1): encoding=None, errors=None, text=None, umask=-1, pipesize=-1):
"""Create new Popen instance.""" """Create new Popen instance."""
_cleanup() _cleanup()
# Held while anything is calling waitpid before returncode has been # Held while anything is calling waitpid before returncode has been
@ -773,6 +778,11 @@ class Popen(object):
if not isinstance(bufsize, int): if not isinstance(bufsize, int):
raise TypeError("bufsize must be an integer") raise TypeError("bufsize must be an integer")
if pipesize is None:
pipesize = -1 # Restore default
if not isinstance(pipesize, int):
raise TypeError("pipesize must be an integer")
if _mswindows: if _mswindows:
if preexec_fn is not None: if preexec_fn is not None:
raise ValueError("preexec_fn is not supported on Windows " raise ValueError("preexec_fn is not supported on Windows "
@ -797,6 +807,7 @@ class Popen(object):
self.returncode = None self.returncode = None
self.encoding = encoding self.encoding = encoding
self.errors = errors self.errors = errors
self.pipesize = pipesize
# Validate the combinations of text and universal_newlines # Validate the combinations of text and universal_newlines
if (text is not None and universal_newlines is not None if (text is not None and universal_newlines is not None
@ -1575,6 +1586,8 @@ class Popen(object):
pass pass
elif stdin == PIPE: elif stdin == PIPE:
p2cread, p2cwrite = os.pipe() p2cread, p2cwrite = os.pipe()
if self.pipesize > 0 and hasattr(fcntl, "F_SETPIPE_SZ"):
fcntl.fcntl(p2cwrite, fcntl.F_SETPIPE_SZ, self.pipesize)
elif stdin == DEVNULL: elif stdin == DEVNULL:
p2cread = self._get_devnull() p2cread = self._get_devnull()
elif isinstance(stdin, int): elif isinstance(stdin, int):
@ -1587,6 +1600,8 @@ class Popen(object):
pass pass
elif stdout == PIPE: elif stdout == PIPE:
c2pread, c2pwrite = os.pipe() c2pread, c2pwrite = os.pipe()
if self.pipesize > 0 and hasattr(fcntl, "F_SETPIPE_SZ"):
fcntl.fcntl(c2pwrite, fcntl.F_SETPIPE_SZ, self.pipesize)
elif stdout == DEVNULL: elif stdout == DEVNULL:
c2pwrite = self._get_devnull() c2pwrite = self._get_devnull()
elif isinstance(stdout, int): elif isinstance(stdout, int):
@ -1599,6 +1614,8 @@ class Popen(object):
pass pass
elif stderr == PIPE: elif stderr == PIPE:
errread, errwrite = os.pipe() errread, errwrite = os.pipe()
if self.pipesize > 0 and hasattr(fcntl, "F_SETPIPE_SZ"):
fcntl.fcntl(errwrite, fcntl.F_SETPIPE_SZ, self.pipesize)
elif stderr == STDOUT: elif stderr == STDOUT:
if c2pwrite != -1: if c2pwrite != -1:
errwrite = c2pwrite errwrite = c2pwrite

View File

@ -190,6 +190,19 @@ class TestFcntl(unittest.TestCase):
res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected))) res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected)))
self.assertEqual(expected, res) self.assertEqual(expected, res)
@unittest.skipIf(not (hasattr(fcntl, "F_SETPIPE_SZ") and hasattr(fcntl, "F_GETPIPE_SZ")),
"F_SETPIPE_SZ and F_GETPIPE_SZ are not available on all unix platforms.")
def test_fcntl_f_pipesize(self):
test_pipe_r, test_pipe_w = os.pipe()
# Get the default pipesize with F_GETPIPE_SZ
pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
# Multiply the default with 2 to get a new value.
fcntl.fcntl(test_pipe_w, fcntl.F_SETPIPE_SZ, pipesize_default * 2)
self.assertEqual(fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ), pipesize_default * 2)
os.close(test_pipe_r)
os.close(test_pipe_w)
def test_main(): def test_main():
run_unittest(TestFcntl) run_unittest(TestFcntl)

View File

@ -39,6 +39,11 @@ try:
except ImportError: except ImportError:
grp = None grp = None
try:
import fcntl
except:
fcntl = None
if support.PGO: if support.PGO:
raise unittest.SkipTest("test is not helpful for PGO") raise unittest.SkipTest("test is not helpful for PGO")
@ -661,6 +666,46 @@ class ProcessTestCase(BaseTestCase):
p.wait() p.wait()
self.assertEqual(p.stdin, None) self.assertEqual(p.stdin, None)
def test_pipesizes(self):
# stdin redirection
pipesize = 16 * 1024
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdin.read(); sys.stdout.write("out"); sys.stderr.write("error!")'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
pipesize=pipesize)
# We only assert pipe size has changed on platforms that support it.
if sys.platform != "win32" and hasattr(fcntl, "F_GETPIPE_SZ"):
for fifo in [p.stdin, p.stdout, p.stderr]:
self.assertEqual(fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), pipesize)
# Windows pipe size can be acquired with the GetNamedPipeInfoFunction
# https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo
# However, this function is not yet in _winapi.
p.stdin.write(b"pear")
p.stdin.close()
p.wait()
def test_pipesize_default(self):
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdin.read(); sys.stdout.write("out");'
' sys.stderr.write("error!")'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
pipesize=-1)
# UNIX tests using fcntl
if sys.platform != "win32" and hasattr(fcntl, "F_GETPIPE_SZ"):
fp_r, fp_w = os.pipe()
default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ)
for fifo in [p.stdin, p.stdout, p.stderr]:
self.assertEqual(
fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), default_pipesize)
# On other platforms we cannot test the pipe size (yet). But above code
# using pipesize=-1 should not crash.
p.stdin.close()
p.wait()
def test_env(self): def test_env(self):
newenv = os.environ.copy() newenv = os.environ.copy()
newenv["FRUIT"] = "orange" newenv["FRUIT"] = "orange"
@ -3503,7 +3548,7 @@ class MiscTests(unittest.TestCase):
def test__all__(self): def test__all__(self):
"""Ensure that __all__ is populated properly.""" """Ensure that __all__ is populated properly."""
intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp"} intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"}
exported = set(subprocess.__all__) exported = set(subprocess.__all__)
possible_exports = set() possible_exports = set()
import types import types

View File

@ -1818,6 +1818,7 @@ Johannes Vogel
Michael Vogt Michael Vogt
Radu Voicilas Radu Voicilas
Alex Volkov Alex Volkov
Ruben Vorderman
Guido Vranken Guido Vranken
Martijn Vries Martijn Vries
Sjoerd de Vries Sjoerd de Vries

View File

@ -0,0 +1,2 @@
Add F_SETPIPE_SZ and F_GETPIPE_SZ to fcntl module. Allow setting pipesize on
subprocess.Popen.

View File

@ -565,6 +565,14 @@ all_ins(PyObject* m)
if (PyModule_AddIntMacro(m, F_SHLCK)) return -1; if (PyModule_AddIntMacro(m, F_SHLCK)) return -1;
#endif #endif
/* Linux specifics */
#ifdef F_SETPIPE_SZ
if (PyModule_AddIntMacro(m, F_SETPIPE_SZ)) return -1;
#endif
#ifdef F_GETPIPE_SZ
if (PyModule_AddIntMacro(m, F_GETPIPE_SZ)) return -1;
#endif
/* OS X specifics */ /* OS X specifics */
#ifdef F_FULLFSYNC #ifdef F_FULLFSYNC
if (PyModule_AddIntMacro(m, F_FULLFSYNC)) return -1; if (PyModule_AddIntMacro(m, F_FULLFSYNC)) return -1;