asyncio, tulip issue 190: Process.communicate() must ignore BrokenPipeError

If you want to handle the BrokenPipeError, you can easily reimplement
communicate().

Add also a unit test to ensure that stdin.write() + stdin.drain() raises
BrokenPipeError.
This commit is contained in:
Victor Stinner 2014-07-17 12:25:27 +02:00
parent 38bf87c7f2
commit cc996b5789
3 changed files with 32 additions and 8 deletions

View File

@ -191,6 +191,10 @@ Process
process, or ``None``, if no data should be sent to the child. The type process, or ``None``, if no data should be sent to the child. The type
of *input* must be bytes. of *input* must be bytes.
If a :exc:`BrokenPipeError` is raised when writing *input* into stdin,
the exception is ignored. It occurs when the process exits before all
data are written into stdin.
:meth:`communicate` returns a tuple ``(stdoutdata, stderrdata)``. :meth:`communicate` returns a tuple ``(stdoutdata, stderrdata)``.
Note that if you want to send data to the process's stdin, you need to Note that if you want to send data to the process's stdin, you need to
@ -205,6 +209,9 @@ Process
This method is a :ref:`coroutine <coroutine>`. This method is a :ref:`coroutine <coroutine>`.
.. versionchanged:: 3.4.2
The method now ignores :exc:`BrokenPipeError`.
.. method:: kill() .. method:: kill()
Kills the child. On Posix OSs the function sends :py:data:`SIGKILL` to Kills the child. On Posix OSs the function sends :py:data:`SIGKILL` to

View File

@ -143,7 +143,11 @@ class Process:
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug('%r communicate: feed stdin (%s bytes)', logger.debug('%r communicate: feed stdin (%s bytes)',
self, len(input)) self, len(input))
yield from self.stdin.drain() try:
yield from self.stdin.drain()
except BrokenPipeError:
# ignore BrokenPipeError
pass
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug('%r communicate: close stdin', self) logger.debug('%r communicate: close stdin', self)

View File

@ -11,9 +11,6 @@ if sys.platform != 'win32':
# Program blocking # Program blocking
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
# Program sleeping during 1 second
PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)']
# Program copying input to output # Program copying input to output
PROGRAM_CAT = [ PROGRAM_CAT = [
sys.executable, '-c', sys.executable, '-c',
@ -118,16 +115,32 @@ class SubprocessMixin:
returncode = self.loop.run_until_complete(proc.wait()) returncode = self.loop.run_until_complete(proc.wait())
self.assertEqual(-signal.SIGHUP, returncode) self.assertEqual(-signal.SIGHUP, returncode)
def test_broken_pipe(self): def prepare_broken_pipe_test(self):
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE large_data = b'x' * support.PIPE_MAX_SIZE
# the program ends before the stdin can be feeded
create = asyncio.create_subprocess_exec( create = asyncio.create_subprocess_exec(
*PROGRAM_SLEEP_1SEC, sys.executable, '-c', 'pass',
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
loop=self.loop) loop=self.loop)
proc = self.loop.run_until_complete(create) proc = self.loop.run_until_complete(create)
with self.assertRaises(BrokenPipeError): return (proc, large_data)
self.loop.run_until_complete(proc.communicate(large_data))
def test_stdin_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# drain() must raise BrokenPipeError
proc.stdin.write(large_data)
self.assertRaises(BrokenPipeError,
self.loop.run_until_complete, proc.stdin.drain())
self.loop.run_until_complete(proc.wait())
def test_communicate_ignore_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# communicate() must ignore BrokenPipeError when feeding stdin
self.loop.run_until_complete(proc.communicate(large_data))
self.loop.run_until_complete(proc.wait()) self.loop.run_until_complete(proc.wait())