bpo-41818: Updated tests for the standard pty library (GH-22962)
This commit is contained in:
parent
be319c0c10
commit
c13d89955d
|
@ -14,9 +14,22 @@ import socket
|
|||
import io # readline
|
||||
import unittest
|
||||
|
||||
import struct
|
||||
import tty
|
||||
import fcntl
|
||||
import platform
|
||||
import warnings
|
||||
|
||||
TEST_STRING_1 = b"I wish to buy a fish license.\n"
|
||||
TEST_STRING_2 = b"For my pet fish, Eric.\n"
|
||||
|
||||
try:
|
||||
_TIOCGWINSZ = tty.TIOCGWINSZ
|
||||
_TIOCSWINSZ = tty.TIOCSWINSZ
|
||||
_HAVE_WINSZ = True
|
||||
except AttributeError:
|
||||
_HAVE_WINSZ = False
|
||||
|
||||
if verbose:
|
||||
def debug(msg):
|
||||
print(msg)
|
||||
|
@ -60,6 +73,27 @@ def _readline(fd):
|
|||
reader = io.FileIO(fd, mode='rb', closefd=False)
|
||||
return reader.readline()
|
||||
|
||||
def expectedFailureIfStdinIsTTY(fun):
|
||||
# avoid isatty() for now
|
||||
try:
|
||||
tty.tcgetattr(pty.STDIN_FILENO)
|
||||
return unittest.expectedFailure(fun)
|
||||
except tty.error:
|
||||
pass
|
||||
return fun
|
||||
|
||||
def expectedFailureOnBSD(fun):
|
||||
PLATFORM = platform.system()
|
||||
if PLATFORM.endswith("BSD") or PLATFORM == "Darwin":
|
||||
return unittest.expectedFailure(fun)
|
||||
return fun
|
||||
|
||||
def _get_term_winsz(fd):
|
||||
s = struct.pack("HHHH", 0, 0, 0, 0)
|
||||
return fcntl.ioctl(fd, _TIOCGWINSZ, s)
|
||||
|
||||
def _set_term_winsz(fd, winsz):
|
||||
fcntl.ioctl(fd, _TIOCSWINSZ, winsz)
|
||||
|
||||
|
||||
# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
|
||||
|
@ -78,6 +112,20 @@ class PtyTest(unittest.TestCase):
|
|||
self.addCleanup(signal.alarm, 0)
|
||||
signal.alarm(10)
|
||||
|
||||
# Save original stdin window size
|
||||
self.stdin_rows = None
|
||||
self.stdin_cols = None
|
||||
if _HAVE_WINSZ:
|
||||
try:
|
||||
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
|
||||
self.stdin_rows = stdin_dim.lines
|
||||
self.stdin_cols = stdin_dim.columns
|
||||
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
|
||||
self.stdin_cols, 0, 0)
|
||||
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def handle_sig(self, sig, frame):
|
||||
self.fail("isatty hung")
|
||||
|
||||
|
@ -86,26 +134,65 @@ class PtyTest(unittest.TestCase):
|
|||
# bpo-38547: if the process is the session leader, os.close(master_fd)
|
||||
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
|
||||
# signal: just ignore the signal.
|
||||
#
|
||||
# NOTE: the above comment is from an older version of the test;
|
||||
# master_open() is not being used anymore.
|
||||
pass
|
||||
|
||||
def test_basic(self):
|
||||
@expectedFailureIfStdinIsTTY
|
||||
def test_openpty(self):
|
||||
try:
|
||||
debug("Calling master_open()")
|
||||
master_fd, slave_name = pty.master_open()
|
||||
debug("Got master_fd '%d', slave_name '%s'" %
|
||||
(master_fd, slave_name))
|
||||
debug("Calling slave_open(%r)" % (slave_name,))
|
||||
slave_fd = pty.slave_open(slave_name)
|
||||
debug("Got slave_fd '%d'" % slave_fd)
|
||||
mode = tty.tcgetattr(pty.STDIN_FILENO)
|
||||
except tty.error:
|
||||
# not a tty or bad/closed fd
|
||||
debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
|
||||
mode = None
|
||||
|
||||
new_stdin_winsz = None
|
||||
if self.stdin_rows != None and self.stdin_cols != None:
|
||||
try:
|
||||
debug("Setting pty.STDIN_FILENO window size")
|
||||
# Set number of columns and rows to be the
|
||||
# floors of 1/5 of respective original values
|
||||
target_stdin_winsz = struct.pack("HHHH", self.stdin_rows//5,
|
||||
self.stdin_cols//5, 0, 0)
|
||||
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)
|
||||
|
||||
# Were we able to set the window size
|
||||
# of pty.STDIN_FILENO successfully?
|
||||
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
|
||||
self.assertEqual(new_stdin_winsz, target_stdin_winsz,
|
||||
"pty.STDIN_FILENO window size unchanged")
|
||||
except OSError:
|
||||
warnings.warn("Failed to set pty.STDIN_FILENO window size")
|
||||
pass
|
||||
|
||||
try:
|
||||
debug("Calling pty.openpty()")
|
||||
try:
|
||||
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
|
||||
except TypeError:
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
|
||||
except OSError:
|
||||
# " An optional feature could not be imported " ... ?
|
||||
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
|
||||
|
||||
self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
|
||||
self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")
|
||||
|
||||
if mode:
|
||||
self.assertEqual(tty.tcgetattr(slave_fd), mode,
|
||||
"openpty() failed to set slave termios")
|
||||
if new_stdin_winsz:
|
||||
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
|
||||
"openpty() failed to set slave window size")
|
||||
|
||||
# Solaris requires reading the fd before anything is returned.
|
||||
# My guess is that since we open and close the slave fd
|
||||
# in master_open(), we need to read the EOF.
|
||||
#
|
||||
# NOTE: the above comment is from an older version of the test;
|
||||
# master_open() is not being used anymore.
|
||||
|
||||
# Ensure the fd is non-blocking in case there's nothing to read.
|
||||
blocking = os.get_blocking(master_fd)
|
||||
|
@ -222,8 +309,20 @@ class PtyTest(unittest.TestCase):
|
|||
|
||||
os.close(master_fd)
|
||||
|
||||
# pty.fork() passed.
|
||||
@expectedFailureOnBSD
|
||||
def test_master_read(self):
|
||||
debug("Calling pty.openpty()")
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
|
||||
|
||||
debug("Closing slave_fd")
|
||||
os.close(slave_fd)
|
||||
|
||||
debug("Reading from master_fd")
|
||||
with self.assertRaises(OSError):
|
||||
os.read(master_fd, 1)
|
||||
|
||||
os.close(master_fd)
|
||||
|
||||
class SmallPtyTests(unittest.TestCase):
|
||||
"""These tests don't spawn children or hang."""
|
||||
|
@ -262,8 +361,9 @@ class SmallPtyTests(unittest.TestCase):
|
|||
self.files.extend(socketpair)
|
||||
return socketpair
|
||||
|
||||
def _mock_select(self, rfds, wfds, xfds):
|
||||
def _mock_select(self, rfds, wfds, xfds, timeout=0):
|
||||
# This will raise IndexError when no more expected calls exist.
|
||||
# This ignores the timeout
|
||||
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
|
||||
return self.select_rfds_results.pop(0), [], []
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Updated tests for the pty library. test_basic() has been changed to test_openpty(); this additionally checks if slave termios and slave winsize are being set properly by pty.openpty(). In order to add support for FreeBSD, NetBSD, OpenBSD, and Darwin, this also adds test_master_read(), which demonstrates that pty.spawn() should not depend on an OSError to exit from its copy loop.
|
Loading…
Reference in New Issue