cpython/Lib/test/test_importlib/util.py

583 lines
18 KiB
Python

import abc
import builtins
import contextlib
import errno
import functools
import importlib
from importlib import machinery, util, invalidate_caches
from importlib.abc import ResourceReader
import io
import marshal
import os
import os.path
from pathlib import Path, PurePath
from test import support
from test.support import import_helper
from test.support import os_helper
import unittest
import sys
import tempfile
import types
from . import data01
from . import zipdata01
BUILTINS = types.SimpleNamespace()
BUILTINS.good_name = None
BUILTINS.bad_name = None
if 'errno' in sys.builtin_module_names:
BUILTINS.good_name = 'errno'
if 'importlib' not in sys.builtin_module_names:
BUILTINS.bad_name = 'importlib'
EXTENSIONS = types.SimpleNamespace()
EXTENSIONS.path = None
EXTENSIONS.ext = None
EXTENSIONS.filename = None
EXTENSIONS.file_path = None
EXTENSIONS.name = '_testcapi'
def _extension_details():
global EXTENSIONS
for path in sys.path:
for ext in machinery.EXTENSION_SUFFIXES:
filename = EXTENSIONS.name + ext
file_path = os.path.join(path, filename)
if os.path.exists(file_path):
EXTENSIONS.path = path
EXTENSIONS.ext = ext
EXTENSIONS.filename = filename
EXTENSIONS.file_path = file_path
return
_extension_details()
def import_importlib(module_name):
"""Import a module from importlib both w/ and w/o _frozen_importlib."""
fresh = ('importlib',) if '.' in module_name else ()
frozen = import_helper.import_fresh_module(module_name)
source = import_helper.import_fresh_module(module_name, fresh=fresh,
blocked=('_frozen_importlib', '_frozen_importlib_external'))
return {'Frozen': frozen, 'Source': source}
def specialize_class(cls, kind, base=None, **kwargs):
# XXX Support passing in submodule names--load (and cache) them?
# That would clean up the test modules a bit more.
if base is None:
base = unittest.TestCase
elif not isinstance(base, type):
base = base[kind]
name = '{}_{}'.format(kind, cls.__name__)
bases = (cls, base)
specialized = types.new_class(name, bases)
specialized.__module__ = cls.__module__
specialized._NAME = cls.__name__
specialized._KIND = kind
for attr, values in kwargs.items():
value = values[kind]
setattr(specialized, attr, value)
return specialized
def split_frozen(cls, base=None, **kwargs):
frozen = specialize_class(cls, 'Frozen', base, **kwargs)
source = specialize_class(cls, 'Source', base, **kwargs)
return frozen, source
def test_both(test_class, base=None, **kwargs):
return split_frozen(test_class, base, **kwargs)
CASE_INSENSITIVE_FS = True
# Windows is the only OS that is *always* case-insensitive
# (OS X *can* be case-sensitive).
if sys.platform not in ('win32', 'cygwin'):
changed_name = __file__.upper()
if changed_name == __file__:
changed_name = __file__.lower()
if not os.path.exists(changed_name):
CASE_INSENSITIVE_FS = False
source_importlib = import_importlib('importlib')['Source']
__import__ = {'Frozen': staticmethod(builtins.__import__),
'Source': staticmethod(source_importlib.__import__)}
def case_insensitive_tests(test):
"""Class decorator that nullifies tests requiring a case-insensitive
file system."""
return unittest.skipIf(not CASE_INSENSITIVE_FS,
"requires a case-insensitive filesystem")(test)
def submodule(parent, name, pkg_dir, content=''):
path = os.path.join(pkg_dir, name + '.py')
with open(path, 'w') as subfile:
subfile.write(content)
return '{}.{}'.format(parent, name), path
def get_code_from_pyc(pyc_path):
"""Reads a pyc file and returns the unmarshalled code object within.
No header validation is performed.
"""
with open(pyc_path, 'rb') as pyc_f:
pyc_f.seek(16)
return marshal.load(pyc_f)
@contextlib.contextmanager
def uncache(*names):
"""Uncache a module from sys.modules.
A basic sanity check is performed to prevent uncaching modules that either
cannot/shouldn't be uncached.
"""
for name in names:
if name in ('sys', 'marshal', 'imp'):
raise ValueError(
"cannot uncache {0}".format(name))
try:
del sys.modules[name]
except KeyError:
pass
try:
yield
finally:
for name in names:
try:
del sys.modules[name]
except KeyError:
pass
@contextlib.contextmanager
def temp_module(name, content='', *, pkg=False):
conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
with os_helper.temp_cwd(None) as cwd:
with uncache(name, *conflicts):
with import_helper.DirsOnSysPath(cwd):
invalidate_caches()
location = os.path.join(cwd, name)
if pkg:
modpath = os.path.join(location, '__init__.py')
os.mkdir(name)
else:
modpath = location + '.py'
if content is None:
# Make sure the module file gets created.
content = ''
if content is not None:
# not a namespace package
with open(modpath, 'w') as modfile:
modfile.write(content)
yield location
@contextlib.contextmanager
def import_state(**kwargs):
"""Context manager to manage the various importers and stored state in the
sys module.
The 'modules' attribute is not supported as the interpreter state stores a
pointer to the dict that the interpreter uses internally;
reassigning to sys.modules does not have the desired effect.
"""
originals = {}
try:
for attr, default in (('meta_path', []), ('path', []),
('path_hooks', []),
('path_importer_cache', {})):
originals[attr] = getattr(sys, attr)
if attr in kwargs:
new_value = kwargs[attr]
del kwargs[attr]
else:
new_value = default
setattr(sys, attr, new_value)
if len(kwargs):
raise ValueError(
'unrecognized arguments: {0}'.format(kwargs.keys()))
yield
finally:
for attr, value in originals.items():
setattr(sys, attr, value)
class _ImporterMock:
"""Base class to help with creating importer mocks."""
def __init__(self, *names, module_code={}):
self.modules = {}
self.module_code = {}
for name in names:
if not name.endswith('.__init__'):
import_name = name
else:
import_name = name[:-len('.__init__')]
if '.' not in name:
package = None
elif import_name == name:
package = name.rsplit('.', 1)[0]
else:
package = import_name
module = types.ModuleType(import_name)
module.__loader__ = self
module.__file__ = '<mock __file__>'
module.__package__ = package
module.attr = name
if import_name != name:
module.__path__ = ['<mock __path__>']
self.modules[import_name] = module
if import_name in module_code:
self.module_code[import_name] = module_code[import_name]
def __getitem__(self, name):
return self.modules[name]
def __enter__(self):
self._uncache = uncache(*self.modules.keys())
self._uncache.__enter__()
return self
def __exit__(self, *exc_info):
self._uncache.__exit__(None, None, None)
class mock_modules(_ImporterMock):
"""Importer mock using PEP 302 APIs."""
def find_module(self, fullname, path=None):
if fullname not in self.modules:
return None
else:
return self
def load_module(self, fullname):
if fullname not in self.modules:
raise ImportError
else:
sys.modules[fullname] = self.modules[fullname]
if fullname in self.module_code:
try:
self.module_code[fullname]()
except Exception:
del sys.modules[fullname]
raise
return self.modules[fullname]
class mock_spec(_ImporterMock):
"""Importer mock using PEP 451 APIs."""
def find_spec(self, fullname, path=None, parent=None):
try:
module = self.modules[fullname]
except KeyError:
return None
spec = util.spec_from_file_location(
fullname, module.__file__, loader=self,
submodule_search_locations=getattr(module, '__path__', None))
return spec
def create_module(self, spec):
if spec.name not in self.modules:
raise ImportError
return self.modules[spec.name]
def exec_module(self, module):
try:
self.module_code[module.__spec__.name]()
except KeyError:
pass
def writes_bytecode_files(fxn):
"""Decorator to protect sys.dont_write_bytecode from mutation and to skip
tests that require it to be set to False."""
if sys.dont_write_bytecode:
return lambda *args, **kwargs: None
@functools.wraps(fxn)
def wrapper(*args, **kwargs):
original = sys.dont_write_bytecode
sys.dont_write_bytecode = False
try:
to_return = fxn(*args, **kwargs)
finally:
sys.dont_write_bytecode = original
return to_return
return wrapper
def ensure_bytecode_path(bytecode_path):
"""Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
:param bytecode_path: File system path to PEP 3147 pyc file.
"""
try:
os.mkdir(os.path.dirname(bytecode_path))
except OSError as error:
if error.errno != errno.EEXIST:
raise
@contextlib.contextmanager
def temporary_pycache_prefix(prefix):
"""Adjust and restore sys.pycache_prefix."""
_orig_prefix = sys.pycache_prefix
sys.pycache_prefix = prefix
try:
yield
finally:
sys.pycache_prefix = _orig_prefix
@contextlib.contextmanager
def create_modules(*names):
"""Temporarily create each named module with an attribute (named 'attr')
that contains the name passed into the context manager that caused the
creation of the module.
All files are created in a temporary directory returned by
tempfile.mkdtemp(). This directory is inserted at the beginning of
sys.path. When the context manager exits all created files (source and
bytecode) are explicitly deleted.
No magic is performed when creating packages! This means that if you create
a module within a package you must also create the package's __init__ as
well.
"""
source = 'attr = {0!r}'
created_paths = []
mapping = {}
state_manager = None
uncache_manager = None
try:
temp_dir = tempfile.mkdtemp()
mapping['.root'] = temp_dir
import_names = set()
for name in names:
if not name.endswith('__init__'):
import_name = name
else:
import_name = name[:-len('.__init__')]
import_names.add(import_name)
if import_name in sys.modules:
del sys.modules[import_name]
name_parts = name.split('.')
file_path = temp_dir
for directory in name_parts[:-1]:
file_path = os.path.join(file_path, directory)
if not os.path.exists(file_path):
os.mkdir(file_path)
created_paths.append(file_path)
file_path = os.path.join(file_path, name_parts[-1] + '.py')
with open(file_path, 'w') as file:
file.write(source.format(name))
created_paths.append(file_path)
mapping[name] = file_path
uncache_manager = uncache(*import_names)
uncache_manager.__enter__()
state_manager = import_state(path=[temp_dir])
state_manager.__enter__()
yield mapping
finally:
if state_manager is not None:
state_manager.__exit__(None, None, None)
if uncache_manager is not None:
uncache_manager.__exit__(None, None, None)
os_helper.rmtree(temp_dir)
def mock_path_hook(*entries, importer):
"""A mock sys.path_hooks entry."""
def hook(entry):
if entry not in entries:
raise ImportError
return importer
return hook
class CASEOKTestBase:
def caseok_env_changed(self, *, should_exist):
possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
if any(x in self.importlib._bootstrap_external._os.environ
for x in possibilities) != should_exist:
self.skipTest('os.environ changes not reflected in _os.environ')
def create_package(file, path, is_package=True, contents=()):
class Reader(ResourceReader):
def get_resource_reader(self, package):
return self
def open_resource(self, path):
self._path = path
if isinstance(file, Exception):
raise file
else:
return file
def resource_path(self, path_):
self._path = path_
if isinstance(path, Exception):
raise path
else:
return path
def is_resource(self, path_):
self._path = path_
if isinstance(path, Exception):
raise path
for entry in contents:
parts = entry.split('/')
if len(parts) == 1 and parts[0] == path_:
return True
return False
def contents(self):
if isinstance(path, Exception):
raise path
# There's no yield from in baseball, er, Python 2.
for entry in contents:
yield entry
name = 'testingpackage'
# Unfortunately importlib.util.module_from_spec() was not introduced until
# Python 3.5.
module = types.ModuleType(name)
loader = Reader()
spec = machinery.ModuleSpec(
name, loader,
origin='does-not-exist',
is_package=is_package)
module.__spec__ = spec
module.__loader__ = loader
return module
class CommonResourceTests(abc.ABC):
@abc.abstractmethod
def execute(self, package, path):
raise NotImplementedError
def test_package_name(self):
# Passing in the package name should succeed.
self.execute(data01.__name__, 'utf-8.file')
def test_package_object(self):
# Passing in the package itself should succeed.
self.execute(data01, 'utf-8.file')
def test_string_path(self):
# Passing in a string for the path should succeed.
path = 'utf-8.file'
self.execute(data01, path)
@unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
def test_pathlib_path(self):
# Passing in a pathlib.PurePath object for the path should succeed.
path = PurePath('utf-8.file')
self.execute(data01, path)
def test_absolute_path(self):
# An absolute path is a ValueError.
path = Path(__file__)
full_path = path.parent/'utf-8.file'
with self.assertRaises(ValueError):
self.execute(data01, full_path)
def test_relative_path(self):
# A relative path is a ValueError.
with self.assertRaises(ValueError):
self.execute(data01, '../data01/utf-8.file')
def test_importing_module_as_side_effect(self):
# The anchor package can already be imported.
del sys.modules[data01.__name__]
self.execute(data01.__name__, 'utf-8.file')
def test_non_package_by_name(self):
# The anchor package cannot be a module.
with self.assertRaises(TypeError):
self.execute(__name__, 'utf-8.file')
def test_non_package_by_package(self):
# The anchor package cannot be a module.
with self.assertRaises(TypeError):
module = sys.modules['test.test_importlib.util']
self.execute(module, 'utf-8.file')
@unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
def test_resource_opener(self):
bytes_data = io.BytesIO(b'Hello, world!')
package = create_package(file=bytes_data, path=FileNotFoundError())
self.execute(package, 'utf-8.file')
self.assertEqual(package.__loader__._path, 'utf-8.file')
@unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
def test_resource_path(self):
bytes_data = io.BytesIO(b'Hello, world!')
path = __file__
package = create_package(file=bytes_data, path=path)
self.execute(package, 'utf-8.file')
self.assertEqual(package.__loader__._path, 'utf-8.file')
def test_useless_loader(self):
package = create_package(file=FileNotFoundError(),
path=FileNotFoundError())
with self.assertRaises(FileNotFoundError):
self.execute(package, 'utf-8.file')
class ZipSetupBase:
ZIP_MODULE = None
@classmethod
def setUpClass(cls):
data_path = Path(cls.ZIP_MODULE.__file__)
data_dir = data_path.parent
cls._zip_path = str(data_dir / 'ziptestdata.zip')
sys.path.append(cls._zip_path)
cls.data = importlib.import_module('ziptestdata')
@classmethod
def tearDownClass(cls):
try:
sys.path.remove(cls._zip_path)
except ValueError:
pass
try:
del sys.path_importer_cache[cls._zip_path]
del sys.modules[cls.data.__name__]
except KeyError:
pass
try:
del cls.data
del cls._zip_path
except AttributeError:
pass
def setUp(self):
modules = import_helper.modules_setup()
self.addCleanup(import_helper.modules_cleanup, *modules)
class ZipSetup(ZipSetupBase):
ZIP_MODULE = zipdata01 # type: ignore