start banging on zipfile's file leakiness

This commit is contained in:
Benjamin Peterson 2010-10-31 17:57:22 +00:00
parent d6868b4ed4
commit d285bdb443
2 changed files with 50 additions and 10 deletions

View File

@ -99,6 +99,8 @@ class TestsWithSourceFile(unittest.TestCase):
# Check that testzip doesn't raise an exception
zipfp.testzip()
if not isinstance(f, str):
f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -127,6 +129,8 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(b''.join(zipdata1), self.data)
self.assertEqual(b''.join(zipdata2), self.data)
if not isinstance(f, str):
f.close()
def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -163,7 +167,8 @@ class TestsWithSourceFile(unittest.TestCase):
zipdata1.append(read_data)
self.assertEqual(b''.join(zipdata1), self.data)
if not isinstance(f, str):
f.close()
def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -207,6 +212,8 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(data, self.data)
zipfp.close()
if not isinstance(f, str):
f.close()
def zip_readline_test(self, f, compression):
self.make_test_archive(f, compression)
@ -217,6 +224,8 @@ class TestsWithSourceFile(unittest.TestCase):
for line in self.line_gen:
linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n')
if not isinstance(f, str):
f.close()
def zip_readlines_test(self, f, compression):
self.make_test_archive(f, compression)
@ -226,6 +235,8 @@ class TestsWithSourceFile(unittest.TestCase):
ziplines = zipfp.open(TESTFN).readlines()
for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n')
if not isinstance(f, str):
f.close()
def zip_iterlines_test(self, f, compression):
self.make_test_archive(f, compression)
@ -234,6 +245,8 @@ class TestsWithSourceFile(unittest.TestCase):
with zipfile.ZipFile(f, "r") as zipfp:
for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
self.assertEqual(zipline, line + '\n')
if not isinstance(f, str):
f.close()
def test_readline_read_stored(self):
# Issue #7610: calls to readline() interleaved with calls to read().
@ -430,6 +443,8 @@ class TestsWithSourceFile(unittest.TestCase):
with zipfile.ZipFile(f, "r") as zipfp:
zinfo = zipfp.getinfo('strfile')
self.assertEqual(zinfo.external_attr, 0o600 << 16)
if not isinstance(f, str):
f.close()
def test_writestr_permissions(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -497,6 +512,8 @@ class TestZip64InSmallFiles(unittest.TestCase):
with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile,
zipfp.writestr, "another.name", self.data)
if not isinstance(f, str):
f.close()
def test_large_file_exception(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -559,6 +576,8 @@ class TestZip64InSmallFiles(unittest.TestCase):
# Check that testzip doesn't raise an exception
zipfp.testzip()
if not isinstance(f, str):
f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -1078,6 +1097,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data)
self.assertEqual(zipfp.read("another.name"), self.data)
if not isinstance(f, str):
f.close()
def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -1116,6 +1137,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
testdata2 = b''.join(zipdata2)
self.assertEqual(len(testdata2), len(self.data))
self.assertEqual(testdata2, self.data)
if not isinstance(f, str):
f.close()
def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -1142,6 +1165,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
testdata = b''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data)
if not isinstance(f, str):
f.close()
def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
@ -1259,8 +1284,11 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
zipdata = zipfp.open(fn, "rU").read()
with zipfp.open(fn, "rU") as fp:
zipdata = fp.read()
self.assertEqual(self.arcdata[sep], zipdata)
if not isinstance(f, str):
f.close()
def readline_read_test(self, f, compression):
self.make_test_archive(f, compression)
@ -1281,9 +1309,12 @@ class UniversalNewlineTests(unittest.TestCase):
break
data += read
zipopen.close()
self.assertEqual(data, self.arcdata['\n'])
zipfp.close()
if not isinstance(f, str):
f.close()
def readline_test(self, f, compression):
self.make_test_archive(f, compression)
@ -1291,10 +1322,12 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
zipopen = zipfp.open(fn, "rU")
for line in self.line_gen:
linedata = zipopen.readline()
self.assertEqual(linedata, line + b'\n')
with zipfp.open(fn, "rU") as zipopen:
for line in self.line_gen:
linedata = zipopen.readline()
self.assertEqual(linedata, line + b'\n')
if not isinstance(f, str):
f.close()
def readlines_test(self, f, compression):
self.make_test_archive(f, compression)
@ -1302,9 +1335,12 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
ziplines = zipfp.open(fn, "rU").readlines()
with zipfp.open(fn, "rU") as fp:
ziplines = fp.readlines()
for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + b'\n')
if not isinstance(f, str):
f.close()
def iterlines_test(self, f, compression):
self.make_test_archive(f, compression)
@ -1312,8 +1348,11 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
self.assertEqual(zipline, line + b'\n')
with zipfp.open(fn, "rU") as fp:
for line, zipline in zip(self.line_gen, fp):
self.assertEqual(zipline, line + b'\n')
if not isinstance(f, str):
f.close()
def test_read_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):

View File

@ -874,7 +874,8 @@ class ZipFile:
def read(self, name, pwd=None):
"""Return file bytes (as a string) for name."""
return self.open(name, "r", pwd).read()
with self.open(name, "r", pwd) as fp:
return fp.read()
def open(self, name, mode="r", pwd=None):
"""Return file-like object for 'name'."""