Make test_subprocess work. Fix universal newlines in io.py.

This commit is contained in:
Guido van Rossum 2007-05-24 04:05:35 +00:00
parent c126e8aae3
commit fa0054aa73
3 changed files with 31 additions and 39 deletions

View File

@ -1114,14 +1114,6 @@ class TextIOWrapper(TextIOBase):
self._decoder = decoder
return orig_pos
def _simplify(self, u):
# XXX Hack until str/unicode unification: return str instead
# of unicode if it's all ASCII
try:
return str(u)
except UnicodeEncodeError:
return u
def read(self, n=None):
if n is None:
n = -1
@ -1131,7 +1123,7 @@ class TextIOWrapper(TextIOBase):
res += decoder.decode(self.buffer.read(), True)
self._pending = ""
self._snapshot = None
return self._simplify(res)
return res.replace("\r\n", "\n")
else:
while len(res) < n:
readahead, pending = self._read_chunk()
@ -1139,7 +1131,7 @@ class TextIOWrapper(TextIOBase):
if not readahead:
break
self._pending = res[n:]
return self._simplify(res[:n])
return res[:n].replace("\r\n", "\n")
def __next__(self):
self._telling = False
@ -1155,9 +1147,9 @@ class TextIOWrapper(TextIOBase):
# XXX Hack to support limit argument, for backwards compatibility
line = self.readline()
if len(line) <= limit:
return self._simplify(line)
return line
line, self._pending = line[:limit], line[limit:] + self._pending
return self._simplify(line)
return line
line = self._pending
start = 0
@ -1210,9 +1202,9 @@ class TextIOWrapper(TextIOBase):
# XXX Update self.newlines here if we want to support that
if self._fix_newlines and ending not in ("\n", ""):
return self._simplify(line[:endpos] + "\n")
return line[:endpos] + "\n"
else:
return self._simplify(line[:nextpos])
return line[:nextpos]
class StringIO(TextIOWrapper):

View File

@ -286,6 +286,7 @@ Popen(["/bin/mycmd", "myarg"], env={"PATH": "/usr/bin"})
import sys
mswindows = (sys.platform == "win32")
import io
import os
import traceback
@ -542,23 +543,23 @@ class Popen(object):
if bufsize == 0:
bufsize = 1 # Nearly unbuffered (XXX for now)
if p2cwrite is not None:
self.stdin = os.fdopen(p2cwrite, 'wb', bufsize)
self.stdin = io.open(p2cwrite, 'wb', bufsize)
if self.universal_newlines:
self.stdin = io.TextIOWrapper(self.stdin)
if c2pread is not None:
self.stdout = io.open(c2pread, 'rb', bufsize)
if universal_newlines:
self.stdout = os.fdopen(c2pread, 'rU', bufsize)
else:
self.stdout = os.fdopen(c2pread, 'rb', bufsize)
self.stdout = io.TextIOWrapper(self.stdout)
if errread is not None:
self.stderr = io.open(errread, 'rb', bufsize)
if universal_newlines:
self.stderr = os.fdopen(errread, 'rU', bufsize)
else:
self.stderr = os.fdopen(errread, 'rb', bufsize)
self.stderr = io.TextIOWrapper(self.stderr)
def _translate_newlines(self, data):
data = data.replace("\r\n", "\n")
data = data.replace("\r", "\n")
return data
return str(data)
def __del__(self, sys=sys):
@ -833,9 +834,9 @@ class Popen(object):
# impossible to combine with select (unless forcing no
# buffering).
if self.universal_newlines:
if stdout:
if stdout is not None:
stdout = self._translate_newlines(stdout)
if stderr:
if stderr is not None:
stderr = self._translate_newlines(stderr)
self.wait()
@ -1009,7 +1010,6 @@ class Popen(object):
if data:
os.waitpid(self.pid, 0)
child_exception = pickle.loads(data)
print("exc:", child_exception)
raise child_exception
@ -1108,9 +1108,9 @@ class Popen(object):
# impossible to combine with select (unless forcing no
# buffering).
if self.universal_newlines:
if stdout:
if stdout is not None:
stdout = self._translate_newlines(stdout)
if stderr:
if stderr is not None:
stderr = self._translate_newlines(stderr)
self.wait()

