Fix for #1444: utf_8_sig.StreamReader was (indirectly through decode())
calling codecs.utf_8_decode() with final==True, which falled with incomplete byte sequences. Fix and test by James G. Sack.
This commit is contained in:
parent
fc7e72d1c6
commit
183744d6b9
|
@ -84,12 +84,18 @@ class StreamReader(codecs.StreamReader):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def decode(self, input, errors='strict'):
|
def decode(self, input, errors='strict'):
|
||||||
if len(input) < 3 and codecs.BOM_UTF8.startswith(input):
|
if len(input) < 3:
|
||||||
|
if codecs.BOM_UTF8.startswith(input):
|
||||||
# not enough data to decide if this is a BOM
|
# not enough data to decide if this is a BOM
|
||||||
# => try again on the next call
|
# => try again on the next call
|
||||||
return (u"", 0)
|
return (u"", 0)
|
||||||
|
elif input[:3] == codecs.BOM_UTF8:
|
||||||
self.decode = codecs.utf_8_decode
|
self.decode = codecs.utf_8_decode
|
||||||
return decode(input, errors)
|
(output, consumed) = codecs.utf_8_decode(input[3:],errors)
|
||||||
|
return (output, consumed+3)
|
||||||
|
# (else) no BOM present
|
||||||
|
self.decode = codecs.utf_8_decode
|
||||||
|
return codecs.utf_8_decode(input, errors)
|
||||||
|
|
||||||
### encodings module API
|
### encodings module API
|
||||||
|
|
||||||
|
|
|
@ -565,6 +565,50 @@ class UTF8SigTest(ReadTest):
|
||||||
s = u"spam"
|
s = u"spam"
|
||||||
self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
|
self.assertEqual(d.decode(s.encode("utf-8-sig")), s)
|
||||||
|
|
||||||
|
def test_stream_bom(self):
|
||||||
|
unistring = u"ABC\u00A1\u2200XYZ"
|
||||||
|
bytestring = codecs.BOM_UTF8 + "ABC\xC2\xA1\xE2\x88\x80XYZ"
|
||||||
|
|
||||||
|
reader = codecs.getreader("utf-8-sig")
|
||||||
|
for sizehint in [None] + range(1, 11) + \
|
||||||
|
[64, 128, 256, 512, 1024]:
|
||||||
|
istream = reader(StringIO.StringIO(bytestring))
|
||||||
|
ostream = StringIO.StringIO()
|
||||||
|
while 1:
|
||||||
|
if sizehint is not None:
|
||||||
|
data = istream.read(sizehint)
|
||||||
|
else:
|
||||||
|
data = istream.read()
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
ostream.write(data)
|
||||||
|
|
||||||
|
got = ostream.getvalue()
|
||||||
|
self.assertEqual(got, unistring)
|
||||||
|
|
||||||
|
def test_stream_bare(self):
|
||||||
|
unistring = u"ABC\u00A1\u2200XYZ"
|
||||||
|
bytestring = "ABC\xC2\xA1\xE2\x88\x80XYZ"
|
||||||
|
|
||||||
|
reader = codecs.getreader("utf-8-sig")
|
||||||
|
for sizehint in [None] + range(1, 11) + \
|
||||||
|
[64, 128, 256, 512, 1024]:
|
||||||
|
istream = reader(StringIO.StringIO(bytestring))
|
||||||
|
ostream = StringIO.StringIO()
|
||||||
|
while 1:
|
||||||
|
if sizehint is not None:
|
||||||
|
data = istream.read(sizehint)
|
||||||
|
else:
|
||||||
|
data = istream.read()
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
ostream.write(data)
|
||||||
|
|
||||||
|
got = ostream.getvalue()
|
||||||
|
self.assertEqual(got, unistring)
|
||||||
|
|
||||||
class EscapeDecodeTest(unittest.TestCase):
|
class EscapeDecodeTest(unittest.TestCase):
|
||||||
def test_empty(self):
|
def test_empty(self):
|
||||||
self.assertEquals(codecs.escape_decode(""), ("", 0))
|
self.assertEquals(codecs.escape_decode(""), ("", 0))
|
||||||
|
|
Loading…
Reference in New Issue