Issue #14366: Support lzma compression in zip files.

Patch by Serhiy Storchaka.
This commit is contained in:
Martin v. Löwis 2012-05-13 10:06:36 +02:00
parent bb54b33cec
commit 7fb79fcb64
5 changed files with 257 additions and 27 deletions

View File

@ -97,12 +97,20 @@ The module defines the following items:
.. versionadded:: 3.3
.. data:: ZIP_LZMA
The numeric constant for the LZMA compression method. This requires the
lzma module.
.. versionadded:: 3.3
.. note::
The ZIP file format specification has included support for bzip2 compression
since 2001. However, some tools (including older Python releases) do not
support it, and may either refuse to process the ZIP file altogether, or
fail to extract individual files.
since 2001, and for LZMA compression since 2006. However, some tools
(including older Python releases) do not support these compression
methods, and may either refuse to process the ZIP file altogether,
or fail to extract individual files.
.. seealso::
@ -133,11 +141,11 @@ ZipFile Objects
adding a ZIP archive to another file (such as :file:`python.exe`). If
*mode* is ``a`` and the file does not exist at all, it is created.
*compression* is the ZIP compression method to use when writing the archive,
and should be :const:`ZIP_STORED`, :const:`ZIP_DEFLATED`; or
:const:`ZIP_DEFLATED`; unrecognized
values will cause :exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED` or
:const:`ZIP_BZIP2` is specified but the corresponded module
(:mod:`zlib` or :mod:`bz2`) is not available, :exc:`RuntimeError`
and should be :const:`ZIP_STORED`, :const:`ZIP_DEFLATED`,
:const:`ZIP_BZIP2` or :const:`ZIP_LZMA`; unrecognized
values will cause :exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED`,
:const:`ZIP_BZIP2` or :const:`ZIP_LZMA` is specified but the corresponded module
(:mod:`zlib`, :mod:`bz2` or :mod:`lzma`) is not available, :exc:`RuntimeError`
is also raised. The default is :const:`ZIP_STORED`. If *allowZip64* is
``True`` zipfile will create ZIP files that use the ZIP64 extensions when
the zipfile is larger than 2 GB. If it is false (the default) :mod:`zipfile`
@ -161,7 +169,7 @@ ZipFile Objects
Added the ability to use :class:`ZipFile` as a context manager.
.. versionchanged:: 3.3
Added support for :mod:`bzip2` compression.
Added support for :mod:`bzip2` and :mod:`lzma` compression.
.. method:: ZipFile.close()

View File

@ -45,6 +45,11 @@ try:
except ImportError:
bz2 = None
try:
import lzma
except ImportError:
lzma = None
__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
"verbose", "use_resources", "max_memuse", "record_original_stdout",
@ -62,7 +67,7 @@ __all__ = [
"get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
"TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
"import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
"anticipate_failure", "run_with_tz", "requires_bz2"
"anticipate_failure", "run_with_tz", "requires_bz2", "requires_lzma"
]
class Error(Exception):
@ -513,6 +518,8 @@ requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
is_jython = sys.platform.startswith('java')
# Filename used for testing

View File

