[Bug #1074261, patch #1074381] Restrict the size of chunks read from the file in order to avoid overflow or huge memory consumption. Patch by Mark Eichin

This commit is contained in:
Andrew M. Kuchling 2005-06-09 14:19:32 +00:00
parent a6f68e1b1f
commit 01cb47b59c
2 changed files with 26 additions and 2 deletions

View File

@ -55,6 +55,7 @@ class GzipFile:
""" """
myfileobj = None myfileobj = None
max_read_chunk = 10 * 1024 * 1024 # 10Mb
def __init__(self, filename=None, mode=None, def __init__(self, filename=None, mode=None,
compresslevel=9, fileobj=None): compresslevel=9, fileobj=None):
@ -215,14 +216,14 @@ class GzipFile:
try: try:
while True: while True:
self._read(readsize) self._read(readsize)
readsize = readsize * 2 readsize = min(self.max_read_chunk, readsize * 2)
except EOFError: except EOFError:
size = self.extrasize size = self.extrasize
else: # just get some more of it else: # just get some more of it
try: try:
while size > self.extrasize: while size > self.extrasize:
self._read(readsize) self._read(readsize)
readsize = readsize * 2 readsize = min(self.max_read_chunk, readsize * 2)
except EOFError: except EOFError:
if size > self.extrasize: if size > self.extrasize:
size = self.extrasize size = self.extrasize

View File

@ -58,6 +58,29 @@ class TestGzip(unittest.TestCase):
f = gzip.GzipFile(self.filename, 'rb') ; d = f.read() ; f.close() f = gzip.GzipFile(self.filename, 'rb') ; d = f.read() ; f.close()
self.assertEqual(d, (data1*50) + (data2*15)) self.assertEqual(d, (data1*50) + (data2*15))
def test_many_append(self):
# Bug #1074261 was triggered when reading a file that contained
# many, many members. Create such a file and verify that reading it
# works.
f = gzip.open(self.filename, 'wb', 9)
f.write('a')
f.close()
for i in range(0,200):
f = gzip.open(self.filename, "ab", 9) # append
f.write('a')
f.close()
# Try reading the file
zgfile = gzip.open(self.filename, "rb")
contents = ""
while 1:
ztxt = zgfile.read(8192)
contents += ztxt
if not ztxt: break
zgfile.close()
self.assertEquals(contents, 'a'*201)
def test_readline(self): def test_readline(self):
self.test_write() self.test_write()
# Try .readline() with varying line lengths # Try .readline() with varying line lengths