Closes #22685, asyncio: Set the transport of stdout and stderr StreamReader

objects in the SubprocessStreamProtocol. It allows to pause the transport to
not buffer too much stdout or stderr data.
This commit is contained in:
Victor Stinner 2014-11-25 17:20:33 +01:00
parent c8c64e30e8
commit 5ef586f25a
2 changed files with 44 additions and 5 deletions

View File

@ -41,15 +41,22 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
def connection_made(self, transport):
self._transport = transport
if transport.get_pipe_transport(1):
stdout_transport = transport.get_pipe_transport(1)
if stdout_transport is not None:
self.stdout = streams.StreamReader(limit=self._limit,
loop=self._loop)
if transport.get_pipe_transport(2):
self.stdout.set_transport(stdout_transport)
stderr_transport = transport.get_pipe_transport(2)
if stderr_transport is not None:
self.stderr = streams.StreamReader(limit=self._limit,
loop=self._loop)
stdin = transport.get_pipe_transport(0)
if stdin is not None:
self.stdin = streams.StreamWriter(stdin,
self.stderr.set_transport(stderr_transport)
stdin_transport = transport.get_pipe_transport(0)
if stdin_transport is not None:
self.stdin = streams.StreamWriter(stdin_transport,
protocol=self,
reader=None,
loop=self._loop)

View File

@ -4,6 +4,7 @@ import asyncio
import signal
import sys
import unittest
from unittest import mock
from test import support
if sys.platform != 'win32':
from asyncio import unix_events
@ -161,6 +162,37 @@ class SubprocessMixin:
self.loop.run_until_complete(proc.communicate(large_data))
self.loop.run_until_complete(proc.wait())
def test_pause_reading(self):
@asyncio.coroutine
def test_pause_reading():
limit = 100
code = '\n'.join((
'import sys',
'sys.stdout.write("x" * %s)' % (limit * 2 + 1),
'sys.stdout.flush()',
))
proc = yield from asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
limit=limit,
loop=self.loop)
stdout_transport = proc._transport.get_pipe_transport(1)
stdout_transport.pause_reading = mock.Mock()
yield from proc.wait()
# The child process produced more than limit bytes of output,
# the stream reader transport should pause the protocol to not
# allocate too much memory.
return stdout_transport.pause_reading.called
# Issue #22685: Ensure that the stream reader pauses the protocol
# when the child process produces too much data
called = self.loop.run_until_complete(test_pause_reading())
self.assertTrue(called)
if sys.platform != 'win32':
# Unix