Issue #16182: Fix readline begidx, endidx, and use locale encoding

Based on patch by Serhiy Storchaka.
This commit is contained in:
Martin Panter 2016-06-14 01:16:16 +00:00
parent 19e21e4948
commit f00c49df10
3 changed files with 235 additions and 42 deletions

View File

@ -1,15 +1,23 @@
"""
Very minimal unittests for parts of the readline module.
"""
from contextlib import ExitStack
from errno import EIO
import os
import selectors
import subprocess
import sys
import tempfile
import unittest
from test.support import import_module, unlink
from test.support import import_module, unlink, TESTFN
from test.support.script_helper import assert_python_ok
# Skip tests if there is no readline module
readline = import_module('readline')
@unittest.skipUnless(hasattr(readline, "clear_history"),
"The history update test cannot be run because the "
"clear_history method is not available.")
class TestHistoryManipulation (unittest.TestCase):
"""
These tests were added to check that the libedit emulation on OSX and the
@ -17,9 +25,6 @@ class TestHistoryManipulation (unittest.TestCase):
why the tests cover only a small subset of the interface.
"""
@unittest.skipUnless(hasattr(readline, "clear_history"),
"The history update test cannot be run because the "
"clear_history method is not available.")
def testHistoryUpdates(self):
readline.clear_history()
@ -82,6 +87,21 @@ class TestHistoryManipulation (unittest.TestCase):
# write_history_file can create the target
readline.write_history_file(hfilename)
def test_nonascii_history(self):
readline.clear_history()
try:
readline.add_history("entrée 1")
except UnicodeEncodeError as err:
self.skipTest("Locale cannot encode test data: " + format(err))
readline.add_history("entrée 2")
readline.replace_history_item(1, "entrée 22")
readline.write_history_file(TESTFN)
self.addCleanup(os.remove, TESTFN)
readline.clear_history()
readline.read_history_file(TESTFN)
self.assertEqual(readline.get_history_item(1), "entrée 1")
self.assertEqual(readline.get_history_item(2), "entrée 22")
class TestReadline(unittest.TestCase):
@ -96,6 +116,119 @@ class TestReadline(unittest.TestCase):
TERM='xterm-256color')
self.assertEqual(stdout, b'')
def test_nonascii(self):
try:
readline.add_history("\xEB\xEF")
except UnicodeEncodeError as err:
self.skipTest("Locale cannot encode test data: " + format(err))
script = r"""import readline
if readline.__doc__ and "libedit" in readline.__doc__:
readline.parse_and_bind(r'bind ^B ed-prev-char')
readline.parse_and_bind(r'bind "\t" rl_complete')
readline.parse_and_bind('bind -s ^A "|t\xEB[after]"')
else:
readline.parse_and_bind(r'Control-b: backward-char')
readline.parse_and_bind(r'"\t": complete')
readline.parse_and_bind(r'set disable-completion off')
readline.parse_and_bind(r'set show-all-if-ambiguous off')
readline.parse_and_bind(r'set show-all-if-unmodified off')
readline.parse_and_bind('Control-a: "|t\xEB[after]"')
def pre_input_hook():
readline.insert_text("[\xEFnserted]")
readline.redisplay()
readline.set_pre_input_hook(pre_input_hook)
def completer(text, state):
if text == "t\xEB":
if state == 0:
print("text", ascii(text))
print("line", ascii(readline.get_line_buffer()))
print("indexes", readline.get_begidx(), readline.get_endidx())
return "t\xEBnt"
if state == 1:
return "t\xEBxt"
if text == "t\xEBx" and state == 0:
return "t\xEBxt"
return None
readline.set_completer(completer)
def display(substitution, matches, longest_match_length):
print("substitution", ascii(substitution))
print("matches", ascii(matches))
readline.set_completion_display_matches_hook(display)
print("result", ascii(input()))
print("history", ascii(readline.get_history_item(1)))
"""
input = b"\x01" # Ctrl-A, expands to "|t\xEB[after]"
input += b"\x02" * len("[after]") # Move cursor back
input += b"\t\t" # Display possible completions
input += b"x\t" # Complete "t\xEBx" -> "t\xEBxt"
input += b"\r"
output = run_pty(script, input)
self.assertIn(b"text 't\\xeb'\r\n", output)
self.assertIn(b"line '[\\xefnserted]|t\\xeb[after]'\r\n", output)
self.assertIn(b"indexes 11 13\r\n", output)
self.assertIn(b"substitution 't\\xeb'\r\n", output)
self.assertIn(b"matches ['t\\xebnt', 't\\xebxt']\r\n", output)
expected = br"'[\xefnserted]|t\xebxt[after]'"
self.assertIn(b"result " + expected + b"\r\n", output)
self.assertIn(b"history " + expected + b"\r\n", output)
def run_pty(script, input=b"dummy input\r"):
pty = import_module('pty')
output = bytearray()
[master, slave] = pty.openpty()
args = (sys.executable, '-c', script)
proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave)
os.close(slave)
with ExitStack() as cleanup:
cleanup.enter_context(proc)
def terminate(proc):
try:
proc.terminate()
except ProcessLookupError:
# Workaround for Open/Net BSD bug (Issue 16762)
pass
cleanup.callback(terminate, proc)
cleanup.callback(os.close, master)
# Avoid using DefaultSelector and PollSelector. Kqueue() does not
# work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open
# BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4
# either (Issue 20472). Hopefully the file descriptor is low enough
# to use with select().
sel = cleanup.enter_context(selectors.SelectSelector())
sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE)
os.set_blocking(master, False)
while True:
for [_, events] in sel.select():
if events & selectors.EVENT_READ:
try:
chunk = os.read(master, 0x10000)
except OSError as err:
# Linux raises EIO when slave is closed (Issue 5380)
if err.errno != EIO:
raise
chunk = b""
if not chunk:
return output
output.extend(chunk)
if events & selectors.EVENT_WRITE:
try:
input = input[os.write(master, input):]
except OSError as err:
# Apparently EIO means the slave was closed
if err.errno != EIO:
raise
input = b"" # Stop writing
if not input:
sel.modify(master, selectors.EVENT_READ)
if __name__ == "__main__":
unittest.main()

