Issue #15592. Fix regression: subprocess.communicate() breaks on no input with universal newlines true.

Patch by Chris Jerdonek.
This commit is contained in:
Andrew Svetlov 2012-08-14 18:40:21 +03:00
commit aa0dbdc2dd
2 changed files with 25 additions and 14 deletions

View File

@ -1536,6 +1536,17 @@ class Popen(object):
return (stdout, stderr)
def _save_input(self, input):
# This method is called from the _communicate_with_*() methods
# so that if we time out while communicating, we can continue
# sending input if we retry.
if self.stdin and self._input is None:
self._input_offset = 0
self._input = input
if self.universal_newlines and input is not None:
self._input = self._input.encode(self.stdin.encoding)
def _communicate_with_poll(self, input, endtime, orig_timeout):
stdout = None # Return
stderr = None # Return
@ -1572,13 +1583,7 @@ class Popen(object):
register_and_append(self.stderr, select_POLLIN_POLLPRI)
stderr = self._fd2output[self.stderr.fileno()]
# Save the input here so that if we time out while communicating,
# we can continue sending input if we retry.
if self.stdin and self._input is None:
self._input_offset = 0
self._input = input
if self.universal_newlines:
self._input = self._input.encode(self.stdin.encoding)
self._save_input(input)
while self._fd2file:
timeout = self._remaining_time(endtime)
@ -1632,11 +1637,7 @@ class Popen(object):
if self.stderr:
self._read_set.append(self.stderr)
if self.stdin and self._input is None:
self._input_offset = 0
self._input = input
if self.universal_newlines:
self._input = self._input.encode(self.stdin.encoding)
self._save_input(input)
stdout = None # Return
stderr = None # Return

View File

@ -615,8 +615,6 @@ class ProcessTestCase(BaseTestCase):
universal_newlines=1)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
# BUG: can't give a non-empty stdin because it breaks both the
# select- and poll-based communicate() implementations.
(stdout, stderr) = p.communicate()
self.assertEqual(stdout,
"line2\nline4\nline5\nline6\nline7\nline8")
@ -635,6 +633,18 @@ class ProcessTestCase(BaseTestCase):
(stdout, stderr) = p.communicate("line1\nline3\n")
self.assertEqual(p.returncode, 0)
def test_universal_newlines_communicate_input_none(self):
# Test communicate(input=None) with universal newlines.
#
# We set stdout to PIPE because, as of this writing, a different
# code path is tested when the number of pipes is zero or one.
p = subprocess.Popen([sys.executable, "-c", "pass"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True)
p.communicate()
self.assertEqual(p.returncode, 0)
def test_no_leaking(self):
# Make sure we leak no resources
if not mswindows: