Issue #24402: Merge potential test fix from 3.5
This commit is contained in:
commit
1431c5af94
|
@ -1493,21 +1493,14 @@ class PtyTests(unittest.TestCase):
|
|||
"""Tests that use a pseudo terminal to guarantee stdin and stdout are
|
||||
terminals in the test environment"""
|
||||
|
||||
def fork(self):
|
||||
def run_child(self, child, terminal_input):
|
||||
r, w = os.pipe() # Pipe test results from child back to parent
|
||||
try:
|
||||
return pty.fork()
|
||||
pid, fd = pty.fork()
|
||||
except (OSError, AttributeError) as e:
|
||||
self.skipTest("pty.fork() raised {}".format(e))
|
||||
|
||||
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
|
||||
if not sys.stdin.isatty() or not sys.stdout.isatty():
|
||||
self.skipTest("stdin and stdout must be ttys")
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
pid, fd = self.fork()
|
||||
except:
|
||||
os.close(r)
|
||||
os.close(w)
|
||||
self.skipTest("pty.fork() raised {}".format(e))
|
||||
raise
|
||||
if pid == 0:
|
||||
# Child
|
||||
|
@ -1515,17 +1508,8 @@ class PtyTests(unittest.TestCase):
|
|||
# Make sure we don't get stuck if there's a problem
|
||||
signal.alarm(2)
|
||||
os.close(r)
|
||||
# Check the error handlers are accounted for
|
||||
if stdio_encoding:
|
||||
sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
|
||||
encoding=stdio_encoding,
|
||||
errors='surrogateescape')
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
|
||||
encoding=stdio_encoding,
|
||||
errors='replace')
|
||||
with open(w, "w") as wpipe:
|
||||
print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
|
||||
print(ascii(input(prompt)), file=wpipe)
|
||||
child(wpipe)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
|
@ -1533,7 +1517,7 @@ class PtyTests(unittest.TestCase):
|
|||
os._exit(0)
|
||||
# Parent
|
||||
os.close(w)
|
||||
os.write(fd, terminal_input + b"\r\n")
|
||||
os.write(fd, terminal_input)
|
||||
# Get results from the pipe
|
||||
with open(r, "r") as rpipe:
|
||||
lines = []
|
||||
|
@ -1546,10 +1530,38 @@ class PtyTests(unittest.TestCase):
|
|||
# Check the result was got and corresponds to the user's terminal input
|
||||
if len(lines) != 2:
|
||||
# Something went wrong, try to get at stderr
|
||||
with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
|
||||
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
|
||||
% (len(lines), child_output.read()))
|
||||
# Beware of Linux raising EIO when the slave is closed
|
||||
child_output = bytearray()
|
||||
while True:
|
||||
try:
|
||||
chunk = os.read(fd, 3000)
|
||||
except OSError: # Assume EIO
|
||||
break
|
||||
if not chunk:
|
||||
break
|
||||
child_output.extend(chunk)
|
||||
os.close(fd)
|
||||
child_output = child_output.decode("ascii", "ignore")
|
||||
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
|
||||
% (len(lines), child_output))
|
||||
os.close(fd)
|
||||
return lines
|
||||
|
||||
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
|
||||
if not sys.stdin.isatty() or not sys.stdout.isatty():
|
||||
self.skipTest("stdin and stdout must be ttys")
|
||||
def child(wpipe):
|
||||
# Check the error handlers are accounted for
|
||||
if stdio_encoding:
|
||||
sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
|
||||
encoding=stdio_encoding,
|
||||
errors='surrogateescape')
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
|
||||
encoding=stdio_encoding,
|
||||
errors='replace')
|
||||
print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
|
||||
print(ascii(input(prompt)), file=wpipe)
|
||||
lines = self.run_child(child, terminal_input + b"\r\n")
|
||||
# Check we did exercise the GNU readline path
|
||||
self.assertIn(lines[0], {'tty = True', 'tty = False'})
|
||||
if lines[0] != 'tty = True':
|
||||
|
@ -1577,26 +1589,17 @@ class PtyTests(unittest.TestCase):
|
|||
def test_input_no_stdout_fileno(self):
|
||||
# Issue #24402: If stdin is the original terminal but stdout.fileno()
|
||||
# fails, do not use the original stdout file descriptor
|
||||
pid, pty = self.fork()
|
||||
if pid: # Parent process
|
||||
# Ideally this should read and write concurrently using select()
|
||||
# or similar, to avoid the possibility of a deadlock.
|
||||
os.write(pty, b"quux\r")
|
||||
_, status = os.waitpid(pid, 0)
|
||||
output = os.read(pty, 3000).decode("ascii", "backslashreplace")
|
||||
os.close(pty)
|
||||
self.assertEqual(status, 0, output)
|
||||
else: # Child process
|
||||
try:
|
||||
self.assertTrue(sys.stdin.isatty(), "stdin not a terminal")
|
||||
sys.stdout = io.StringIO() # Does not support fileno()
|
||||
input("prompt")
|
||||
self.assertEqual(sys.stdout.getvalue(), "prompt")
|
||||
os._exit(0) # Success!
|
||||
except:
|
||||
sys.excepthook(*sys.exc_info())
|
||||
finally:
|
||||
os._exit(1) # Failure
|
||||
def child(wpipe):
|
||||
print("stdin.isatty():", sys.stdin.isatty(), file=wpipe)
|
||||
sys.stdout = io.StringIO() # Does not support fileno()
|
||||
input("prompt")
|
||||
print("captured:", ascii(sys.stdout.getvalue()), file=wpipe)
|
||||
lines = self.run_child(child, b"quux\r")
|
||||
expected = (
|
||||
"stdin.isatty(): True",
|
||||
"captured: 'prompt'",
|
||||
)
|
||||
self.assertSequenceEqual(lines, expected)
|
||||
|
||||
class TestSorted(unittest.TestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue