diff --git a/Lib/codecs.py b/Lib/codecs.py index ab12237f888..f6d480cc982 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -230,6 +230,7 @@ class StreamReader(Codec): self.errors = errors self.bytebuffer = "" self.charbuffer = u"" + self.atcr = False def decode(self, input, errors='strict'): raise NotImplementedError @@ -256,41 +257,39 @@ class StreamReader(Codec): definition of the encoding and the given size, e.g. if optional encoding endings or state markers are available on the stream, these should be read too. - """ # read until we get the required number of characters (if available) - done = False while True: # can the request can be satisfied from the character buffer? if chars < 0: if self.charbuffer: - done = True + break else: if len(self.charbuffer) >= chars: - done = True - if done: - if chars < 0: - result = self.charbuffer - self.charbuffer = u"" - break - else: - result = self.charbuffer[:chars] - self.charbuffer = self.charbuffer[chars:] break # we need more data if size < 0: newdata = self.stream.read() else: newdata = self.stream.read(size) + # decode bytes (those remaining from the last call included) data = self.bytebuffer + newdata - object, decodedbytes = self.decode(data, self.errors) + newchars, decodedbytes = self.decode(data, self.errors) # keep undecoded bytes until the next call self.bytebuffer = data[decodedbytes:] # put new characters in the character buffer - self.charbuffer += object + self.charbuffer += newchars # there was no data available if not newdata: - done = True + break + if chars < 0: + # Return everything we've got + result = self.charbuffer + self.charbuffer = u"" + else: + # Return the first chars characters + result = self.charbuffer[:chars] + self.charbuffer = self.charbuffer[chars:] return result def readline(self, size=None, keepends=True): @@ -302,24 +301,36 @@ class StreamReader(Codec): read() method. """ - if size is None: - size = 10 + readsize = size or 72 line = u"" + # If size is given, we call read() only once while True: - data = self.read(size) + data = self.read(readsize) + if self.atcr and data.startswith(u"\n"): + data = data[1:] + if data: + self.atcr = data.endswith(u"\r") line += data - pos = line.find("\n") - if pos>=0: - self.charbuffer = line[pos+1:] + self.charbuffer - if keepends: - line = line[:pos+1] - else: - line = line[:pos] - return line - elif not data: - return line - if size<8000: - size *= 2 + lines = line.splitlines(True) + if lines: + line0withend = lines[0] + line0withoutend = lines[0].splitlines(False)[0] + if line0withend != line0withoutend: # We really have a line end + # Put the rest back together and keep it until the next call + self.charbuffer = u"".join(lines[1:]) + self.charbuffer + if keepends: + line = line0withend + else: + line = line0withoutend + break + # we didn't get anything or this was our only try + elif not data or size is not None: + if line and not keepends: + line = line.splitlines(False)[0] + break + if readsize<8000: + readsize *= 2 + return line def readlines(self, sizehint=None, keepends=True): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 21ae46798cb..36c40401770 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -23,16 +23,16 @@ class Queue(object): self._buffer = self._buffer[size:] return s -class PartialReadTest(unittest.TestCase): - def check_partial(self, encoding, input, partialresults): +class ReadTest(unittest.TestCase): + def check_partial(self, input, partialresults): # get a StreamReader for the encoding and feed the bytestring version # of input to the reader byte by byte. Read every available from # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue() - r = codecs.getreader(encoding)(q) + r = codecs.getreader(self.encoding)(q) result = u"" - for (c, partialresult) in zip(input.encode(encoding), partialresults): + for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(c) result += r.read() self.assertEqual(result, partialresult) @@ -41,13 +41,81 @@ class PartialReadTest(unittest.TestCase): self.assertEqual(r.bytebuffer, "") self.assertEqual(r.charbuffer, u"") -class UTF16Test(PartialReadTest): + def test_readline(self): + def getreader(input): + stream = StringIO.StringIO(input.encode(self.encoding)) + return codecs.getreader(self.encoding)(stream) + + def readalllines(input, keepends=True): + reader = getreader(input) + lines = [] + while True: + line = reader.readline(keepends=keepends) + if not line: + break + lines.append(line) + return "".join(lines) + + s = u"foo\nbar\r\nbaz\rspam\u2028eggs" + self.assertEqual(readalllines(s, True), s) + self.assertEqual(readalllines(s, False), u"foobarbazspameggs") + + # Test long lines (multiple calls to read() in readline()) + vw = [] + vwo = [] + for (i, lineend) in enumerate(u"\n \r\n \r \u2028".split()): + vw.append((i*200)*u"\3042" + lineend) + vwo.append((i*200)*u"\3042") + self.assertEqual(readalllines("".join(vw), True), "".join(vw)) + self.assertEqual(readalllines("".join(vw), False),"".join(vwo)) + + # Test lines where the first read might end with \r, so the + # reader has to look ahead whether this is a lone \r or a \r\n + for size in xrange(80): + for lineend in u"\n \r\n \r \u2028".split(): + s = size*u"a" + lineend + u"xxx\n" + self.assertEqual( + getreader(s).readline(keepends=True), + size*u"a" + lineend, + ) + self.assertEqual( + getreader(s).readline(keepends=False), + size*u"a", + ) + + def test_readlinequeue(self): + q = Queue() + writer = codecs.getwriter(self.encoding)(q) + reader = codecs.getreader(self.encoding)(q) + + # No lineends + writer.write(u"foo\r") + self.assertEqual(reader.readline(keepends=False), u"foo") + writer.write(u"\nbar\r") + self.assertEqual(reader.readline(keepends=False), u"bar") + writer.write(u"baz") + self.assertEqual(reader.readline(keepends=False), u"baz") + self.assertEqual(reader.readline(keepends=False), u"") + + # Lineends + writer.write(u"foo\r") + self.assertEqual(reader.readline(keepends=True), u"foo\r") + writer.write(u"\nbar\r") + self.assertEqual(reader.readline(keepends=True), u"bar\r") + writer.write(u"baz") + self.assertEqual(reader.readline(keepends=True), u"baz") + self.assertEqual(reader.readline(keepends=True), u"") + writer.write(u"foo\r\n") + self.assertEqual(reader.readline(keepends=True), u"foo\r\n") + +class UTF16Test(ReadTest): + encoding = "utf-16" spamle = '\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00' spambe = '\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m' def test_only_one_bom(self): - _,_,reader,writer = codecs.lookup("utf-16") + _,_,reader,writer = codecs.lookup(self.encoding) # encode some stream s = StringIO.StringIO() f = writer(s) @@ -63,7 +131,6 @@ class UTF16Test(PartialReadTest): def test_partial(self): self.check_partial( - "utf-16", u"\x00\xff\u0100\uffff", [ u"", # first byte of BOM read @@ -79,11 +146,11 @@ class UTF16Test(PartialReadTest): ] ) -class UTF16LETest(PartialReadTest): +class UTF16LETest(ReadTest): + encoding = "utf-16-le" def test_partial(self): self.check_partial( - "utf-16-le", u"\x00\xff\u0100\uffff", [ u"", @@ -97,11 +164,11 @@ class UTF16LETest(PartialReadTest): ] ) -class UTF16BETest(PartialReadTest): +class UTF16BETest(ReadTest): + encoding = "utf-16-be" def test_partial(self): self.check_partial( - "utf-16-be", u"\x00\xff\u0100\uffff", [ u"", @@ -115,11 +182,11 @@ class UTF16BETest(PartialReadTest): ] ) -class UTF8Test(PartialReadTest): +class UTF8Test(ReadTest): + encoding = "utf-8" def test_partial(self): self.check_partial( - "utf-8", u"\x00\xff\u07ff\u0800\uffff", [ u"\x00",