diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst index d089dc48ec5..11a14266187 100644 --- a/Doc/library/shutil.rst +++ b/Doc/library/shutil.rst @@ -288,13 +288,75 @@ Archives operations .. versionadded:: 3.2 -.. function:: unregister_archive_format(name) +.. function:: unregister_archive_format(name) Remove the archive format *name* from the list of supported formats. .. versionadded:: 3.2 +.. function:: unpack_archive(filename[, extract_dir[, format]]) + + Unpack an archive. *filename* is the full path of the archive. + + *extract_dir* is the name of the target directory where the archive is + unpacked. If not provided, the current working directory is used. + + *format* is the archive format: one of "zip", "tar", or "gztar". Or any + other format registered with :func:`register_unpack_format`. If not + provided, :func:`unpack_archive` will use the archive file name extension + and see if an unpacker was registered for that extension. In case none is + found, a :exc:`ValueError` is raised. + + .. versionadded:: 3.2 + + +.. function:: register_unpack_format(name, extensions, function[, extra_args[,description]]) + + Registers an unpack format. *name* is the name of the format and + *extensions* is a list of extensions corresponding to the format, like + ``.zip`` for Zip files. + + *function* is the callable that will be used to unpack archives. The + callable will receive the path of the archive, followed by the directory + the archive must be extracted to. + + When provided, *extra_args* is a sequence of ``(name, value)`` tuples that + will be passed as keywords arguments to the callable. + + *description* can be provided to describe the format, and will be returned + by the :func:`get_unpack_formats` function. + + .. versionadded:: 3.2 + + +.. function:: unregister_unpack_format(name) + + Unregister an unpack format. *name* is the name of the format. + + .. versionadded:: 3.2 + + +.. function:: get_unpack_formats() + + Return a list of all registered formats for unpacking. + Each element of the returned sequence is a tuple + ``(name, extensions, description)``. + + By default :mod:`shutil` provides these formats: + + - *gztar*: gzip'ed tar-file + - *bztar*: bzip2'ed tar-file + - *tar*: uncompressed tar file + - *zip*: ZIP file + + You can register new formats or provide your own unpacker for any existing + formats, by using :func:`register_unpack_format`. + + .. versionadded:: 3.2 + + + Archiving example ::::::::::::::::: diff --git a/Lib/shutil.py b/Lib/shutil.py index 8890d24a54f..c07f394b6b1 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -11,6 +11,7 @@ from os.path import abspath import fnmatch import collections import errno +import tarfile try: from pwd import getpwnam @@ -25,7 +26,9 @@ except ImportError: __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", "copytree", "move", "rmtree", "Error", "SpecialFileError", "ExecError", "make_archive", "get_archive_formats", - "register_archive_format", "unregister_archive_format"] + "register_archive_format", "unregister_archive_format", + "get_unpack_formats", "register_unpack_format", + "unregister_unpack_format", "unpack_archive"] class Error(EnvironmentError): pass @@ -37,6 +40,14 @@ class SpecialFileError(EnvironmentError): class ExecError(EnvironmentError): """Raised when a command could not be executed""" +class ReadError(EnvironmentError): + """Raised when an archive cannot be read""" + +class RegistryError(Exception): + """Raised when a registery operation with the archiving + and unpacking registeries fails""" + + try: WindowsError except NameError: @@ -381,10 +392,7 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, if not dry_run: os.makedirs(archive_dir) - # creating the tarball - import tarfile # late import so Python build itself doesn't break - if logger is not None: logger.info('Creating tar archive') @@ -567,3 +575,165 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, os.chdir(save_cwd) return filename + + +def get_unpack_formats(): + """Returns a list of supported formats for unpacking. + + Each element of the returned sequence is a tuple + (name, extensions, description) + """ + formats = [(name, info[0], info[3]) for name, info in + _UNPACK_FORMATS.items()] + formats.sort() + return formats + +def _check_unpack_options(extensions, function, extra_args): + """Checks what gets registered as an unpacker.""" + # first make sure no other unpacker is registered for this extension + existing_extensions = {} + for name, info in _UNPACK_FORMATS.items(): + for ext in info[0]: + existing_extensions[ext] = name + + for extension in extensions: + if extension in existing_extensions: + msg = '%s is already registered for "%s"' + raise RegistryError(msg % (extension, + existing_extensions[extension])) + + if not isinstance(function, collections.Callable): + raise TypeError('The registered function must be a callable') + + +def register_unpack_format(name, extensions, function, extra_args=None, + description=''): + """Registers an unpack format. + + `name` is the name of the format. `extensions` is a list of extensions + corresponding to the format. + + `function` is the callable that will be + used to unpack archives. The callable will receive archives to unpack. + If it's unable to handle an archive, it needs to raise a ReadError + exception. + + If provided, `extra_args` is a sequence of + (name, value) tuples that will be passed as arguments to the callable. + description can be provided to describe the format, and will be returned + by the get_unpack_formats() function. + """ + if extra_args is None: + extra_args = [] + _check_unpack_options(extensions, function, extra_args) + _UNPACK_FORMATS[name] = extensions, function, extra_args, description + +def unregister_unpack_format(name): + """Removes the pack format from the registery.""" + del _UNPACK_FORMATS[name] + +def _ensure_directory(path): + """Ensure that the parent directory of `path` exists""" + dirname = os.path.dirname(path) + if not os.path.isdir(dirname): + os.makedirs(dirname) + +def _unpack_zipfile(filename, extract_dir): + """Unpack zip `filename` to `extract_dir` + """ + try: + import zipfile + except ImportError: + raise ReadError('zlib not supported, cannot unpack this archive.') + + if not zipfile.is_zipfile(filename): + raise ReadError("%s is not a zip file" % filename) + + zip = zipfile.ZipFile(filename) + try: + for info in zip.infolist(): + name = info.filename + + # don't extract absolute paths or ones with .. in them + if name.startswith('/') or '..' in name: + continue + + target = os.path.join(extract_dir, *name.split('/')) + if not target: + continue + + _ensure_directory(target) + if not name.endswith('/'): + # file + data = zip.read(info.filename) + f = open(target,'wb') + try: + f.write(data) + finally: + f.close() + del data + finally: + zip.close() + +def _unpack_tarfile(filename, extract_dir): + """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` + """ + try: + tarobj = tarfile.open(filename) + except tarfile.TarError: + raise ReadError( + "%s is not a compressed or uncompressed tar file" % filename) + try: + tarobj.extractall(extract_dir) + finally: + tarobj.close() + +_UNPACK_FORMATS = { + 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"), + 'bztar': (['.bz2'], _unpack_tarfile, [], "bzip2'ed tar-file"), + 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), + 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file") + } + +def _find_unpack_format(filename): + for name, info in _UNPACK_FORMATS.items(): + for extension in info[0]: + if filename.endswith(extension): + return name + return None + +def unpack_archive(filename, extract_dir=None, format=None): + """Unpack an archive. + + `filename` is the name of the archive. + + `extract_dir` is the name of the target directory, where the archive + is unpacked. If not provided, the current working directory is used. + + `format` is the archive format: one of "zip", "tar", or "gztar". Or any + other registered format. If not provided, unpack_archive will use the + filename extension and see if an unpacker was registered for that + extension. + + In case none is found, a ValueError is raised. + """ + if extract_dir is None: + extract_dir = os.getcwd() + + if format is not None: + try: + format_info = _UNPACK_FORMATS[format] + except KeyError: + raise ValueError("Unknown unpack format '{0}'".format(format)) + + func = format_info[0] + func(filename, extract_dir, **dict(format_info[1])) + else: + # we need to look at the registered unpackers supported extensions + format = _find_unpack_format(filename) + if format is None: + raise ReadError("Unknown archive format '{0}'".format(filename)) + + func = _UNPACK_FORMATS[format][1] + kwargs = dict(_UNPACK_FORMATS[format][2]) + func(filename, extract_dir, **kwargs) diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 18164abb188..b34165d1cd2 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -13,7 +13,9 @@ from os.path import splitdrive from distutils.spawn import find_executable, spawn from shutil import (_make_tarball, _make_zipfile, make_archive, register_archive_format, unregister_archive_format, - get_archive_formats, Error) + get_archive_formats, Error, unpack_archive, + register_unpack_format, RegistryError, + unregister_unpack_format, get_unpack_formats) import tarfile import warnings @@ -538,6 +540,7 @@ class TestShutil(unittest.TestCase): owner='kjhkjhkjg', group='oihohoh') self.assertTrue(os.path.exists(res)) + @unittest.skipUnless(zlib, "Requires zlib") @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") def test_tarfile_root_owner(self): @@ -595,6 +598,58 @@ class TestShutil(unittest.TestCase): formats = [name for name, params in get_archive_formats()] self.assertNotIn('xxx', formats) + def _compare_dirs(self, dir1, dir2): + # check that dir1 and dir2 are equivalent, + # return the diff + diff = [] + for root, dirs, files in os.walk(dir1): + for file_ in files: + path = os.path.join(root, file_) + target_path = os.path.join(dir2, os.path.split(path)[-1]) + if not os.path.exists(target_path): + diff.append(file_) + return diff + + @unittest.skipUnless(zlib, "Requires zlib") + def test_unpack_archive(self): + + for format in ('tar', 'gztar', 'bztar', 'zip'): + tmpdir = self.mkdtemp() + base_dir, root_dir, base_name = self._create_files() + tmpdir2 = self.mkdtemp() + filename = make_archive(base_name, format, root_dir, base_dir) + + # let's try to unpack it now + unpack_archive(filename, tmpdir2) + diff = self._compare_dirs(tmpdir, tmpdir2) + self.assertEquals(diff, []) + + def test_unpack_registery(self): + + formats = get_unpack_formats() + + def _boo(filename, extract_dir, extra): + self.assertEquals(extra, 1) + self.assertEquals(filename, 'stuff.boo') + self.assertEquals(extract_dir, 'xx') + + register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) + unpack_archive('stuff.boo', 'xx') + + # trying to register a .boo unpacker again + self.assertRaises(RegistryError, register_unpack_format, 'Boo2', + ['.boo'], _boo) + + # should work now + unregister_unpack_format('Boo') + register_unpack_format('Boo2', ['.boo'], _boo) + self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) + self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) + + # let's leave a clean state + unregister_unpack_format('Boo2') + self.assertEquals(get_unpack_formats(), formats) + class TestMove(unittest.TestCase): diff --git a/Misc/NEWS b/Misc/NEWS index e83285aba4b..622851d8138 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -339,6 +339,8 @@ C-API Library ------- +- Issue #8295: Added shutil.unpack_archive. + - Issue #6312: Fixed http HEAD request when the transfer encoding is chunked. It should correctly return an empty response now.