From 655fbf18068626c86f487ce94771fdd22b784ae8 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sun, 14 Dec 2008 17:40:51 +0000 Subject: [PATCH] Backport r67759 (fix io.IncrementalNewlineDecoder for UTF-16 et al.). --- Lib/io.py | 27 +++++------ Lib/test/test_io.py | 114 ++++++++++++++++++++++++++++---------------- Misc/NEWS | 4 ++ 3 files changed, 89 insertions(+), 56 deletions(-) diff --git a/Lib/io.py b/Lib/io.py index 110804e607f..7f938987b71 100644 --- a/Lib/io.py +++ b/Lib/io.py @@ -1292,25 +1292,23 @@ class IncrementalNewlineDecoder(codecs.IncrementalDecoder): """ def __init__(self, decoder, translate, errors='strict'): codecs.IncrementalDecoder.__init__(self, errors=errors) - self.buffer = b'' self.translate = translate self.decoder = decoder self.seennl = 0 + self.pendingcr = False def decode(self, input, final=False): # decode input (with the eventual \r from a previous pass) - if self.buffer: - input = self.buffer + input - output = self.decoder.decode(input, final=final) + if self.pendingcr and (output or final): + output = "\r" + output + self.pendingcr = False # retain last \r even when not translating data: # then readline() is sure to get \r\n in one pass if output.endswith("\r") and not final: output = output[:-1] - self.buffer = b'\r' - else: - self.buffer = b'' + self.pendingcr = True # Record which newlines are read crlf = output.count('\r\n') @@ -1329,20 +1327,19 @@ class IncrementalNewlineDecoder(codecs.IncrementalDecoder): def getstate(self): buf, flag = self.decoder.getstate() - return buf + self.buffer, flag + flag <<= 1 + if self.pendingcr: + flag |= 1 + return buf, flag def setstate(self, state): buf, flag = state - if buf.endswith(b'\r'): - self.buffer = b'\r' - buf = buf[:-1] - else: - self.buffer = b'' - self.decoder.setstate((buf, flag)) + self.pendingcr = bool(flag & 1) + self.decoder.setstate((buf, flag >> 1)) def reset(self): self.seennl = 0 - self.buffer = b'' + self.pendingcr = False self.decoder.reset() _LF = 1 diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index eb41d1ffa7b..8a7da60947b 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -680,8 +680,9 @@ class StatefulIncrementalDecoder(codecs.IncrementalDecoder): @classmethod def lookupTestDecoder(cls, name): if cls.codecEnabled and name == 'test_decoder': + latin1 = codecs.lookup('latin-1') return codecs.CodecInfo( - name='test_decoder', encode=None, decode=None, + name='test_decoder', encode=latin1.encode, decode=None, incrementalencoder=None, streamreader=None, streamwriter=None, incrementaldecoder=cls) @@ -840,8 +841,11 @@ class TextIOWrapperTest(unittest.TestCase): [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], ] - - encodings = ('utf-8', 'latin-1') + encodings = ( + 'utf-8', 'latin-1', + 'utf-16', 'utf-16-le', 'utf-16-be', + 'utf-32', 'utf-32-le', 'utf-32-be', + ) # Try a range of buffer sizes to test the case where \r is the last # character in TextIOWrapper._pending_line. @@ -1195,55 +1199,83 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(buffer.seekable(), txt.seekable()) - def test_newline_decoder(self): - import codecs - decoder = codecs.getincrementaldecoder("utf-8")() - decoder = io.IncrementalNewlineDecoder(decoder, translate=True) + def check_newline_decoder_utf8(self, decoder): + # UTF-8 specific tests for a newline decoder + def _check_decode(b, s, **kwargs): + # We exercise getstate() / setstate() as well as decode() + state = decoder.getstate() + self.assertEquals(decoder.decode(b, **kwargs), s) + decoder.setstate(state) + self.assertEquals(decoder.decode(b, **kwargs), s) - self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888") + _check_decode(b'\xe8\xa2\x88', "\u8888") - self.assertEquals(decoder.decode(b'\xe8'), u"") - self.assertEquals(decoder.decode(b'\xa2'), u"") - self.assertEquals(decoder.decode(b'\x88'), u"\u8888") + _check_decode(b'\xe8', "") + _check_decode(b'\xa2', "") + _check_decode(b'\x88', "\u8888") - self.assertEquals(decoder.decode(b'\xe8'), u"") + _check_decode(b'\xe8', "") + _check_decode(b'\xa2', "") + _check_decode(b'\x88', "\u8888") + + _check_decode(b'\xe8', "") self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) - decoder.setstate((b'', 0)) - self.assertEquals(decoder.decode(b'\n'), u"\n") - self.assertEquals(decoder.decode(b'\r'), u"") - self.assertEquals(decoder.decode(b'', final=True), u"\n") - self.assertEquals(decoder.decode(b'\r', final=True), u"\n") + decoder.reset() + _check_decode(b'\n', "\n") + _check_decode(b'\r', "") + _check_decode(b'', "\n", final=True) + _check_decode(b'\r', "\n", final=True) - self.assertEquals(decoder.decode(b'\r'), u"") - self.assertEquals(decoder.decode(b'a'), u"\na") + _check_decode(b'\r', "") + _check_decode(b'a', "\na") - self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n") - self.assertEquals(decoder.decode(b'\r'), u"") - self.assertEquals(decoder.decode(b'\r'), u"\n") - self.assertEquals(decoder.decode(b'\na'), u"\na") + _check_decode(b'\r\r\n', "\n\n") + _check_decode(b'\r', "") + _check_decode(b'\r', "\n") + _check_decode(b'\na', "\na") - self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n") - self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888") - self.assertEquals(decoder.decode(b'\n'), u"\n") - self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888") - self.assertEquals(decoder.decode(b'\n'), u"\n") + _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") + _check_decode(b'\xe8\xa2\x88', "\u8888") + _check_decode(b'\n', "\n") + _check_decode(b'\xe8\xa2\x88\r', "\u8888") + _check_decode(b'\n', "\n") + def check_newline_decoder(self, decoder, encoding): + result = [] + encoder = codecs.getincrementalencoder(encoding)() + def _decode_bytewise(s): + for b in encoder.encode(s): + result.append(decoder.decode(b)) + self.assertEquals(decoder.newlines, None) + _decode_bytewise("abc\n\r") + self.assertEquals(decoder.newlines, '\n') + _decode_bytewise("\nabc") + self.assertEquals(decoder.newlines, ('\n', '\r\n')) + _decode_bytewise("abc\r") + self.assertEquals(decoder.newlines, ('\n', '\r\n')) + _decode_bytewise("abc") + self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n')) + _decode_bytewise("abc\r") + self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc") + decoder.reset() + self.assertEquals(decoder.decode("abc".encode(encoding)), "abc") + self.assertEquals(decoder.newlines, None) + + def test_newline_decoder(self): + encodings = ( + 'utf-8', 'latin-1', + 'utf-16', 'utf-16-le', 'utf-16-be', + 'utf-32', 'utf-32-le', 'utf-32-be', + ) + for enc in encodings: + decoder = codecs.getincrementaldecoder(enc)() + decoder = io.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoder(decoder, enc) decoder = codecs.getincrementaldecoder("utf-8")() decoder = io.IncrementalNewlineDecoder(decoder, translate=True) - self.assertEquals(decoder.newlines, None) - decoder.decode(b"abc\n\r") - self.assertEquals(decoder.newlines, u'\n') - decoder.decode(b"\nabc") - self.assertEquals(decoder.newlines, ('\n', '\r\n')) - decoder.decode(b"abc\r") - self.assertEquals(decoder.newlines, ('\n', '\r\n')) - decoder.decode(b"abc") - self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n')) - decoder.decode(b"abc\r") - decoder.reset() - self.assertEquals(decoder.decode(b"abc"), "abc") - self.assertEquals(decoder.newlines, None) + self.check_newline_decoder_utf8(decoder) + # XXX Tests for open() diff --git a/Misc/NEWS b/Misc/NEWS index ea5ee13e31a..79d4493cbaf 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -74,6 +74,10 @@ Core and Builtins Library ------- +- Issue #4574: fix a crash in io.IncrementalNewlineDecoder when a carriage + return encodes to more than one byte in the source encoding (e.g. UTF-16) + and gets split on a chunk boundary. + - Issue #4223: inspect.getsource() will now correctly display source code for packages loaded via zipimport (or any other conformant PEP 302 loader). Original patch by Alexander Belopolsky.