gh-119842: Honor PyOS_InputHook in the new REPL (GH-119843)

Signed-off-by: Pablo Galindo <pablogsal@gmail.com>
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
Co-authored-by: Michael Droettboom <mdboom@gmail.com>
This commit is contained in:
Pablo Galindo Salgado 2024-06-04 19:32:43 +01:00 committed by GitHub
parent bf5e1065f4
commit d9095194dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 144 additions and 11 deletions

View File

@ -33,6 +33,7 @@ TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import IO
from typing import Callable
@dataclass
@ -134,8 +135,15 @@ class Console(ABC):
...
@abstractmethod
def wait(self) -> None:
"""Wait for an event."""
def wait(self, timeout: float | None) -> bool:
"""Wait for an event. The return value is True if an event is
available, False if the timeout has been reached. If timeout is
None, wait forever. The timeout is in milliseconds."""
...
@property
def input_hook(self) -> Callable[[], int] | None:
"""Returns the current input hook."""
...
@abstractmethod

View File

@ -650,7 +650,15 @@ class Reader:
self.dirty = True
while True:
event = self.console.get_event(block)
input_hook = self.console.input_hook
if input_hook:
input_hook()
# We use the same timeout as in readline.c: 100ms
while not self.console.wait(100):
input_hook()
event = self.console.get_event(block=False)
else:
event = self.console.get_event(block)
if not event: # can only happen if we're not blocking
return False

View File

@ -118,9 +118,12 @@ except AttributeError:
def register(self, fd, flag):
self.fd = fd
def poll(self): # note: a 'timeout' argument would be *milliseconds*
r, w, e = select.select([self.fd], [], [])
# note: The 'timeout' argument is received as *milliseconds*
def poll(self, timeout: float | None = None) -> list[int]:
if timeout is None:
r, w, e = select.select([self.fd], [], [])
else:
r, w, e = select.select([self.fd], [], [], timeout/1000)
return r
poll = MinimalPoll # type: ignore[assignment]
@ -385,11 +388,11 @@ class UnixConsole(Console):
break
return self.event_queue.get()
def wait(self):
def wait(self, timeout: float | None = None) -> bool:
"""
Wait for events on the console.
"""
self.pollob.poll()
return bool(self.pollob.poll(timeout))
def set_cursor_vis(self, visible):
"""
@ -527,6 +530,15 @@ class UnixConsole(Console):
self.__posxy = 0, 0
self.screen = []
@property
def input_hook(self):
try:
import posix
except ImportError:
return None
if posix._is_inputhook_installed():
return posix._inputhook
def __enable_bracketed_paste(self) -> None:
os.write(self.output_fd, b"\x1b[?2004h")

View File

@ -23,6 +23,8 @@ import io
from multiprocessing import Value
import os
import sys
import time
import msvcrt
from abc import ABC, abstractmethod
from collections import deque
@ -202,6 +204,15 @@ class WindowsConsole(Console):
self.screen = screen
self.move_cursor(cx, cy)
@property
def input_hook(self):
try:
import nt
except ImportError:
return None
if nt._is_inputhook_installed():
return nt._inputhook
def __write_changed_line(
self, y: int, oldline: str, newline: str, px_coord: int
) -> None:
@ -460,9 +471,16 @@ class WindowsConsole(Console):
processed."""
return Event("key", "", b"")
def wait(self) -> None:
def wait(self, timeout: float | None) -> bool:
"""Wait for an event."""
raise NotImplementedError("No wait support")
# Poor man's Windows select loop
start_time = time.time()
while True:
if msvcrt.kbhit(): # type: ignore[attr-defined]
return True
if timeout and time.time() - start_time > timeout:
return False
time.sleep(0.01)
def repaint(self) -> None:
raise NotImplementedError("No repaint support")

View File

@ -2,8 +2,10 @@ import itertools
import functools
import rlcompleter
from unittest import TestCase
from unittest.mock import MagicMock, patch
from .support import handle_all_events, handle_events_narrow_console, code_to_events, prepare_reader
from test.support import import_helper
from _pyrepl.console import Event
from _pyrepl.reader import Reader
@ -179,6 +181,21 @@ class TestReader(TestCase):
self.assert_screen_equals(reader, expected)
self.assertTrue(reader.finished)
def test_input_hook_is_called_if_set(self):
input_hook = MagicMock()
def _prepare_console(events):
console = MagicMock()
console.get_event.side_effect = events
console.height = 100
console.width = 80
console.input_hook = input_hook
return console
events = code_to_events("a")
reader, _ = handle_all_events(events, prepare_console=_prepare_console)
self.assertEqual(len(input_hook.mock_calls), 4)
def test_keyboard_interrupt_clears_screen(self):
namespace = {"itertools": itertools}
code = "import itertools\nitertools."