@ -13,7 +13,7 @@ from tempfile import TemporaryFile
from random import randint, random
from unittest import skipUnless
from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2
from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2, requires_lzma
TESTFN2 = TESTFN + "2"
TESTFNDIR = TESTFN + "d"
@ -361,6 +361,55 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), b'2')
@requires_lzma
def test_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_random_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_readline_read_lzma(self):
# Issue #7610: calls to readline() interleaved with calls to read().
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_read_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_readline_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_readlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readlines_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_iterlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_iterlines_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_low_compression_lzma(self):
"""Check for cases where compressed data is larger than original."""
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_LZMA) as zipfp:
zipfp.writestr("strfile", '12')
# Get an open object for strfile
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_LZMA) as zipfp:
with zipfp.open("strfile") as openobj:
self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), b'2')
def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, "/absolute")
@ -508,6 +557,13 @@ class TestsWithSourceFile(unittest.TestCase):
info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2)
@requires_lzma
def test_writestr_compression_lzma(self):
zipfp = zipfile.ZipFile(TESTFN2, "w")
zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_LZMA)
info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, zipfile.ZIP_LZMA)
def zip_test_writestr_permissions(self, f, compression):
# Make sure that writestr creates files with mode 0600,
# when it is passed a name rather than a ZipInfo instance.
@ -686,6 +742,11 @@ class TestZip64InSmallFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_BZIP2)
@requires_lzma
def test_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_LZMA)
def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
allowZip64=True) as zipfp:
@ -826,6 +887,16 @@ class OtherTests(unittest.TestCase):
b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
b'\x00\x00\x00\x00'),
zipfile.ZIP_LZMA: (
b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
b'\x00>\x00\x00\x00\x00\x00'),
}
def test_unsupported_version(self):
@ -1104,6 +1175,10 @@ class OtherTests(unittest.TestCase):
def test_testzip_with_bad_crc_bzip2(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2)
@requires_lzma
def test_testzip_with_bad_crc_lzma(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_LZMA)
def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zips_with_bad_crc[compression]
@ -1136,6 +1211,10 @@ class OtherTests(unittest.TestCase):
def test_read_with_bad_crc_bzip2(self):
self.check_read_with_bad_crc(zipfile.ZIP_BZIP2)
@requires_lzma
def test_read_with_bad_crc_lzma(self):
self.check_read_with_bad_crc(zipfile.ZIP_LZMA)
def check_read_return_size(self, compression):
# Issue #9837: ZipExtFile.read() shouldn't return more bytes
# than requested.
@ -1160,6 +1239,10 @@ class OtherTests(unittest.TestCase):
def test_read_return_size_bzip2(self):
self.check_read_return_size(zipfile.ZIP_BZIP2)
@requires_lzma
def test_read_return_size_lzma(self):
self.check_read_return_size(zipfile.ZIP_LZMA)
def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file
@ -1306,6 +1389,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_BZIP2)
@requires_lzma
def test_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_LZMA)
def zip_open_test(self, f, compression):
self.make_test_archive(f, compression)
@ -1351,6 +1439,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_BZIP2)
@requires_lzma
def test_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_LZMA)
def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression)
@ -1384,6 +1477,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_BZIP2)
@requires_lzma
def test_random_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_LZMA)
@requires_zlib
class TestsWithMultipleOpens(unittest.TestCase):
@ -1628,6 +1726,31 @@ class UniversalNewlineTests(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlines_test(f, zipfile.ZIP_BZIP2)
@requires_lzma
def test_read_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.read_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_readline_read_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_read_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_readline_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_readlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlines_test(f, zipfile.ZIP_LZMA)
@requires_lzma
def test_iterlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlines_test(f, zipfile.ZIP_LZMA)
def tearDown(self):
for sep, fn in self.arcfiles.items():
os.remove(fn)

View File

@ -27,8 +27,13 @@ try:
except ImportError:
bz2 = None
try:
import lzma # We may need its compression method
except ImportError:
lzma = None
__all__ = ["BadZipFile", "BadZipfile", "error",
"ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2",
"ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA",
"is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"]
class BadZipFile(Exception):
@ -52,13 +57,15 @@ ZIP_MAX_COMMENT = (1 << 16) - 1
ZIP_STORED = 0
ZIP_DEFLATED = 8
ZIP_BZIP2 = 12
ZIP_LZMA = 14
# Other ZIP compression methods not supported
DEFAULT_VERSION = 20
ZIP64_VERSION = 45
BZIP2_VERSION = 46
LZMA_VERSION = 63
# we recognize (but not necessarily support) all features up to that version
MAX_EXTRACT_VERSION = 46
MAX_EXTRACT_VERSION = 63
# Below are some formats and associated data for reading/writing headers using
# the struct module. The names and structures of headers/records are those used
@ -367,6 +374,8 @@ class ZipInfo (object):
if self.compress_type == ZIP_BZIP2:
min_version = max(BZIP2_VERSION, min_version)
elif self.compress_type == ZIP_LZMA:
min_version = max(LZMA_VERSION, min_version)
self.extract_version = max(min_version, self.extract_version)
self.create_version = max(min_version, self.create_version)
@ -480,6 +489,77 @@ class _ZipDecrypter:
return c
class LZMACompressor:
def __init__(self):
self._comp = None
def _init(self):
props = lzma.encode_filter_properties({'id': lzma.FILTER_LZMA1})
self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[
lzma.decode_filter_properties(lzma.FILTER_LZMA1, props)
])
return struct.pack('<BBH', 9, 4, len(props)) + props
def compress(self, data):
if self._comp is None:
return self._init() + self._comp.compress(data)
return self._comp.compress(data)
def flush(self):
if self._comp is None:
return self._init() + self._comp.flush()
return self._comp.flush()
class LZMADecompressor:
def __init__(self):
self._decomp = None
self._unconsumed = b''
self.eof = False
def decompress(self, data):
if self._decomp is None:
self._unconsumed += data
if len(self._unconsumed) <= 4:
return b''
psize, = struct.unpack('<H', self._unconsumed[2:4])
if len(self._unconsumed) <= 4 + psize:
return b''
self._decomp = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[
lzma.decode_filter_properties(lzma.FILTER_LZMA1,
self._unconsumed[4:4 + psize])
])
data = self._unconsumed[4 + psize:]
del self._unconsumed
result = self._decomp.decompress(data)
self.eof = self._decomp.eof
return result
compressor_names = {
0: 'store',
1: 'shrink',
2: 'reduce',
3: 'reduce',
4: 'reduce',
5: 'reduce',
6: 'implode',
7: 'tokenize',
8: 'deflate',
9: 'deflate64',
10: 'implode',
12: 'bzip2',
14: 'lzma',
18: 'terse',
19: 'lz77',
97: 'wavpack',
98: 'ppmd',
}
def _check_compression(compression):
if compression == ZIP_STORED:
pass
@ -491,6 +571,10 @@ def _check_compression(compression):
if not bz2:
raise RuntimeError(
"Compression requires the (missing) bz2 module")
elif compression == ZIP_LZMA:
if not lzma:
raise RuntimeError(
"Compression requires the (missing) lzma module")
else:
raise RuntimeError("That compression method is not supported")
@ -501,6 +585,8 @@ def _get_compressor(compress_type):
zlib.DEFLATED, -15)
elif compress_type == ZIP_BZIP2:
return bz2.BZ2Compressor()
elif compress_type == ZIP_LZMA:
return LZMACompressor()
else:
return None
@ -512,19 +598,10 @@ def _get_decompressor(compress_type):
return zlib.decompressobj(-15)
elif compress_type == ZIP_BZIP2:
return bz2.BZ2Decompressor()
elif compress_type == ZIP_LZMA:
return LZMADecompressor()
else:
unknown_compressors = {
1: 'shrink',
2: 'reduce',
3: 'reduce',
4: 'reduce',
5: 'reduce',
6: 'implode',
9: 'enhanced deflate',
10: 'implode',
14: 'lzma',
}
descr = unknown_compressors.get(compress_type)
descr = compressor_names.get(compress_type)
if descr:
raise NotImplementedError("compression type %d (%s)" % (compress_type, descr))
else:
@ -781,8 +858,8 @@ class ZipFile:
file: Either the path to the file, or a file-like object.
If it is a path, the file will be opened and closed by ZipFile.
mode: The mode can be either read "r", write "w" or append "a".
compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib) or
ZIP_BZIP2 (requires bz2).
compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib),
ZIP_BZIP2 (requires bz2) or ZIP_LZMA (requires lzma).
allowZip64: if True ZipFile will create files with ZIP64 extensions when
needed, otherwise it will raise an exception when this would
be necessary.
@ -1062,6 +1139,10 @@ class ZipFile:
# Zip 2.7: compressed patched data
raise NotImplementedError("compressed patched data (flag bit 5)")
if zinfo.flag_bits & 0x40:
# strong encryption
raise NotImplementedError("strong encryption (flag bit 6)")
if zinfo.flag_bits & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
@ -1220,6 +1301,9 @@ class ZipFile:
zinfo.file_size = st.st_size
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes
if zinfo.compress_type == ZIP_LZMA:
# Compressed data includes an end-of-stream (EOS) marker
zinfo.flag_bits |= 0x02
self._writecheck(zinfo)
self._didModify = True
@ -1292,6 +1376,9 @@ class ZipFile:
zinfo.header_offset = self.fp.tell() # Start of header data
if compress_type is not None:
zinfo.compress_type = compress_type
if zinfo.compress_type == ZIP_LZMA:
# Compressed data includes an end-of-stream (EOS) marker
zinfo.flag_bits |= 0x02
self._writecheck(zinfo)
self._didModify = True
@ -1360,6 +1447,8 @@ class ZipFile:
if zinfo.compress_type == ZIP_BZIP2:
min_version = max(BZIP2_VERSION, min_version)
elif zinfo.compress_type == ZIP_LZMA:
min_version = max(LZMA_VERSION, min_version)
extract_version = max(min_version, zinfo.extract_version)
create_version = max(min_version, zinfo.create_version)

View File

@ -23,6 +23,9 @@ Core and Builtins
Library
-------
- Issue #14366: Support lzma compression in zip files.
Patch by Serhiy Storchaka.
- Issue #13959: Introduce importlib.find_loader().
- Issue #14082: shutil.copy2() now copies extended attributes, if possible.