asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data exceeds buffer limit.

This commit is contained in:
Guido van Rossum 2014-05-12 10:04:37 -07:00
parent a869fd3dc0
commit bf88ffba5e
2 changed files with 47 additions and 6 deletions

View File

@ -419,12 +419,17 @@ class StreamReader:
return b''
if n < 0:
while not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
blocks = []
while True:
block = yield from self.read(self._limit)
if not block:
break
blocks.append(block)
return b''.join(blocks)
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')

View File

@ -1,7 +1,9 @@
"""Tests for streams.py."""
import gc
import os
import socket
import sys
import unittest
from unittest import mock
try:
@ -583,6 +585,40 @@ class StreamReaderTests(unittest.TestCase):
server.stop()
self.assertEqual(msg, b"hello world!\n")
@unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self):
# See Tulip issue 168. This test is derived from the example
# subprocess_attach_read_pipe.py, but we configure the
# StreamReader's limit so that twice it is less than the size
# of the data writter. Also we must explicitly attach a child
# watcher to the event loop.
watcher = asyncio.get_child_watcher()
watcher.attach_loop(self.loop)
code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
rfd, wfd = os.pipe()
args = [sys.executable, '-c', code, str(wfd)]
pipe = open(rfd, 'rb', 0)
reader = asyncio.StreamReader(loop=self.loop, limit=1)
protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
transport, _ = self.loop.run_until_complete(
self.loop.connect_read_pipe(lambda: protocol, pipe))
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
self.loop.run_until_complete(proc.wait())
os.close(wfd)
data = self.loop.run_until_complete(reader.read(-1))
self.assertEqual(data, b'data')
if __name__ == '__main__':
unittest.main()