View File

@ -13,6 +13,10 @@ Core and Builtins
Library
-------
- Issue #16182: Fix various functions in the "readline" module to use the
locale encoding, and fix get_begidx() and get_endidx() to return code point
indexes.
What's New in Python 3.5.2 final?
=================================

View File

@ -128,20 +128,40 @@ static PyModuleDef readlinemodule;
#define readlinestate_global ((readlinestate *)PyModule_GetState(PyState_FindModule(&readlinemodule)))
/* Convert to/from multibyte C strings */
static PyObject *
encode(PyObject *b)
{
return PyUnicode_EncodeLocale(b, "surrogateescape");
}
static PyObject *
decode(const char *s)
{
return PyUnicode_DecodeLocale(s, "surrogateescape");
}
/* Exported function to send one line to readline's init file parser */
static PyObject *
parse_and_bind(PyObject *self, PyObject *args)
parse_and_bind(PyObject *self, PyObject *string)
{
char *s, *copy;
if (!PyArg_ParseTuple(args, "s:parse_and_bind", &s))
char *copy;
PyObject *encoded = encode(string);
if (encoded == NULL) {
return NULL;
}
/* Make a copy -- rl_parse_and_bind() modifies its argument */
/* Bernard Herzog */
copy = PyMem_Malloc(1 + strlen(s));
if (copy == NULL)
copy = PyMem_Malloc(1 + PyBytes_GET_SIZE(encoded));
if (copy == NULL) {
Py_DECREF(encoded);
return PyErr_NoMemory();
strcpy(copy, s);
}
strcpy(copy, PyBytes_AS_STRING(encoded));
Py_DECREF(encoded);
rl_parse_and_bind(copy);
PyMem_Free(copy); /* Free the copy */
Py_RETURN_NONE;
@ -441,17 +461,18 @@ get the ending index of the completion scope");
/* Set the tab-completion word-delimiters that readline uses */
static PyObject *
set_completer_delims(PyObject *self, PyObject *args)
set_completer_delims(PyObject *self, PyObject *string)
{
char *break_chars;
if (!PyArg_ParseTuple(args, "s:set_completer_delims", &break_chars)) {
PyObject *encoded = encode(string);
if (encoded == NULL) {
return NULL;
}
/* Keep a reference to the allocated memory in the module state in case
some other module modifies rl_completer_word_break_characters
(see issue #17289). */
break_chars = strdup(break_chars);
break_chars = strdup(PyBytes_AS_STRING(encoded));
Py_DECREF(encoded);
if (break_chars) {
free(completer_word_break_characters);
completer_word_break_characters = break_chars;
@ -531,10 +552,11 @@ static PyObject *
py_replace_history(PyObject *self, PyObject *args)
{
int entry_number;
char *line;
PyObject *line;
PyObject *encoded;
HIST_ENTRY *old_entry;
if (!PyArg_ParseTuple(args, "is:replace_history_item", &entry_number,
if (!PyArg_ParseTuple(args, "iU:replace_history_item", &entry_number,
&line)) {
return NULL;
}
@ -543,7 +565,12 @@ py_replace_history(PyObject *self, PyObject *args)
"History index cannot be negative");
return NULL;
}
old_entry = replace_history_entry(entry_number, line, (void *)NULL);
encoded = encode(line);
if (encoded == NULL) {
return NULL;
}
old_entry = replace_history_entry(entry_number, PyBytes_AS_STRING(encoded), (void *)NULL);
Py_DECREF(encoded);
if (!old_entry) {
PyErr_Format(PyExc_ValueError,
"No history item at position %d",
@ -562,14 +589,14 @@ replaces history item given by its position with contents of line");
/* Add a line to the history buffer */
static PyObject *
py_add_history(PyObject *self, PyObject *args)
py_add_history(PyObject *self, PyObject *string)
{
char *line;
if(!PyArg_ParseTuple(args, "s:add_history", &line)) {
PyObject *encoded = encode(string);
if (encoded == NULL) {
return NULL;
}
add_history(line);
add_history(PyBytes_AS_STRING(encoded));
Py_DECREF(encoded);
Py_RETURN_NONE;
}
@ -583,7 +610,7 @@ add an item to the history buffer");
static PyObject *
get_completer_delims(PyObject *self, PyObject *noarg)
{
return PyUnicode_FromString(rl_completer_word_break_characters);
return decode(rl_completer_word_break_characters);
}
PyDoc_STRVAR(doc_get_completer_delims,
@ -673,7 +700,7 @@ get_history_item(PyObject *self, PyObject *args)
}
#endif /* __APPLE__ */
if ((hist_ent = history_get(idx)))
return PyUnicode_FromString(hist_ent->line);
return decode(hist_ent->line);
else {
Py_RETURN_NONE;
}
@ -702,7 +729,7 @@ return the current (not the maximum) length of history.");
static PyObject *
get_line_buffer(PyObject *self, PyObject *noarg)
{
return PyUnicode_FromString(rl_line_buffer);
return decode(rl_line_buffer);
}
PyDoc_STRVAR(doc_get_line_buffer,
@ -730,12 +757,14 @@ Clear the current readline history.");
/* Exported function to insert text into the line buffer */
static PyObject *
insert_text(PyObject *self, PyObject *args)
insert_text(PyObject *self, PyObject *string)
{
char *s;
if (!PyArg_ParseTuple(args, "s:insert_text", &s))
PyObject *encoded = encode(string);
if (encoded == NULL) {
return NULL;
rl_insert_text(s);
}
rl_insert_text(PyBytes_AS_STRING(encoded));
Py_DECREF(encoded);
Py_RETURN_NONE;
}
@ -763,9 +792,9 @@ contents of the line buffer.");
static struct PyMethodDef readline_methods[] =
{
{"parse_and_bind", parse_and_bind, METH_VARARGS, doc_parse_and_bind},
{"parse_and_bind", parse_and_bind, METH_O, doc_parse_and_bind},
{"get_line_buffer", get_line_buffer, METH_NOARGS, doc_get_line_buffer},
{"insert_text", insert_text, METH_VARARGS, doc_insert_text},
{"insert_text", insert_text, METH_O, doc_insert_text},
{"redisplay", redisplay, METH_NOARGS, doc_redisplay},
{"read_init_file", read_init_file, METH_VARARGS, doc_read_init_file},
{"read_history_file", read_history_file,
@ -792,8 +821,8 @@ static struct PyMethodDef readline_methods[] =
{"get_endidx", get_endidx, METH_NOARGS, doc_get_endidx},
{"set_completer_delims", set_completer_delims,
METH_VARARGS, doc_set_completer_delims},
{"add_history", py_add_history, METH_VARARGS, doc_add_history},
METH_O, doc_set_completer_delims},
{"add_history", py_add_history, METH_O, doc_add_history},
{"remove_history_item", py_remove_history, METH_VARARGS, doc_remove_history},
{"replace_history_item", py_replace_history, METH_VARARGS, doc_replace_history},
{"get_completer_delims", get_completer_delims,
@ -890,7 +919,7 @@ on_completion_display_matches_hook(char **matches,
int num_matches, int max_length)
{
int i;
PyObject *m=NULL, *s=NULL, *r=NULL;
PyObject *sub, *m=NULL, *s=NULL, *r=NULL;
#ifdef WITH_THREAD
PyGILState_STATE gilstate = PyGILState_Ensure();
#endif
@ -898,16 +927,17 @@ on_completion_display_matches_hook(char **matches,
if (m == NULL)
goto error;
for (i = 0; i < num_matches; i++) {
s = PyUnicode_FromString(matches[i+1]);
s = decode(matches[i+1]);
if (s == NULL)
goto error;
if (PyList_SetItem(m, i, s) == -1)
goto error;
}
sub = decode(matches[0]);
r = PyObject_CallFunction(readlinestate_global->completion_display_matches_hook,
"sOi", matches[0], m, max_length);
"NNi", sub, m, max_length);
Py_DECREF(m); m=NULL;
m=NULL;
if (r == NULL ||
(r != Py_None && PyLong_AsLong(r) == -1 && PyErr_Occurred())) {
@ -955,22 +985,24 @@ on_completion(const char *text, int state)
{
char *result = NULL;
if (readlinestate_global->completer != NULL) {
PyObject *r;
PyObject *r = NULL, *t;
#ifdef WITH_THREAD
PyGILState_STATE gilstate = PyGILState_Ensure();
#endif
rl_attempted_completion_over = 1;
r = PyObject_CallFunction(readlinestate_global->completer, "si", text, state);
t = decode(text);
r = PyObject_CallFunction(readlinestate_global->completer, "Ni", t, state);
if (r == NULL)
goto error;
if (r == Py_None) {
result = NULL;
}
else {
char *s = _PyUnicode_AsString(r);
if (s == NULL)
PyObject *encoded = encode(r);
if (encoded == NULL)
goto error;
result = strdup(s);
result = strdup(PyBytes_AS_STRING(encoded));
Py_DECREF(encoded);
}
Py_DECREF(r);
goto done;
@ -994,6 +1026,9 @@ static char **
flex_complete(const char *text, int start, int end)
{
char **result;
char saved;
size_t start_size, end_size;
wchar_t *s;
#ifdef WITH_THREAD
PyGILState_STATE gilstate = PyGILState_Ensure();
#endif
@ -1003,6 +1038,27 @@ flex_complete(const char *text, int start, int end)
#ifdef HAVE_RL_COMPLETION_SUPPRESS_APPEND
rl_completion_suppress_append = 0;
#endif
saved = rl_line_buffer[start];
rl_line_buffer[start] = 0;
s = Py_DecodeLocale(rl_line_buffer, &start_size);
rl_line_buffer[start] = saved;
if (s == NULL) {
goto done;
}
PyMem_RawFree(s);
saved = rl_line_buffer[end];
rl_line_buffer[end] = 0;
s = Py_DecodeLocale(rl_line_buffer + start, &end_size);
rl_line_buffer[end] = saved;
if (s == NULL) {
goto done;
}
PyMem_RawFree(s);
start = (int)start_size;
end = start + (int)end_size;
done:
Py_XDECREF(readlinestate_global->begidx);
Py_XDECREF(readlinestate_global->endidx);
readlinestate_global->begidx = PyLong_FromLong((long) start);