This commit is contained in:
Stefan Krah 2013-11-08 20:37:01 +01:00
commit 2199ea96c0
22 changed files with 349 additions and 445 deletions

View File

@ -109,12 +109,13 @@ PyAPI_FUNC(PyObject *) _PyDict_GetItemId(PyObject *dp, struct _Py_Identifier *ke
PyAPI_FUNC(int) PyDict_SetItemString(PyObject *dp, const char *key, PyObject *item);
PyAPI_FUNC(int) _PyDict_SetItemId(PyObject *dp, struct _Py_Identifier *key, PyObject *item);
PyAPI_FUNC(int) PyDict_DelItemString(PyObject *dp, const char *key);
PyAPI_FUNC(int) _PyDict_DelItemId(PyObject *mp, struct _Py_Identifier *key);
#ifndef Py_LIMITED_API
PyAPI_FUNC(int) _PyDict_DelItemId(PyObject *mp, struct _Py_Identifier *key);
PyAPI_FUNC(void) _PyDict_DebugMallocStats(FILE *out);
int _PyObjectDict_SetItem(PyTypeObject *tp, PyObject **dictptr, PyObject *name, PyObject *value);
PyObject *_PyDict_LoadGlobal(PyDictObject *, PyDictObject *, PyObject *);
PyAPI_FUNC(void) _PyDict_DebugMallocStats(FILE *out);
#endif
#ifdef __cplusplus

View File

@ -404,15 +404,23 @@ if mswindows:
hStdError = None
wShowWindow = 0
else:
import select
_has_poll = hasattr(select, 'poll')
import _posixsubprocess
import select
import selectors
# When select or poll has indicated that the file is writable,
# we can write up to _PIPE_BUF bytes without risk of blocking.
# POSIX defines PIPE_BUF as >= 512.
_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
# poll/select have the advantage of not requiring any extra file
# descriptor, contrarily to epoll/kqueue (also, they require a single
# syscall).
if hasattr(selectors, 'PollSelector'):
_PopenSelector = selectors.PollSelector
else:
_PopenSelector = selectors.SelectSelector
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
"getoutput", "check_output", "CalledProcessError", "DEVNULL"]
@ -1530,12 +1538,65 @@ class Popen(object):
if not input:
self.stdin.close()
if _has_poll:
stdout, stderr = self._communicate_with_poll(input, endtime,
orig_timeout)
else:
stdout, stderr = self._communicate_with_select(input, endtime,
orig_timeout)
stdout = None
stderr = None
# Only create this mapping if we haven't already.
if not self._communication_started:
self._fileobj2output = {}
if self.stdout:
self._fileobj2output[self.stdout] = []
if self.stderr:
self._fileobj2output[self.stderr] = []
if self.stdout:
stdout = self._fileobj2output[self.stdout]
if self.stderr:
stderr = self._fileobj2output[self.stderr]
self._save_input(input)
with _PopenSelector() as selector:
if self.stdin and input:
selector.register(self.stdin, selectors.EVENT_WRITE)
if self.stdout:
selector.register(self.stdout, selectors.EVENT_READ)
if self.stderr:
selector.register(self.stderr, selectors.EVENT_READ)
while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)
ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout)
# XXX Rewrite these to use non-blocking I/O on the file
# objects; they are no longer using C stdio!
for key, events in ready:
if key.fileobj is self.stdin:
chunk = self._input[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except OSError as e:
if e.errno == errno.EPIPE:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
raise
else:
if self._input_offset >= len(self._input):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 4096)
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)
self.wait(timeout=self._remaining_time(endtime))
@ -1569,167 +1630,6 @@ class Popen(object):
self._input = self._input.encode(self.stdin.encoding)
def _communicate_with_poll(self, input, endtime, orig_timeout):
stdout = None # Return
stderr = None # Return
if not self._communication_started:
self._fd2file = {}
poller = select.poll()
def register_and_append(file_obj, eventmask):
poller.register(file_obj.fileno(), eventmask)
self._fd2file[file_obj.fileno()] = file_obj
def close_unregister_and_remove(fd):
poller.unregister(fd)
self._fd2file[fd].close()
self._fd2file.pop(fd)
if self.stdin and input:
register_and_append(self.stdin, select.POLLOUT)
# Only create this mapping if we haven't already.
if not self._communication_started:
self._fd2output = {}
if self.stdout:
self._fd2output[self.stdout.fileno()] = []
if self.stderr:
self._fd2output[self.stderr.fileno()] = []
select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
if self.stdout:
register_and_append(self.stdout, select_POLLIN_POLLPRI)
stdout = self._fd2output[self.stdout.fileno()]
if self.stderr:
register_and_append(self.stderr, select_POLLIN_POLLPRI)
stderr = self._fd2output[self.stderr.fileno()]
self._save_input(input)
while self._fd2file:
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)
try:
ready = poller.poll(timeout)
except OSError as e:
if e.args[0] == errno.EINTR:
continue
raise
self._check_timeout(endtime, orig_timeout)
# XXX Rewrite these to use non-blocking I/O on the
# file objects; they are no longer using C stdio!
for fd, mode in ready:
if mode & select.POLLOUT:
chunk = self._input[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
self._input_offset += os.write(fd, chunk)
except OSError as e:
if e.errno == errno.EPIPE:
close_unregister_and_remove(fd)
else:
raise
else:
if self._input_offset >= len(self._input):
close_unregister_and_remove(fd)
elif mode & select_POLLIN_POLLPRI:
data = os.read(fd, 4096)
if not data:
close_unregister_and_remove(fd)
self._fd2output[fd].append(data)
else:
# Ignore hang up or errors.
close_unregister_and_remove(fd)
return (stdout, stderr)
def _communicate_with_select(self, input, endtime, orig_timeout):
if not self._communication_started:
self._read_set = []
self._write_set = []
if self.stdin and input:
self._write_set.append(self.stdin)
if self.stdout:
self._read_set.append(self.stdout)
if self.stderr:
self._read_set.append(self.stderr)
self._save_input(input)
stdout = None # Return
stderr = None # Return
if self.stdout:
if not self._communication_started:
self._stdout_buff = []
stdout = self._stdout_buff
if self.stderr:
if not self._communication_started:
self._stderr_buff = []
stderr = self._stderr_buff
while self._read_set or self._write_set:
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)
try:
(rlist, wlist, xlist) = \
select.select(self._read_set, self._write_set, [],
timeout)
except OSError as e:
if e.args[0] == errno.EINTR:
continue
raise
# According to the docs, returning three empty lists indicates
# that the timeout expired.
if not (rlist or wlist or xlist):
raise TimeoutExpired(self.args, orig_timeout)
# We also check what time it is ourselves for good measure.
self._check_timeout(endtime, orig_timeout)
# XXX Rewrite these to use non-blocking I/O on the
# file objects; they are no longer using C stdio!
if self.stdin in wlist:
chunk = self._input[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
bytes_written = os.write(self.stdin.fileno(), chunk)
except OSError as e:
if e.errno == errno.EPIPE:
self.stdin.close()
self._write_set.remove(self.stdin)
else:
raise
else:
self._input_offset += bytes_written
if self._input_offset >= len(self._input):
self.stdin.close()
self._write_set.remove(self.stdin)
if self.stdout in rlist:
data = os.read(self.stdout.fileno(), 1024)
if not data:
self.stdout.close()
self._read_set.remove(self.stdout)
stdout.append(data)
if self.stderr in rlist:
data = os.read(self.stderr.fileno(), 1024)
if not data:
self.stderr.close()
self._read_set.remove(self.stderr)
stderr.append(data)
return (stdout, stderr)
def send_signal(self, sig):
"""Send a signal to the process
"""

View File

@ -3,7 +3,6 @@ import importlib
import importlib.util
from importlib._bootstrap import _get_sourcefile
import builtins
from test.test_importlib.import_ import util as importlib_util
import marshal
import os
import platform

View File

@ -4,17 +4,6 @@ Specifying the ``--builtin`` flag will run tests, where applicable, with
builtins.__import__ instead of importlib.__import__.
"""
from . import test_main
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Execute the importlib test '
'suite')
parser.add_argument('-b', '--builtin', action='store_true', default=False,
help='use builtins.__import__() instead of importlib')
args = parser.parse_args()
if args.builtin:
util.using___import__ = True
from . import test_main
test_main()

View File

@ -1,15 +1,17 @@
from importlib import machinery
from .. import abc
from .. import util
machinery = util.import_importlib('importlib.machinery')
import unittest
class FinderTests(unittest.TestCase, abc.FinderTests):
class FinderTests(abc.FinderTests):
"""Test finding frozen modules."""
def find(self, name, path=None):
finder = machinery.FrozenImporter
finder = self.machinery.FrozenImporter
return finder.find_module(name, path)
def test_module(self):
@ -37,11 +39,9 @@ class FinderTests(unittest.TestCase, abc.FinderTests):
loader = self.find('<not real>')
self.assertIsNone(loader)
def test_main():
from test.support import run_unittest
run_unittest(FinderTests)
Frozen_FinderTests, Source_FinderTests = util.test_both(FinderTests,
machinery=machinery)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,20 +1,21 @@
from .. import abc
from .. import util
from importlib import machinery
machinery = util.import_importlib('importlib.machinery')
import unittest
from test.support import captured_stdout
import types
class LoaderTests(unittest.TestCase, abc.LoaderTests):
class LoaderTests(abc.LoaderTests):
def test_module(self):
with util.uncache('__hello__'), captured_stdout() as stdout:
module = machinery.FrozenImporter.load_module('__hello__')
module = self.machinery.FrozenImporter.load_module('__hello__')
check = {'__name__': '__hello__',
'__package__': '',
'__loader__': machinery.FrozenImporter,
'__loader__': self.machinery.FrozenImporter,
}
for attr, value in check.items():
self.assertEqual(getattr(module, attr), value)
@ -23,11 +24,11 @@ class LoaderTests(unittest.TestCase, abc.LoaderTests):
def test_package(self):
with util.uncache('__phello__'), captured_stdout() as stdout:
module = machinery.FrozenImporter.load_module('__phello__')
module = self.machinery.FrozenImporter.load_module('__phello__')
check = {'__name__': '__phello__',
'__package__': '__phello__',
'__path__': [],
'__loader__': machinery.FrozenImporter,
'__loader__': self.machinery.FrozenImporter,
}
for attr, value in check.items():
attr_value = getattr(module, attr)
@ -40,10 +41,10 @@ class LoaderTests(unittest.TestCase, abc.LoaderTests):
def test_lacking_parent(self):
with util.uncache('__phello__', '__phello__.spam'), \
captured_stdout() as stdout:
module = machinery.FrozenImporter.load_module('__phello__.spam')
module = self.machinery.FrozenImporter.load_module('__phello__.spam')
check = {'__name__': '__phello__.spam',
'__package__': '__phello__',
'__loader__': machinery.FrozenImporter,
'__loader__': self.machinery.FrozenImporter,
}
for attr, value in check.items():
attr_value = getattr(module, attr)
@ -55,15 +56,15 @@ class LoaderTests(unittest.TestCase, abc.LoaderTests):
def test_module_reuse(self):
with util.uncache('__hello__'), captured_stdout() as stdout:
module1 = machinery.FrozenImporter.load_module('__hello__')
module2 = machinery.FrozenImporter.load_module('__hello__')
module1 = self.machinery.FrozenImporter.load_module('__hello__')
module2 = self.machinery.FrozenImporter.load_module('__hello__')
self.assertIs(module1, module2)
self.assertEqual(stdout.getvalue(),
'Hello world!\nHello world!\n')
def test_module_repr(self):
with util.uncache('__hello__'), captured_stdout():
module = machinery.FrozenImporter.load_module('__hello__')
module = self.machinery.FrozenImporter.load_module('__hello__')
self.assertEqual(repr(module),
"<module '__hello__' (frozen)>")
@ -72,13 +73,16 @@ class LoaderTests(unittest.TestCase, abc.LoaderTests):
pass
def test_unloadable(self):
assert machinery.FrozenImporter.find_module('_not_real') is None
assert self.machinery.FrozenImporter.find_module('_not_real') is None
with self.assertRaises(ImportError) as cm:
machinery.FrozenImporter.load_module('_not_real')
self.machinery.FrozenImporter.load_module('_not_real')
self.assertEqual(cm.exception.name, '_not_real')
Frozen_LoaderTests, Source_LoaderTests = util.test_both(LoaderTests,
machinery=machinery)
class InspectLoaderTests(unittest.TestCase):
class InspectLoaderTests:
"""Tests for the InspectLoader methods for FrozenImporter."""
@ -86,7 +90,7 @@ class InspectLoaderTests(unittest.TestCase):
# Make sure that the code object is good.
name = '__hello__'
with captured_stdout() as stdout:
code = machinery.FrozenImporter.get_code(name)
code = self.machinery.FrozenImporter.get_code(name)
mod = types.ModuleType(name)
exec(code, mod.__dict__)
self.assertTrue(hasattr(mod, 'initialized'))
@ -94,7 +98,7 @@ class InspectLoaderTests(unittest.TestCase):
def test_get_source(self):
# Should always return None.
result = machinery.FrozenImporter.get_source('__hello__')
result = self.machinery.FrozenImporter.get_source('__hello__')
self.assertIsNone(result)
def test_is_package(self):
@ -102,22 +106,20 @@ class InspectLoaderTests(unittest.TestCase):
test_for = (('__hello__', False), ('__phello__', True),
('__phello__.spam', False))
for name, is_package in test_for:
result = machinery.FrozenImporter.is_package(name)
result = self.machinery.FrozenImporter.is_package(name)
self.assertEqual(bool(result), is_package)
def test_failure(self):
# Raise ImportError for modules that are not frozen.
for meth_name in ('get_code', 'get_source', 'is_package'):
method = getattr(machinery.FrozenImporter, meth_name)
method = getattr(self.machinery.FrozenImporter, meth_name)
with self.assertRaises(ImportError) as cm:
method('importlib')
self.assertEqual(cm.exception.name, 'importlib')
def test_main():
from test.support import run_unittest
run_unittest(LoaderTests, InspectLoaderTests)
Frozen_ILTests, Source_ILTests = util.test_both(InspectLoaderTests,
machinery=machinery)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -16,7 +16,7 @@ class LoaderMock:
return self.module
class LoaderAttributeTests(unittest.TestCase):
class LoaderAttributeTests:
def test___loader___missing(self):
module = types.ModuleType('blah')
@ -27,7 +27,7 @@ class LoaderAttributeTests(unittest.TestCase):
loader = LoaderMock()
loader.module = module
with util.uncache('blah'), util.import_state(meta_path=[loader]):
module = import_util.import_('blah')
module = self.__import__('blah')
self.assertEqual(loader, module.__loader__)
def test___loader___is_None(self):
@ -36,9 +36,13 @@ class LoaderAttributeTests(unittest.TestCase):
loader = LoaderMock()
loader.module = module
with util.uncache('blah'), util.import_state(meta_path=[loader]):
returned_module = import_util.import_('blah')
returned_module = self.__import__('blah')
self.assertEqual(loader, module.__loader__)
Frozen_Tests, Source_Tests = util.test_both(LoaderAttributeTests,
__import__=import_util.__import__)
if __name__ == '__main__':
unittest.main()

View File

@ -9,7 +9,7 @@ from .. import util
from . import util as import_util
class Using__package__(unittest.TestCase):
class Using__package__:
"""Use of __package__ supercedes the use of __name__/__path__ to calculate
what package a module belongs to. The basic algorithm is [__package__]::
@ -38,8 +38,8 @@ class Using__package__(unittest.TestCase):
# [__package__]
with util.mock_modules('pkg.__init__', 'pkg.fake') as importer:
with util.import_state(meta_path=[importer]):
import_util.import_('pkg.fake')
module = import_util.import_('',
self.__import__('pkg.fake')
module = self.__import__('',
globals={'__package__': 'pkg.fake'},
fromlist=['attr'], level=2)
self.assertEqual(module.__name__, 'pkg')
@ -51,8 +51,8 @@ class Using__package__(unittest.TestCase):
globals_['__package__'] = None
with util.mock_modules('pkg.__init__', 'pkg.fake') as importer:
with util.import_state(meta_path=[importer]):
import_util.import_('pkg.fake')
module = import_util.import_('', globals= globals_,
self.__import__('pkg.fake')
module = self.__import__('', globals= globals_,
fromlist=['attr'], level=2)
self.assertEqual(module.__name__, 'pkg')
@ -63,15 +63,17 @@ class Using__package__(unittest.TestCase):
def test_bad__package__(self):
globals = {'__package__': '<not real>'}
with self.assertRaises(SystemError):
import_util.import_('', globals, {}, ['relimport'], 1)
self.__import__('', globals, {}, ['relimport'], 1)
def test_bunk__package__(self):
globals = {'__package__': 42}
with self.assertRaises(TypeError):
import_util.import_('', globals, {}, ['relimport'], 1)
self.__import__('', globals, {}, ['relimport'], 1)
Frozen_UsingPackage, Source_UsingPackage = util.test_both(
Using__package__, __import__=import_util.__import__)
@import_util.importlib_only
class Setting__package__(unittest.TestCase):
"""Because __package__ is a new feature, it is not always set by a loader.
@ -84,12 +86,14 @@ class Setting__package__(unittest.TestCase):
"""
__import__ = import_util.__import__[1]
# [top-level]
def test_top_level(self):
with util.mock_modules('top_level') as mock:
with util.import_state(meta_path=[mock]):
del mock['top_level'].__package__
module = import_util.import_('top_level')
module = self.__import__('top_level')
self.assertEqual(module.__package__, '')
# [package]
@ -97,7 +101,7 @@ class Setting__package__(unittest.TestCase):
with util.mock_modules('pkg.__init__') as mock:
with util.import_state(meta_path=[mock]):
del mock['pkg'].__package__
module = import_util.import_('pkg')
module = self.__import__('pkg')
self.assertEqual(module.__package__, 'pkg')
# [submodule]
@ -105,15 +109,10 @@ class Setting__package__(unittest.TestCase):
with util.mock_modules('pkg.__init__', 'pkg.mod') as mock:
with util.import_state(meta_path=[mock]):
del mock['pkg.mod'].__package__
pkg = import_util.import_('pkg.mod')
pkg = self.__import__('pkg.mod')
module = getattr(pkg, 'mod')
self.assertEqual(module.__package__, 'pkg')
def test_main():
from test.support import run_unittest
run_unittest(Using__package__, Setting__package__)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,5 +1,5 @@
from .. import util as importlib_test_util
from . import util
from .. import util
from . import util as import_util
import sys
import types
import unittest
@ -17,7 +17,7 @@ class BadLoaderFinder:
raise ImportError('I cannot be loaded!')
class APITest(unittest.TestCase):
class APITest:
"""Test API-specific details for __import__ (e.g. raising the right
exception when passing in an int for the module name)."""
@ -25,24 +25,24 @@ class APITest(unittest.TestCase):
def test_name_requires_rparition(self):
# Raise TypeError if a non-string is passed in for the module name.
with self.assertRaises(TypeError):
util.import_(42)
self.__import__(42)
def test_negative_level(self):
# Raise ValueError when a negative level is specified.
# PEP 328 did away with sys.module None entries and the ambiguity of
# absolute/relative imports.
with self.assertRaises(ValueError):
util.import_('os', globals(), level=-1)
self.__import__('os', globals(), level=-1)
def test_nonexistent_fromlist_entry(self):
# If something in fromlist doesn't exist, that's okay.
# issue15715
mod = types.ModuleType('fine')
mod.__path__ = ['XXX']
with importlib_test_util.import_state(meta_path=[BadLoaderFinder]):
with importlib_test_util.uncache('fine'):
with util.import_state(meta_path=[BadLoaderFinder]):
with util.uncache('fine'):
sys.modules['fine'] = mod
util.import_('fine', fromlist=['not here'])
self.__import__('fine', fromlist=['not here'])
def test_fromlist_load_error_propagates(self):
# If something in fromlist triggers an exception not related to not
@ -50,18 +50,15 @@ class APITest(unittest.TestCase):
# issue15316
mod = types.ModuleType('fine')
mod.__path__ = ['XXX']
with importlib_test_util.import_state(meta_path=[BadLoaderFinder]):
with importlib_test_util.uncache('fine'):
with util.import_state(meta_path=[BadLoaderFinder]):
with util.uncache('fine'):
sys.modules['fine'] = mod
with self.assertRaises(ImportError):
util.import_('fine', fromlist=['bogus'])
self.__import__('fine', fromlist=['bogus'])
def test_main():
from test.support import run_unittest
run_unittest(APITest)
Frozen_APITests, Source_APITests = util.test_both(
APITest, __import__=import_util.__import__)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -6,7 +6,7 @@ from types import MethodType
import unittest
class UseCache(unittest.TestCase):
class UseCache:
"""When it comes to sys.modules, import prefers it over anything else.
@ -21,12 +21,13 @@ class UseCache(unittest.TestCase):
ImportError is raised [None in cache].
"""
def test_using_cache(self):
# [use cache]
module_to_use = "some module found!"
with util.uncache('some_module'):
sys.modules['some_module'] = module_to_use
module = import_util.import_('some_module')
module = self.__import__('some_module')
self.assertEqual(id(module_to_use), id(module))
def test_None_in_cache(self):
@ -35,7 +36,7 @@ class UseCache(unittest.TestCase):
with util.uncache(name):
sys.modules[name] = None
with self.assertRaises(ImportError) as cm:
import_util.import_(name)
self.__import__(name)
self.assertEqual(cm.exception.name, name)
def create_mock(self, *names, return_=None):
@ -47,42 +48,43 @@ class UseCache(unittest.TestCase):
mock.load_module = MethodType(load_module, mock)
return mock
Frozen_UseCache, Source_UseCache = util.test_both(
UseCache, __import__=import_util.__import__)
class ImportlibUseCache(UseCache, unittest.TestCase):
__import__ = import_util.__import__[1]
# __import__ inconsistent between loaders and built-in import when it comes
# to when to use the module in sys.modules and when not to.
@import_util.importlib_only
def test_using_cache_after_loader(self):
# [from cache on return]
with self.create_mock('module') as mock:
with util.import_state(meta_path=[mock]):
module = import_util.import_('module')
module = self.__import__('module')
self.assertEqual(id(module), id(sys.modules['module']))
# See test_using_cache_after_loader() for reasoning.
@import_util.importlib_only
def test_using_cache_for_assigning_to_attribute(self):
# [from cache to attribute]
with self.create_mock('pkg.__init__', 'pkg.module') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('pkg.module')
module = self.__import__('pkg.module')
self.assertTrue(hasattr(module, 'module'))
self.assertEqual(id(module.module),
id(sys.modules['pkg.module']))
# See test_using_cache_after_loader() for reasoning.
@import_util.importlib_only
def test_using_cache_for_fromlist(self):
# [from cache for fromlist]
with self.create_mock('pkg.__init__', 'pkg.module') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('pkg', fromlist=['module'])
module = self.__import__('pkg', fromlist=['module'])
self.assertTrue(hasattr(module, 'module'))
self.assertEqual(id(module.module),
id(sys.modules['pkg.module']))
def test_main():
from test.support import run_unittest
run_unittest(UseCache)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -3,7 +3,8 @@ from .. import util
from . import util as import_util
import unittest
class ReturnValue(unittest.TestCase):
class ReturnValue:
"""The use of fromlist influences what import returns.
@ -18,18 +19,21 @@ class ReturnValue(unittest.TestCase):
# [import return]
with util.mock_modules('pkg.__init__', 'pkg.module') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('pkg.module')
module = self.__import__('pkg.module')
self.assertEqual(module.__name__, 'pkg')
def test_return_from_from_import(self):
# [from return]
with util.mock_modules('pkg.__init__', 'pkg.module')as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('pkg.module', fromlist=['attr'])
module = self.__import__('pkg.module', fromlist=['attr'])
self.assertEqual(module.__name__, 'pkg.module')
Frozen_ReturnValue, Source_ReturnValue = util.test_both(
ReturnValue, __import__=import_util.__import__)
class HandlingFromlist(unittest.TestCase):
class HandlingFromlist:
"""Using fromlist triggers different actions based on what is being asked
of it.
@ -48,14 +52,14 @@ class HandlingFromlist(unittest.TestCase):
# [object case]
with util.mock_modules('module') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('module', fromlist=['attr'])
module = self.__import__('module', fromlist=['attr'])
self.assertEqual(module.__name__, 'module')
def test_nonexistent_object(self):
# [bad object]
with util.mock_modules('module') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('module', fromlist=['non_existent'])
module = self.__import__('module', fromlist=['non_existent'])
self.assertEqual(module.__name__, 'module')
self.assertTrue(not hasattr(module, 'non_existent'))
@ -63,7 +67,7 @@ class HandlingFromlist(unittest.TestCase):
# [module]
with util.mock_modules('pkg.__init__', 'pkg.module') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('pkg', fromlist=['module'])
module = self.__import__('pkg', fromlist=['module'])
self.assertEqual(module.__name__, 'pkg')
self.assertTrue(hasattr(module, 'module'))
self.assertEqual(module.module.__name__, 'pkg.module')
@ -78,13 +82,13 @@ class HandlingFromlist(unittest.TestCase):
module_code={'pkg.mod': module_code}) as importer:
with util.import_state(meta_path=[importer]):
with self.assertRaises(ImportError) as exc:
import_util.import_('pkg', fromlist=['mod'])
self.__import__('pkg', fromlist=['mod'])
self.assertEqual('i_do_not_exist', exc.exception.name)
def test_empty_string(self):
with util.mock_modules('pkg.__init__', 'pkg.mod') as importer:
with util.import_state(meta_path=[importer]):
module = import_util.import_('pkg.mod', fromlist=[''])
module = self.__import__('pkg.mod', fromlist=[''])
self.assertEqual(module.__name__, 'pkg.mod')
def basic_star_test(self, fromlist=['*']):
@ -92,7 +96,7 @@ class HandlingFromlist(unittest.TestCase):
with util.mock_modules('pkg.__init__', 'pkg.module') as mock:
with util.import_state(meta_path=[mock]):
mock['pkg'].__all__ = ['module']
module = import_util.import_('pkg', fromlist=fromlist)
module = self.__import__('pkg', fromlist=fromlist)
self.assertEqual(module.__name__, 'pkg')
self.assertTrue(hasattr(module, 'module'))
self.assertEqual(module.module.__name__, 'pkg.module')
@ -110,17 +114,16 @@ class HandlingFromlist(unittest.TestCase):
with context as mock:
with util.import_state(meta_path=[mock]):
mock['pkg'].__all__ = ['module1']
module = import_util.import_('pkg', fromlist=['module2', '*'])
module = self.__import__('pkg', fromlist=['module2', '*'])
self.assertEqual(module.__name__, 'pkg')
self.assertTrue(hasattr(module, 'module1'))
self.assertTrue(hasattr(module, 'module2'))
self.assertEqual(module.module1.__name__, 'pkg.module1')
self.assertEqual(module.module2.__name__, 'pkg.module2')
Frozen_FromList, Source_FromList = util.test_both(
HandlingFromlist, __import__=import_util.__import__)
def test_main():
from test.support import run_unittest
run_unittest(ReturnValue, HandlingFromlist)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -7,7 +7,7 @@ import unittest
import warnings
class CallingOrder(unittest.TestCase):
class CallingOrder:
"""Calls to the importers on sys.meta_path happen in order that they are
specified in the sequence, starting with the first importer
@ -24,7 +24,7 @@ class CallingOrder(unittest.TestCase):
first.modules[mod] = 42
second.modules[mod] = -13
with util.import_state(meta_path=[first, second]):
self.assertEqual(import_util.import_(mod), 42)
self.assertEqual(self.__import__(mod), 42)
def test_continuing(self):
# [continuing]
@ -34,7 +34,7 @@ class CallingOrder(unittest.TestCase):
first.find_module = lambda self, fullname, path=None: None
second.modules[mod_name] = 42
with util.import_state(meta_path=[first, second]):
self.assertEqual(import_util.import_(mod_name), 42)
self.assertEqual(self.__import__(mod_name), 42)
def test_empty(self):
# Raise an ImportWarning if sys.meta_path is empty.
@ -51,8 +51,11 @@ class CallingOrder(unittest.TestCase):
self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[-1].category, ImportWarning))
Frozen_CallingOrder, Source_CallingOrder = util.test_both(
CallingOrder, __import__=import_util.__import__)
class CallSignature(unittest.TestCase):
class CallSignature:
"""If there is no __path__ entry on the parent module, then 'path' is None
[no path]. Otherwise, the value for __path__ is passed in for the 'path'
@ -74,7 +77,7 @@ class CallSignature(unittest.TestCase):
log, wrapped_call = self.log(importer.find_module)
importer.find_module = MethodType(wrapped_call, importer)
with util.import_state(meta_path=[importer]):
import_util.import_(mod_name)
self.__import__(mod_name)
assert len(log) == 1
args = log[0][0]
kwargs = log[0][1]
@ -95,7 +98,7 @@ class CallSignature(unittest.TestCase):
log, wrapped_call = self.log(importer.find_module)
importer.find_module = MethodType(wrapped_call, importer)
with util.import_state(meta_path=[importer]):
import_util.import_(mod_name)
self.__import__(mod_name)
assert len(log) == 2
args = log[1][0]
kwargs = log[1][1]
@ -104,12 +107,9 @@ class CallSignature(unittest.TestCase):
self.assertEqual(args[0], mod_name)
self.assertIs(args[1], path)
def test_main():
from test.support import run_unittest
run_unittest(CallingOrder, CallSignature)
Frozen_CallSignature, Source_CallSignature = util.test_both(
CallSignature, __import__=import_util.__import__)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -6,21 +6,21 @@ import importlib
from test import support
class ParentModuleTests(unittest.TestCase):
class ParentModuleTests:
"""Importing a submodule should import the parent modules."""
def test_import_parent(self):
with util.mock_modules('pkg.__init__', 'pkg.module') as mock:
with util.import_state(meta_path=[mock]):
module = import_util.import_('pkg.module')
module = self.__import__('pkg.module')
self.assertIn('pkg', sys.modules)
def test_bad_parent(self):
with util.mock_modules('pkg.module') as mock:
with util.import_state(meta_path=[mock]):
with self.assertRaises(ImportError) as cm:
import_util.import_('pkg.module')
self.__import__('pkg.module')
self.assertEqual(cm.exception.name, 'pkg')
def test_raising_parent_after_importing_child(self):
@ -32,11 +32,11 @@ class ParentModuleTests(unittest.TestCase):
with mock:
with util.import_state(meta_path=[mock]):
with self.assertRaises(ZeroDivisionError):
import_util.import_('pkg')
self.__import__('pkg')
self.assertNotIn('pkg', sys.modules)
self.assertIn('pkg.module', sys.modules)
with self.assertRaises(ZeroDivisionError):
import_util.import_('pkg.module')
self.__import__('pkg.module')
self.assertNotIn('pkg', sys.modules)
self.assertIn('pkg.module', sys.modules)
@ -51,10 +51,10 @@ class ParentModuleTests(unittest.TestCase):
with self.assertRaises((ZeroDivisionError, ImportError)):
# This raises ImportError on the "from . import module"
# line, not sure why.
import_util.import_('pkg')
self.__import__('pkg')
self.assertNotIn('pkg', sys.modules)
with self.assertRaises((ZeroDivisionError, ImportError)):
import_util.import_('pkg.module')
self.__import__('pkg.module')
self.assertNotIn('pkg', sys.modules)
# XXX False
#self.assertIn('pkg.module', sys.modules)
@ -71,10 +71,10 @@ class ParentModuleTests(unittest.TestCase):
with self.assertRaises((ZeroDivisionError, ImportError)):
# This raises ImportError on the "from ..subpkg import module"
# line, not sure why.
import_util.import_('pkg.subpkg')
self.__import__('pkg.subpkg')
self.assertNotIn('pkg.subpkg', sys.modules)
with self.assertRaises((ZeroDivisionError, ImportError)):
import_util.import_('pkg.subpkg.module')
self.__import__('pkg.subpkg.module')
self.assertNotIn('pkg.subpkg', sys.modules)
# XXX False
#self.assertIn('pkg.subpkg.module', sys.modules)
@ -83,7 +83,7 @@ class ParentModuleTests(unittest.TestCase):
# Try to import a submodule from a non-package should raise ImportError.
assert not hasattr(sys, '__path__')
with self.assertRaises(ImportError) as cm:
import_util.import_('sys.no_submodules_here')
self.__import__('sys.no_submodules_here')
self.assertEqual(cm.exception.name, 'sys.no_submodules_here')
def test_module_not_package_but_side_effects(self):
@ -98,15 +98,13 @@ class ParentModuleTests(unittest.TestCase):
with mock_modules as mock:
with util.import_state(meta_path=[mock]):
try:
submodule = import_util.import_(subname)
submodule = self.__import__(subname)
finally:
support.unload(subname)
def test_main():
from test.support import run_unittest
run_unittest(ParentModuleTests)
Frozen_ParentTests, Source_ParentTests = util.test_both(
ParentModuleTests, __import__=import_util.__import__)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,8 +1,9 @@
from importlib import _bootstrap
from importlib import machinery
from importlib import import_module
from .. import util
from . import util as import_util
importlib = util.import_importlib('importlib')
machinery = util.import_importlib('importlib.machinery')
import os
import sys
from types import ModuleType
@ -11,7 +12,7 @@ import warnings
import zipimport
class FinderTests(unittest.TestCase):
class FinderTests:
"""Tests for PathFinder."""
@ -19,7 +20,7 @@ class FinderTests(unittest.TestCase):
# Test None returned upon not finding a suitable finder.
module = '<test module>'
with util.import_state():
self.assertIsNone(machinery.PathFinder.find_module(module))
self.assertIsNone(self.machinery.PathFinder.find_module(module))
def test_sys_path(self):
# Test that sys.path is used when 'path' is None.
@ -29,7 +30,7 @@ class FinderTests(unittest.TestCase):
importer = util.mock_modules(module)
with util.import_state(path_importer_cache={path: importer},
path=[path]):
loader = machinery.PathFinder.find_module(module)
loader = self.machinery.PathFinder.find_module(module)
self.assertIs(loader, importer)
def test_path(self):
@ -39,7 +40,7 @@ class FinderTests(unittest.TestCase):
path = '<test path>'
importer = util.mock_modules(module)
with util.import_state(path_importer_cache={path: importer}):
loader = machinery.PathFinder.find_module(module, [path])
loader = self.machinery.PathFinder.find_module(module, [path])
self.assertIs(loader, importer)
def test_empty_list(self):
@ -49,7 +50,7 @@ class FinderTests(unittest.TestCase):
importer = util.mock_modules(module)
with util.import_state(path_importer_cache={path: importer},
path=[path]):
self.assertIsNone(machinery.PathFinder.find_module('module', []))
self.assertIsNone(self.machinery.PathFinder.find_module('module', []))
def test_path_hooks(self):
# Test that sys.path_hooks is used.
@ -59,7 +60,7 @@ class FinderTests(unittest.TestCase):
importer = util.mock_modules(module)
hook = import_util.mock_path_hook(path, importer=importer)
with util.import_state(path_hooks=[hook]):
loader = machinery.PathFinder.find_module(module, [path])
loader = self.machinery.PathFinder.find_module(module, [path])
self.assertIs(loader, importer)
self.assertIn(path, sys.path_importer_cache)
self.assertIs(sys.path_importer_cache[path], importer)
@ -72,7 +73,7 @@ class FinderTests(unittest.TestCase):
path=[path_entry]):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
self.assertIsNone(machinery.PathFinder.find_module('os'))
self.assertIsNone(self.machinery.PathFinder.find_module('os'))
self.assertIsNone(sys.path_importer_cache[path_entry])
self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[-1].category, ImportWarning))
@ -84,7 +85,7 @@ class FinderTests(unittest.TestCase):
importer = util.mock_modules(module)
hook = import_util.mock_path_hook(os.getcwd(), importer=importer)
with util.import_state(path=[path], path_hooks=[hook]):
loader = machinery.PathFinder.find_module(module)
loader = self.machinery.PathFinder.find_module(module)
self.assertIs(loader, importer)
self.assertIn(os.getcwd(), sys.path_importer_cache)
@ -96,8 +97,8 @@ class FinderTests(unittest.TestCase):
new_path_importer_cache = sys.path_importer_cache.copy()
new_path_importer_cache.pop(None, None)
new_path_hooks = [zipimport.zipimporter,
_bootstrap.FileFinder.path_hook(
*_bootstrap._get_supported_file_loaders())]
self.machinery.FileFinder.path_hook(
*self.importlib._bootstrap._get_supported_file_loaders())]
missing = object()
email = sys.modules.pop('email', missing)
try:
@ -105,16 +106,15 @@ class FinderTests(unittest.TestCase):
path=new_path,
path_importer_cache=new_path_importer_cache,
path_hooks=new_path_hooks):
module = import_module('email')
module = self.importlib.import_module('email')
self.assertIsInstance(module, ModuleType)
finally:
if email is not missing:
sys.modules['email'] = email
Frozen_FinderTests, Source_FinderTests = util.test_both(
FinderTests, importlib=importlib, machinery=machinery)
def test_main():
from test.support import run_unittest
run_unittest(FinderTests)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -4,7 +4,7 @@ from . import util as import_util
import sys
import unittest
class RelativeImports(unittest.TestCase):
class RelativeImports:
"""PEP 328 introduced relative imports. This allows for imports to occur
from within a package without having to specify the actual package name.
@ -76,8 +76,8 @@ class RelativeImports(unittest.TestCase):
create = 'pkg.__init__', 'pkg.mod2'
globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.mod1'}
def callback(global_):
import_util.import_('pkg') # For __import__().
module = import_util.import_('', global_, fromlist=['mod2'], level=1)
self.__import__('pkg') # For __import__().
module = self.__import__('', global_, fromlist=['mod2'], level=1)
self.assertEqual(module.__name__, 'pkg')
self.assertTrue(hasattr(module, 'mod2'))
self.assertEqual(module.mod2.attr, 'pkg.mod2')
@ -88,8 +88,8 @@ class RelativeImports(unittest.TestCase):
create = 'pkg.__init__', 'pkg.mod2'
globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.mod1'}
def callback(global_):
import_util.import_('pkg') # For __import__().
module = import_util.import_('mod2', global_, fromlist=['attr'],
self.__import__('pkg') # For __import__().
module = self.__import__('mod2', global_, fromlist=['attr'],
level=1)
self.assertEqual(module.__name__, 'pkg.mod2')
self.assertEqual(module.attr, 'pkg.mod2')
@ -101,8 +101,8 @@ class RelativeImports(unittest.TestCase):
globals_ = ({'__package__': 'pkg'},
{'__name__': 'pkg', '__path__': ['blah']})
def callback(global_):
import_util.import_('pkg') # For __import__().
module = import_util.import_('', global_, fromlist=['module'],
self.__import__('pkg') # For __import__().
module = self.__import__('', global_, fromlist=['module'],
level=1)
self.assertEqual(module.__name__, 'pkg')
self.assertTrue(hasattr(module, 'module'))
@ -114,8 +114,8 @@ class RelativeImports(unittest.TestCase):
create = 'pkg.__init__', 'pkg.module'
globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.module'}
def callback(global_):
import_util.import_('pkg') # For __import__().
module = import_util.import_('', global_, fromlist=['attr'], level=1)
self.__import__('pkg') # For __import__().
module = self.__import__('', global_, fromlist=['attr'], level=1)
self.assertEqual(module.__name__, 'pkg')
self.relative_import_test(create, globals_, callback)
@ -126,7 +126,7 @@ class RelativeImports(unittest.TestCase):
globals_ = ({'__package__': 'pkg.subpkg1'},
{'__name__': 'pkg.subpkg1', '__path__': ['blah']})
def callback(global_):
module = import_util.import_('', global_, fromlist=['subpkg2'],
module = self.__import__('', global_, fromlist=['subpkg2'],
level=2)
self.assertEqual(module.__name__, 'pkg')
self.assertTrue(hasattr(module, 'subpkg2'))
@ -142,8 +142,8 @@ class RelativeImports(unittest.TestCase):
{'__name__': 'pkg.pkg1.pkg2.pkg3.pkg4.pkg5',
'__path__': ['blah']})
def callback(global_):
import_util.import_(globals_[0]['__package__'])
module = import_util.import_('', global_, fromlist=['attr'], level=6)
self.__import__(globals_[0]['__package__'])
module = self.__import__('', global_, fromlist=['attr'], level=6)
self.assertEqual(module.__name__, 'pkg')
self.relative_import_test(create, globals_, callback)
@ -153,9 +153,9 @@ class RelativeImports(unittest.TestCase):
globals_ = ({'__package__': 'pkg'},
{'__name__': 'pkg', '__path__': ['blah']})
def callback(global_):
import_util.import_('pkg')
self.__import__('pkg')
with self.assertRaises(ValueError):
import_util.import_('', global_, fromlist=['top_level'],
self.__import__('', global_, fromlist=['top_level'],
level=2)
self.relative_import_test(create, globals_, callback)
@ -164,16 +164,16 @@ class RelativeImports(unittest.TestCase):
create = ['top_level', 'pkg.__init__', 'pkg.module']
globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.module'}
def callback(global_):
import_util.import_('pkg')
self.__import__('pkg')
with self.assertRaises(ValueError):
import_util.import_('', global_, fromlist=['top_level'],
self.__import__('', global_, fromlist=['top_level'],
level=2)
self.relative_import_test(create, globals_, callback)
def test_empty_name_w_level_0(self):
# [empty name]
with self.assertRaises(ValueError):
import_util.import_('')
self.__import__('')
def test_import_from_different_package(self):
# Test importing from a different package than the caller.
@ -186,8 +186,8 @@ class RelativeImports(unittest.TestCase):
'__runpy_pkg__.uncle.cousin.nephew']
globals_ = {'__package__': '__runpy_pkg__.__runpy_pkg__'}
def callback(global_):
import_util.import_('__runpy_pkg__.__runpy_pkg__')
module = import_util.import_('uncle.cousin', globals_, {},
self.__import__('__runpy_pkg__.__runpy_pkg__')
module = self.__import__('uncle.cousin', globals_, {},
fromlist=['nephew'],
level=2)
self.assertEqual(module.__name__, '__runpy_pkg__.uncle.cousin')
@ -198,20 +198,19 @@ class RelativeImports(unittest.TestCase):
create = ['crash.__init__', 'crash.mod']
globals_ = [{'__package__': 'crash', '__name__': 'crash'}]
def callback(global_):
import_util.import_('crash')
mod = import_util.import_('mod', global_, {}, [], 1)
self.__import__('crash')
mod = self.__import__('mod', global_, {}, [], 1)
self.assertEqual(mod.__name__, 'crash.mod')
self.relative_import_test(create, globals_, callback)
def test_relative_import_no_globals(self):
# No globals for a relative import is an error.
with self.assertRaises(KeyError):
import_util.import_('sys', level=1)
self.__import__('sys', level=1)
Frozen_RelativeImports, Source_RelativeImports = util.test_both(
RelativeImports, __import__=import_util.__import__)
def test_main():
from test.support import run_unittest
run_unittest(RelativeImports)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,22 +1,14 @@
from .. import util
frozen_importlib, source_importlib = util.import_importlib('importlib')
import builtins
import functools
import importlib
import unittest
using___import__ = False
def import_(*args, **kwargs):
"""Delegate to allow for injecting different implementations of import."""
if using___import__:
return __import__(*args, **kwargs)
else:
return importlib.__import__(*args, **kwargs)
def importlib_only(fxn):
"""Decorator to skip a test if using __builtins__.__import__."""
return unittest.skipIf(using___import__, "importlib-specific test")(fxn)
__import__ = staticmethod(builtins.__import__), staticmethod(source_importlib.__import__)
def mock_path_hook(*entries, importer):

View File

@ -2,8 +2,9 @@
from .. import util
from . import util as source_util
from importlib import _bootstrap
from importlib import machinery
importlib = util.import_importlib('importlib')
machinery = util.import_importlib('importlib.machinery')
import os
import sys
from test import support as test_support
@ -11,7 +12,7 @@ import unittest
@util.case_insensitive_tests
class CaseSensitivityTest(unittest.TestCase):
class CaseSensitivityTest:
"""PEP 235 dictates that on case-preserving, case-insensitive file systems
that imports are case-sensitive unless the PYTHONCASEOK environment
@ -21,11 +22,11 @@ class CaseSensitivityTest(unittest.TestCase):
assert name != name.lower()
def find(self, path):
finder = machinery.FileFinder(path,
(machinery.SourceFileLoader,
machinery.SOURCE_SUFFIXES),
(machinery.SourcelessFileLoader,
machinery.BYTECODE_SUFFIXES))
finder = self.machinery.FileFinder(path,
(self.machinery.SourceFileLoader,
self.machinery.SOURCE_SUFFIXES),
(self.machinery.SourcelessFileLoader,
self.machinery.BYTECODE_SUFFIXES))
return finder.find_module(self.name)
def sensitivity_test(self):
@ -41,7 +42,7 @@ class CaseSensitivityTest(unittest.TestCase):
def test_sensitive(self):
with test_support.EnvironmentVarGuard() as env:
env.unset('PYTHONCASEOK')
if b'PYTHONCASEOK' in _bootstrap._os.environ:
if b'PYTHONCASEOK' in self.importlib._bootstrap._os.environ:
self.skipTest('os.environ changes not reflected in '
'_os.environ')
sensitive, insensitive = self.sensitivity_test()
@ -52,7 +53,7 @@ class CaseSensitivityTest(unittest.TestCase):
def test_insensitive(self):
with test_support.EnvironmentVarGuard() as env:
env.set('PYTHONCASEOK', '1')
if b'PYTHONCASEOK' not in _bootstrap._os.environ:
if b'PYTHONCASEOK' not in self.importlib._bootstrap._os.environ:
self.skipTest('os.environ changes not reflected in '
'_os.environ')
sensitive, insensitive = self.sensitivity_test()
@ -61,10 +62,9 @@ class CaseSensitivityTest(unittest.TestCase):
self.assertTrue(hasattr(insensitive, 'load_module'))
self.assertIn(self.name, insensitive.get_filename(self.name))
def test_main():
test_support.run_unittest(CaseSensitivityTest)
Frozen_CaseSensitivityTest, Source_CaseSensitivityTest = util.test_both(
CaseSensitivityTest, importlib=importlib, machinery=machinery)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,11 +1,12 @@
from importlib import machinery
import importlib
import importlib.abc
import importlib.util
from .. import abc
from .. import util
from . import util as source_util
importlib = util.import_importlib('importlib')
importlib_abc = util.import_importlib('importlib.abc')
machinery = util.import_importlib('importlib.machinery')
importlib_util = util.import_importlib('importlib.util')
import errno
import marshal
import os
@ -19,7 +20,7 @@ import unittest
from test.support import make_legacy_pyc, unload
class SimpleTest(unittest.TestCase, abc.LoaderTests):
class SimpleTest(abc.LoaderTests):
"""Should have no issue importing a source module [basic]. And if there is
a syntax error, it should raise a SyntaxError [syntax error].
@ -27,7 +28,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
"""
def test_load_module_API(self):
class Tester(importlib.abc.FileLoader):
class Tester(self.abc.FileLoader):
def get_source(self, _): return 'attr = 42'
def is_package(self, _): return False
@ -37,7 +38,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
def test_get_filename_API(self):
# If fullname is not set then assume self.path is desired.
class Tester(importlib.abc.FileLoader):
class Tester(self.abc.FileLoader):
def get_code(self, _): pass
def get_source(self, _): pass
def is_package(self, _): pass
@ -55,7 +56,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
# [basic]
def test_module(self):
with source_util.create_modules('_temp') as mapping:
loader = machinery.SourceFileLoader('_temp', mapping['_temp'])
loader = self.machinery.SourceFileLoader('_temp', mapping['_temp'])
module = loader.load_module('_temp')
self.assertIn('_temp', sys.modules)
check = {'__name__': '_temp', '__file__': mapping['_temp'],
@ -65,7 +66,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
def test_package(self):
with source_util.create_modules('_pkg.__init__') as mapping:
loader = machinery.SourceFileLoader('_pkg',
loader = self.machinery.SourceFileLoader('_pkg',
mapping['_pkg.__init__'])
module = loader.load_module('_pkg')
self.assertIn('_pkg', sys.modules)
@ -78,7 +79,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
def test_lacking_parent(self):
with source_util.create_modules('_pkg.__init__', '_pkg.mod')as mapping:
loader = machinery.SourceFileLoader('_pkg.mod',
loader = self.machinery.SourceFileLoader('_pkg.mod',
mapping['_pkg.mod'])
module = loader.load_module('_pkg.mod')
self.assertIn('_pkg.mod', sys.modules)
@ -93,7 +94,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
def test_module_reuse(self):
with source_util.create_modules('_temp') as mapping:
loader = machinery.SourceFileLoader('_temp', mapping['_temp'])
loader = self.machinery.SourceFileLoader('_temp', mapping['_temp'])
module = loader.load_module('_temp')
module_id = id(module)
module_dict_id = id(module.__dict__)
@ -118,7 +119,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
setattr(orig_module, attr, value)
with open(mapping[name], 'w') as file:
file.write('+++ bad syntax +++')
loader = machinery.SourceFileLoader('_temp', mapping['_temp'])
loader = self.machinery.SourceFileLoader('_temp', mapping['_temp'])
with self.assertRaises(SyntaxError):
loader.load_module(name)
for attr in attributes:
@ -129,7 +130,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
with source_util.create_modules('_temp') as mapping:
with open(mapping['_temp'], 'w') as file:
file.write('=')
loader = machinery.SourceFileLoader('_temp', mapping['_temp'])
loader = self.machinery.SourceFileLoader('_temp', mapping['_temp'])
with self.assertRaises(SyntaxError):
loader.load_module('_temp')
self.assertNotIn('_temp', sys.modules)
@ -142,14 +143,14 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
file.write("# test file for importlib")
try:
with util.uncache('_temp'):
loader = machinery.SourceFileLoader('_temp', file_path)
loader = self.machinery.SourceFileLoader('_temp', file_path)
mod = loader.load_module('_temp')
self.assertEqual(file_path, mod.__file__)
self.assertEqual(importlib.util.cache_from_source(file_path),
self.assertEqual(self.util.cache_from_source(file_path),
mod.__cached__)
finally:
os.unlink(file_path)
pycache = os.path.dirname(importlib.util.cache_from_source(file_path))
pycache = os.path.dirname(self.util.cache_from_source(file_path))
if os.path.exists(pycache):
shutil.rmtree(pycache)
@ -158,7 +159,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
# truncated rather than raise an OverflowError.
with source_util.create_modules('_temp') as mapping:
source = mapping['_temp']
compiled = importlib.util.cache_from_source(source)
compiled = self.util.cache_from_source(source)
with open(source, 'w') as f:
f.write("x = 5")
try:
@ -169,7 +170,7 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
if e.errno != getattr(errno, 'EOVERFLOW', None):
raise
self.skipTest("cannot set modification time to large integer ({})".format(e))
loader = machinery.SourceFileLoader('_temp', mapping['_temp'])
loader = self.machinery.SourceFileLoader('_temp', mapping['_temp'])
mod = loader.load_module('_temp')
# Sanity checks.
self.assertEqual(mod.__cached__, compiled)
@ -178,12 +179,16 @@ class SimpleTest(unittest.TestCase, abc.LoaderTests):
os.stat(compiled)
def test_unloadable(self):
loader = machinery.SourceFileLoader('good name', {})
loader = self.machinery.SourceFileLoader('good name', {})
with self.assertRaises(ImportError):
loader.load_module('bad name')
Frozen_SimpleTest, Source_SimpleTest = util.test_both(
SimpleTest, importlib=importlib, machinery=machinery, abc=importlib_abc,
util=importlib_util)
class BadBytecodeTest(unittest.TestCase):
class BadBytecodeTest:
def import_(self, file, module_name):
loader = self.loader(module_name, file)
@ -200,7 +205,7 @@ class BadBytecodeTest(unittest.TestCase):
pass
py_compile.compile(mapping[name])
if not del_source:
bytecode_path = importlib.util.cache_from_source(mapping[name])
bytecode_path = self.util.cache_from_source(mapping[name])
else:
os.unlink(mapping[name])
bytecode_path = make_legacy_pyc(mapping[name])
@ -289,7 +294,9 @@ class BadBytecodeTest(unittest.TestCase):
class SourceLoaderBadBytecodeTest(BadBytecodeTest):
loader = machinery.SourceFileLoader
@classmethod
def setUpClass(cls):
cls.loader = cls.machinery.SourceFileLoader
@source_util.writes_bytecode_files
def test_empty_file(self):
@ -329,7 +336,7 @@ class SourceLoaderBadBytecodeTest(BadBytecodeTest):
self.import_(mapping[name], name)
with open(bytecode_path, 'rb') as bytecode_file:
self.assertEqual(bytecode_file.read(4),
importlib.util.MAGIC_NUMBER)
self.util.MAGIC_NUMBER)
self._test_bad_magic(test)
@ -379,13 +386,13 @@ class SourceLoaderBadBytecodeTest(BadBytecodeTest):
zeros = b'\x00\x00\x00\x00'
with source_util.create_modules('_temp') as mapping:
py_compile.compile(mapping['_temp'])
bytecode_path = importlib.util.cache_from_source(mapping['_temp'])
bytecode_path = self.util.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(4)
bytecode_file.write(zeros)
self.import_(mapping['_temp'], '_temp')
source_mtime = os.path.getmtime(mapping['_temp'])
source_timestamp = importlib._w_long(source_mtime)
source_timestamp = self.importlib._w_long(source_mtime)
with open(bytecode_path, 'rb') as bytecode_file:
bytecode_file.seek(4)
self.assertEqual(bytecode_file.read(4), source_timestamp)
@ -397,7 +404,7 @@ class SourceLoaderBadBytecodeTest(BadBytecodeTest):
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = importlib.util.cache_from_source(mapping['_temp'])
bytecode_path = self.util.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
@ -411,10 +418,16 @@ class SourceLoaderBadBytecodeTest(BadBytecodeTest):
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
Frozen_SourceBadBytecode, Source_SourceBadBytecode = util.test_both(
SourceLoaderBadBytecodeTest, importlib=importlib, machinery=machinery,
abc=importlib_abc, util=importlib_util)
class SourcelessLoaderBadBytecodeTest(BadBytecodeTest):
loader = machinery.SourcelessFileLoader
@classmethod
def setUpClass(cls):
cls.loader = cls.machinery.SourcelessFileLoader
def test_empty_file(self):
def test(name, mapping, bytecode_path):
@ -469,6 +482,9 @@ class SourcelessLoaderBadBytecodeTest(BadBytecodeTest):
def test_non_code_marshal(self):
self._test_non_code_marshal(del_source=True)
Frozen_SourcelessBadBytecode, Source_SourcelessBadBytecode = util.test_both(
SourcelessLoaderBadBytecodeTest, importlib=importlib,
machinery=machinery, abc=importlib_abc, util=importlib_util)
if __name__ == '__main__':

View File

@ -1,7 +1,9 @@
from .. import abc
from .. import util
from . import util as source_util
from importlib import machinery
machinery = util.import_importlib('importlib.machinery')
import errno
import os
import py_compile
@ -13,7 +15,7 @@ import unittest
import warnings
class FinderTests(unittest.TestCase, abc.FinderTests):
class FinderTests(abc.FinderTests):
"""For a top-level module, it should just be found directly in the
directory being searched. This is true for a directory with source
@ -38,11 +40,11 @@ class FinderTests(unittest.TestCase, abc.FinderTests):
"""
def get_finder(self, root):
loader_details = [(machinery.SourceFileLoader,
machinery.SOURCE_SUFFIXES),
(machinery.SourcelessFileLoader,
machinery.BYTECODE_SUFFIXES)]
return machinery.FileFinder(root, *loader_details)
loader_details = [(self.machinery.SourceFileLoader,
self.machinery.SOURCE_SUFFIXES),
(self.machinery.SourcelessFileLoader,
self.machinery.BYTECODE_SUFFIXES)]
return self.machinery.FileFinder(root, *loader_details)
def import_(self, root, module):
return self.get_finder(root).find_module(module)
@ -123,8 +125,8 @@ class FinderTests(unittest.TestCase, abc.FinderTests):
def test_empty_string_for_dir(self):
# The empty string from sys.path means to search in the cwd.
finder = machinery.FileFinder('', (machinery.SourceFileLoader,
machinery.SOURCE_SUFFIXES))
finder = self.machinery.FileFinder('', (self.machinery.SourceFileLoader,
self.machinery.SOURCE_SUFFIXES))
with open('mod.py', 'w') as file:
file.write("# test file for importlib")
try:
@ -135,8 +137,8 @@ class FinderTests(unittest.TestCase, abc.FinderTests):
def test_invalidate_caches(self):
# invalidate_caches() should reset the mtime.
finder = machinery.FileFinder('', (machinery.SourceFileLoader,
machinery.SOURCE_SUFFIXES))
finder = self.machinery.FileFinder('', (self.machinery.SourceFileLoader,
self.machinery.SOURCE_SUFFIXES))
finder._path_mtime = 42
finder.invalidate_caches()
self.assertEqual(finder._path_mtime, -1)
@ -180,11 +182,9 @@ class FinderTests(unittest.TestCase, abc.FinderTests):
finder = self.get_finder(file_obj.name)
self.assertEqual((None, []), finder.find_loader('doesnotexist'))
Frozen_FinderTests, Source_FinderTests = util.test_both(FinderTests, machinery=machinery)
def test_main():
from test.support import run_unittest
run_unittest(FinderTests)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,16 +1,18 @@
from .. import util
from . import util as source_util
from importlib import machinery
machinery = util.import_importlib('importlib.machinery')
import unittest
class PathHookTest(unittest.TestCase):
class PathHookTest:
"""Test the path hook for source."""
def path_hook(self):
return machinery.FileFinder.path_hook((machinery.SourceFileLoader,
machinery.SOURCE_SUFFIXES))
return self.machinery.FileFinder.path_hook((self.machinery.SourceFileLoader,
self.machinery.SOURCE_SUFFIXES))
def test_success(self):
with source_util.create_modules('dummy') as mapping:
@ -21,11 +23,8 @@ class PathHookTest(unittest.TestCase):
# The empty string represents the cwd.
self.assertTrue(hasattr(self.path_hook()(''), 'find_module'))
def test_main():
from test.support import run_unittest
run_unittest(PathHookTest)
Frozen_PathHookTest, Source_PathHooktest = util.test_both(PathHookTest, machinery=machinery)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -1,6 +1,8 @@
from .. import util
from . import util as source_util
from importlib import _bootstrap
machinery = util.import_importlib('importlib.machinery')
import codecs
import re
import sys
@ -13,7 +15,7 @@ import unittest
CODING_RE = re.compile(r'^[ \t\f]*#.*coding[:=][ \t]*([-\w.]+)', re.ASCII)
class EncodingTest(unittest.TestCase):
class EncodingTest:
"""PEP 3120 makes UTF-8 the default encoding for source code
[default encoding].
@ -35,7 +37,7 @@ class EncodingTest(unittest.TestCase):
with source_util.create_modules(self.module_name) as mapping:
with open(mapping[self.module_name], 'wb') as file:
file.write(source)
loader = _bootstrap.SourceFileLoader(self.module_name,
loader = self.machinery.SourceFileLoader(self.module_name,
mapping[self.module_name])
return loader.load_module(self.module_name)
@ -84,8 +86,10 @@ class EncodingTest(unittest.TestCase):
with self.assertRaises(SyntaxError):
self.run_test(source)
Frozen_EncodingTest, Source_EncodingTest = util.test_both(EncodingTest, machinery=machinery)
class LineEndingTest(unittest.TestCase):
class LineEndingTest:
r"""Source written with the three types of line endings (\n, \r\n, \r)
need to be readable [cr][crlf][lf]."""
@ -97,7 +101,7 @@ class LineEndingTest(unittest.TestCase):
with source_util.create_modules(module_name) as mapping:
with open(mapping[module_name], 'wb') as file:
file.write(source)
loader = _bootstrap.SourceFileLoader(module_name,
loader = self.machinery.SourceFileLoader(module_name,
mapping[module_name])
return loader.load_module(module_name)
@ -113,11 +117,9 @@ class LineEndingTest(unittest.TestCase):
def test_lf(self):
self.run_test(b'\n')
Frozen_LineEndings, Source_LineEndings = util.test_both(LineEndingTest, machinery=machinery)
def test_main():
from test.support import run_unittest
run_unittest(EncodingTest, LineEndingTest)
if __name__ == '__main__':
test_main()
unittest.main()

View File

@ -11,6 +11,7 @@ import errno
import tempfile
import time
import re
import selectors
import sysconfig
import warnings
import select
@ -2179,15 +2180,16 @@ class CommandTests(unittest.TestCase):
os.rmdir(dir)
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
"poll system call not supported")
@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
"Test needs selectors.PollSelector")
class ProcessTestCaseNoPoll(ProcessTestCase):
def setUp(self):
subprocess._has_poll = False
self.orig_selector = subprocess._PopenSelector
subprocess._PopenSelector = selectors.SelectSelector
ProcessTestCase.setUp(self)
def tearDown(self):
subprocess._has_poll = True
subprocess._PopenSelector = self.orig_selector
ProcessTestCase.tearDown(self)