mirror of https://github.com/python/cpython
gh-98098: Create packages from zipfile and test_zipfile (gh-98103)
* gh-98098: Move zipfile into a package. * Moved test_zipfile to a package * Extracted module for test_path. * Add blurb * Add jaraco as owner of zipfile.Path. * Synchronize with minor changes found at jaraco/zipp@d9e7f4352d.
This commit is contained in:
parent
dc063a25d2
commit
7796d3179b
|
@ -154,3 +154,6 @@ Lib/ast.py @isidentical
|
||||||
|
|
||||||
# pathlib
|
# pathlib
|
||||||
**/*pathlib* @brettcannon
|
**/*pathlib* @brettcannon
|
||||||
|
|
||||||
|
# zipfile.Path
|
||||||
|
**/*zipfile/*_path.py @jaraco
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
import os
|
||||||
|
from test.support import load_package_tests
|
||||||
|
|
||||||
|
def load_tests(*args):
|
||||||
|
return load_package_tests(os.path.dirname(__file__), *args)
|
|
@ -6,7 +6,6 @@ import itertools
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import posixpath
|
import posixpath
|
||||||
import string
|
|
||||||
import struct
|
import struct
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
@ -14,7 +13,6 @@ import time
|
||||||
import unittest
|
import unittest
|
||||||
import unittest.mock as mock
|
import unittest.mock as mock
|
||||||
import zipfile
|
import zipfile
|
||||||
import functools
|
|
||||||
|
|
||||||
|
|
||||||
from tempfile import TemporaryFile
|
from tempfile import TemporaryFile
|
||||||
|
@ -2715,13 +2713,13 @@ class TestWithDirectory(unittest.TestCase):
|
||||||
class ZipInfoTests(unittest.TestCase):
|
class ZipInfoTests(unittest.TestCase):
|
||||||
def test_from_file(self):
|
def test_from_file(self):
|
||||||
zi = zipfile.ZipInfo.from_file(__file__)
|
zi = zipfile.ZipInfo.from_file(__file__)
|
||||||
self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
|
self.assertEqual(posixpath.basename(zi.filename), 'test_core.py')
|
||||||
self.assertFalse(zi.is_dir())
|
self.assertFalse(zi.is_dir())
|
||||||
self.assertEqual(zi.file_size, os.path.getsize(__file__))
|
self.assertEqual(zi.file_size, os.path.getsize(__file__))
|
||||||
|
|
||||||
def test_from_file_pathlike(self):
|
def test_from_file_pathlike(self):
|
||||||
zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
|
zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
|
||||||
self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
|
self.assertEqual(posixpath.basename(zi.filename), 'test_core.py')
|
||||||
self.assertFalse(zi.is_dir())
|
self.assertFalse(zi.is_dir())
|
||||||
self.assertEqual(zi.file_size, os.path.getsize(__file__))
|
self.assertEqual(zi.file_size, os.path.getsize(__file__))
|
||||||
|
|
||||||
|
@ -2867,420 +2865,6 @@ class TestExecutablePrependedZip(unittest.TestCase):
|
||||||
self.assertIn(b'number in executable: 5', output)
|
self.assertIn(b'number in executable: 5', output)
|
||||||
|
|
||||||
|
|
||||||
# Poor man's technique to consume a (smallish) iterable.
|
|
||||||
consume = tuple
|
|
||||||
|
|
||||||
|
|
||||||
# from jaraco.itertools 5.0
|
|
||||||
class jaraco:
|
|
||||||
class itertools:
|
|
||||||
class Counter:
|
|
||||||
def __init__(self, i):
|
|
||||||
self.count = 0
|
|
||||||
self._orig_iter = iter(i)
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __next__(self):
|
|
||||||
result = next(self._orig_iter)
|
|
||||||
self.count += 1
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def add_dirs(zf):
|
|
||||||
"""
|
|
||||||
Given a writable zip file zf, inject directory entries for
|
|
||||||
any directories implied by the presence of children.
|
|
||||||
"""
|
|
||||||
for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
|
|
||||||
zf.writestr(name, b"")
|
|
||||||
return zf
|
|
||||||
|
|
||||||
|
|
||||||
def build_alpharep_fixture():
|
|
||||||
"""
|
|
||||||
Create a zip file with this structure:
|
|
||||||
|
|
||||||
.
|
|
||||||
├── a.txt
|
|
||||||
├── b
|
|
||||||
│ ├── c.txt
|
|
||||||
│ ├── d
|
|
||||||
│ │ └── e.txt
|
|
||||||
│ └── f.txt
|
|
||||||
└── g
|
|
||||||
└── h
|
|
||||||
└── i.txt
|
|
||||||
|
|
||||||
This fixture has the following key characteristics:
|
|
||||||
|
|
||||||
- a file at the root (a)
|
|
||||||
- a file two levels deep (b/d/e)
|
|
||||||
- multiple files in a directory (b/c, b/f)
|
|
||||||
- a directory containing only a directory (g/h)
|
|
||||||
|
|
||||||
"alpha" because it uses alphabet
|
|
||||||
"rep" because it's a representative example
|
|
||||||
"""
|
|
||||||
data = io.BytesIO()
|
|
||||||
zf = zipfile.ZipFile(data, "w")
|
|
||||||
zf.writestr("a.txt", b"content of a")
|
|
||||||
zf.writestr("b/c.txt", b"content of c")
|
|
||||||
zf.writestr("b/d/e.txt", b"content of e")
|
|
||||||
zf.writestr("b/f.txt", b"content of f")
|
|
||||||
zf.writestr("g/h/i.txt", b"content of i")
|
|
||||||
zf.filename = "alpharep.zip"
|
|
||||||
return zf
|
|
||||||
|
|
||||||
|
|
||||||
def pass_alpharep(meth):
|
|
||||||
"""
|
|
||||||
Given a method, wrap it in a for loop that invokes method
|
|
||||||
with each subtest.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@functools.wraps(meth)
|
|
||||||
def wrapper(self):
|
|
||||||
for alpharep in self.zipfile_alpharep():
|
|
||||||
meth(self, alpharep=alpharep)
|
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
class TestPath(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.fixtures = contextlib.ExitStack()
|
|
||||||
self.addCleanup(self.fixtures.close)
|
|
||||||
|
|
||||||
def zipfile_alpharep(self):
|
|
||||||
with self.subTest():
|
|
||||||
yield build_alpharep_fixture()
|
|
||||||
with self.subTest():
|
|
||||||
yield add_dirs(build_alpharep_fixture())
|
|
||||||
|
|
||||||
def zipfile_ondisk(self, alpharep):
|
|
||||||
tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
|
|
||||||
buffer = alpharep.fp
|
|
||||||
alpharep.close()
|
|
||||||
path = tmpdir / alpharep.filename
|
|
||||||
with path.open("wb") as strm:
|
|
||||||
strm.write(buffer.getvalue())
|
|
||||||
return path
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_iterdir_and_types(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.is_dir()
|
|
||||||
a, b, g = root.iterdir()
|
|
||||||
assert a.is_file()
|
|
||||||
assert b.is_dir()
|
|
||||||
assert g.is_dir()
|
|
||||||
c, f, d = b.iterdir()
|
|
||||||
assert c.is_file() and f.is_file()
|
|
||||||
(e,) = d.iterdir()
|
|
||||||
assert e.is_file()
|
|
||||||
(h,) = g.iterdir()
|
|
||||||
(i,) = h.iterdir()
|
|
||||||
assert i.is_file()
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_is_file_missing(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert not root.joinpath('missing.txt').is_file()
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_iterdir_on_file(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
a, b, g = root.iterdir()
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
a.iterdir()
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_subdir_is_dir(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert (root / 'b').is_dir()
|
|
||||||
assert (root / 'b/').is_dir()
|
|
||||||
assert (root / 'g').is_dir()
|
|
||||||
assert (root / 'g/').is_dir()
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_open(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
a, b, g = root.iterdir()
|
|
||||||
with a.open(encoding="utf-8") as strm:
|
|
||||||
data = strm.read()
|
|
||||||
assert data == "content of a"
|
|
||||||
|
|
||||||
def test_open_write(self):
|
|
||||||
"""
|
|
||||||
If the zipfile is open for write, it should be possible to
|
|
||||||
write bytes or text to it.
|
|
||||||
"""
|
|
||||||
zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
|
|
||||||
with zf.joinpath('file.bin').open('wb') as strm:
|
|
||||||
strm.write(b'binary contents')
|
|
||||||
with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
|
|
||||||
strm.write('text file')
|
|
||||||
|
|
||||||
def test_open_extant_directory(self):
|
|
||||||
"""
|
|
||||||
Attempting to open a directory raises IsADirectoryError.
|
|
||||||
"""
|
|
||||||
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
|
|
||||||
with self.assertRaises(IsADirectoryError):
|
|
||||||
zf.joinpath('b').open()
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_open_binary_invalid_args(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
root.joinpath('a.txt').open('rb', encoding='utf-8')
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
root.joinpath('a.txt').open('rb', 'utf-8')
|
|
||||||
|
|
||||||
def test_open_missing_directory(self):
|
|
||||||
"""
|
|
||||||
Attempting to open a missing directory raises FileNotFoundError.
|
|
||||||
"""
|
|
||||||
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
|
|
||||||
with self.assertRaises(FileNotFoundError):
|
|
||||||
zf.joinpath('z').open()
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_read(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
a, b, g = root.iterdir()
|
|
||||||
assert a.read_text(encoding="utf-8") == "content of a"
|
|
||||||
assert a.read_bytes() == b"content of a"
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_joinpath(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
a = root.joinpath("a.txt")
|
|
||||||
assert a.is_file()
|
|
||||||
e = root.joinpath("b").joinpath("d").joinpath("e.txt")
|
|
||||||
assert e.read_text(encoding="utf-8") == "content of e"
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_joinpath_multiple(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
e = root.joinpath("b", "d", "e.txt")
|
|
||||||
assert e.read_text(encoding="utf-8") == "content of e"
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_traverse_truediv(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
a = root / "a.txt"
|
|
||||||
assert a.is_file()
|
|
||||||
e = root / "b" / "d" / "e.txt"
|
|
||||||
assert e.read_text(encoding="utf-8") == "content of e"
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_traverse_simplediv(self, alpharep):
|
|
||||||
"""
|
|
||||||
Disable the __future__.division when testing traversal.
|
|
||||||
"""
|
|
||||||
code = compile(
|
|
||||||
source="zipfile.Path(alpharep) / 'a'",
|
|
||||||
filename="(test)",
|
|
||||||
mode="eval",
|
|
||||||
dont_inherit=True,
|
|
||||||
)
|
|
||||||
eval(code)
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_pathlike_construction(self, alpharep):
|
|
||||||
"""
|
|
||||||
zipfile.Path should be constructable from a path-like object
|
|
||||||
"""
|
|
||||||
zipfile_ondisk = self.zipfile_ondisk(alpharep)
|
|
||||||
pathlike = pathlib.Path(str(zipfile_ondisk))
|
|
||||||
zipfile.Path(pathlike)
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_traverse_pathlike(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
root / pathlib.Path("a")
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_parent(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert (root / 'a').parent.at == ''
|
|
||||||
assert (root / 'a' / 'b').parent.at == 'a/'
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_dir_parent(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert (root / 'b').parent.at == ''
|
|
||||||
assert (root / 'b/').parent.at == ''
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_missing_dir_parent(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert (root / 'missing dir/').parent.at == ''
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_mutability(self, alpharep):
|
|
||||||
"""
|
|
||||||
If the underlying zipfile is changed, the Path object should
|
|
||||||
reflect that change.
|
|
||||||
"""
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
a, b, g = root.iterdir()
|
|
||||||
alpharep.writestr('foo.txt', 'foo')
|
|
||||||
alpharep.writestr('bar/baz.txt', 'baz')
|
|
||||||
assert any(child.name == 'foo.txt' for child in root.iterdir())
|
|
||||||
assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
|
|
||||||
(baz,) = (root / 'bar').iterdir()
|
|
||||||
assert baz.read_text(encoding="utf-8") == 'baz'
|
|
||||||
|
|
||||||
HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
|
|
||||||
|
|
||||||
def huge_zipfile(self):
|
|
||||||
"""Create a read-only zipfile with a huge number of entries entries."""
|
|
||||||
strm = io.BytesIO()
|
|
||||||
zf = zipfile.ZipFile(strm, "w")
|
|
||||||
for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
|
|
||||||
zf.writestr(entry, entry)
|
|
||||||
zf.mode = 'r'
|
|
||||||
return zf
|
|
||||||
|
|
||||||
def test_joinpath_constant_time(self):
|
|
||||||
"""
|
|
||||||
Ensure joinpath on items in zipfile is linear time.
|
|
||||||
"""
|
|
||||||
root = zipfile.Path(self.huge_zipfile())
|
|
||||||
entries = jaraco.itertools.Counter(root.iterdir())
|
|
||||||
for entry in entries:
|
|
||||||
entry.joinpath('suffix')
|
|
||||||
# Check the file iterated all items
|
|
||||||
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
|
|
||||||
|
|
||||||
# @func_timeout.func_set_timeout(3)
|
|
||||||
def test_implied_dirs_performance(self):
|
|
||||||
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
|
|
||||||
zipfile.CompleteDirs._implied_dirs(data)
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_read_does_not_close(self, alpharep):
|
|
||||||
alpharep = self.zipfile_ondisk(alpharep)
|
|
||||||
with zipfile.ZipFile(alpharep) as file:
|
|
||||||
for rep in range(2):
|
|
||||||
zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_subclass(self, alpharep):
|
|
||||||
class Subclass(zipfile.Path):
|
|
||||||
pass
|
|
||||||
|
|
||||||
root = Subclass(alpharep)
|
|
||||||
assert isinstance(root / 'b', Subclass)
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_filename(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.filename == pathlib.Path('alpharep.zip')
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_root_name(self, alpharep):
|
|
||||||
"""
|
|
||||||
The name of the root should be the name of the zipfile
|
|
||||||
"""
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.name == 'alpharep.zip' == root.filename.name
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_suffix(self, alpharep):
|
|
||||||
"""
|
|
||||||
The suffix of the root should be the suffix of the zipfile.
|
|
||||||
The suffix of each nested file is the final component's last suffix, if any.
|
|
||||||
Includes the leading period, just like pathlib.Path.
|
|
||||||
"""
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.suffix == '.zip' == root.filename.suffix
|
|
||||||
|
|
||||||
b = root / "b.txt"
|
|
||||||
assert b.suffix == ".txt"
|
|
||||||
|
|
||||||
c = root / "c" / "filename.tar.gz"
|
|
||||||
assert c.suffix == ".gz"
|
|
||||||
|
|
||||||
d = root / "d"
|
|
||||||
assert d.suffix == ""
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_suffixes(self, alpharep):
|
|
||||||
"""
|
|
||||||
The suffix of the root should be the suffix of the zipfile.
|
|
||||||
The suffix of each nested file is the final component's last suffix, if any.
|
|
||||||
Includes the leading period, just like pathlib.Path.
|
|
||||||
"""
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.suffixes == ['.zip'] == root.filename.suffixes
|
|
||||||
|
|
||||||
b = root / 'b.txt'
|
|
||||||
assert b.suffixes == ['.txt']
|
|
||||||
|
|
||||||
c = root / 'c' / 'filename.tar.gz'
|
|
||||||
assert c.suffixes == ['.tar', '.gz']
|
|
||||||
|
|
||||||
d = root / 'd'
|
|
||||||
assert d.suffixes == []
|
|
||||||
|
|
||||||
e = root / '.hgrc'
|
|
||||||
assert e.suffixes == []
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_stem(self, alpharep):
|
|
||||||
"""
|
|
||||||
The final path component, without its suffix
|
|
||||||
"""
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.stem == 'alpharep' == root.filename.stem
|
|
||||||
|
|
||||||
b = root / "b.txt"
|
|
||||||
assert b.stem == "b"
|
|
||||||
|
|
||||||
c = root / "c" / "filename.tar.gz"
|
|
||||||
assert c.stem == "filename.tar"
|
|
||||||
|
|
||||||
d = root / "d"
|
|
||||||
assert d.stem == "d"
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_root_parent(self, alpharep):
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
assert root.parent == pathlib.Path('.')
|
|
||||||
root.root.filename = 'foo/bar.zip'
|
|
||||||
assert root.parent == pathlib.Path('foo')
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_root_unnamed(self, alpharep):
|
|
||||||
"""
|
|
||||||
It is an error to attempt to get the name
|
|
||||||
or parent of an unnamed zipfile.
|
|
||||||
"""
|
|
||||||
alpharep.filename = None
|
|
||||||
root = zipfile.Path(alpharep)
|
|
||||||
with self.assertRaises(TypeError):
|
|
||||||
root.name
|
|
||||||
with self.assertRaises(TypeError):
|
|
||||||
root.parent
|
|
||||||
|
|
||||||
# .name and .parent should still work on subs
|
|
||||||
sub = root / "b"
|
|
||||||
assert sub.name == "b"
|
|
||||||
assert sub.parent
|
|
||||||
|
|
||||||
@pass_alpharep
|
|
||||||
def test_inheritance(self, alpharep):
|
|
||||||
cls = type('PathChild', (zipfile.Path,), {})
|
|
||||||
for alpharep in self.zipfile_alpharep():
|
|
||||||
file = cls(alpharep).joinpath('some dir').parent
|
|
||||||
assert isinstance(file, cls)
|
|
||||||
|
|
||||||
|
|
||||||
class EncodedMetadataTests(unittest.TestCase):
|
class EncodedMetadataTests(unittest.TestCase):
|
||||||
file_names = ['\u4e00', '\u4e8c', '\u4e09'] # Han 'one', 'two', 'three'
|
file_names = ['\u4e00', '\u4e8c', '\u4e09'] # Han 'one', 'two', 'three'
|
||||||
file_content = [
|
file_content = [
|
|
@ -0,0 +1,423 @@
|
||||||
|
import io
|
||||||
|
import zipfile
|
||||||
|
import contextlib
|
||||||
|
import pathlib
|
||||||
|
import unittest
|
||||||
|
import string
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from test.support.os_helper import temp_dir
|
||||||
|
|
||||||
|
|
||||||
|
# Poor man's technique to consume a (smallish) iterable.
|
||||||
|
consume = tuple
|
||||||
|
|
||||||
|
|
||||||
|
# from jaraco.itertools 5.0
|
||||||
|
class jaraco:
|
||||||
|
class itertools:
|
||||||
|
class Counter:
|
||||||
|
def __init__(self, i):
|
||||||
|
self.count = 0
|
||||||
|
self._orig_iter = iter(i)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __next__(self):
|
||||||
|
result = next(self._orig_iter)
|
||||||
|
self.count += 1
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def add_dirs(zf):
|
||||||
|
"""
|
||||||
|
Given a writable zip file zf, inject directory entries for
|
||||||
|
any directories implied by the presence of children.
|
||||||
|
"""
|
||||||
|
for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
|
||||||
|
zf.writestr(name, b"")
|
||||||
|
return zf
|
||||||
|
|
||||||
|
|
||||||
|
def build_alpharep_fixture():
|
||||||
|
"""
|
||||||
|
Create a zip file with this structure:
|
||||||
|
|
||||||
|
.
|
||||||
|
├── a.txt
|
||||||
|
├── b
|
||||||
|
│ ├── c.txt
|
||||||
|
│ ├── d
|
||||||
|
│ │ └── e.txt
|
||||||
|
│ └── f.txt
|
||||||
|
└── g
|
||||||
|
└── h
|
||||||
|
└── i.txt
|
||||||
|
|
||||||
|
This fixture has the following key characteristics:
|
||||||
|
|
||||||
|
- a file at the root (a)
|
||||||
|
- a file two levels deep (b/d/e)
|
||||||
|
- multiple files in a directory (b/c, b/f)
|
||||||
|
- a directory containing only a directory (g/h)
|
||||||
|
|
||||||
|
"alpha" because it uses alphabet
|
||||||
|
"rep" because it's a representative example
|
||||||
|
"""
|
||||||
|
data = io.BytesIO()
|
||||||
|
zf = zipfile.ZipFile(data, "w")
|
||||||
|
zf.writestr("a.txt", b"content of a")
|
||||||
|
zf.writestr("b/c.txt", b"content of c")
|
||||||
|
zf.writestr("b/d/e.txt", b"content of e")
|
||||||
|
zf.writestr("b/f.txt", b"content of f")
|
||||||
|
zf.writestr("g/h/i.txt", b"content of i")
|
||||||
|
zf.filename = "alpharep.zip"
|
||||||
|
return zf
|
||||||
|
|
||||||
|
|
||||||
|
def pass_alpharep(meth):
|
||||||
|
"""
|
||||||
|
Given a method, wrap it in a for loop that invokes method
|
||||||
|
with each subtest.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@functools.wraps(meth)
|
||||||
|
def wrapper(self):
|
||||||
|
for alpharep in self.zipfile_alpharep():
|
||||||
|
meth(self, alpharep=alpharep)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class TestPath(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.fixtures = contextlib.ExitStack()
|
||||||
|
self.addCleanup(self.fixtures.close)
|
||||||
|
|
||||||
|
def zipfile_alpharep(self):
|
||||||
|
with self.subTest():
|
||||||
|
yield build_alpharep_fixture()
|
||||||
|
with self.subTest():
|
||||||
|
yield add_dirs(build_alpharep_fixture())
|
||||||
|
|
||||||
|
def zipfile_ondisk(self, alpharep):
|
||||||
|
tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
|
||||||
|
buffer = alpharep.fp
|
||||||
|
alpharep.close()
|
||||||
|
path = tmpdir / alpharep.filename
|
||||||
|
with path.open("wb") as strm:
|
||||||
|
strm.write(buffer.getvalue())
|
||||||
|
return path
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_iterdir_and_types(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.is_dir()
|
||||||
|
a, b, g = root.iterdir()
|
||||||
|
assert a.is_file()
|
||||||
|
assert b.is_dir()
|
||||||
|
assert g.is_dir()
|
||||||
|
c, f, d = b.iterdir()
|
||||||
|
assert c.is_file() and f.is_file()
|
||||||
|
(e,) = d.iterdir()
|
||||||
|
assert e.is_file()
|
||||||
|
(h,) = g.iterdir()
|
||||||
|
(i,) = h.iterdir()
|
||||||
|
assert i.is_file()
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_is_file_missing(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert not root.joinpath('missing.txt').is_file()
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_iterdir_on_file(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
a, b, g = root.iterdir()
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
a.iterdir()
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_subdir_is_dir(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert (root / 'b').is_dir()
|
||||||
|
assert (root / 'b/').is_dir()
|
||||||
|
assert (root / 'g').is_dir()
|
||||||
|
assert (root / 'g/').is_dir()
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_open(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
a, b, g = root.iterdir()
|
||||||
|
with a.open(encoding="utf-8") as strm:
|
||||||
|
data = strm.read()
|
||||||
|
assert data == "content of a"
|
||||||
|
|
||||||
|
def test_open_write(self):
|
||||||
|
"""
|
||||||
|
If the zipfile is open for write, it should be possible to
|
||||||
|
write bytes or text to it.
|
||||||
|
"""
|
||||||
|
zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
|
||||||
|
with zf.joinpath('file.bin').open('wb') as strm:
|
||||||
|
strm.write(b'binary contents')
|
||||||
|
with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
|
||||||
|
strm.write('text file')
|
||||||
|
|
||||||
|
def test_open_extant_directory(self):
|
||||||
|
"""
|
||||||
|
Attempting to open a directory raises IsADirectoryError.
|
||||||
|
"""
|
||||||
|
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
|
||||||
|
with self.assertRaises(IsADirectoryError):
|
||||||
|
zf.joinpath('b').open()
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_open_binary_invalid_args(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
root.joinpath('a.txt').open('rb', encoding='utf-8')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
root.joinpath('a.txt').open('rb', 'utf-8')
|
||||||
|
|
||||||
|
def test_open_missing_directory(self):
|
||||||
|
"""
|
||||||
|
Attempting to open a missing directory raises FileNotFoundError.
|
||||||
|
"""
|
||||||
|
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
zf.joinpath('z').open()
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_read(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
a, b, g = root.iterdir()
|
||||||
|
assert a.read_text(encoding="utf-8") == "content of a"
|
||||||
|
assert a.read_bytes() == b"content of a"
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_joinpath(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
a = root.joinpath("a.txt")
|
||||||
|
assert a.is_file()
|
||||||
|
e = root.joinpath("b").joinpath("d").joinpath("e.txt")
|
||||||
|
assert e.read_text(encoding="utf-8") == "content of e"
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_joinpath_multiple(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
e = root.joinpath("b", "d", "e.txt")
|
||||||
|
assert e.read_text(encoding="utf-8") == "content of e"
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_traverse_truediv(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
a = root / "a.txt"
|
||||||
|
assert a.is_file()
|
||||||
|
e = root / "b" / "d" / "e.txt"
|
||||||
|
assert e.read_text(encoding="utf-8") == "content of e"
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_traverse_simplediv(self, alpharep):
|
||||||
|
"""
|
||||||
|
Disable the __future__.division when testing traversal.
|
||||||
|
"""
|
||||||
|
code = compile(
|
||||||
|
source="zipfile.Path(alpharep) / 'a'",
|
||||||
|
filename="(test)",
|
||||||
|
mode="eval",
|
||||||
|
dont_inherit=True,
|
||||||
|
)
|
||||||
|
eval(code)
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_pathlike_construction(self, alpharep):
|
||||||
|
"""
|
||||||
|
zipfile.Path should be constructable from a path-like object
|
||||||
|
"""
|
||||||
|
zipfile_ondisk = self.zipfile_ondisk(alpharep)
|
||||||
|
pathlike = pathlib.Path(str(zipfile_ondisk))
|
||||||
|
zipfile.Path(pathlike)
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_traverse_pathlike(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
root / pathlib.Path("a")
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_parent(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert (root / 'a').parent.at == ''
|
||||||
|
assert (root / 'a' / 'b').parent.at == 'a/'
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_dir_parent(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert (root / 'b').parent.at == ''
|
||||||
|
assert (root / 'b/').parent.at == ''
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_missing_dir_parent(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert (root / 'missing dir/').parent.at == ''
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_mutability(self, alpharep):
|
||||||
|
"""
|
||||||
|
If the underlying zipfile is changed, the Path object should
|
||||||
|
reflect that change.
|
||||||
|
"""
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
a, b, g = root.iterdir()
|
||||||
|
alpharep.writestr('foo.txt', 'foo')
|
||||||
|
alpharep.writestr('bar/baz.txt', 'baz')
|
||||||
|
assert any(child.name == 'foo.txt' for child in root.iterdir())
|
||||||
|
assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
|
||||||
|
(baz,) = (root / 'bar').iterdir()
|
||||||
|
assert baz.read_text(encoding="utf-8") == 'baz'
|
||||||
|
|
||||||
|
HUGE_ZIPFILE_NUM_ENTRIES = 2**13
|
||||||
|
|
||||||
|
def huge_zipfile(self):
|
||||||
|
"""Create a read-only zipfile with a huge number of entries entries."""
|
||||||
|
strm = io.BytesIO()
|
||||||
|
zf = zipfile.ZipFile(strm, "w")
|
||||||
|
for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
|
||||||
|
zf.writestr(entry, entry)
|
||||||
|
zf.mode = 'r'
|
||||||
|
return zf
|
||||||
|
|
||||||
|
def test_joinpath_constant_time(self):
|
||||||
|
"""
|
||||||
|
Ensure joinpath on items in zipfile is linear time.
|
||||||
|
"""
|
||||||
|
root = zipfile.Path(self.huge_zipfile())
|
||||||
|
entries = jaraco.itertools.Counter(root.iterdir())
|
||||||
|
for entry in entries:
|
||||||
|
entry.joinpath('suffix')
|
||||||
|
# Check the file iterated all items
|
||||||
|
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
|
||||||
|
|
||||||
|
# @func_timeout.func_set_timeout(3)
|
||||||
|
def test_implied_dirs_performance(self):
|
||||||
|
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
|
||||||
|
zipfile.CompleteDirs._implied_dirs(data)
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_read_does_not_close(self, alpharep):
|
||||||
|
alpharep = self.zipfile_ondisk(alpharep)
|
||||||
|
with zipfile.ZipFile(alpharep) as file:
|
||||||
|
for rep in range(2):
|
||||||
|
zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_subclass(self, alpharep):
|
||||||
|
class Subclass(zipfile.Path):
|
||||||
|
pass
|
||||||
|
|
||||||
|
root = Subclass(alpharep)
|
||||||
|
assert isinstance(root / 'b', Subclass)
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_filename(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.filename == pathlib.Path('alpharep.zip')
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_root_name(self, alpharep):
|
||||||
|
"""
|
||||||
|
The name of the root should be the name of the zipfile
|
||||||
|
"""
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.name == 'alpharep.zip' == root.filename.name
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_suffix(self, alpharep):
|
||||||
|
"""
|
||||||
|
The suffix of the root should be the suffix of the zipfile.
|
||||||
|
The suffix of each nested file is the final component's last suffix, if any.
|
||||||
|
Includes the leading period, just like pathlib.Path.
|
||||||
|
"""
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.suffix == '.zip' == root.filename.suffix
|
||||||
|
|
||||||
|
b = root / "b.txt"
|
||||||
|
assert b.suffix == ".txt"
|
||||||
|
|
||||||
|
c = root / "c" / "filename.tar.gz"
|
||||||
|
assert c.suffix == ".gz"
|
||||||
|
|
||||||
|
d = root / "d"
|
||||||
|
assert d.suffix == ""
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_suffixes(self, alpharep):
|
||||||
|
"""
|
||||||
|
The suffix of the root should be the suffix of the zipfile.
|
||||||
|
The suffix of each nested file is the final component's last suffix, if any.
|
||||||
|
Includes the leading period, just like pathlib.Path.
|
||||||
|
"""
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.suffixes == ['.zip'] == root.filename.suffixes
|
||||||
|
|
||||||
|
b = root / 'b.txt'
|
||||||
|
assert b.suffixes == ['.txt']
|
||||||
|
|
||||||
|
c = root / 'c' / 'filename.tar.gz'
|
||||||
|
assert c.suffixes == ['.tar', '.gz']
|
||||||
|
|
||||||
|
d = root / 'd'
|
||||||
|
assert d.suffixes == []
|
||||||
|
|
||||||
|
e = root / '.hgrc'
|
||||||
|
assert e.suffixes == []
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_stem(self, alpharep):
|
||||||
|
"""
|
||||||
|
The final path component, without its suffix
|
||||||
|
"""
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.stem == 'alpharep' == root.filename.stem
|
||||||
|
|
||||||
|
b = root / "b.txt"
|
||||||
|
assert b.stem == "b"
|
||||||
|
|
||||||
|
c = root / "c" / "filename.tar.gz"
|
||||||
|
assert c.stem == "filename.tar"
|
||||||
|
|
||||||
|
d = root / "d"
|
||||||
|
assert d.stem == "d"
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_root_parent(self, alpharep):
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
assert root.parent == pathlib.Path('.')
|
||||||
|
root.root.filename = 'foo/bar.zip'
|
||||||
|
assert root.parent == pathlib.Path('foo')
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_root_unnamed(self, alpharep):
|
||||||
|
"""
|
||||||
|
It is an error to attempt to get the name
|
||||||
|
or parent of an unnamed zipfile.
|
||||||
|
"""
|
||||||
|
alpharep.filename = None
|
||||||
|
root = zipfile.Path(alpharep)
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
root.name
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
root.parent
|
||||||
|
|
||||||
|
# .name and .parent should still work on subs
|
||||||
|
sub = root / "b"
|
||||||
|
assert sub.name == "b"
|
||||||
|
assert sub.parent
|
||||||
|
|
||||||
|
@pass_alpharep
|
||||||
|
def test_inheritance(self, alpharep):
|
||||||
|
cls = type('PathChild', (zipfile.Path,), {})
|
||||||
|
for alpharep in self.zipfile_alpharep():
|
||||||
|
file = cls(alpharep).joinpath('some dir').parent
|
||||||
|
assert isinstance(file, cls)
|
|
@ -6,17 +6,13 @@ XXX references to utf-8 need further investigation.
|
||||||
import binascii
|
import binascii
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import io
|
import io
|
||||||
import itertools
|
|
||||||
import os
|
import os
|
||||||
import posixpath
|
|
||||||
import shutil
|
import shutil
|
||||||
import stat
|
import stat
|
||||||
import struct
|
import struct
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import contextlib
|
|
||||||
import pathlib
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import zlib # We may need its compression method
|
import zlib # We may need its compression method
|
||||||
|
@ -2186,381 +2182,12 @@ class PyZipFile(ZipFile):
|
||||||
return (fname, archivename)
|
return (fname, archivename)
|
||||||
|
|
||||||
|
|
||||||
def _parents(path):
|
from ._path import ( # noqa: E402
|
||||||
"""
|
Path,
|
||||||
Given a path with elements separated by
|
|
||||||
posixpath.sep, generate all parents of that path.
|
|
||||||
|
|
||||||
>>> list(_parents('b/d'))
|
# used privately for tests
|
||||||
['b']
|
CompleteDirs, # noqa: F401
|
||||||
>>> list(_parents('/b/d/'))
|
)
|
||||||
['/b']
|
|
||||||
>>> list(_parents('b/d/f/'))
|
|
||||||
['b/d', 'b']
|
|
||||||
>>> list(_parents('b'))
|
|
||||||
[]
|
|
||||||
>>> list(_parents(''))
|
|
||||||
[]
|
|
||||||
"""
|
|
||||||
return itertools.islice(_ancestry(path), 1, None)
|
|
||||||
|
|
||||||
|
# used privately for tests
|
||||||
def _ancestry(path):
|
from .__main__ import main # noqa: F401, E402
|
||||||
"""
|
|
||||||
Given a path with elements separated by
|
|
||||||
posixpath.sep, generate all elements of that path
|
|
||||||
|
|
||||||
>>> list(_ancestry('b/d'))
|
|
||||||
['b/d', 'b']
|
|
||||||
>>> list(_ancestry('/b/d/'))
|
|
||||||
['/b/d', '/b']
|
|
||||||
>>> list(_ancestry('b/d/f/'))
|
|
||||||
['b/d/f', 'b/d', 'b']
|
|
||||||
>>> list(_ancestry('b'))
|
|
||||||
['b']
|
|
||||||
>>> list(_ancestry(''))
|
|
||||||
[]
|
|
||||||
"""
|
|
||||||
path = path.rstrip(posixpath.sep)
|
|
||||||
while path and path != posixpath.sep:
|
|
||||||
yield path
|
|
||||||
path, tail = posixpath.split(path)
|
|
||||||
|
|
||||||
|
|
||||||
_dedupe = dict.fromkeys
|
|
||||||
"""Deduplicate an iterable in original order"""
|
|
||||||
|
|
||||||
|
|
||||||
def _difference(minuend, subtrahend):
|
|
||||||
"""
|
|
||||||
Return items in minuend not in subtrahend, retaining order
|
|
||||||
with O(1) lookup.
|
|
||||||
"""
|
|
||||||
return itertools.filterfalse(set(subtrahend).__contains__, minuend)
|
|
||||||
|
|
||||||
|
|
||||||
class CompleteDirs(ZipFile):
|
|
||||||
"""
|
|
||||||
A ZipFile subclass that ensures that implied directories
|
|
||||||
are always included in the namelist.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _implied_dirs(names):
|
|
||||||
parents = itertools.chain.from_iterable(map(_parents, names))
|
|
||||||
as_dirs = (p + posixpath.sep for p in parents)
|
|
||||||
return _dedupe(_difference(as_dirs, names))
|
|
||||||
|
|
||||||
def namelist(self):
|
|
||||||
names = super(CompleteDirs, self).namelist()
|
|
||||||
return names + list(self._implied_dirs(names))
|
|
||||||
|
|
||||||
def _name_set(self):
|
|
||||||
return set(self.namelist())
|
|
||||||
|
|
||||||
def resolve_dir(self, name):
|
|
||||||
"""
|
|
||||||
If the name represents a directory, return that name
|
|
||||||
as a directory (with the trailing slash).
|
|
||||||
"""
|
|
||||||
names = self._name_set()
|
|
||||||
dirname = name + '/'
|
|
||||||
dir_match = name not in names and dirname in names
|
|
||||||
return dirname if dir_match else name
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def make(cls, source):
|
|
||||||
"""
|
|
||||||
Given a source (filename or zipfile), return an
|
|
||||||
appropriate CompleteDirs subclass.
|
|
||||||
"""
|
|
||||||
if isinstance(source, CompleteDirs):
|
|
||||||
return source
|
|
||||||
|
|
||||||
if not isinstance(source, ZipFile):
|
|
||||||
return cls(source)
|
|
||||||
|
|
||||||
# Only allow for FastLookup when supplied zipfile is read-only
|
|
||||||
if 'r' not in source.mode:
|
|
||||||
cls = CompleteDirs
|
|
||||||
|
|
||||||
source.__class__ = cls
|
|
||||||
return source
|
|
||||||
|
|
||||||
|
|
||||||
class FastLookup(CompleteDirs):
|
|
||||||
"""
|
|
||||||
ZipFile subclass to ensure implicit
|
|
||||||
dirs exist and are resolved rapidly.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def namelist(self):
|
|
||||||
with contextlib.suppress(AttributeError):
|
|
||||||
return self.__names
|
|
||||||
self.__names = super(FastLookup, self).namelist()
|
|
||||||
return self.__names
|
|
||||||
|
|
||||||
def _name_set(self):
|
|
||||||
with contextlib.suppress(AttributeError):
|
|
||||||
return self.__lookup
|
|
||||||
self.__lookup = super(FastLookup, self)._name_set()
|
|
||||||
return self.__lookup
|
|
||||||
|
|
||||||
|
|
||||||
class Path:
|
|
||||||
"""
|
|
||||||
A pathlib-compatible interface for zip files.
|
|
||||||
|
|
||||||
Consider a zip file with this structure::
|
|
||||||
|
|
||||||
.
|
|
||||||
├── a.txt
|
|
||||||
└── b
|
|
||||||
├── c.txt
|
|
||||||
└── d
|
|
||||||
└── e.txt
|
|
||||||
|
|
||||||
>>> data = io.BytesIO()
|
|
||||||
>>> zf = ZipFile(data, 'w')
|
|
||||||
>>> zf.writestr('a.txt', 'content of a')
|
|
||||||
>>> zf.writestr('b/c.txt', 'content of c')
|
|
||||||
>>> zf.writestr('b/d/e.txt', 'content of e')
|
|
||||||
>>> zf.filename = 'mem/abcde.zip'
|
|
||||||
|
|
||||||
Path accepts the zipfile object itself or a filename
|
|
||||||
|
|
||||||
>>> root = Path(zf)
|
|
||||||
|
|
||||||
From there, several path operations are available.
|
|
||||||
|
|
||||||
Directory iteration (including the zip file itself):
|
|
||||||
|
|
||||||
>>> a, b = root.iterdir()
|
|
||||||
>>> a
|
|
||||||
Path('mem/abcde.zip', 'a.txt')
|
|
||||||
>>> b
|
|
||||||
Path('mem/abcde.zip', 'b/')
|
|
||||||
|
|
||||||
name property:
|
|
||||||
|
|
||||||
>>> b.name
|
|
||||||
'b'
|
|
||||||
|
|
||||||
join with divide operator:
|
|
||||||
|
|
||||||
>>> c = b / 'c.txt'
|
|
||||||
>>> c
|
|
||||||
Path('mem/abcde.zip', 'b/c.txt')
|
|
||||||
>>> c.name
|
|
||||||
'c.txt'
|
|
||||||
|
|
||||||
Read text:
|
|
||||||
|
|
||||||
>>> c.read_text()
|
|
||||||
'content of c'
|
|
||||||
|
|
||||||
existence:
|
|
||||||
|
|
||||||
>>> c.exists()
|
|
||||||
True
|
|
||||||
>>> (b / 'missing.txt').exists()
|
|
||||||
False
|
|
||||||
|
|
||||||
Coercion to string:
|
|
||||||
|
|
||||||
>>> import os
|
|
||||||
>>> str(c).replace(os.sep, posixpath.sep)
|
|
||||||
'mem/abcde.zip/b/c.txt'
|
|
||||||
|
|
||||||
At the root, ``name``, ``filename``, and ``parent``
|
|
||||||
resolve to the zipfile. Note these attributes are not
|
|
||||||
valid and will raise a ``ValueError`` if the zipfile
|
|
||||||
has no filename.
|
|
||||||
|
|
||||||
>>> root.name
|
|
||||||
'abcde.zip'
|
|
||||||
>>> str(root.filename).replace(os.sep, posixpath.sep)
|
|
||||||
'mem/abcde.zip'
|
|
||||||
>>> str(root.parent)
|
|
||||||
'mem'
|
|
||||||
"""
|
|
||||||
|
|
||||||
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
|
|
||||||
|
|
||||||
def __init__(self, root, at=""):
|
|
||||||
"""
|
|
||||||
Construct a Path from a ZipFile or filename.
|
|
||||||
|
|
||||||
Note: When the source is an existing ZipFile object,
|
|
||||||
its type (__class__) will be mutated to a
|
|
||||||
specialized type. If the caller wishes to retain the
|
|
||||||
original type, the caller should either create a
|
|
||||||
separate ZipFile object or pass a filename.
|
|
||||||
"""
|
|
||||||
self.root = FastLookup.make(root)
|
|
||||||
self.at = at
|
|
||||||
|
|
||||||
def open(self, mode='r', *args, pwd=None, **kwargs):
|
|
||||||
"""
|
|
||||||
Open this entry as text or binary following the semantics
|
|
||||||
of ``pathlib.Path.open()`` by passing arguments through
|
|
||||||
to io.TextIOWrapper().
|
|
||||||
"""
|
|
||||||
if self.is_dir():
|
|
||||||
raise IsADirectoryError(self)
|
|
||||||
zip_mode = mode[0]
|
|
||||||
if not self.exists() and zip_mode == 'r':
|
|
||||||
raise FileNotFoundError(self)
|
|
||||||
stream = self.root.open(self.at, zip_mode, pwd=pwd)
|
|
||||||
if 'b' in mode:
|
|
||||||
if args or kwargs:
|
|
||||||
raise ValueError("encoding args invalid for binary operation")
|
|
||||||
return stream
|
|
||||||
else:
|
|
||||||
kwargs["encoding"] = io.text_encoding(kwargs.get("encoding"))
|
|
||||||
return io.TextIOWrapper(stream, *args, **kwargs)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def name(self):
|
|
||||||
return pathlib.Path(self.at).name or self.filename.name
|
|
||||||
|
|
||||||
@property
|
|
||||||
def suffix(self):
|
|
||||||
return pathlib.Path(self.at).suffix or self.filename.suffix
|
|
||||||
|
|
||||||
@property
|
|
||||||
def suffixes(self):
|
|
||||||
return pathlib.Path(self.at).suffixes or self.filename.suffixes
|
|
||||||
|
|
||||||
@property
|
|
||||||
def stem(self):
|
|
||||||
return pathlib.Path(self.at).stem or self.filename.stem
|
|
||||||
|
|
||||||
@property
|
|
||||||
def filename(self):
|
|
||||||
return pathlib.Path(self.root.filename).joinpath(self.at)
|
|
||||||
|
|
||||||
def read_text(self, *args, **kwargs):
|
|
||||||
kwargs["encoding"] = io.text_encoding(kwargs.get("encoding"))
|
|
||||||
with self.open('r', *args, **kwargs) as strm:
|
|
||||||
return strm.read()
|
|
||||||
|
|
||||||
def read_bytes(self):
|
|
||||||
with self.open('rb') as strm:
|
|
||||||
return strm.read()
|
|
||||||
|
|
||||||
def _is_child(self, path):
|
|
||||||
return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
|
|
||||||
|
|
||||||
def _next(self, at):
|
|
||||||
return self.__class__(self.root, at)
|
|
||||||
|
|
||||||
def is_dir(self):
|
|
||||||
return not self.at or self.at.endswith("/")
|
|
||||||
|
|
||||||
def is_file(self):
|
|
||||||
return self.exists() and not self.is_dir()
|
|
||||||
|
|
||||||
def exists(self):
|
|
||||||
return self.at in self.root._name_set()
|
|
||||||
|
|
||||||
def iterdir(self):
|
|
||||||
if not self.is_dir():
|
|
||||||
raise ValueError("Can't listdir a file")
|
|
||||||
subs = map(self._next, self.root.namelist())
|
|
||||||
return filter(self._is_child, subs)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return posixpath.join(self.root.filename, self.at)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return self.__repr.format(self=self)
|
|
||||||
|
|
||||||
def joinpath(self, *other):
|
|
||||||
next = posixpath.join(self.at, *other)
|
|
||||||
return self._next(self.root.resolve_dir(next))
|
|
||||||
|
|
||||||
__truediv__ = joinpath
|
|
||||||
|
|
||||||
@property
|
|
||||||
def parent(self):
|
|
||||||
if not self.at:
|
|
||||||
return self.filename.parent
|
|
||||||
parent_at = posixpath.dirname(self.at.rstrip('/'))
|
|
||||||
if parent_at:
|
|
||||||
parent_at += '/'
|
|
||||||
return self._next(parent_at)
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
description = 'A simple command-line interface for zipfile module.'
|
|
||||||
parser = argparse.ArgumentParser(description=description)
|
|
||||||
group = parser.add_mutually_exclusive_group(required=True)
|
|
||||||
group.add_argument('-l', '--list', metavar='<zipfile>',
|
|
||||||
help='Show listing of a zipfile')
|
|
||||||
group.add_argument('-e', '--extract', nargs=2,
|
|
||||||
metavar=('<zipfile>', '<output_dir>'),
|
|
||||||
help='Extract zipfile into target dir')
|
|
||||||
group.add_argument('-c', '--create', nargs='+',
|
|
||||||
metavar=('<name>', '<file>'),
|
|
||||||
help='Create zipfile from sources')
|
|
||||||
group.add_argument('-t', '--test', metavar='<zipfile>',
|
|
||||||
help='Test if a zipfile is valid')
|
|
||||||
parser.add_argument('--metadata-encoding', metavar='<encoding>',
|
|
||||||
help='Specify encoding of member names for -l, -e and -t')
|
|
||||||
args = parser.parse_args(args)
|
|
||||||
|
|
||||||
encoding = args.metadata_encoding
|
|
||||||
|
|
||||||
if args.test is not None:
|
|
||||||
src = args.test
|
|
||||||
with ZipFile(src, 'r', metadata_encoding=encoding) as zf:
|
|
||||||
badfile = zf.testzip()
|
|
||||||
if badfile:
|
|
||||||
print("The following enclosed file is corrupted: {!r}".format(badfile))
|
|
||||||
print("Done testing")
|
|
||||||
|
|
||||||
elif args.list is not None:
|
|
||||||
src = args.list
|
|
||||||
with ZipFile(src, 'r', metadata_encoding=encoding) as zf:
|
|
||||||
zf.printdir()
|
|
||||||
|
|
||||||
elif args.extract is not None:
|
|
||||||
src, curdir = args.extract
|
|
||||||
with ZipFile(src, 'r', metadata_encoding=encoding) as zf:
|
|
||||||
zf.extractall(curdir)
|
|
||||||
|
|
||||||
elif args.create is not None:
|
|
||||||
if encoding:
|
|
||||||
print("Non-conforming encodings not supported with -c.",
|
|
||||||
file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
zip_name = args.create.pop(0)
|
|
||||||
files = args.create
|
|
||||||
|
|
||||||
def addToZip(zf, path, zippath):
|
|
||||||
if os.path.isfile(path):
|
|
||||||
zf.write(path, zippath, ZIP_DEFLATED)
|
|
||||||
elif os.path.isdir(path):
|
|
||||||
if zippath:
|
|
||||||
zf.write(path, zippath)
|
|
||||||
for nm in sorted(os.listdir(path)):
|
|
||||||
addToZip(zf,
|
|
||||||
os.path.join(path, nm), os.path.join(zippath, nm))
|
|
||||||
# else: ignore
|
|
||||||
|
|
||||||
with ZipFile(zip_name, 'w') as zf:
|
|
||||||
for path in files:
|
|
||||||
zippath = os.path.basename(path)
|
|
||||||
if not zippath:
|
|
||||||
zippath = os.path.basename(os.path.dirname(path))
|
|
||||||
if zippath in ('', os.curdir, os.pardir):
|
|
||||||
zippath = ''
|
|
||||||
addToZip(zf, path, zippath)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from . import ZipFile, ZIP_DEFLATED
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
description = 'A simple command-line interface for zipfile module.'
|
||||||
|
parser = argparse.ArgumentParser(description=description)
|
||||||
|
group = parser.add_mutually_exclusive_group(required=True)
|
||||||
|
group.add_argument('-l', '--list', metavar='<zipfile>',
|
||||||
|
help='Show listing of a zipfile')
|
||||||
|
group.add_argument('-e', '--extract', nargs=2,
|
||||||
|
metavar=('<zipfile>', '<output_dir>'),
|
||||||
|
help='Extract zipfile into target dir')
|
||||||
|
group.add_argument('-c', '--create', nargs='+',
|
||||||
|
metavar=('<name>', '<file>'),
|
||||||
|
help='Create zipfile from sources')
|
||||||
|
group.add_argument('-t', '--test', metavar='<zipfile>',
|
||||||
|
help='Test if a zipfile is valid')
|
||||||
|
parser.add_argument('--metadata-encoding', metavar='<encoding>',
|
||||||
|
help='Specify encoding of member names for -l, -e and -t')
|
||||||
|
args = parser.parse_args(args)
|
||||||
|
|
||||||
|
encoding = args.metadata_encoding
|
||||||
|
|
||||||
|
if args.test is not None:
|
||||||
|
src = args.test
|
||||||
|
with ZipFile(src, 'r', metadata_encoding=encoding) as zf:
|
||||||
|
badfile = zf.testzip()
|
||||||
|
if badfile:
|
||||||
|
print("The following enclosed file is corrupted: {!r}".format(badfile))
|
||||||
|
print("Done testing")
|
||||||
|
|
||||||
|
elif args.list is not None:
|
||||||
|
src = args.list
|
||||||
|
with ZipFile(src, 'r', metadata_encoding=encoding) as zf:
|
||||||
|
zf.printdir()
|
||||||
|
|
||||||
|
elif args.extract is not None:
|
||||||
|
src, curdir = args.extract
|
||||||
|
with ZipFile(src, 'r', metadata_encoding=encoding) as zf:
|
||||||
|
zf.extractall(curdir)
|
||||||
|
|
||||||
|
elif args.create is not None:
|
||||||
|
if encoding:
|
||||||
|
print("Non-conforming encodings not supported with -c.",
|
||||||
|
file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
zip_name = args.create.pop(0)
|
||||||
|
files = args.create
|
||||||
|
|
||||||
|
def addToZip(zf, path, zippath):
|
||||||
|
if os.path.isfile(path):
|
||||||
|
zf.write(path, zippath, ZIP_DEFLATED)
|
||||||
|
elif os.path.isdir(path):
|
||||||
|
if zippath:
|
||||||
|
zf.write(path, zippath)
|
||||||
|
for nm in sorted(os.listdir(path)):
|
||||||
|
addToZip(zf,
|
||||||
|
os.path.join(path, nm), os.path.join(zippath, nm))
|
||||||
|
# else: ignore
|
||||||
|
|
||||||
|
with ZipFile(zip_name, 'w') as zf:
|
||||||
|
for path in files:
|
||||||
|
zippath = os.path.basename(path)
|
||||||
|
if not zippath:
|
||||||
|
zippath = os.path.basename(os.path.dirname(path))
|
||||||
|
if zippath in ('', os.curdir, os.pardir):
|
||||||
|
zippath = ''
|
||||||
|
addToZip(zf, path, zippath)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -0,0 +1,315 @@
|
||||||
|
import io
|
||||||
|
import posixpath
|
||||||
|
import zipfile
|
||||||
|
import itertools
|
||||||
|
import contextlib
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ['Path']
|
||||||
|
|
||||||
|
|
||||||
|
def _parents(path):
|
||||||
|
"""
|
||||||
|
Given a path with elements separated by
|
||||||
|
posixpath.sep, generate all parents of that path.
|
||||||
|
|
||||||
|
>>> list(_parents('b/d'))
|
||||||
|
['b']
|
||||||
|
>>> list(_parents('/b/d/'))
|
||||||
|
['/b']
|
||||||
|
>>> list(_parents('b/d/f/'))
|
||||||
|
['b/d', 'b']
|
||||||
|
>>> list(_parents('b'))
|
||||||
|
[]
|
||||||
|
>>> list(_parents(''))
|
||||||
|
[]
|
||||||
|
"""
|
||||||
|
return itertools.islice(_ancestry(path), 1, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _ancestry(path):
|
||||||
|
"""
|
||||||
|
Given a path with elements separated by
|
||||||
|
posixpath.sep, generate all elements of that path
|
||||||
|
|
||||||
|
>>> list(_ancestry('b/d'))
|
||||||
|
['b/d', 'b']
|
||||||
|
>>> list(_ancestry('/b/d/'))
|
||||||
|
['/b/d', '/b']
|
||||||
|
>>> list(_ancestry('b/d/f/'))
|
||||||
|
['b/d/f', 'b/d', 'b']
|
||||||
|
>>> list(_ancestry('b'))
|
||||||
|
['b']
|
||||||
|
>>> list(_ancestry(''))
|
||||||
|
[]
|
||||||
|
"""
|
||||||
|
path = path.rstrip(posixpath.sep)
|
||||||
|
while path and path != posixpath.sep:
|
||||||
|
yield path
|
||||||
|
path, tail = posixpath.split(path)
|
||||||
|
|
||||||
|
|
||||||
|
_dedupe = dict.fromkeys
|
||||||
|
"""Deduplicate an iterable in original order"""
|
||||||
|
|
||||||
|
|
||||||
|
def _difference(minuend, subtrahend):
|
||||||
|
"""
|
||||||
|
Return items in minuend not in subtrahend, retaining order
|
||||||
|
with O(1) lookup.
|
||||||
|
"""
|
||||||
|
return itertools.filterfalse(set(subtrahend).__contains__, minuend)
|
||||||
|
|
||||||
|
|
||||||
|
class CompleteDirs(zipfile.ZipFile):
|
||||||
|
"""
|
||||||
|
A ZipFile subclass that ensures that implied directories
|
||||||
|
are always included in the namelist.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _implied_dirs(names):
|
||||||
|
parents = itertools.chain.from_iterable(map(_parents, names))
|
||||||
|
as_dirs = (p + posixpath.sep for p in parents)
|
||||||
|
return _dedupe(_difference(as_dirs, names))
|
||||||
|
|
||||||
|
def namelist(self):
|
||||||
|
names = super(CompleteDirs, self).namelist()
|
||||||
|
return names + list(self._implied_dirs(names))
|
||||||
|
|
||||||
|
def _name_set(self):
|
||||||
|
return set(self.namelist())
|
||||||
|
|
||||||
|
def resolve_dir(self, name):
|
||||||
|
"""
|
||||||
|
If the name represents a directory, return that name
|
||||||
|
as a directory (with the trailing slash).
|
||||||
|
"""
|
||||||
|
names = self._name_set()
|
||||||
|
dirname = name + '/'
|
||||||
|
dir_match = name not in names and dirname in names
|
||||||
|
return dirname if dir_match else name
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def make(cls, source):
|
||||||
|
"""
|
||||||
|
Given a source (filename or zipfile), return an
|
||||||
|
appropriate CompleteDirs subclass.
|
||||||
|
"""
|
||||||
|
if isinstance(source, CompleteDirs):
|
||||||
|
return source
|
||||||
|
|
||||||
|
if not isinstance(source, zipfile.ZipFile):
|
||||||
|
return cls(source)
|
||||||
|
|
||||||
|
# Only allow for FastLookup when supplied zipfile is read-only
|
||||||
|
if 'r' not in source.mode:
|
||||||
|
cls = CompleteDirs
|
||||||
|
|
||||||
|
source.__class__ = cls
|
||||||
|
return source
|
||||||
|
|
||||||
|
|
||||||
|
class FastLookup(CompleteDirs):
|
||||||
|
"""
|
||||||
|
ZipFile subclass to ensure implicit
|
||||||
|
dirs exist and are resolved rapidly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def namelist(self):
|
||||||
|
with contextlib.suppress(AttributeError):
|
||||||
|
return self.__names
|
||||||
|
self.__names = super(FastLookup, self).namelist()
|
||||||
|
return self.__names
|
||||||
|
|
||||||
|
def _name_set(self):
|
||||||
|
with contextlib.suppress(AttributeError):
|
||||||
|
return self.__lookup
|
||||||
|
self.__lookup = super(FastLookup, self)._name_set()
|
||||||
|
return self.__lookup
|
||||||
|
|
||||||
|
|
||||||
|
class Path:
|
||||||
|
"""
|
||||||
|
A pathlib-compatible interface for zip files.
|
||||||
|
|
||||||
|
Consider a zip file with this structure::
|
||||||
|
|
||||||
|
.
|
||||||
|
├── a.txt
|
||||||
|
└── b
|
||||||
|
├── c.txt
|
||||||
|
└── d
|
||||||
|
└── e.txt
|
||||||
|
|
||||||
|
>>> data = io.BytesIO()
|
||||||
|
>>> zf = ZipFile(data, 'w')
|
||||||
|
>>> zf.writestr('a.txt', 'content of a')
|
||||||
|
>>> zf.writestr('b/c.txt', 'content of c')
|
||||||
|
>>> zf.writestr('b/d/e.txt', 'content of e')
|
||||||
|
>>> zf.filename = 'mem/abcde.zip'
|
||||||
|
|
||||||
|
Path accepts the zipfile object itself or a filename
|
||||||
|
|
||||||
|
>>> root = Path(zf)
|
||||||
|
|
||||||
|
From there, several path operations are available.
|
||||||
|
|
||||||
|
Directory iteration (including the zip file itself):
|
||||||
|
|
||||||
|
>>> a, b = root.iterdir()
|
||||||
|
>>> a
|
||||||
|
Path('mem/abcde.zip', 'a.txt')
|
||||||
|
>>> b
|
||||||
|
Path('mem/abcde.zip', 'b/')
|
||||||
|
|
||||||
|
name property:
|
||||||
|
|
||||||
|
>>> b.name
|
||||||
|
'b'
|
||||||
|
|
||||||
|
join with divide operator:
|
||||||
|
|
||||||
|
>>> c = b / 'c.txt'
|
||||||
|
>>> c
|
||||||
|
Path('mem/abcde.zip', 'b/c.txt')
|
||||||
|
>>> c.name
|
||||||
|
'c.txt'
|
||||||
|
|
||||||
|
Read text:
|
||||||
|
|
||||||
|
>>> c.read_text()
|
||||||
|
'content of c'
|
||||||
|
|
||||||
|
existence:
|
||||||
|
|
||||||
|
>>> c.exists()
|
||||||
|
True
|
||||||
|
>>> (b / 'missing.txt').exists()
|
||||||
|
False
|
||||||
|
|
||||||
|
Coercion to string:
|
||||||
|
|
||||||
|
>>> import os
|
||||||
|
>>> str(c).replace(os.sep, posixpath.sep)
|
||||||
|
'mem/abcde.zip/b/c.txt'
|
||||||
|
|
||||||
|
At the root, ``name``, ``filename``, and ``parent``
|
||||||
|
resolve to the zipfile. Note these attributes are not
|
||||||
|
valid and will raise a ``ValueError`` if the zipfile
|
||||||
|
has no filename.
|
||||||
|
|
||||||
|
>>> root.name
|
||||||
|
'abcde.zip'
|
||||||
|
>>> str(root.filename).replace(os.sep, posixpath.sep)
|
||||||
|
'mem/abcde.zip'
|
||||||
|
>>> str(root.parent)
|
||||||
|
'mem'
|
||||||
|
"""
|
||||||
|
|
||||||
|
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
|
||||||
|
|
||||||
|
def __init__(self, root, at=""):
|
||||||
|
"""
|
||||||
|
Construct a Path from a ZipFile or filename.
|
||||||
|
|
||||||
|
Note: When the source is an existing ZipFile object,
|
||||||
|
its type (__class__) will be mutated to a
|
||||||
|
specialized type. If the caller wishes to retain the
|
||||||
|
original type, the caller should either create a
|
||||||
|
separate ZipFile object or pass a filename.
|
||||||
|
"""
|
||||||
|
self.root = FastLookup.make(root)
|
||||||
|
self.at = at
|
||||||
|
|
||||||
|
def open(self, mode='r', *args, pwd=None, **kwargs):
|
||||||
|
"""
|
||||||
|
Open this entry as text or binary following the semantics
|
||||||
|
of ``pathlib.Path.open()`` by passing arguments through
|
||||||
|
to io.TextIOWrapper().
|
||||||
|
"""
|
||||||
|
if self.is_dir():
|
||||||
|
raise IsADirectoryError(self)
|
||||||
|
zip_mode = mode[0]
|
||||||
|
if not self.exists() and zip_mode == 'r':
|
||||||
|
raise FileNotFoundError(self)
|
||||||
|
stream = self.root.open(self.at, zip_mode, pwd=pwd)
|
||||||
|
if 'b' in mode:
|
||||||
|
if args or kwargs:
|
||||||
|
raise ValueError("encoding args invalid for binary operation")
|
||||||
|
return stream
|
||||||
|
else:
|
||||||
|
kwargs["encoding"] = io.text_encoding(kwargs.get("encoding"))
|
||||||
|
return io.TextIOWrapper(stream, *args, **kwargs)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return pathlib.Path(self.at).name or self.filename.name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def suffix(self):
|
||||||
|
return pathlib.Path(self.at).suffix or self.filename.suffix
|
||||||
|
|
||||||
|
@property
|
||||||
|
def suffixes(self):
|
||||||
|
return pathlib.Path(self.at).suffixes or self.filename.suffixes
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stem(self):
|
||||||
|
return pathlib.Path(self.at).stem or self.filename.stem
|
||||||
|
|
||||||
|
@property
|
||||||
|
def filename(self):
|
||||||
|
return pathlib.Path(self.root.filename).joinpath(self.at)
|
||||||
|
|
||||||
|
def read_text(self, *args, **kwargs):
|
||||||
|
kwargs["encoding"] = io.text_encoding(kwargs.get("encoding"))
|
||||||
|
with self.open('r', *args, **kwargs) as strm:
|
||||||
|
return strm.read()
|
||||||
|
|
||||||
|
def read_bytes(self):
|
||||||
|
with self.open('rb') as strm:
|
||||||
|
return strm.read()
|
||||||
|
|
||||||
|
def _is_child(self, path):
|
||||||
|
return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
|
||||||
|
|
||||||
|
def _next(self, at):
|
||||||
|
return self.__class__(self.root, at)
|
||||||
|
|
||||||
|
def is_dir(self):
|
||||||
|
return not self.at or self.at.endswith("/")
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return self.exists() and not self.is_dir()
|
||||||
|
|
||||||
|
def exists(self):
|
||||||
|
return self.at in self.root._name_set()
|
||||||
|
|
||||||
|
def iterdir(self):
|
||||||
|
if not self.is_dir():
|
||||||
|
raise ValueError("Can't listdir a file")
|
||||||
|
subs = map(self._next, self.root.namelist())
|
||||||
|
return filter(self._is_child, subs)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return posixpath.join(self.root.filename, self.at)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return self.__repr.format(self=self)
|
||||||
|
|
||||||
|
def joinpath(self, *other):
|
||||||
|
next = posixpath.join(self.at, *other)
|
||||||
|
return self._next(self.root.resolve_dir(next))
|
||||||
|
|
||||||
|
__truediv__ = joinpath
|
||||||
|
|
||||||
|
@property
|
||||||
|
def parent(self):
|
||||||
|
if not self.at:
|
||||||
|
return self.filename.parent
|
||||||
|
parent_at = posixpath.dirname(self.at.rstrip('/'))
|
||||||
|
if parent_at:
|
||||||
|
parent_at += '/'
|
||||||
|
return self._next(parent_at)
|
|
@ -0,0 +1,2 @@
|
||||||
|
Created packages from zipfile and test_zipfile modules, separating
|
||||||
|
``zipfile.Path`` functionality.
|
Loading…
Reference in New Issue