diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index c4e5ba2061c..043359bbd03 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -147,15 +147,17 @@ class Process: async def _feed_stdin(self, input): debug = self._loop.get_debug() - if input is not None: - self.stdin.write(input) - if debug: - logger.debug( - '%r communicate: feed stdin (%s bytes)', self, len(input)) try: + if input is not None: + self.stdin.write(input) + if debug: + logger.debug( + '%r communicate: feed stdin (%s bytes)', self, len(input)) + await self.stdin.drain() except (BrokenPipeError, ConnectionResetError) as exc: - # communicate() ignores BrokenPipeError and ConnectionResetError + # communicate() ignores BrokenPipeError and ConnectionResetError. + # write() and drain() can raise these exceptions. if debug: logger.debug('%r communicate: stdin got %r', self, exc) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 429ef16fdb0..dc5a48d500e 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -1,6 +1,7 @@ import os import signal import sys +import textwrap import unittest import warnings from unittest import mock @@ -12,9 +13,14 @@ from test.test_asyncio import utils as test_utils from test import support from test.support import os_helper -if sys.platform != 'win32': + +MS_WINDOWS = (sys.platform == 'win32') +if MS_WINDOWS: + import msvcrt +else: from asyncio import unix_events + if support.check_sanitizer(address=True): raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI") @@ -270,26 +276,43 @@ class SubprocessMixin: finally: signal.signal(signal.SIGHUP, old_handler) - def prepare_broken_pipe_test(self): + def test_stdin_broken_pipe(self): # buffer large enough to feed the whole pipe buffer large_data = b'x' * support.PIPE_MAX_SIZE + rfd, wfd = os.pipe() + self.addCleanup(os.close, rfd) + self.addCleanup(os.close, wfd) + if MS_WINDOWS: + handle = msvcrt.get_osfhandle(rfd) + os.set_handle_inheritable(handle, True) + code = textwrap.dedent(f''' + import os, msvcrt + handle = {handle} + fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) + os.read(fd, 1) + ''') + from subprocess import STARTUPINFO + startupinfo = STARTUPINFO() + startupinfo.lpAttributeList = {"handle_list": [handle]} + kwargs = dict(startupinfo=startupinfo) + else: + code = f'import os; fd = {rfd}; os.read(fd, 1)' + kwargs = dict(pass_fds=(rfd,)) + # the program ends before the stdin can be fed proc = self.loop.run_until_complete( asyncio.create_subprocess_exec( - sys.executable, '-c', 'pass', + sys.executable, '-c', code, stdin=subprocess.PIPE, + **kwargs ) ) - return (proc, large_data) - - def test_stdin_broken_pipe(self): - proc, large_data = self.prepare_broken_pipe_test() - async def write_stdin(proc, data): - await asyncio.sleep(0.5) proc.stdin.write(data) + # Only exit the child process once the write buffer is filled + os.write(wfd, b'go') await proc.stdin.drain() coro = write_stdin(proc, large_data) @@ -300,7 +323,16 @@ class SubprocessMixin: self.loop.run_until_complete(proc.wait()) def test_communicate_ignore_broken_pipe(self): - proc, large_data = self.prepare_broken_pipe_test() + # buffer large enough to feed the whole pipe buffer + large_data = b'x' * support.PIPE_MAX_SIZE + + # the program ends before the stdin can be fed + proc = self.loop.run_until_complete( + asyncio.create_subprocess_exec( + sys.executable, '-c', 'pass', + stdin=subprocess.PIPE, + ) + ) # communicate() must ignore BrokenPipeError when feeding stdin self.loop.set_exception_handler(lambda loop, msg: None)