From ee06fffd38cb51ce1c045da9d8336d9ce13c318a Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Tue, 14 Nov 2023 17:37:56 +0200 Subject: [PATCH] gh-111942: Fix crashes in TextIOWrapper.reconfigure() (GH-111976) * Fix crash when encoding is not string or None. * Fix crash when both line_buffering and write_through raise exception when converted ti int. * Add a number of tests for constructor and reconfigure() method with invalid arguments. --- Lib/test/test_io.py | 86 ++++++++++++++++++- ...-11-10-22-08-28.gh-issue-111942.MDFm6v.rst | 2 + Modules/_io/textio.c | 39 ++++++++- 3 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2023-11-10-22-08-28.gh-issue-111942.MDFm6v.rst diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 3c3be870fac..fe622e836dd 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -77,6 +77,10 @@ requires_alarm = unittest.skipUnless( ) +class BadIndex: + def __index__(self): + 1/0 + class MockRawIOWithoutRead: """A RawIO implementation without read(), so as to exercise the default RawIO.read() which calls readinto().""" @@ -2709,8 +2713,31 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) - self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42) - self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') + invalid_type = TypeError if self.is_C else ValueError + with self.assertRaises(invalid_type): + t.__init__(b, encoding=42) + with self.assertRaises(UnicodeEncodeError): + t.__init__(b, encoding='\udcfe') + with self.assertRaises(ValueError): + t.__init__(b, encoding='utf-8\0') + with self.assertRaises(invalid_type): + t.__init__(b, encoding="utf-8", errors=42) + if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: + with self.assertRaises(UnicodeEncodeError): + t.__init__(b, encoding="utf-8", errors='\udcfe') + if support.Py_DEBUG or sys.flags.dev_mode: + # TODO: If encoded to UTF-8, should also be checked for + # embedded null characters. + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", errors='replace\0') + with self.assertRaises(TypeError): + t.__init__(b, encoding="utf-8", newline=42) + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", newline='\udcfe') + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", newline='\n\0') + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", newline='xyzzy') def test_uninitialized(self): t = self.TextIOWrapper.__new__(self.TextIOWrapper) @@ -3756,6 +3783,59 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') + def test_reconfigure_errors(self): + txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r') + with self.assertRaises(TypeError): # there was a crash + txt.reconfigure(encoding=42) + if self.is_C: + with self.assertRaises(UnicodeEncodeError): + txt.reconfigure(encoding='\udcfe') + with self.assertRaises(LookupError): + txt.reconfigure(encoding='locale\0') + # TODO: txt.reconfigure(encoding='utf-8\0') + # TODO: txt.reconfigure(encoding='nonexisting') + with self.assertRaises(TypeError): + txt.reconfigure(errors=42) + if self.is_C: + with self.assertRaises(UnicodeEncodeError): + txt.reconfigure(errors='\udcfe') + # TODO: txt.reconfigure(errors='ignore\0') + # TODO: txt.reconfigure(errors='nonexisting') + with self.assertRaises(TypeError): + txt.reconfigure(newline=42) + with self.assertRaises(ValueError): + txt.reconfigure(newline='\udcfe') + with self.assertRaises(ValueError): + txt.reconfigure(newline='xyz') + if not self.is_C: + # TODO: Should fail in C too. + with self.assertRaises(ValueError): + txt.reconfigure(newline='\n\0') + if self.is_C: + # TODO: Use __bool__(), not __index__(). + with self.assertRaises(ZeroDivisionError): + txt.reconfigure(line_buffering=BadIndex()) + with self.assertRaises(OverflowError): + txt.reconfigure(line_buffering=2**1000) + with self.assertRaises(ZeroDivisionError): + txt.reconfigure(write_through=BadIndex()) + with self.assertRaises(OverflowError): + txt.reconfigure(write_through=2**1000) + with self.assertRaises(ZeroDivisionError): # there was a crash + txt.reconfigure(line_buffering=BadIndex(), + write_through=BadIndex()) + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'replace') + self.assertIs(txt.line_buffering, False) + self.assertIs(txt.write_through, False) + + txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n', + line_buffering=True, write_through=True) + self.assertEqual(txt.encoding, 'latin1') + self.assertEqual(txt.errors, 'ignore') + self.assertIs(txt.line_buffering, True) + self.assertIs(txt.write_through, True) + def test_reconfigure_newline(self): raw = self.BytesIO(b'CR\rEOF') txt = self.TextIOWrapper(raw, 'ascii', newline='\n') @@ -4781,9 +4861,11 @@ def load_tests(loader, tests, pattern): if test.__name__.startswith("C"): for name, obj in c_io_ns.items(): setattr(test, name, obj) + test.is_C = True elif test.__name__.startswith("Py"): for name, obj in py_io_ns.items(): setattr(test, name, obj) + test.is_C = False suite = loader.suiteClass() for test in tests: diff --git a/Misc/NEWS.d/next/Library/2023-11-10-22-08-28.gh-issue-111942.MDFm6v.rst b/Misc/NEWS.d/next/Library/2023-11-10-22-08-28.gh-issue-111942.MDFm6v.rst new file mode 100644 index 00000000000..4fc505c8f25 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-11-10-22-08-28.gh-issue-111942.MDFm6v.rst @@ -0,0 +1,2 @@ +Fix crashes in :meth:`io.TextIOWrapper.reconfigure` when pass invalid +arguments, e.g. non-string encoding. diff --git a/Modules/_io/textio.c b/Modules/_io/textio.c index e6a971e2250..5c37e99067f 100644 --- a/Modules/_io/textio.c +++ b/Modules/_io/textio.c @@ -1284,30 +1284,40 @@ textiowrapper_change_encoding(textio *self, PyObject *encoding, errors = &_Py_ID(strict); } } + Py_INCREF(errors); + const char *c_encoding = PyUnicode_AsUTF8(encoding); + if (c_encoding == NULL) { + Py_DECREF(encoding); + Py_DECREF(errors); + return -1; + } const char *c_errors = PyUnicode_AsUTF8(errors); if (c_errors == NULL) { Py_DECREF(encoding); + Py_DECREF(errors); return -1; } // Create new encoder & decoder PyObject *codec_info = _PyCodec_LookupTextEncoding( - PyUnicode_AsUTF8(encoding), "codecs.open()"); + c_encoding, "codecs.open()"); if (codec_info == NULL) { Py_DECREF(encoding); + Py_DECREF(errors); return -1; } if (_textiowrapper_set_decoder(self, codec_info, c_errors) != 0 || _textiowrapper_set_encoder(self, codec_info, c_errors) != 0) { Py_DECREF(codec_info); Py_DECREF(encoding); + Py_DECREF(errors); return -1; } Py_DECREF(codec_info); Py_SETREF(self->encoding, encoding); - Py_SETREF(self->errors, Py_NewRef(errors)); + Py_SETREF(self->errors, errors); return _textiowrapper_fix_encoder_state(self); } @@ -1338,6 +1348,26 @@ _io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding, int write_through; const char *newline = NULL; + if (encoding != Py_None && !PyUnicode_Check(encoding)) { + PyErr_Format(PyExc_TypeError, + "reconfigure() argument 'encoding' must be str or None, not %s", + Py_TYPE(encoding)->tp_name); + return NULL; + } + if (errors != Py_None && !PyUnicode_Check(errors)) { + PyErr_Format(PyExc_TypeError, + "reconfigure() argument 'errors' must be str or None, not %s", + Py_TYPE(errors)->tp_name); + return NULL; + } + if (newline_obj != NULL && newline_obj != Py_None && + !PyUnicode_Check(newline_obj)) + { + PyErr_Format(PyExc_TypeError, + "reconfigure() argument 'newline' must be str or None, not %s", + Py_TYPE(newline_obj)->tp_name); + return NULL; + } /* Check if something is in the read buffer */ if (self->decoded_chars != NULL) { if (encoding != Py_None || errors != Py_None || newline_obj != NULL) { @@ -1357,9 +1387,12 @@ _io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding, line_buffering = convert_optional_bool(line_buffering_obj, self->line_buffering); + if (line_buffering < 0) { + return NULL; + } write_through = convert_optional_bool(write_through_obj, self->write_through); - if (line_buffering < 0 || write_through < 0) { + if (write_through < 0) { return NULL; }