View File

@ -151,7 +151,7 @@ class ProcessTestCase(unittest.TestCase):
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=subprocess.PIPE)
self.assertEqual(p.stdout.read(), "orange")
self.assertEqual(p.stdout.read(), b"orange")
def test_stdout_filedes(self):
# stdout is set to open file descriptor
@ -172,7 +172,7 @@ class ProcessTestCase(unittest.TestCase):
stdout=tf)
p.wait()
tf.seek(0)
self.assertEqual(tf.read(), "orange")
self.assertEqual(tf.read(), b"orange")
def test_stderr_pipe(self):
# stderr redirection
@ -264,7 +264,7 @@ class ProcessTestCase(unittest.TestCase):
'sys.stdout.write(os.getenv("FRUIT"))'],
stdout=subprocess.PIPE,
env=newenv)
self.assertEqual(p.stdout.read(), "orange")
self.assertEqual(p.stdout.read(), b"orange")
def test_communicate_stdin(self):
p = subprocess.Popen([sys.executable, "-c",
@ -278,7 +278,7 @@ class ProcessTestCase(unittest.TestCase):
'import sys; sys.stdout.write("pineapple")'],
stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "pineapple")
self.assertEqual(stdout, b"pineapple")
self.assertEqual(stderr, None)
def test_communicate_stderr(self):
@ -353,7 +353,7 @@ class ProcessTestCase(unittest.TestCase):
'import sys,os;' + SETBINARY +
'sys.stdout.write("line1\\n");'
'sys.stdout.flush();'
'sys.stdout.write("line2\\r");'
'sys.stdout.write("line2\\n");'
'sys.stdout.flush();'
'sys.stdout.write("line3\\r\\n");'
'sys.stdout.flush();'
@ -373,7 +373,7 @@ class ProcessTestCase(unittest.TestCase):
'import sys,os;' + SETBINARY +
'sys.stdout.write("line1\\n");'
'sys.stdout.flush();'
'sys.stdout.write("line2\\r");'
'sys.stdout.write("line2\\n");'
'sys.stdout.flush();'
'sys.stdout.write("line3\\r\\n");'
'sys.stdout.flush();'
@ -385,7 +385,7 @@ class ProcessTestCase(unittest.TestCase):
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=1)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, b"line1\nline2\nline3\nline4\nline5\nline6")
self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6")
def test_no_leaking(self):
# Make sure we leak no resources
@ -460,10 +460,10 @@ class ProcessTestCase(unittest.TestCase):
#
if not mswindows:
def test_exceptions(self):
# catched & re-raised exceptions
# caught & re-raised exceptions
try:
p = subprocess.Popen([sys.executable, "-c", ""],
cwd="/this/path/does/not/exist")
cwd="/this/path/does/not/exist")
except OSError as e:
# The attribute child_traceback should contain "os.chdir"
# somewhere.
@ -511,7 +511,7 @@ class ProcessTestCase(unittest.TestCase):
'sys.stdout.write(os.getenv("FRUIT"))'],
stdout=subprocess.PIPE,
preexec_fn=lambda: os.putenv("FRUIT", "apple"))
self.assertEqual(p.stdout.read(), "apple")
self.assertEqual(p.stdout.read(), b"apple")
def test_args_string(self):
# args is a string
@ -544,7 +544,7 @@ class ProcessTestCase(unittest.TestCase):
p = subprocess.Popen(["echo $FRUIT"], shell=1,
stdout=subprocess.PIPE,
env=newenv)
self.assertEqual(p.stdout.read().strip(), "apple")
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_shell_string(self):
# Run command through the shell (string)
@ -553,7 +553,7 @@ class ProcessTestCase(unittest.TestCase):
p = subprocess.Popen("echo $FRUIT", shell=1,
stdout=subprocess.PIPE,
env=newenv)
self.assertEqual(p.stdout.read().strip(), "apple")
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_call_string(self):
# call() function with string argument on UNIX