Reset internal buffers when seek() is called. This fixes SF bug #1156259.

This commit is contained in:
Walter Dörwald 2005-03-14 19:06:30 +00:00
parent 3390d33dd7
commit 729c31f5c3
3 changed files with 33 additions and 1 deletions

View File

@ -356,7 +356,17 @@ class StreamReader(Codec):
from decoding errors. from decoding errors.
""" """
pass self.bytebuffer = ""
self.charbuffer = u""
self.atcr = False
def seek(self, offset, whence):
""" Set the input stream's current position.
Resets the codec buffers used for keeping state.
"""
self.reset()
self.stream.seek(offset, whence)
def next(self): def next(self):

View File

@ -31,6 +31,13 @@ class StreamWriter(codecs.StreamWriter):
class StreamReader(codecs.StreamReader): class StreamReader(codecs.StreamReader):
def reset(self):
codecs.StreamReader.reset(self)
try:
del self.decode
except AttributeError:
pass
def decode(self, input, errors='strict'): def decode(self, input, errors='strict'):
(object, consumed, byteorder) = \ (object, consumed, byteorder) = \
codecs.utf_16_ex_decode(input, errors, 0, False) codecs.utf_16_ex_decode(input, errors, 0, False)

View File

@ -755,6 +755,21 @@ class BasicUnicodeTest(unittest.TestCase):
decodedresult += reader.read() decodedresult += reader.read()
self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding)) self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding))
def test_seek(self):
# all codecs should be able to encode these
s = u"%s\n%s\n" % (100*u"abc123", 100*u"def456")
for encoding in all_unicode_encodings:
if encoding == "idna": # FIXME: See SF bug #1163178
continue
if encoding in broken_unicode_with_streams:
continue
reader = codecs.getreader(encoding)(StringIO.StringIO(s.encode(encoding)))
for t in xrange(5):
# Test that calling seek resets the internal codec state and buffers
reader.seek(0, 0)
line = reader.readline()
self.assertEqual(s[:len(line)], line)
class BasicStrTest(unittest.TestCase): class BasicStrTest(unittest.TestCase):
def test_basics(self): def test_basics(self):
s = "abc123" s = "abc123"