diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 5e837cd6de6..fb866d8b4dc 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -10,10 +10,9 @@ import unittest from tempfile import TemporaryFile -from random import randint, random -from unittest import skipUnless +from random import randint, random, getrandbits -from test.support import (TESTFN, run_unittest, findfile, unlink, +from test.support import (TESTFN, findfile, unlink, requires_zlib, requires_bz2, requires_lzma, captured_stdout) @@ -27,14 +26,24 @@ SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] +def get_files(test): + yield TESTFN2 + with TemporaryFile() as f: + yield f + test.assertFalse(f.closed) + with io.BytesIO() as f: + yield f + test.assertFalse(f.closed) + +class AbstractTestsWithSourceFile: + @classmethod + def setUpClass(cls): + cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % + (i, random()), "ascii") + for i in range(FIXEDTEST_SIZE)] + cls.data = b''.join(cls.line_gen) -class TestsWithSourceFile(unittest.TestCase): def setUp(self): - self.line_gen = (bytes("Zipfile test line %d. random float: %f" % - (i, random()), "ascii") - for i in range(FIXEDTEST_SIZE)) - self.data = b'\n'.join(self.line_gen) + b'\n' - # Make a source file with some lines with open(TESTFN, "wb") as fp: fp.write(self.data) @@ -97,12 +106,10 @@ class TestsWithSourceFile(unittest.TestCase): # Check that testzip doesn't raise an exception zipfp.testzip() - if not isinstance(f, str): - f.close() - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) + def test_basic(self): + for f in get_files(self): + self.zip_test(f, self.compression) def zip_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -127,30 +134,10 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(b''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata2), self.data) - if not isinstance(f, str): - f.close() - def test_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_STORED) - - def test_open_via_zip_info(self): - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - zipfp.writestr("name", "foo") - zipfp.writestr("name", "bar") - - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - infos = zipfp.infolist() - data = b"" - for info in infos: - with zipfp.open(info) as zipopen: - data += zipopen.read() - self.assertTrue(data == b"foobar" or data == b"barfoo") - data = b"" - for info in infos: - data += zipfp.read(info) - self.assertTrue(data == b"foobar" or data == b"barfoo") + def test_open(self): + for f in get_files(self): + self.zip_open_test(f, self.compression) def zip_random_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -166,36 +153,17 @@ class TestsWithSourceFile(unittest.TestCase): zipdata1.append(read_data) self.assertEqual(b''.join(zipdata1), self.data) - if not isinstance(f, str): - f.close() - def test_random_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_STORED) - - def test_univeral_readaheads(self): - f = io.BytesIO() - - data = b'a\r\n' * 16 * 1024 - zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) - zipfp.writestr(TESTFN, data) - zipfp.close() - - data2 = b'' - zipfp = zipfile.ZipFile(f, 'r') - with zipfp.open(TESTFN, 'rU') as zipopen: - for line in zipopen: - data2 += line - zipfp.close() - - self.assertEqual(data, data2.replace(b'\n', b'\r\n')) + def test_random_open(self): + for f in get_files(self): + self.zip_random_open_test(f, self.compression) def zip_readline_read_test(self, f, compression): self.make_test_archive(f, compression) # Read the ZIP archive - zipfp = zipfile.ZipFile(f, "r") - with zipfp.open(TESTFN) as zipopen: + with zipfile.ZipFile(f, "r") as zipfp, \ + zipfp.open(TESTFN) as zipopen: data = b'' while True: read = zipopen.readline() @@ -209,9 +177,11 @@ class TestsWithSourceFile(unittest.TestCase): data += read self.assertEqual(data, self.data) - zipfp.close() - if not isinstance(f, str): - f.close() + + def test_readline_read(self): + # Issue #7610: calls to readline() interleaved with calls to read(). + for f in get_files(self): + self.zip_readline_read_test(f, self.compression) def zip_readline_test(self, f, compression): self.make_test_archive(f, compression) @@ -221,9 +191,11 @@ class TestsWithSourceFile(unittest.TestCase): with zipfp.open(TESTFN) as zipopen: for line in self.line_gen: linedata = zipopen.readline() - self.assertEqual(linedata, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(linedata, line) + + def test_readline(self): + for f in get_files(self): + self.zip_readline_test(f, self.compression) def zip_readlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -233,9 +205,11 @@ class TestsWithSourceFile(unittest.TestCase): with zipfp.open(TESTFN) as zipopen: ziplines = zipopen.readlines() for line, zipline in zip(self.line_gen, ziplines): - self.assertEqual(zipline, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(zipline, line) + + def test_readlines(self): + for f in get_files(self): + self.zip_readlines_test(f, self.compression) def zip_iterlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -244,173 +218,64 @@ class TestsWithSourceFile(unittest.TestCase): with zipfile.ZipFile(f, "r") as zipfp: with zipfp.open(TESTFN) as zipopen: for line, zipline in zip(self.line_gen, zipopen): - self.assertEqual(zipline, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(zipline, line) - def test_readline_read_stored(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_STORED) + def test_iterlines(self): + for f in get_files(self): + self.zip_iterlines_test(f, self.compression) - def test_readline_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_STORED) - - def test_readlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_STORED) - - def test_iterlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) - - - @requires_zlib - def test_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_random_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_read_deflated(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_iterlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib def test_low_compression(self): """Check for cases where compressed data is larger than original.""" # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: + with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: zipfp.writestr("strfile", '12') # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: + with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: with zipfp.open("strfile") as openobj: self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'2') - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) + def test_writestr_compression(self): + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.writestr("b.txt", "hello world", compress_type=self.compression) + info = zipfp.getinfo('b.txt') + self.assertEqual(info.compress_type, self.compression) - @requires_bz2 - def test_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_BZIP2) + def test_read_return_size(self): + # Issue #9837: ZipExtFile.read() shouldn't return more bytes + # than requested. + for test_size in (1, 4095, 4096, 4097, 16384): + file_size = test_size + 1 + junk = getrandbits(8 * file_size).to_bytes(file_size, 'little') + with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: + zipf.writestr('foo', junk) + with zipf.open('foo', 'r') as fp: + buf = fp.read(test_size) + self.assertEqual(len(buf), test_size) - @requires_bz2 - def test_random_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_BZIP2) + def tearDown(self): + unlink(TESTFN) + unlink(TESTFN2) - @requires_bz2 - def test_readline_read_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_BZIP2) - @requires_bz2 - def test_readline_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_BZIP2) +class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_STORED + test_low_compression = None - @requires_bz2 - def test_readlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_BZIP2) + def zip_test_writestr_permissions(self, f, compression): + # Make sure that writestr creates files with mode 0600, + # when it is passed a name rather than a ZipInfo instance. - @requires_bz2 - def test_iterlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_BZIP2) + self.make_test_archive(f, compression) + with zipfile.ZipFile(f, "r") as zipfp: + zinfo = zipfp.getinfo('strfile') + self.assertEqual(zinfo.external_attr, 0o600 << 16) - @requires_bz2 - def test_low_compression_bzip2(self): - """Check for cases where compressed data is larger than original.""" - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_BZIP2) as zipfp: - zipfp.writestr("strfile", '12') - - # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_BZIP2) as zipfp: - with zipfp.open("strfile") as openobj: - self.assertEqual(openobj.read(1), b'1') - self.assertEqual(openobj.read(1), b'2') - - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_random_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readline_read_lzma(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readline_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_iterlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_low_compression_lzma(self): - """Check for cases where compressed data is larger than original.""" - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_LZMA) as zipfp: - zipfp.writestr("strfile", '12') - - # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_LZMA) as zipfp: - with zipfp.open("strfile") as openobj: - self.assertEqual(openobj.read(1), b'1') - self.assertEqual(openobj.read(1), b'2') + def test_writestr_permissions(self): + for f in get_files(self): + self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: @@ -470,7 +335,26 @@ class TestsWithSourceFile(unittest.TestCase): with open(TESTFN, "rb") as f: self.assertEqual(zipfp.read(TESTFN), f.read()) - @requires_zlib + def test_write_to_readonly(self): + """Check that trying to call write() on a readonly ZipFile object + raises a RuntimeError.""" + with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: + zipfp.writestr("somefile.txt", "bogus") + + with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: + self.assertRaises(RuntimeError, zipfp.write, TESTFN) + + def test_add_file_before_1980(self): + # Set atime and mtime to 1970-01-01 + os.utime(TESTFN, (0, 0)) + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + self.assertRaises(ValueError, zipfp.write, TESTFN) + +@requires_zlib +class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + def test_per_file_compression(self): """Check that files within a Zip archive can have different compression options.""" @@ -482,15 +366,263 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) - def test_write_to_readonly(self): - """Check that trying to call write() on a readonly ZipFile object - raises a RuntimeError.""" - with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: - zipfp.writestr("somefile.txt", "bogus") +@requires_bz2 +class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 - with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: - self.assertRaises(RuntimeError, zipfp.write, TESTFN) +@requires_lzma +class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_LZMA + +class AbstractTestZip64InSmallFiles: + # These tests test the ZIP64 functionality without using large files, + # see test_zipfile64 for proper tests. + + @classmethod + def setUpClass(cls): + line_gen = (bytes("Test of zipfile line %d." % i, "ascii") + for i in range(0, FIXEDTEST_SIZE)) + cls.data = b'\n'.join(line_gen) + + def setUp(self): + self._limit = zipfile.ZIP64_LIMIT + zipfile.ZIP64_LIMIT = 5 + + # Make a source file with some lines + with open(TESTFN, "wb") as fp: + fp.write(self.data) + + def zip_test(self, f, compression): + # Create the ZIP archive + with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: + zipfp.write(TESTFN, "another.name") + zipfp.write(TESTFN, TESTFN) + zipfp.writestr("strfile", self.data) + + # Read the ZIP archive + with zipfile.ZipFile(f, "r", compression) as zipfp: + self.assertEqual(zipfp.read(TESTFN), self.data) + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + + # Print the ZIP directory + fp = io.StringIO() + zipfp.printdir(fp) + + directory = fp.getvalue() + lines = directory.splitlines() + self.assertEqual(len(lines), 4) # Number of files + header + + self.assertIn('File Name', lines[0]) + self.assertIn('Modified', lines[0]) + self.assertIn('Size', lines[0]) + + fn, date, time_, size = lines[1].split() + self.assertEqual(fn, 'another.name') + self.assertTrue(time.strptime(date, '%Y-%m-%d')) + self.assertTrue(time.strptime(time_, '%H:%M:%S')) + self.assertEqual(size, str(len(self.data))) + + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 3) + self.assertIn(TESTFN, names) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 3) + self.assertIn(TESTFN, names) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + # check getinfo + for nm in (TESTFN, "another.name", "strfile"): + info = zipfp.getinfo(nm) + self.assertEqual(info.filename, nm) + self.assertEqual(info.file_size, len(self.data)) + + # Check that testzip doesn't raise an exception + zipfp.testzip() + + def test_basic(self): + for f in get_files(self): + self.zip_test(f, self.compression) + + def tearDown(self): + zipfile.ZIP64_LIMIT = self._limit + unlink(TESTFN) + unlink(TESTFN2) + + +class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_STORED + + def large_file_exception_test(self, f, compression): + with zipfile.ZipFile(f, "w", compression) as zipfp: + self.assertRaises(zipfile.LargeZipFile, + zipfp.write, TESTFN, "another.name") + + def large_file_exception_test2(self, f, compression): + with zipfile.ZipFile(f, "w", compression) as zipfp: + self.assertRaises(zipfile.LargeZipFile, + zipfp.writestr, "another.name", self.data) + + def test_large_file_exception(self): + for f in get_files(self): + self.large_file_exception_test(f, zipfile.ZIP_STORED) + self.large_file_exception_test2(f, zipfile.ZIP_STORED) + + def test_absolute_arcnames(self): + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, + allowZip64=True) as zipfp: + zipfp.write(TESTFN, "/absolute") + + with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: + self.assertEqual(zipfp.namelist(), ["absolute"]) + +@requires_zlib +class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2 +class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma +class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_LZMA + + +class PyZipFileTests(unittest.TestCase): + def assertCompiledIn(self, name, namelist): + if name + 'o' not in namelist: + self.assertIn(name + 'c', namelist) + + def test_write_pyfile(self): + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + fn = __file__ + if fn.endswith('.pyc') or fn.endswith('.pyo'): + path_split = fn.split(os.sep) + if os.altsep is not None: + path_split.extend(fn.split(os.altsep)) + if '__pycache__' in path_split: + fn = imp.source_from_cache(fn) + else: + fn = fn[:-1] + + zipfp.writepy(fn) + + bn = os.path.basename(fn) + self.assertNotIn(bn, zipfp.namelist()) + self.assertCompiledIn(bn, zipfp.namelist()) + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + fn = __file__ + if fn.endswith(('.pyc', '.pyo')): + fn = fn[:-1] + + zipfp.writepy(fn, "testpackage") + + bn = "%s/%s" % ("testpackage", os.path.basename(fn)) + self.assertNotIn(bn, zipfp.namelist()) + self.assertCompiledIn(bn, zipfp.namelist()) + + def test_write_python_package(self): + import email + packagedir = os.path.dirname(email.__file__) + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + zipfp.writepy(packagedir) + + # Check for a couple of modules at different levels of the + # hierarchy + names = zipfp.namelist() + self.assertCompiledIn('email/__init__.py', names) + self.assertCompiledIn('email/mime/text.py', names) + + def test_write_with_optimization(self): + import email + packagedir = os.path.dirname(email.__file__) + # use .pyc if running test in optimization mode, + # use .pyo if running test in debug mode + optlevel = 1 if __debug__ else 0 + ext = '.pyo' if optlevel == 1 else '.pyc' + + with TemporaryFile() as t, \ + zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: + zipfp.writepy(packagedir) + + names = zipfp.namelist() + self.assertIn('email/__init__' + ext, names) + self.assertIn('email/mime/text' + ext, names) + + def test_write_python_directory(self): + os.mkdir(TESTFN2) + try: + with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: + fp.write("print(42)\n") + + with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: + fp.write("print(42 * 42)\n") + + with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: + fp.write("bla bla bla\n") + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + zipfp.writepy(TESTFN2) + + names = zipfp.namelist() + self.assertCompiledIn('mod1.py', names) + self.assertCompiledIn('mod2.py', names) + self.assertNotIn('mod2.txt', names) + + finally: + shutil.rmtree(TESTFN2) + + def test_write_non_pyfile(self): + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + with open(TESTFN, 'w') as f: + f.write('most definitely not a python file') + self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) + os.remove(TESTFN) + + def test_write_pyfile_bad_syntax(self): + os.mkdir(TESTFN2) + try: + with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: + fp.write("Bad syntax in python file\n") + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + # syntax errors are printed to stdout + with captured_stdout() as s: + zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) + + self.assertIn("SyntaxError", s.getvalue()) + + # as it will not have compiled the python file, it will + # include the .py file not .pyc or .pyo + names = zipfp.namelist() + self.assertIn('mod1.py', names) + self.assertNotIn('mod1.pyc', names) + self.assertNotIn('mod1.pyo', names) + + finally: + shutil.rmtree(TESTFN2) + + +class ExtractTests(unittest.TestCase): def test_extract(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: for fpath, fdata in SMALL_TEST_DATA: @@ -636,47 +768,40 @@ class TestsWithSourceFile(unittest.TestCase): os.remove(TESTFN2) - def test_writestr_compression_stored(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) - info = zipfp.getinfo('a.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_STORED) - @requires_zlib - def test_writestr_compression_deflated(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) +class OtherTests(unittest.TestCase): + def test_open_via_zip_info(self): + # Create the ZIP archive + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + zipfp.writestr("name", "foo") + zipfp.writestr("name", "bar") - @requires_bz2 - def test_writestr_compression_bzip2(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_BZIP2) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2) + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + infos = zipfp.infolist() + data = b"" + for info in infos: + with zipfp.open(info) as zipopen: + data += zipopen.read() + self.assertIn(data, {b"foobar", b"barfoo"}) + data = b"" + for info in infos: + data += zipfp.read(info) + self.assertIn(data, {b"foobar", b"barfoo"}) - @requires_lzma - def test_writestr_compression_lzma(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_LZMA) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_LZMA) + def test_universal_readaheads(self): + f = io.BytesIO() - def zip_test_writestr_permissions(self, f, compression): - # Make sure that writestr creates files with mode 0600, - # when it is passed a name rather than a ZipInfo instance. + data = b'a\r\n' * 16 * 1024 + with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp: + zipfp.writestr(TESTFN, data) - self.make_test_archive(f, compression) - with zipfile.ZipFile(f, "r") as zipfp: - zinfo = zipfp.getinfo('strfile') - self.assertEqual(zinfo.external_attr, 0o600 << 16) - if not isinstance(f, str): - f.close() + data2 = b'' + with zipfile.ZipFile(f, 'r') as zipfp, \ + zipfp.open(TESTFN, 'rU') as zipopen: + for line in zipopen: + data2 += line - def test_writestr_permissions(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) + self.assertEqual(data, data2.replace(b'\n', b'\r\n')) def test_writestr_extended_local_header_issue1202(self): with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: @@ -690,12 +815,12 @@ class TestsWithSourceFile(unittest.TestCase): with zipfile.ZipFile(TESTFN2, "w") as zipfp: for fpath, fdata in SMALL_TEST_DATA: zipfp.writestr(fpath, fdata) - self.assertTrue(zipfp.fp is not None, 'zipfp is not open') - self.assertTrue(zipfp.fp is None, 'zipfp is not closed') + self.assertIsNotNone(zipfp.fp, 'zipfp is not open') + self.assertIsNone(zipfp.fp, 'zipfp is not closed') with zipfile.ZipFile(TESTFN2, "r") as zipfp: - self.assertTrue(zipfp.fp is not None, 'zipfp is not open') - self.assertTrue(zipfp.fp is None, 'zipfp is not closed') + self.assertIsNotNone(zipfp.fp, 'zipfp is not open') + self.assertIsNone(zipfp.fp, 'zipfp is not closed') def test_close_on_exception(self): """Check that the zipfile is closed if an exception is raised in the @@ -708,317 +833,7 @@ class TestsWithSourceFile(unittest.TestCase): with zipfile.ZipFile(TESTFN2, "r") as zipfp2: raise zipfile.BadZipFile() except zipfile.BadZipFile: - self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') - - def test_add_file_before_1980(self): - # Set atime and mtime to 1970-01-01 - os.utime(TESTFN, (0, 0)) - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - self.assertRaises(ValueError, zipfp.write, TESTFN) - - - - - - - - @requires_zlib - def test_unicode_filenames(self): - # bug #10801 - fname = findfile('zip_cp437_header.zip') - with zipfile.ZipFile(fname) as zipfp: - for name in zipfp.namelist(): - zipfp.open(name).close() - - def tearDown(self): - unlink(TESTFN) - unlink(TESTFN2) - - -class TestZip64InSmallFiles(unittest.TestCase): - # These tests test the ZIP64 functionality without using large files, - # see test_zipfile64 for proper tests. - - def setUp(self): - self._limit = zipfile.ZIP64_LIMIT - zipfile.ZIP64_LIMIT = 5 - - line_gen = (bytes("Test of zipfile line %d." % i, "ascii") - for i in range(0, FIXEDTEST_SIZE)) - self.data = b'\n'.join(line_gen) - - # Make a source file with some lines - with open(TESTFN, "wb") as fp: - fp.write(self.data) - - def large_file_exception_test(self, f, compression): - with zipfile.ZipFile(f, "w", compression) as zipfp: - self.assertRaises(zipfile.LargeZipFile, - zipfp.write, TESTFN, "another.name") - - def large_file_exception_test2(self, f, compression): - with zipfile.ZipFile(f, "w", compression) as zipfp: - self.assertRaises(zipfile.LargeZipFile, - zipfp.writestr, "another.name", self.data) - if not isinstance(f, str): - f.close() - - def test_large_file_exception(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.large_file_exception_test(f, zipfile.ZIP_STORED) - self.large_file_exception_test2(f, zipfile.ZIP_STORED) - - def zip_test(self, f, compression): - # Create the ZIP archive - with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: - zipfp.write(TESTFN, "another.name") - zipfp.write(TESTFN, TESTFN) - zipfp.writestr("strfile", self.data) - - # Read the ZIP archive - with zipfile.ZipFile(f, "r", compression) as zipfp: - self.assertEqual(zipfp.read(TESTFN), self.data) - self.assertEqual(zipfp.read("another.name"), self.data) - self.assertEqual(zipfp.read("strfile"), self.data) - - # Print the ZIP directory - fp = io.StringIO() - zipfp.printdir(fp) - - directory = fp.getvalue() - lines = directory.splitlines() - self.assertEqual(len(lines), 4) # Number of files + header - - self.assertIn('File Name', lines[0]) - self.assertIn('Modified', lines[0]) - self.assertIn('Size', lines[0]) - - fn, date, time_, size = lines[1].split() - self.assertEqual(fn, 'another.name') - self.assertTrue(time.strptime(date, '%Y-%m-%d')) - self.assertTrue(time.strptime(time_, '%H:%M:%S')) - self.assertEqual(size, str(len(self.data))) - - # Check the namelist - names = zipfp.namelist() - self.assertEqual(len(names), 3) - self.assertIn(TESTFN, names) - self.assertIn("another.name", names) - self.assertIn("strfile", names) - - # Check infolist - infos = zipfp.infolist() - names = [i.filename for i in infos] - self.assertEqual(len(names), 3) - self.assertIn(TESTFN, names) - self.assertIn("another.name", names) - self.assertIn("strfile", names) - for i in infos: - self.assertEqual(i.file_size, len(self.data)) - - # check getinfo - for nm in (TESTFN, "another.name", "strfile"): - info = zipfp.getinfo(nm) - self.assertEqual(info.filename, nm) - self.assertEqual(info.file_size, len(self.data)) - - # Check that testzip doesn't raise an exception - zipfp.testzip() - if not isinstance(f, str): - f.close() - - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) - - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) - - def test_absolute_arcnames(self): - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, - allowZip64=True) as zipfp: - zipfp.write(TESTFN, "/absolute") - - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: - self.assertEqual(zipfp.namelist(), ["absolute"]) - - def tearDown(self): - zipfile.ZIP64_LIMIT = self._limit - unlink(TESTFN) - unlink(TESTFN2) - - -class PyZipFileTests(unittest.TestCase): - def test_write_pyfile(self): - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - fn = __file__ - if fn.endswith('.pyc') or fn.endswith('.pyo'): - path_split = fn.split(os.sep) - if os.altsep is not None: - path_split.extend(fn.split(os.altsep)) - if '__pycache__' in path_split: - fn = imp.source_from_cache(fn) - else: - fn = fn[:-1] - - zipfp.writepy(fn) - - bn = os.path.basename(fn) - self.assertNotIn(bn, zipfp.namelist()) - self.assertTrue(bn + 'o' in zipfp.namelist() or - bn + 'c' in zipfp.namelist()) - - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - fn = __file__ - if fn.endswith(('.pyc', '.pyo')): - fn = fn[:-1] - - zipfp.writepy(fn, "testpackage") - - bn = "%s/%s" % ("testpackage", os.path.basename(fn)) - self.assertNotIn(bn, zipfp.namelist()) - self.assertTrue(bn + 'o' in zipfp.namelist() or - bn + 'c' in zipfp.namelist()) - - def test_write_python_package(self): - import email - packagedir = os.path.dirname(email.__file__) - - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - zipfp.writepy(packagedir) - - # Check for a couple of modules at different levels of the - # hierarchy - names = zipfp.namelist() - self.assertTrue('email/__init__.pyo' in names or - 'email/__init__.pyc' in names) - self.assertTrue('email/mime/text.pyo' in names or - 'email/mime/text.pyc' in names) - - def test_write_with_optimization(self): - import email - packagedir = os.path.dirname(email.__file__) - # use .pyc if running test in optimization mode, - # use .pyo if running test in debug mode - optlevel = 1 if __debug__ else 0 - ext = '.pyo' if optlevel == 1 else '.pyc' - - with TemporaryFile() as t, \ - zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: - zipfp.writepy(packagedir) - - names = zipfp.namelist() - self.assertIn('email/__init__' + ext, names) - self.assertIn('email/mime/text' + ext, names) - - def test_write_python_directory(self): - os.mkdir(TESTFN2) - try: - with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: - fp.write("print(42)\n") - - with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: - fp.write("print(42 * 42)\n") - - with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: - fp.write("bla bla bla\n") - - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - zipfp.writepy(TESTFN2) - - names = zipfp.namelist() - self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) - self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) - self.assertNotIn('mod2.txt', names) - - finally: - shutil.rmtree(TESTFN2) - - def test_write_non_pyfile(self): - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - with open(TESTFN, 'w') as f: - f.write('most definitely not a python file') - self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) - os.remove(TESTFN) - - def test_write_pyfile_bad_syntax(self): - os.mkdir(TESTFN2) - try: - with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: - fp.write("Bad syntax in python file\n") - - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - # syntax errors are printed to stdout - with captured_stdout() as s: - zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) - - self.assertIn("SyntaxError", s.getvalue()) - - # as it will not have compiled the python file, it will - # include the .py file not .pyc or .pyo - names = zipfp.namelist() - self.assertIn('mod1.py', names) - self.assertNotIn('mod1.pyc', names) - self.assertNotIn('mod1.pyo', names) - - finally: - shutil.rmtree(TESTFN2) - -class OtherTests(unittest.TestCase): - zips_with_bad_crc = { - zipfile.ZIP_STORED: ( - b'PK\003\004\024\0\0\0\0\0 \213\212;:r' - b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' - b'ilehello,AworldP' - b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' - b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' - b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' - b'lePK\005\006\0\0\0\0\001\0\001\0003\000' - b'\0\0/\0\0\0\0\0'), - zipfile.ZIP_DEFLATED: ( - b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' - b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' - b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' - b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' - b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' - b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), - zipfile.ZIP_BZIP2: ( - b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' - b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ileBZh91AY&SY\xd4\xa8\xca' - b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' - b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' - b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' - b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' - b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' - b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' - b'\x00\x00\x00\x00'), - zipfile.ZIP_LZMA: ( - b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' - b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' - b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' - b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' - b'\x00>\x00\x00\x00\x00\x00'), - } + self.assertIsNone(zipfp2.fp, 'zipfp is not closed') def test_unsupported_version(self): # File has an extract_version of 120 @@ -1027,10 +842,19 @@ class OtherTests(unittest.TestCase): b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') + self.assertRaises(NotImplementedError, zipfile.ZipFile, io.BytesIO(data), 'r') - def test_unicode_filenames(self): + @requires_zlib + def test_read_unicode_filenames(self): + # bug #10801 + fname = findfile('zip_cp437_header.zip') + with zipfile.ZipFile(fname) as zipfp: + for name in zipfp.namelist(): + zipfp.open(name).close() + + def test_write_unicode_filenames(self): with zipfile.ZipFile(TESTFN, "w") as zf: zf.writestr("foo.txt", "Test for unicode filename") zf.writestr("\xf6.txt", "Test for unicode filename") @@ -1078,20 +902,16 @@ class OtherTests(unittest.TestCase): # - passing a filename with open(TESTFN, "w") as fp: fp.write("this is not a legal zip file\n") - chk = zipfile.is_zipfile(TESTFN) - self.assertFalse(chk) + self.assertFalse(zipfile.is_zipfile(TESTFN)) # - passing a file object with open(TESTFN, "rb") as fp: - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) # - passing a file-like object fp = io.BytesIO() fp.write(b"this is not a legal zip file\n") - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) fp.seek(0, 0) - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) def test_damaged_zipfile(self): """Check that zipfiles with missing bytes at the end raise BadZipFile.""" @@ -1113,22 +933,18 @@ class OtherTests(unittest.TestCase): with zipfile.ZipFile(TESTFN, mode="w") as zipf: zipf.writestr("foo.txt", b"O, for a Muse of Fire!") - chk = zipfile.is_zipfile(TESTFN) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(TESTFN)) # - passing a file object with open(TESTFN, "rb") as fp: - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) fp.seek(0, 0) zip_contents = fp.read() # - passing a file-like object fp = io.BytesIO() fp.write(zip_contents) - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) fp.seek(0, 0) - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) def test_non_existent_file_raises_IOError(self): # make sure we don't raise an AttributeError when a partially-constructed @@ -1309,93 +1125,6 @@ class OtherTests(unittest.TestCase): with zipfile.ZipFile(TESTFN, "r") as zipf: self.assertEqual(zipf.comment, b"this is a comment") - def check_testzip_with_bad_crc(self, compression): - """Tests that files with bad CRCs return their name from testzip.""" - zipdata = self.zips_with_bad_crc[compression] - - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - # testzip returns the name of the first corrupt file, or None - self.assertEqual('afile', zipf.testzip()) - - def test_testzip_with_bad_crc_stored(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) - - @requires_zlib - def test_testzip_with_bad_crc_deflated(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_testzip_with_bad_crc_bzip2(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_testzip_with_bad_crc_lzma(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_LZMA) - - def check_read_with_bad_crc(self, compression): - """Tests that files with bad CRCs raise a BadZipFile exception when read.""" - zipdata = self.zips_with_bad_crc[compression] - - # Using ZipFile.read() - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') - - # Using ZipExtFile.read() - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - with zipf.open('afile', 'r') as corrupt_file: - self.assertRaises(zipfile.BadZipFile, corrupt_file.read) - - # Same with small reads (in order to exercise the buffering logic) - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - with zipf.open('afile', 'r') as corrupt_file: - corrupt_file.MIN_READ_SIZE = 2 - with self.assertRaises(zipfile.BadZipFile): - while corrupt_file.read(2): - pass - - def test_read_with_bad_crc_stored(self): - self.check_read_with_bad_crc(zipfile.ZIP_STORED) - - @requires_zlib - def test_read_with_bad_crc_deflated(self): - self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_with_bad_crc_bzip2(self): - self.check_read_with_bad_crc(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_with_bad_crc_lzma(self): - self.check_read_with_bad_crc(zipfile.ZIP_LZMA) - - def check_read_return_size(self, compression): - # Issue #9837: ZipExtFile.read() shouldn't return more bytes - # than requested. - for test_size in (1, 4095, 4096, 4097, 16384): - file_size = test_size + 1 - junk = b''.join(struct.pack('B', randint(0, 255)) - for x in range(file_size)) - with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: - zipf.writestr('foo', junk) - with zipf.open('foo', 'r') as fp: - buf = fp.read(test_size) - self.assertEqual(len(buf), test_size) - - def test_read_return_size_stored(self): - self.check_read_return_size(zipfile.ZIP_STORED) - - @requires_zlib - def test_read_return_size_deflated(self): - self.check_read_return_size(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_return_size_bzip2(self): - self.check_read_return_size(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_return_size_lzma(self): - self.check_read_return_size(zipfile.ZIP_LZMA) - def test_empty_zipfile(self): # Check that creating a file in 'w' or 'a' mode and closing without # adding any files to the archives creates a valid empty ZIP file @@ -1430,6 +1159,93 @@ class OtherTests(unittest.TestCase): unlink(TESTFN2) +class AbstractBadCrcTests: + def test_testzip_with_bad_crc(self): + """Tests that files with bad CRCs return their name from testzip.""" + zipdata = self.zip_with_bad_crc + + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + # testzip returns the name of the first corrupt file, or None + self.assertEqual('afile', zipf.testzip()) + + def test_read_with_bad_crc(self): + """Tests that files with bad CRCs raise a BadZipFile exception when read.""" + zipdata = self.zip_with_bad_crc + + # Using ZipFile.read() + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') + + # Using ZipExtFile.read() + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + with zipf.open('afile', 'r') as corrupt_file: + self.assertRaises(zipfile.BadZipFile, corrupt_file.read) + + # Same with small reads (in order to exercise the buffering logic) + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + with zipf.open('afile', 'r') as corrupt_file: + corrupt_file.MIN_READ_SIZE = 2 + with self.assertRaises(zipfile.BadZipFile): + while corrupt_file.read(2): + pass + + +class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_STORED + zip_with_bad_crc = ( + b'PK\003\004\024\0\0\0\0\0 \213\212;:r' + b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' + b'ilehello,AworldP' + b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' + b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' + b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' + b'lePK\005\006\0\0\0\0\001\0\001\0003\000' + b'\0\0/\0\0\0\0\0') + +@requires_zlib +class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' + b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' + b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' + b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' + b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' + b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') + +@requires_bz2 +class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' + b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ileBZh91AY&SY\xd4\xa8\xca' + b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' + b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' + b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' + b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' + b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' + b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' + b'\x00\x00\x00\x00') + +@requires_lzma +class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_LZMA + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' + b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' + b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' + b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' + b'\x00>\x00\x00\x00\x00\x00') + + class DecryptionTests(unittest.TestCase): """Check that ZIP decryption works. Since the library does not support encryption at the moment, we use a pre-generated encrypted @@ -1495,13 +1311,14 @@ class DecryptionTests(unittest.TestCase): self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") - -class TestsWithRandomBinaryFiles(unittest.TestCase): - def setUp(self): +class AbstractTestsWithRandomBinaryFiles: + @classmethod + def setUpClass(cls): datacount = randint(16, 64)*1024 + randint(1, 1024) - self.data = b''.join(struct.pack('