gh-123856: Fix PyREPL failure when a keyboard interrupt is triggered after using a history search (#124396)

Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
Emily Morehouse 2024-09-25 11:22:03 -07:00 committed by GitHub
parent 28efeefab7
commit c1600c78e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 115 additions and 90 deletions

View File

@ -28,6 +28,7 @@ from __future__ import annotations
import _sitebuiltins import _sitebuiltins
import linecache import linecache
import functools import functools
import os
import sys import sys
import code import code
@ -50,7 +51,9 @@ def check() -> str:
try: try:
_get_reader() _get_reader()
except _error as e: except _error as e:
return str(e) or repr(e) or "unknown error" if term := os.environ.get("TERM", ""):
term = f"; TERM={term}"
return str(str(e) or repr(e) or "unknown error") + term
return "" return ""
@ -159,10 +162,8 @@ def run_multiline_interactive_console(
input_n += 1 input_n += 1
except KeyboardInterrupt: except KeyboardInterrupt:
r = _get_reader() r = _get_reader()
if r.last_command and 'isearch' in r.last_command.__name__: if r.input_trans is r.isearch_trans:
r.isearch_direction = '' r.do_cmd(("isearch-end", [""]))
r.console.forgetinput()
r.pop_input_trans()
r.pos = len(r.get_unicode()) r.pos = len(r.get_unicode())
r.dirty = True r.dirty = True
r.refresh() r.refresh()

View File

@ -8,7 +8,7 @@ import select
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
from unittest import TestCase, skipUnless from unittest import TestCase, skipUnless, skipIf
from unittest.mock import patch from unittest.mock import patch
from test.support import force_not_colorized from test.support import force_not_colorized
from test.support import SHORT_TIMEOUT from test.support import SHORT_TIMEOUT
@ -35,6 +35,94 @@ try:
except ImportError: except ImportError:
pty = None pty = None
class ReplTestCase(TestCase):
def run_repl(
self,
repl_input: str | list[str],
env: dict | None = None,
*,
cmdline_args: list[str] | None = None,
cwd: str | None = None,
) -> tuple[str, int]:
temp_dir = None
if cwd is None:
temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
cwd = temp_dir.name
try:
return self._run_repl(
repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd
)
finally:
if temp_dir is not None:
temp_dir.cleanup()
def _run_repl(
self,
repl_input: str | list[str],
*,
env: dict | None,
cmdline_args: list[str] | None,
cwd: str,
) -> tuple[str, int]:
assert pty
master_fd, slave_fd = pty.openpty()
cmd = [sys.executable, "-i", "-u"]
if env is None:
cmd.append("-I")
elif "PYTHON_HISTORY" not in env:
env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
if cmdline_args is not None:
cmd.extend(cmdline_args)
try:
import termios
except ModuleNotFoundError:
pass
else:
term_attr = termios.tcgetattr(slave_fd)
term_attr[6][termios.VREPRINT] = 0 # pass through CTRL-R
term_attr[6][termios.VINTR] = 0 # pass through CTRL-C
termios.tcsetattr(slave_fd, termios.TCSANOW, term_attr)
process = subprocess.Popen(
cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=cwd,
text=True,
close_fds=True,
env=env if env else os.environ,
)
os.close(slave_fd)
if isinstance(repl_input, list):
repl_input = "\n".join(repl_input) + "\n"
os.write(master_fd, repl_input.encode("utf-8"))
output = []
while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
try:
data = os.read(master_fd, 1024).decode("utf-8")
if not data:
break
except OSError:
break
output.append(data)
else:
os.close(master_fd)
process.kill()
self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
os.close(master_fd)
try:
exit_code = process.wait(timeout=SHORT_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
exit_code = process.wait()
return "".join(output), exit_code
class TestCursorPosition(TestCase): class TestCursorPosition(TestCase):
def prepare_reader(self, events): def prepare_reader(self, events):
console = FakeConsole(events) console = FakeConsole(events)
@ -968,7 +1056,20 @@ class TestPasteEvent(TestCase):
@skipUnless(pty, "requires pty") @skipUnless(pty, "requires pty")
class TestMain(TestCase): class TestDumbTerminal(ReplTestCase):
def test_dumb_terminal_exits_cleanly(self):
env = os.environ.copy()
env.update({"TERM": "dumb"})
output, exit_code = self.run_repl("exit()\n", env=env)
self.assertEqual(exit_code, 0)
self.assertIn("warning: can't use pyrepl", output)
self.assertNotIn("Exception", output)
self.assertNotIn("Traceback", output)
@skipUnless(pty, "requires pty")
@skipIf((os.environ.get("TERM") or "dumb") == "dumb", "can't use pyrepl in dumb terminal")
class TestMain(ReplTestCase):
def setUp(self): def setUp(self):
# Cleanup from PYTHON* variables to isolate from local # Cleanup from PYTHON* variables to isolate from local
# user settings, see #121359. Such variables should be # user settings, see #121359. Such variables should be
@ -1078,15 +1179,6 @@ class TestMain(TestCase):
} }
self._run_repl_globals_test(expectations, as_module=True) self._run_repl_globals_test(expectations, as_module=True)
def test_dumb_terminal_exits_cleanly(self):
env = os.environ.copy()
env.update({"TERM": "dumb"})
output, exit_code = self.run_repl("exit()\n", env=env)
self.assertEqual(exit_code, 0)
self.assertIn("warning: can't use pyrepl", output)
self.assertNotIn("Exception", output)
self.assertNotIn("Traceback", output)
@force_not_colorized @force_not_colorized
def test_python_basic_repl(self): def test_python_basic_repl(self):
env = os.environ.copy() env = os.environ.copy()
@ -1209,80 +1301,6 @@ class TestMain(TestCase):
self.assertIn("in x3", output) self.assertIn("in x3", output)
self.assertIn("in <module>", output) self.assertIn("in <module>", output)
def run_repl(
self,
repl_input: str | list[str],
env: dict | None = None,
*,
cmdline_args: list[str] | None = None,
cwd: str | None = None,
) -> tuple[str, int]:
temp_dir = None
if cwd is None:
temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
cwd = temp_dir.name
try:
return self._run_repl(
repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd
)
finally:
if temp_dir is not None:
temp_dir.cleanup()
def _run_repl(
self,
repl_input: str | list[str],
*,
env: dict | None,
cmdline_args: list[str] | None,
cwd: str,
) -> tuple[str, int]:
assert pty
master_fd, slave_fd = pty.openpty()
cmd = [sys.executable, "-i", "-u"]
if env is None:
cmd.append("-I")
elif "PYTHON_HISTORY" not in env:
env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
if cmdline_args is not None:
cmd.extend(cmdline_args)
process = subprocess.Popen(
cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=cwd,
text=True,
close_fds=True,
env=env if env else os.environ,
)
os.close(slave_fd)
if isinstance(repl_input, list):
repl_input = "\n".join(repl_input) + "\n"
os.write(master_fd, repl_input.encode("utf-8"))
output = []
while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
try:
data = os.read(master_fd, 1024).decode("utf-8")
if not data:
break
except OSError:
break
output.append(data)
else:
os.close(master_fd)
process.kill()
self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
os.close(master_fd)
try:
exit_code = process.wait(timeout=SHORT_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
exit_code = process.wait()
return "".join(output), exit_code
def test_readline_history_file(self): def test_readline_history_file(self):
# skip, if readline module is not available # skip, if readline module is not available
readline = import_module('readline') readline = import_module('readline')
@ -1305,3 +1323,7 @@ class TestMain(TestCase):
output, exit_code = self.run_repl("exit\n", env=env) output, exit_code = self.run_repl("exit\n", env=env)
self.assertEqual(exit_code, 0) self.assertEqual(exit_code, 0)
self.assertNotIn("\\040", pathlib.Path(hfile.name).read_text()) self.assertNotIn("\\040", pathlib.Path(hfile.name).read_text())
def test_keyboard_interrupt_after_isearch(self):
output, exit_code = self.run_repl(["\x12", "\x03", "exit"])
self.assertEqual(exit_code, 0)

View File

@ -0,0 +1,2 @@
Fix PyREPL failure when a keyboard interrupt is triggered after using a
history search