Issue #10791: Implement missing method GzipFile.read1(), allowing GzipFile
to be wrapped in a TextIOWrapper. Patch by Nadeem Vawda.
This commit is contained in:
parent
457cdf5e96
commit
4ec4b0c041
22
Lib/gzip.py
22
Lib/gzip.py
|
@ -348,6 +348,28 @@ class GzipFile(io.BufferedIOBase):
|
|||
self.offset += size
|
||||
return chunk
|
||||
|
||||
def read1(self, size=-1):
|
||||
self._check_closed()
|
||||
if self.mode != READ:
|
||||
import errno
|
||||
raise IOError(errno.EBADF, "read1() on write-only GzipFile object")
|
||||
|
||||
if self.extrasize <= 0 and self.fileobj is None:
|
||||
return b''
|
||||
|
||||
try:
|
||||
self._read()
|
||||
except EOFError:
|
||||
pass
|
||||
if size < 0 or size > self.extrasize:
|
||||
size = self.extrasize
|
||||
|
||||
offset = self.offset - self.extrastart
|
||||
chunk = self.extrabuf[offset: offset + size]
|
||||
self.extrasize -= size
|
||||
self.offset += size
|
||||
return chunk
|
||||
|
||||
def peek(self, n):
|
||||
if self.mode != READ:
|
||||
import errno
|
||||
|
|
|
@ -64,6 +64,21 @@ class TestGzip(unittest.TestCase):
|
|||
d = f.read()
|
||||
self.assertEqual(d, data1*50)
|
||||
|
||||
def test_read1(self):
|
||||
self.test_write()
|
||||
blocks = []
|
||||
nread = 0
|
||||
with gzip.GzipFile(self.filename, 'r') as f:
|
||||
while True:
|
||||
d = f.read1()
|
||||
if not d:
|
||||
break
|
||||
blocks.append(d)
|
||||
nread += len(d)
|
||||
# Check that position was updated correctly (see issue10791).
|
||||
self.assertEqual(f.tell(), nread)
|
||||
self.assertEqual(b''.join(blocks), data1 * 50)
|
||||
|
||||
def test_io_on_closed_object(self):
|
||||
# Test that I/O operations on closed GzipFile objects raise a
|
||||
# ValueError, just like the corresponding functions on file objects.
|
||||
|
@ -323,6 +338,14 @@ class TestGzip(unittest.TestCase):
|
|||
self.assertEqual(f.read(100), b'')
|
||||
self.assertEqual(nread, len(uncompressed))
|
||||
|
||||
def test_textio_readlines(self):
|
||||
# Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
|
||||
lines = (data1 * 50).decode("ascii").splitlines(True)
|
||||
self.test_write()
|
||||
with gzip.GzipFile(self.filename, 'r') as f:
|
||||
with io.TextIOWrapper(f, encoding="ascii") as t:
|
||||
self.assertEqual(t.readlines(), lines)
|
||||
|
||||
# Testing compress/decompress shortcut functions
|
||||
|
||||
def test_compress(self):
|
||||
|
|
Loading…
Reference in New Issue