mirror of https://github.com/python/cpython
gh-85984: Utilize new "winsize" functions from termios in pty tests. (#101831)
Utilize new functions termios.tcgetwinsize() and termios.tcsetwinsize in test_pty.py. Signed-off-by: Soumendra Ganguly <soumendraganguly@gmail.com> Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
parent
1d194235e4
commit
da2fb92643
|
@ -3,6 +3,8 @@ from test.support.import_helper import import_module
|
|||
|
||||
# Skip these tests if termios or fcntl are not available
|
||||
import_module('termios')
|
||||
# fcntl is a proxy for not being one of the wasm32 platforms even though we
|
||||
# don't use this module... a proper check for what crashes those is needed.
|
||||
import_module("fcntl")
|
||||
|
||||
import errno
|
||||
|
@ -15,20 +17,12 @@ import signal
|
|||
import socket
|
||||
import io # readline
|
||||
import unittest
|
||||
|
||||
import struct
|
||||
import fcntl
|
||||
import warnings
|
||||
|
||||
TEST_STRING_1 = b"I wish to buy a fish license.\n"
|
||||
TEST_STRING_2 = b"For my pet fish, Eric.\n"
|
||||
|
||||
try:
|
||||
_TIOCGWINSZ = tty.TIOCGWINSZ
|
||||
_TIOCSWINSZ = tty.TIOCSWINSZ
|
||||
_HAVE_WINSZ = True
|
||||
except AttributeError:
|
||||
_HAVE_WINSZ = False
|
||||
_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ")
|
||||
|
||||
if verbose:
|
||||
def debug(msg):
|
||||
|
@ -82,14 +76,6 @@ def expectedFailureIfStdinIsTTY(fun):
|
|||
pass
|
||||
return fun
|
||||
|
||||
def _get_term_winsz(fd):
|
||||
s = struct.pack("HHHH", 0, 0, 0, 0)
|
||||
return fcntl.ioctl(fd, _TIOCGWINSZ, s)
|
||||
|
||||
def _set_term_winsz(fd, winsz):
|
||||
fcntl.ioctl(fd, _TIOCSWINSZ, winsz)
|
||||
|
||||
|
||||
# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
|
||||
# because pty code is not too portable.
|
||||
class PtyTest(unittest.TestCase):
|
||||
|
@ -105,18 +91,14 @@ class PtyTest(unittest.TestCase):
|
|||
self.addCleanup(signal.alarm, 0)
|
||||
signal.alarm(10)
|
||||
|
||||
# Save original stdin window size
|
||||
self.stdin_rows = None
|
||||
self.stdin_cols = None
|
||||
# Save original stdin window size.
|
||||
self.stdin_dim = None
|
||||
if _HAVE_WINSZ:
|
||||
try:
|
||||
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
|
||||
self.stdin_rows = stdin_dim.lines
|
||||
self.stdin_cols = stdin_dim.columns
|
||||
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
|
||||
self.stdin_cols, 0, 0)
|
||||
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
|
||||
except OSError:
|
||||
self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO)
|
||||
self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO,
|
||||
self.stdin_dim)
|
||||
except tty.error:
|
||||
pass
|
||||
|
||||
def handle_sig(self, sig, frame):
|
||||
|
@ -131,41 +113,40 @@ class PtyTest(unittest.TestCase):
|
|||
try:
|
||||
mode = tty.tcgetattr(pty.STDIN_FILENO)
|
||||
except tty.error:
|
||||
# not a tty or bad/closed fd
|
||||
# Not a tty or bad/closed fd.
|
||||
debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
|
||||
mode = None
|
||||
|
||||
new_stdin_winsz = None
|
||||
if self.stdin_rows is not None and self.stdin_cols is not None:
|
||||
new_dim = None
|
||||
if self.stdin_dim:
|
||||
try:
|
||||
# Modify pty.STDIN_FILENO window size; we need to
|
||||
# check if pty.openpty() is able to set pty slave
|
||||
# window size accordingly.
|
||||
debug("Setting pty.STDIN_FILENO window size")
|
||||
debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})")
|
||||
target_stdin_rows = self.stdin_rows + 1
|
||||
target_stdin_cols = self.stdin_cols + 1
|
||||
debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})")
|
||||
target_stdin_winsz = struct.pack("HHHH", target_stdin_rows,
|
||||
target_stdin_cols, 0, 0)
|
||||
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)
|
||||
debug("Setting pty.STDIN_FILENO window size.")
|
||||
debug(f"original size: (row, col) = {self.stdin_dim}")
|
||||
target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1)
|
||||
debug(f"target size: (row, col) = {target_dim}")
|
||||
tty.tcsetwinsize(pty.STDIN_FILENO, target_dim)
|
||||
|
||||
# Were we able to set the window size
|
||||
# of pty.STDIN_FILENO successfully?
|
||||
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
|
||||
self.assertEqual(new_stdin_winsz, target_stdin_winsz,
|
||||
new_dim = tty.tcgetwinsize(pty.STDIN_FILENO)
|
||||
self.assertEqual(new_dim, target_dim,
|
||||
"pty.STDIN_FILENO window size unchanged")
|
||||
except OSError:
|
||||
warnings.warn("Failed to set pty.STDIN_FILENO window size")
|
||||
warnings.warn("Failed to set pty.STDIN_FILENO window size.")
|
||||
pass
|
||||
|
||||
try:
|
||||
debug("Calling pty.openpty()")
|
||||
try:
|
||||
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
|
||||
master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim,
|
||||
True)
|
||||
except TypeError:
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
|
||||
slave_name = None
|
||||
debug(f"Got {master_fd=}, {slave_fd=}, {slave_name=}")
|
||||
except OSError:
|
||||
# " An optional feature could not be imported " ... ?
|
||||
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
|
||||
|
@ -181,8 +162,8 @@ class PtyTest(unittest.TestCase):
|
|||
if mode:
|
||||
self.assertEqual(tty.tcgetattr(slave_fd), mode,
|
||||
"openpty() failed to set slave termios")
|
||||
if new_stdin_winsz:
|
||||
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
|
||||
if new_dim:
|
||||
self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim,
|
||||
"openpty() failed to set slave window size")
|
||||
|
||||
# Ensure the fd is non-blocking in case there's nothing to read.
|
||||
|
@ -367,9 +348,8 @@ class SmallPtyTests(unittest.TestCase):
|
|||
self.files.extend(socketpair)
|
||||
return socketpair
|
||||
|
||||
def _mock_select(self, rfds, wfds, xfds, timeout=0):
|
||||
def _mock_select(self, rfds, wfds, xfds):
|
||||
# This will raise IndexError when no more expected calls exist.
|
||||
# This ignores the timeout
|
||||
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
|
||||
return self.select_rfds_results.pop(0), [], []
|
||||
|
||||
|
@ -409,28 +389,6 @@ class SmallPtyTests(unittest.TestCase):
|
|||
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
|
||||
self.assertEqual(os.read(masters[1], 20), b'from stdin')
|
||||
|
||||
def test__copy_eof_on_all(self):
|
||||
"""Test the empty read EOF case on both master_fd and stdin."""
|
||||
read_from_stdout_fd, mock_stdout_fd = self._pipe()
|
||||
pty.STDOUT_FILENO = mock_stdout_fd
|
||||
mock_stdin_fd, write_to_stdin_fd = self._pipe()
|
||||
pty.STDIN_FILENO = mock_stdin_fd
|
||||
socketpair = self._socketpair()
|
||||
masters = [s.fileno() for s in socketpair]
|
||||
|
||||
socketpair[1].close()
|
||||
os.close(write_to_stdin_fd)
|
||||
|
||||
pty.select = self._mock_select
|
||||
self.select_rfds_lengths.append(2)
|
||||
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
|
||||
# We expect that both fds were removed from the fds list as they
|
||||
# both encountered an EOF before the second select call.
|
||||
self.select_rfds_lengths.append(0)
|
||||
|
||||
# We expect the function to return without error.
|
||||
self.assertEqual(pty._copy(masters[0]), None)
|
||||
|
||||
def test__restore_tty_mode_normal_return(self):
|
||||
"""Test that spawn resets the tty mode no when _copy returns normally."""
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Utilize new "winsize" functions from termios in pty tests.
|
Loading…
Reference in New Issue