View File

@ -0,0 +1 @@
Honor :c:func:`PyOS_InputHook` in the new REPL. Patch by Pablo Galindo

View File

@ -12116,6 +12116,42 @@ os__supports_virtual_terminal(PyObject *module, PyObject *Py_UNUSED(ignored))
#endif /* defined(MS_WINDOWS) */
PyDoc_STRVAR(os__inputhook__doc__,
"_inputhook($module, /)\n"
"--\n"
"\n"
"Calls PyOS_CallInputHook droppong the GIL first");
#define OS__INPUTHOOK_METHODDEF \
{"_inputhook", (PyCFunction)os__inputhook, METH_NOARGS, os__inputhook__doc__},
static PyObject *
os__inputhook_impl(PyObject *module);
static PyObject *
os__inputhook(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return os__inputhook_impl(module);
}
PyDoc_STRVAR(os__is_inputhook_installed__doc__,
"_is_inputhook_installed($module, /)\n"
"--\n"
"\n"
"Checks if PyOS_CallInputHook is set");
#define OS__IS_INPUTHOOK_INSTALLED_METHODDEF \
{"_is_inputhook_installed", (PyCFunction)os__is_inputhook_installed, METH_NOARGS, os__is_inputhook_installed__doc__},
static PyObject *
os__is_inputhook_installed_impl(PyObject *module);
static PyObject *
os__is_inputhook_installed(PyObject *module, PyObject *Py_UNUSED(ignored))
{
return os__is_inputhook_installed_impl(module);
}
#ifndef OS_TTYNAME_METHODDEF
#define OS_TTYNAME_METHODDEF
#endif /* !defined(OS_TTYNAME_METHODDEF) */
@ -12783,4 +12819,4 @@ os__supports_virtual_terminal(PyObject *module, PyObject *Py_UNUSED(ignored))
#ifndef OS__SUPPORTS_VIRTUAL_TERMINAL_METHODDEF
#define OS__SUPPORTS_VIRTUAL_TERMINAL_METHODDEF
#endif /* !defined(OS__SUPPORTS_VIRTUAL_TERMINAL_METHODDEF) */
/*[clinic end generated code: output=49c2d7a65f7a9f3b input=a9049054013a1b77]*/
/*[clinic end generated code: output=faaa5e5ffb7b165d input=a9049054013a1b77]*/

View File

@ -16784,6 +16784,37 @@ os__supports_virtual_terminal_impl(PyObject *module)
}
#endif
/*[clinic input]
os._inputhook
Calls PyOS_CallInputHook droppong the GIL first
[clinic start generated code]*/
static PyObject *
os__inputhook_impl(PyObject *module)
/*[clinic end generated code: output=525aca4ef3c6149f input=fc531701930d064f]*/
{
int result = 0;
if (PyOS_InputHook) {
Py_BEGIN_ALLOW_THREADS;
result = PyOS_InputHook();
Py_END_ALLOW_THREADS;
}
return PyLong_FromLong(result);
}
/*[clinic input]
os._is_inputhook_installed
Checks if PyOS_CallInputHook is set
[clinic start generated code]*/
static PyObject *
os__is_inputhook_installed_impl(PyObject *module)
/*[clinic end generated code: output=3b3eab4f672c689a input=ff177c9938dd76d8]*/
{
return PyBool_FromLong(PyOS_InputHook != NULL);
}
static PyMethodDef posix_methods[] = {
@ -16997,6 +17028,8 @@ static PyMethodDef posix_methods[] = {
OS__PATH_LEXISTS_METHODDEF
OS__SUPPORTS_VIRTUAL_TERMINAL_METHODDEF
OS__INPUTHOOK_METHODDEF
OS__IS_INPUTHOOK_INSTALLED_METHODDEF
{NULL, NULL} /* Sentinel */
};