bpo-15216: io: TextIOWrapper.reconfigure() accepts encoding, errors and newline (GH-2343)

This commit is contained in:
INADA Naoki 2017-12-21 09:59:53 +09:00 committed by GitHub
parent 31e99080f6
commit 507434fd50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 523 additions and 181 deletions

View File

@ -904,7 +904,7 @@ Text I/O
locale encoding using :func:`locale.setlocale`, use the current locale locale encoding using :func:`locale.setlocale`, use the current locale
encoding instead of the user preferred encoding. encoding instead of the user preferred encoding.
:class:`TextIOWrapper` provides one attribute in addition to those of :class:`TextIOWrapper` provides these members in addition to those of
:class:`TextIOBase` and its parents: :class:`TextIOBase` and its parents:
.. attribute:: line_buffering .. attribute:: line_buffering
@ -918,11 +918,19 @@ Text I/O
.. versionadded:: 3.7 .. versionadded:: 3.7
.. method:: reconfigure(*, line_buffering=None, write_through=None) .. method:: reconfigure(*[, encoding][, errors][, newline][, \
line_buffering][, write_through])
Reconfigure this text stream using new settings for *line_buffering* Reconfigure this text stream using new settings for *encoding*,
and *write_through*. Passing ``None`` as an argument will retain *errors*, *newline*, *line_buffering* and *write_through*.
the current setting for that parameter.
Parameters not specified keep current settings, except
``errors='strict`` is used when *encoding* is specified but
*errors* is not specified.
It is not possible to change the encoding or newline if some data
has already been read from the stream. On the other hand, changing
encoding after write is possible.
This method does an implicit stream flush before setting the This method does an implicit stream flush before setting the
new parameters. new parameters.

View File

@ -1938,10 +1938,7 @@ class TextIOWrapper(TextIOBase):
# so that the signature can match the signature of the C version. # so that the signature can match the signature of the C version.
def __init__(self, buffer, encoding=None, errors=None, newline=None, def __init__(self, buffer, encoding=None, errors=None, newline=None,
line_buffering=False, write_through=False): line_buffering=False, write_through=False):
if newline is not None and not isinstance(newline, str): self._check_newline(newline)
raise TypeError("illegal newline type: %r" % (type(newline),))
if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,))
if encoding is None: if encoding is None:
try: try:
encoding = os.device_encoding(buffer.fileno()) encoding = os.device_encoding(buffer.fileno())
@ -1971,22 +1968,38 @@ class TextIOWrapper(TextIOBase):
raise ValueError("invalid errors: %r" % errors) raise ValueError("invalid errors: %r" % errors)
self._buffer = buffer self._buffer = buffer
self._encoding = encoding
self._errors = errors
self._readuniversal = not newline
self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
self._encoder = None
self._decoder = None
self._decoded_chars = '' # buffer for text returned from decoder self._decoded_chars = '' # buffer for text returned from decoder
self._decoded_chars_used = 0 # offset into _decoded_chars for read() self._decoded_chars_used = 0 # offset into _decoded_chars for read()
self._snapshot = None # info for reconstructing decoder state self._snapshot = None # info for reconstructing decoder state
self._seekable = self._telling = self.buffer.seekable() self._seekable = self._telling = self.buffer.seekable()
self._has_read1 = hasattr(self.buffer, 'read1') self._has_read1 = hasattr(self.buffer, 'read1')
self._configure(encoding, errors, newline,
line_buffering, write_through)
def _check_newline(self, newline):
if newline is not None and not isinstance(newline, str):
raise TypeError("illegal newline type: %r" % (type(newline),))
if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,))
def _configure(self, encoding=None, errors=None, newline=None,
line_buffering=False, write_through=False):
self._encoding = encoding
self._errors = errors
self._encoder = None
self._decoder = None
self._b2cratio = 0.0 self._b2cratio = 0.0
self._readuniversal = not newline
self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
self._line_buffering = line_buffering
self._write_through = write_through
# don't write a BOM in the middle of a file
if self._seekable and self.writable(): if self._seekable and self.writable():
position = self.buffer.tell() position = self.buffer.tell()
if position != 0: if position != 0:
@ -1996,12 +2009,6 @@ class TextIOWrapper(TextIOBase):
# Sometimes the encoder doesn't exist # Sometimes the encoder doesn't exist
pass pass
self._configure(line_buffering, write_through)
def _configure(self, line_buffering=False, write_through=False):
self._line_buffering = line_buffering
self._write_through = write_through
# self._snapshot is either None, or a tuple (dec_flags, next_input) # self._snapshot is either None, or a tuple (dec_flags, next_input)
# where dec_flags is the second (integer) item of the decoder state # where dec_flags is the second (integer) item of the decoder state
# and next_input is the chunk of input bytes that comes next after the # and next_input is the chunk of input bytes that comes next after the
@ -2048,17 +2055,46 @@ class TextIOWrapper(TextIOBase):
def buffer(self): def buffer(self):
return self._buffer return self._buffer
def reconfigure(self, *, line_buffering=None, write_through=None): def reconfigure(self, *,
encoding=None, errors=None, newline=Ellipsis,
line_buffering=None, write_through=None):
"""Reconfigure the text stream with new parameters. """Reconfigure the text stream with new parameters.
This also flushes the stream. This also flushes the stream.
""" """
if (self._decoder is not None
and (encoding is not None or errors is not None
or newline is not Ellipsis)):
raise UnsupportedOperation(
"It is not possible to set the encoding or newline of stream "
"after the first read")
if errors is None:
if encoding is None:
errors = self._errors
else:
errors = 'strict'
elif not isinstance(errors, str):
raise TypeError("invalid errors: %r" % errors)
if encoding is None:
encoding = self._encoding
else:
if not isinstance(encoding, str):
raise TypeError("invalid encoding: %r" % encoding)
if newline is Ellipsis:
newline = self._readnl
self._check_newline(newline)
if line_buffering is None: if line_buffering is None:
line_buffering = self.line_buffering line_buffering = self.line_buffering
if write_through is None: if write_through is None:
write_through = self.write_through write_through = self.write_through
self.flush() self.flush()
self._configure(line_buffering, write_through) self._configure(encoding, errors, newline,
line_buffering, write_through)
def seekable(self): def seekable(self):
if self.closed: if self.closed:

