From 74bfb53e3afb6f5dd90dff3ef0e2dc3b2fba823e Mon Sep 17 00:00:00 2001 From: "Alexander P." <37912520+aplaikner@users.noreply.github.com> Date: Sat, 31 Aug 2024 07:57:22 +0200 Subject: [PATCH] gh-121313: Limit the reading size from pipes to their default buffer size on POSIX systems (GH-121315) See https://github.com/python/cpython/issues/121313 for analysis, but this greatly reduces memory overallocation and overhead when multiprocessing is sending non-small data over its pipes between processes. --- Lib/multiprocessing/connection.py | 21 ++++++++++++++++--- ...-07-03-10-11-53.gh-issue-121313.D7gARW.rst | 1 + 2 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index b7e1e132172..d84b52fe6d4 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -11,13 +11,14 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import errno import io +import itertools import os +import stat import sys import socket import struct -import time import tempfile -import itertools +import time from . import util @@ -360,6 +361,11 @@ if _winapi: f.write(ov.getbuffer()) return f +""" +The default size of a pipe on Linux systems is 16 times the base page size: +https://man7.org/linux/man-pages/man7/pipe.7.html +""" +PAGES_PER_PIPE = 16 class Connection(_ConnectionBase): """ @@ -372,11 +378,14 @@ class Connection(_ConnectionBase): _close(self._handle) _write = _multiprocessing.send _read = _multiprocessing.recv + _default_pipe_size = 0 else: def _close(self, _close=os.close): _close(self._handle) _write = os.write _read = os.read + _base_page_size = os.sysconf(os.sysconf_names['SC_PAGESIZE']) + _default_pipe_size = _base_page_size * PAGES_PER_PIPE def _send(self, buf, write=_write): remaining = len(buf) @@ -391,8 +400,14 @@ class Connection(_ConnectionBase): buf = io.BytesIO() handle = self._handle remaining = size + is_pipe = False + if size > self._default_pipe_size > 0: + mode = os.fstat(handle).st_mode + is_pipe = stat.S_ISFIFO(mode) + limit = self._default_pipe_size if is_pipe else remaining while remaining > 0: - chunk = read(handle, remaining) + to_read = min(limit, remaining) + chunk = read(handle, to_read) n = len(chunk) if n == 0: if remaining == size: diff --git a/Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst b/Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst new file mode 100644 index 00000000000..06abce9e67d --- /dev/null +++ b/Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst @@ -0,0 +1 @@ +Limit reading size in multiprocessing connection._recv for pipes to default pipe size of 16 times base page size, in order to avoid memory overallocation and unnecessary memory management system calls.