View File

@ -3408,6 +3408,123 @@ class TextIOWrapperTest(unittest.TestCase):
F.tell = lambda x: 0 F.tell = lambda x: 0
t = self.TextIOWrapper(F(), encoding='utf-8') t = self.TextIOWrapper(F(), encoding='utf-8')
def test_reconfigure_encoding_read(self):
# latin1 -> utf8
# (latin1 can decode utf-8 encoded string)
data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
raw = self.BytesIO(data)
txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
self.assertEqual(txt.readline(), 'abc\xe9\n')
with self.assertRaises(self.UnsupportedOperation):
txt.reconfigure(encoding='utf-8')
with self.assertRaises(self.UnsupportedOperation):
txt.reconfigure(newline=None)
def test_reconfigure_write_fromascii(self):
# ascii has a specific encodefunc in the C implementation,
# but utf-8-sig has not. Make sure that we get rid of the
# cached encodefunc when we switch encoders.
raw = self.BytesIO()
txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
txt.write('foo\n')
txt.reconfigure(encoding='utf-8-sig')
txt.write('\xe9\n')
txt.flush()
self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
def test_reconfigure_write(self):
# latin -> utf8
raw = self.BytesIO()
txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
txt.write('abc\xe9\n')
txt.reconfigure(encoding='utf-8')
self.assertEqual(raw.getvalue(), b'abc\xe9\n')
txt.write('d\xe9f\n')
txt.flush()
self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
# ascii -> utf-8-sig: ensure that no BOM is written in the middle of
# the file
raw = self.BytesIO()
txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
txt.write('abc\n')
txt.reconfigure(encoding='utf-8-sig')
txt.write('d\xe9f\n')
txt.flush()
self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
def test_reconfigure_write_non_seekable(self):
raw = self.BytesIO()
raw.seekable = lambda: False
raw.seek = None
txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
txt.write('abc\n')
txt.reconfigure(encoding='utf-8-sig')
txt.write('d\xe9f\n')
txt.flush()
# If the raw stream is not seekable, there'll be a BOM
self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
def test_reconfigure_defaults(self):
txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
txt.reconfigure(encoding=None)
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'replace')
txt.write('LF\n')
txt.reconfigure(newline='\r\n')
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'replace')
txt.reconfigure(errors='ignore')
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'ignore')
txt.write('CRLF\n')
txt.reconfigure(encoding='utf-8', newline=None)
self.assertEqual(txt.errors, 'strict')
txt.seek(0)
self.assertEqual(txt.read(), 'LF\nCRLF\n')
self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
def test_reconfigure_newline(self):
raw = self.BytesIO(b'CR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
txt.reconfigure(newline=None)
self.assertEqual(txt.readline(), 'CR\n')
raw = self.BytesIO(b'CR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
txt.reconfigure(newline='')
self.assertEqual(txt.readline(), 'CR\r')
raw = self.BytesIO(b'CR\rLF\nEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
txt.reconfigure(newline='\n')
self.assertEqual(txt.readline(), 'CR\rLF\n')
raw = self.BytesIO(b'LF\nCR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
txt.reconfigure(newline='\r')
self.assertEqual(txt.readline(), 'LF\nCR\r')
raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
txt.reconfigure(newline='\r\n')
self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
txt.reconfigure(newline=None)
txt.write('linesep\n')
txt.reconfigure(newline='')
txt.write('LF\n')
txt.reconfigure(newline='\n')
txt.write('LF\n')
txt.reconfigure(newline='\r')
txt.write('CR\n')
txt.reconfigure(newline='\r\n')
txt.write('CRLF\n')
expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
class MemviewBytesIO(io.BytesIO): class MemviewBytesIO(io.BytesIO):
'''A BytesIO object whose read method returns memoryviews '''A BytesIO object whose read method returns memoryviews

View File

@ -0,0 +1,2 @@
``TextIOWrapper.reconfigure()`` supports changing *encoding*, *errors*, and
*newline*.

View File

@ -149,7 +149,7 @@ PyDoc_STRVAR(_io_TextIOWrapper___init____doc__,
static int static int
_io_TextIOWrapper___init___impl(textio *self, PyObject *buffer, _io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
const char *encoding, const char *errors, const char *encoding, PyObject *errors,
const char *newline, int line_buffering, const char *newline, int line_buffering,
int write_through); int write_through);
@ -158,10 +158,10 @@ _io_TextIOWrapper___init__(PyObject *self, PyObject *args, PyObject *kwargs)
{ {
int return_value = -1; int return_value = -1;
static const char * const _keywords[] = {"buffer", "encoding", "errors", "newline", "line_buffering", "write_through", NULL}; static const char * const _keywords[] = {"buffer", "encoding", "errors", "newline", "line_buffering", "write_through", NULL};
static _PyArg_Parser _parser = {"O|zzzii:TextIOWrapper", _keywords, 0}; static _PyArg_Parser _parser = {"O|zOzii:TextIOWrapper", _keywords, 0};
PyObject *buffer; PyObject *buffer;
const char *encoding = NULL; const char *encoding = NULL;
const char *errors = NULL; PyObject *errors = Py_None;
const char *newline = NULL; const char *newline = NULL;
int line_buffering = 0; int line_buffering = 0;
int write_through = 0; int write_through = 0;
@ -177,7 +177,8 @@ exit:
} }
PyDoc_STRVAR(_io_TextIOWrapper_reconfigure__doc__, PyDoc_STRVAR(_io_TextIOWrapper_reconfigure__doc__,
"reconfigure($self, /, *, line_buffering=None, write_through=None)\n" "reconfigure($self, /, *, encoding=None, errors=None, newline=None,\n"
" line_buffering=None, write_through=None)\n"
"--\n" "--\n"
"\n" "\n"
"Reconfigure the text stream with new parameters.\n" "Reconfigure the text stream with new parameters.\n"
@ -188,7 +189,8 @@ PyDoc_STRVAR(_io_TextIOWrapper_reconfigure__doc__,
{"reconfigure", (PyCFunction)_io_TextIOWrapper_reconfigure, METH_FASTCALL|METH_KEYWORDS, _io_TextIOWrapper_reconfigure__doc__}, {"reconfigure", (PyCFunction)_io_TextIOWrapper_reconfigure, METH_FASTCALL|METH_KEYWORDS, _io_TextIOWrapper_reconfigure__doc__},
static PyObject * static PyObject *
_io_TextIOWrapper_reconfigure_impl(textio *self, _io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,
PyObject *errors, PyObject *newline_obj,
PyObject *line_buffering_obj, PyObject *line_buffering_obj,
PyObject *write_through_obj); PyObject *write_through_obj);
@ -196,16 +198,19 @@ static PyObject *
_io_TextIOWrapper_reconfigure(textio *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) _io_TextIOWrapper_reconfigure(textio *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{ {
PyObject *return_value = NULL; PyObject *return_value = NULL;
static const char * const _keywords[] = {"line_buffering", "write_through", NULL}; static const char * const _keywords[] = {"encoding", "errors", "newline", "line_buffering", "write_through", NULL};
static _PyArg_Parser _parser = {"|$OO:reconfigure", _keywords, 0}; static _PyArg_Parser _parser = {"|$OOOOO:reconfigure", _keywords, 0};
PyObject *encoding = Py_None;
PyObject *errors = Py_None;
PyObject *newline_obj = NULL;
PyObject *line_buffering_obj = Py_None; PyObject *line_buffering_obj = Py_None;
PyObject *write_through_obj = Py_None; PyObject *write_through_obj = Py_None;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
&line_buffering_obj, &write_through_obj)) { &encoding, &errors, &newline_obj, &line_buffering_obj, &write_through_obj)) {
goto exit; goto exit;
} }
return_value = _io_TextIOWrapper_reconfigure_impl(self, line_buffering_obj, write_through_obj); return_value = _io_TextIOWrapper_reconfigure_impl(self, encoding, errors, newline_obj, line_buffering_obj, write_through_obj);
exit: exit:
return return_value; return return_value;
@ -499,4 +504,4 @@ _io_TextIOWrapper_close(textio *self, PyObject *Py_UNUSED(ignored))
{ {
return _io_TextIOWrapper_close_impl(self); return _io_TextIOWrapper_close_impl(self);
} }
/*[clinic end generated code: output=679b3ac5284df4e0 input=a9049054013a1b77]*/ /*[clinic end generated code: output=b5be870b0039d577 input=a9049054013a1b77]*/

View File

@ -36,6 +36,7 @@ _Py_IDENTIFIER(reset);
_Py_IDENTIFIER(seek); _Py_IDENTIFIER(seek);
_Py_IDENTIFIER(seekable); _Py_IDENTIFIER(seekable);
_Py_IDENTIFIER(setstate); _Py_IDENTIFIER(setstate);
_Py_IDENTIFIER(strict);
_Py_IDENTIFIER(tell); _Py_IDENTIFIER(tell);
_Py_IDENTIFIER(writable); _Py_IDENTIFIER(writable);
@ -252,14 +253,14 @@ _io_IncrementalNewlineDecoder___init___impl(nldecoder_object *self,
Py_INCREF(decoder); Py_INCREF(decoder);
if (errors == NULL) { if (errors == NULL) {
self->errors = PyUnicode_FromString("strict"); self->errors = _PyUnicode_FromId(&PyId_strict);
if (self->errors == NULL) if (self->errors == NULL)
return -1; return -1;
} }
else { else {
Py_INCREF(errors);
self->errors = errors; self->errors = errors;
} }
Py_INCREF(self->errors);
self->translate = translate; self->translate = translate;
self->seennl = 0; self->seennl = 0;
@ -647,7 +648,7 @@ typedef struct
PyObject *decoder; PyObject *decoder;
PyObject *readnl; PyObject *readnl;
PyObject *errors; PyObject *errors;
const char *writenl; /* utf-8 encoded, NULL stands for \n */ const char *writenl; /* ASCII-encoded; NULL stands for \n */
char line_buffering; char line_buffering;
char write_through; char write_through;
char readuniversal; char readuniversal;
@ -700,21 +701,21 @@ typedef struct
static PyObject * static PyObject *
ascii_encode(textio *self, PyObject *text) ascii_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_AsASCIIString(text, PyBytes_AS_STRING(self->errors)); return _PyUnicode_AsASCIIString(text, PyUnicode_AsUTF8(self->errors));
} }
static PyObject * static PyObject *
utf16be_encode(textio *self, PyObject *text) utf16be_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_EncodeUTF16(text, return _PyUnicode_EncodeUTF16(text,
PyBytes_AS_STRING(self->errors), 1); PyUnicode_AsUTF8(self->errors), 1);
} }
static PyObject * static PyObject *
utf16le_encode(textio *self, PyObject *text) utf16le_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_EncodeUTF16(text, return _PyUnicode_EncodeUTF16(text,
PyBytes_AS_STRING(self->errors), -1); PyUnicode_AsUTF8(self->errors), -1);
} }
static PyObject * static PyObject *
@ -729,21 +730,21 @@ utf16_encode(textio *self, PyObject *text)
#endif #endif
} }
return _PyUnicode_EncodeUTF16(text, return _PyUnicode_EncodeUTF16(text,
PyBytes_AS_STRING(self->errors), 0); PyUnicode_AsUTF8(self->errors), 0);
} }
static PyObject * static PyObject *
utf32be_encode(textio *self, PyObject *text) utf32be_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_EncodeUTF32(text, return _PyUnicode_EncodeUTF32(text,
PyBytes_AS_STRING(self->errors), 1); PyUnicode_AsUTF8(self->errors), 1);
} }
static PyObject * static PyObject *
utf32le_encode(textio *self, PyObject *text) utf32le_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_EncodeUTF32(text, return _PyUnicode_EncodeUTF32(text,
PyBytes_AS_STRING(self->errors), -1); PyUnicode_AsUTF8(self->errors), -1);
} }
static PyObject * static PyObject *
@ -758,19 +759,19 @@ utf32_encode(textio *self, PyObject *text)
#endif #endif
} }
return _PyUnicode_EncodeUTF32(text, return _PyUnicode_EncodeUTF32(text,
PyBytes_AS_STRING(self->errors), 0); PyUnicode_AsUTF8(self->errors), 0);
} }
static PyObject * static PyObject *
utf8_encode(textio *self, PyObject *text) utf8_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_AsUTF8String(text, PyBytes_AS_STRING(self->errors)); return _PyUnicode_AsUTF8String(text, PyUnicode_AsUTF8(self->errors));
} }
static PyObject * static PyObject *
latin1_encode(textio *self, PyObject *text) latin1_encode(textio *self, PyObject *text)
{ {
return _PyUnicode_AsLatin1String(text, PyBytes_AS_STRING(self->errors)); return _PyUnicode_AsLatin1String(text, PyUnicode_AsUTF8(self->errors));
} }
/* Map normalized encoding names onto the specialized encoding funcs */ /* Map normalized encoding names onto the specialized encoding funcs */
@ -793,12 +794,198 @@ static const encodefuncentry encodefuncs[] = {
{NULL, NULL} {NULL, NULL}
}; };
static int
validate_newline(const char *newline)
{
if (newline && newline[0] != '\0'
&& !(newline[0] == '\n' && newline[1] == '\0')
&& !(newline[0] == '\r' && newline[1] == '\0')
&& !(newline[0] == '\r' && newline[1] == '\n' && newline[2] == '\0')) {
PyErr_Format(PyExc_ValueError,
"illegal newline value: %s", newline);
return -1;
}
return 0;
}
static int
set_newline(textio *self, const char *newline)
{
PyObject *old = self->readnl;
if (newline == NULL) {
self->readnl = NULL;
}
else {
self->readnl = PyUnicode_FromString(newline);
if (self->readnl == NULL) {
self->readnl = old;
return -1;
}
}
self->readuniversal = (newline == NULL || newline[0] == '\0');
self->readtranslate = (newline == NULL);
self->writetranslate = (newline == NULL || newline[0] != '\0');
if (!self->readuniversal && self->readnl != NULL) {
// validate_newline() accepts only ASCII newlines.
assert(PyUnicode_KIND(self->readnl) == PyUnicode_1BYTE_KIND);
self->writenl = (const char *)PyUnicode_1BYTE_DATA(self->readnl);
if (strcmp(self->writenl, "\n") == 0) {
self->writenl = NULL;
}
}
else {
#ifdef MS_WINDOWS
self->writenl = "\r\n";
#else
self->writenl = NULL;
#endif
}
Py_XDECREF(old);
return 0;
}
static int
_textiowrapper_set_decoder(textio *self, PyObject *codec_info,
const char *errors)
{
PyObject *res;
int r;
res = _PyObject_CallMethodId(self->buffer, &PyId_readable, NULL);
if (res == NULL)
return -1;
r = PyObject_IsTrue(res);
Py_DECREF(res);
if (r == -1)
return -1;
if (r != 1)
return 0;
Py_CLEAR(self->decoder);
self->decoder = _PyCodecInfo_GetIncrementalDecoder(codec_info, errors);
if (self->decoder == NULL)
return -1;
if (self->readuniversal) {
PyObject *incrementalDecoder = PyObject_CallFunction(
(PyObject *)&PyIncrementalNewlineDecoder_Type,
"Oi", self->decoder, (int)self->readtranslate);
if (incrementalDecoder == NULL)
return -1;
Py_CLEAR(self->decoder);
self->decoder = incrementalDecoder;
}
return 0;
}
static PyObject*
_textiowrapper_decode(PyObject *decoder, PyObject *bytes, int eof)
{
PyObject *chars;
if (Py_TYPE(decoder) == &PyIncrementalNewlineDecoder_Type)
chars = _PyIncrementalNewlineDecoder_decode(decoder, bytes, eof);
else
chars = PyObject_CallMethodObjArgs(decoder, _PyIO_str_decode, bytes,
eof ? Py_True : Py_False, NULL);
if (check_decoded(chars) < 0)
// check_decoded already decreases refcount
return NULL;
return chars;
}
static int
_textiowrapper_set_encoder(textio *self, PyObject *codec_info,
const char *errors)
{
PyObject *res;
int r;
res = _PyObject_CallMethodId(self->buffer, &PyId_writable, NULL);
if (res == NULL)
return -1;
r = PyObject_IsTrue(res);
Py_DECREF(res);
if (r == -1)
return -1;
if (r != 1)
return 0;
Py_CLEAR(self->encoder);
self->encodefunc = NULL;
self->encoder = _PyCodecInfo_GetIncrementalEncoder(codec_info, errors);
if (self->encoder == NULL)
return -1;
/* Get the normalized named of the codec */
res = _PyObject_GetAttrId(codec_info, &PyId_name);
if (res == NULL) {
if (PyErr_ExceptionMatches(PyExc_AttributeError))
PyErr_Clear();
else
return -1;
}
else if (PyUnicode_Check(res)) {
const encodefuncentry *e = encodefuncs;
while (e->name != NULL) {
if (_PyUnicode_EqualToASCIIString(res, e->name)) {
self->encodefunc = e->encodefunc;
break;
}
e++;
}
}
Py_XDECREF(res);
return 0;
}
static int
_textiowrapper_fix_encoder_state(textio *self)
{
if (!self->seekable || !self->encoder) {
return 0;
}
self->encoding_start_of_stream = 1;
PyObject *cookieObj = PyObject_CallMethodObjArgs(
self->buffer, _PyIO_str_tell, NULL);
if (cookieObj == NULL) {
return -1;
}
int cmp = PyObject_RichCompareBool(cookieObj, _PyLong_Zero, Py_EQ);
Py_DECREF(cookieObj);
if (cmp < 0) {
return -1;
}
if (cmp == 0) {
self->encoding_start_of_stream = 0;
PyObject *res = PyObject_CallMethodObjArgs(
self->encoder, _PyIO_str_setstate, _PyLong_Zero, NULL);
if (res == NULL) {
return -1;
}
Py_DECREF(res);
}
return 0;
}
/*[clinic input] /*[clinic input]
_io.TextIOWrapper.__init__ _io.TextIOWrapper.__init__
buffer: object buffer: object
encoding: str(accept={str, NoneType}) = NULL encoding: str(accept={str, NoneType}) = NULL
errors: str(accept={str, NoneType}) = NULL errors: object = None
newline: str(accept={str, NoneType}) = NULL newline: str(accept={str, NoneType}) = NULL
line_buffering: bool(accept={int}) = False line_buffering: bool(accept={int}) = False
write_through: bool(accept={int}) = False write_through: bool(accept={int}) = False
@ -835,10 +1022,10 @@ write contains a newline character.
static int static int
_io_TextIOWrapper___init___impl(textio *self, PyObject *buffer, _io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
const char *encoding, const char *errors, const char *encoding, PyObject *errors,
const char *newline, int line_buffering, const char *newline, int line_buffering,
int write_through) int write_through)
/*[clinic end generated code: output=56a83402ce2a8381 input=598d10cc5f2ed7dd]*/ /*[clinic end generated code: output=72267c0c01032ed2 input=1c5dd5d78bfcc675]*/
{ {
PyObject *raw, *codec_info = NULL; PyObject *raw, *codec_info = NULL;
_PyIO_State *state = NULL; _PyIO_State *state = NULL;
@ -848,12 +1035,20 @@ _io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
self->ok = 0; self->ok = 0;
self->detached = 0; self->detached = 0;
if (newline && newline[0] != '\0' if (errors == Py_None) {
&& !(newline[0] == '\n' && newline[1] == '\0') errors = _PyUnicode_FromId(&PyId_strict); /* borrowed */
&& !(newline[0] == '\r' && newline[1] == '\0') }
&& !(newline[0] == '\r' && newline[1] == '\n' && newline[2] == '\0')) { else if (!PyUnicode_Check(errors)) {
PyErr_Format(PyExc_ValueError, // Check 'errors' argument here because Argument Clinic doesn't support
"illegal newline value: %s", newline); // 'str(accept={str, NoneType})' converter.
PyErr_Format(
PyExc_TypeError,
"TextIOWrapper() argument 'errors' must be str or None, not %.50s",
errors->ob_type->tp_name);
return -1;
}
if (validate_newline(newline) < 0) {
return -1; return -1;
} }
@ -955,99 +1150,29 @@ _io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
* of the partially constructed object (like self->encoding) * of the partially constructed object (like self->encoding)
*/ */
if (errors == NULL) Py_INCREF(errors);
errors = "strict"; self->errors = errors;
self->errors = PyBytes_FromString(errors);
if (self->errors == NULL)
goto error;
self->chunk_size = 8192; self->chunk_size = 8192;
self->readuniversal = (newline == NULL || newline[0] == '\0');
self->line_buffering = line_buffering; self->line_buffering = line_buffering;
self->write_through = write_through; self->write_through = write_through;
self->readtranslate = (newline == NULL); if (set_newline(self, newline) < 0) {
if (newline) {
self->readnl = PyUnicode_FromString(newline);
if (self->readnl == NULL)
goto error; goto error;
} }
self->writetranslate = (newline == NULL || newline[0] != '\0');
if (!self->readuniversal && self->readnl) {
self->writenl = PyUnicode_AsUTF8(self->readnl);
if (self->writenl == NULL)
goto error;
if (!strcmp(self->writenl, "\n"))
self->writenl = NULL;
}
#ifdef MS_WINDOWS
else
self->writenl = "\r\n";
#endif
/* Build the decoder object */
res = _PyObject_CallMethodId(buffer, &PyId_readable, NULL);
if (res == NULL)
goto error;
r = PyObject_IsTrue(res);
Py_DECREF(res);
if (r == -1)
goto error;
if (r == 1) {
self->decoder = _PyCodecInfo_GetIncrementalDecoder(codec_info,
errors);
if (self->decoder == NULL)
goto error;
if (self->readuniversal) {
PyObject *incrementalDecoder = PyObject_CallFunction(
(PyObject *)&PyIncrementalNewlineDecoder_Type,
"Oi", self->decoder, (int)self->readtranslate);
if (incrementalDecoder == NULL)
goto error;
Py_XSETREF(self->decoder, incrementalDecoder);
}
}
/* Build the encoder object */
res = _PyObject_CallMethodId(buffer, &PyId_writable, NULL);
if (res == NULL)
goto error;
r = PyObject_IsTrue(res);
Py_DECREF(res);
if (r == -1)
goto error;
if (r == 1) {
self->encoder = _PyCodecInfo_GetIncrementalEncoder(codec_info,
errors);
if (self->encoder == NULL)
goto error;
/* Get the normalized name of the codec */
res = _PyObject_GetAttrId(codec_info, &PyId_name);
if (res == NULL) {
if (PyErr_ExceptionMatches(PyExc_AttributeError))
PyErr_Clear();
else
goto error;
}
else if (PyUnicode_Check(res)) {
const encodefuncentry *e = encodefuncs;
while (e->name != NULL) {
if (_PyUnicode_EqualToASCIIString(res, e->name)) {
self->encodefunc = e->encodefunc;
break;
}
e++;
}
}
Py_XDECREF(res);
}
/* Finished sorting out the codec details */
Py_CLEAR(codec_info);
self->buffer = buffer; self->buffer = buffer;
Py_INCREF(buffer); Py_INCREF(buffer);
/* Build the decoder object */
if (_textiowrapper_set_decoder(self, codec_info, PyUnicode_AsUTF8(errors)) != 0)
goto error;
/* Build the encoder object */
if (_textiowrapper_set_encoder(self, codec_info, PyUnicode_AsUTF8(errors)) != 0)
goto error;
/* Finished sorting out the codec details */
Py_CLEAR(codec_info);
if (Py_TYPE(buffer) == &PyBufferedReader_Type || if (Py_TYPE(buffer) == &PyBufferedReader_Type ||
Py_TYPE(buffer) == &PyBufferedWriter_Type || Py_TYPE(buffer) == &PyBufferedWriter_Type ||
Py_TYPE(buffer) == &PyBufferedRandom_Type) { Py_TYPE(buffer) == &PyBufferedRandom_Type) {
@ -1077,30 +1202,8 @@ _io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
self->has_read1 = _PyObject_HasAttrId(buffer, &PyId_read1); self->has_read1 = _PyObject_HasAttrId(buffer, &PyId_read1);
self->encoding_start_of_stream = 0; self->encoding_start_of_stream = 0;
if (self->seekable && self->encoder) { if (_textiowrapper_fix_encoder_state(self) < 0) {
PyObject *cookieObj;
int cmp;
self->encoding_start_of_stream = 1;
cookieObj = PyObject_CallMethodObjArgs(buffer, _PyIO_str_tell, NULL);
if (cookieObj == NULL)
goto error; goto error;
cmp = PyObject_RichCompareBool(cookieObj, _PyLong_Zero, Py_EQ);
Py_DECREF(cookieObj);
if (cmp < 0) {
goto error;
}
if (cmp == 0) {
self->encoding_start_of_stream = 0;
res = PyObject_CallMethodObjArgs(self->encoder, _PyIO_str_setstate,
_PyLong_Zero, NULL);
if (res == NULL)
goto error;
Py_DECREF(res);
}
} }
self->ok = 1; self->ok = 1;
@ -1129,10 +1232,57 @@ convert_optional_bool(PyObject *obj, int default_value)
return v != 0; return v != 0;
} }
static int
textiowrapper_change_encoding(textio *self, PyObject *encoding,
PyObject *errors, int newline_changed)
{
/* Use existing settings where new settings are not specified */
if (encoding == Py_None && errors == Py_None && !newline_changed) {
return 0; // no change
}
if (encoding == Py_None) {
encoding = self->encoding;
if (errors == Py_None) {
errors = self->errors;
}
}
else if (errors == Py_None) {
errors = _PyUnicode_FromId(&PyId_strict);
}
const char *c_errors = PyUnicode_AsUTF8(errors);
if (c_errors == NULL) {
return -1;
}
// Create new encoder & decoder
PyObject *codec_info = _PyCodec_LookupTextEncoding(
PyUnicode_AsUTF8(encoding), "codecs.open()");
if (codec_info == NULL) {
return -1;
}
if (_textiowrapper_set_decoder(self, codec_info, c_errors) != 0 ||
_textiowrapper_set_encoder(self, codec_info, c_errors) != 0) {
Py_DECREF(codec_info);
return -1;
}
Py_DECREF(codec_info);
Py_INCREF(encoding);
Py_INCREF(errors);
Py_SETREF(self->encoding, encoding);
Py_SETREF(self->errors, errors);
return _textiowrapper_fix_encoder_state(self);
}
/*[clinic input] /*[clinic input]
_io.TextIOWrapper.reconfigure _io.TextIOWrapper.reconfigure
* *
encoding: object = None
errors: object = None
newline as newline_obj: object(c_default="NULL") = None
line_buffering as line_buffering_obj: object = None line_buffering as line_buffering_obj: object = None
write_through as write_through_obj: object = None write_through as write_through_obj: object = None
@ -1143,14 +1293,31 @@ This also does an implicit stream flush.
[clinic start generated code]*/ [clinic start generated code]*/
static PyObject * static PyObject *
_io_TextIOWrapper_reconfigure_impl(textio *self, _io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,
PyObject *errors, PyObject *newline_obj,
PyObject *line_buffering_obj, PyObject *line_buffering_obj,
PyObject *write_through_obj) PyObject *write_through_obj)
/*[clinic end generated code: output=7cdf79e7001e2856 input=baade27ecb9db7bc]*/ /*[clinic end generated code: output=52b812ff4b3d4b0f input=671e82136e0f5822]*/
{ {
int line_buffering; int line_buffering;
int write_through; int write_through;
PyObject *res; const char *newline = NULL;
/* Check if something is in the read buffer */
if (self->decoded_chars != NULL) {
if (encoding != Py_None || errors != Py_None || newline_obj != NULL) {
_unsupported("It is not possible to set the encoding or newline"
"of stream after the first read");
return NULL;
}
}
if (newline_obj != NULL && newline_obj != Py_None) {
newline = PyUnicode_AsUTF8(newline_obj);
if (newline == NULL || validate_newline(newline) < 0) {
return NULL;
}
}
line_buffering = convert_optional_bool(line_buffering_obj, line_buffering = convert_optional_bool(line_buffering_obj,
self->line_buffering); self->line_buffering);
@ -1159,11 +1326,23 @@ _io_TextIOWrapper_reconfigure_impl(textio *self,
if (line_buffering < 0 || write_through < 0) { if (line_buffering < 0 || write_through < 0) {
return NULL; return NULL;
} }
res = PyObject_CallMethodObjArgs((PyObject *) self, _PyIO_str_flush, NULL);
Py_XDECREF(res); PyObject *res = PyObject_CallMethodObjArgs((PyObject *)self, _PyIO_str_flush, NULL);
if (res == NULL) { if (res == NULL) {
return NULL; return NULL;
} }
Py_DECREF(res);
self->b2cratio = 0;
if (newline_obj != NULL && set_newline(self, newline) < 0) {
return NULL;
}
if (textiowrapper_change_encoding(
self, encoding, errors, newline_obj != NULL) < 0) {
return NULL;
}
self->line_buffering = line_buffering; self->line_buffering = line_buffering;
self->write_through = write_through; self->write_through = write_through;
Py_RETURN_NONE; Py_RETURN_NONE;
@ -1565,18 +1744,12 @@ textiowrapper_read_chunk(textio *self, Py_ssize_t size_hint)
nbytes = input_chunk_buf.len; nbytes = input_chunk_buf.len;
eof = (nbytes == 0); eof = (nbytes == 0);
if (Py_TYPE(self->decoder) == &PyIncrementalNewlineDecoder_Type) {
decoded_chars = _PyIncrementalNewlineDecoder_decode(
self->decoder, input_chunk, eof);
}
else {
decoded_chars = PyObject_CallMethodObjArgs(self->decoder,
_PyIO_str_decode, input_chunk, eof ? Py_True : Py_False, NULL);
}
PyBuffer_Release(&input_chunk_buf);
if (check_decoded(decoded_chars) < 0) decoded_chars = _textiowrapper_decode(self->decoder, input_chunk, eof);
PyBuffer_Release(&input_chunk_buf);
if (decoded_chars == NULL)
goto fail; goto fail;
textiowrapper_set_decoded_chars(self, decoded_chars); textiowrapper_set_decoded_chars(self, decoded_chars);
nchars = PyUnicode_GET_LENGTH(decoded_chars); nchars = PyUnicode_GET_LENGTH(decoded_chars);
if (nchars > 0) if (nchars > 0)
@ -2851,7 +3024,8 @@ static PyObject *
textiowrapper_errors_get(textio *self, void *context) textiowrapper_errors_get(textio *self, void *context)
{ {
CHECK_INITIALIZED(self); CHECK_INITIALIZED(self);
return PyUnicode_FromString(PyBytes_AS_STRING(self->errors)); Py_INCREF(self->errors);
return self->errors;
} }
static PyObject * static PyObject *