gh-98040: Move the Single-Phase Init Tests Out of test_imp (gh-102561)

I recently added some tests to test_imp, but @warsaw is removing that file in gh-98573. The tests are worth keeping so here I'm moving them to test_import.
This commit is contained in:
Eric Snow 2023-04-19 16:09:35 -06:00 committed by GitHub
parent 2b1260c557
commit 6be7aee18c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 865 additions and 843 deletions

View File

@ -1,5 +1,4 @@
import gc
import json
import importlib
import importlib.util
import os
@ -11,28 +10,15 @@ from test.support import import_helper
from test.support import os_helper
from test.support import script_helper
from test.support import warnings_helper
import textwrap
import types
import unittest
import warnings
imp = warnings_helper.import_deprecated('imp')
import _imp
import _testinternalcapi
try:
import _xxsubinterpreters as _interpreters
except ModuleNotFoundError:
_interpreters = None
OS_PATH_NAME = os.path.__name__
def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(_interpreters is None,
'subinterpreters required')(meth)
def requires_load_dynamic(meth):
"""Decorator to skip a test if not running under CPython or lacking
imp.load_dynamic()."""
@ -41,169 +27,6 @@ def requires_load_dynamic(meth):
'imp.load_dynamic() required')(meth)
class ModuleSnapshot(types.SimpleNamespace):
"""A representation of a module for testing.
Fields:
* id - the module's object ID
* module - the actual module or an adequate substitute
* __file__
* __spec__
* name
* origin
* ns - a copy (dict) of the module's __dict__ (or None)
* ns_id - the object ID of the module's __dict__
* cached - the sys.modules[mod.__spec__.name] entry (or None)
* cached_id - the object ID of the sys.modules entry (or None)
In cases where the value is not available (e.g. due to serialization),
the value will be None.
"""
_fields = tuple('id module ns ns_id cached cached_id'.split())
@classmethod
def from_module(cls, mod):
name = mod.__spec__.name
cached = sys.modules.get(name)
return cls(
id=id(mod),
module=mod,
ns=types.SimpleNamespace(**mod.__dict__),
ns_id=id(mod.__dict__),
cached=cached,
cached_id=id(cached),
)
SCRIPT = textwrap.dedent('''
{imports}
name = {name!r}
{prescript}
mod = {name}
{body}
{postscript}
''')
IMPORTS = textwrap.dedent('''
import sys
''').strip()
SCRIPT_BODY = textwrap.dedent('''
# Capture the snapshot data.
cached = sys.modules.get(name)
snapshot = dict(
id=id(mod),
module=dict(
__file__=mod.__file__,
__spec__=dict(
name=mod.__spec__.name,
origin=mod.__spec__.origin,
),
),
ns=None,
ns_id=id(mod.__dict__),
cached=None,
cached_id=id(cached) if cached else None,
)
''').strip()
CLEANUP_SCRIPT = textwrap.dedent('''
# Clean up the module.
sys.modules.pop(name, None)
''').strip()
@classmethod
def build_script(cls, name, *,
prescript=None,
import_first=False,
postscript=None,
postcleanup=False,
):
if postcleanup is True:
postcleanup = cls.CLEANUP_SCRIPT
elif isinstance(postcleanup, str):
postcleanup = textwrap.dedent(postcleanup).strip()
postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup
else:
postcleanup = ''
prescript = textwrap.dedent(prescript).strip() if prescript else ''
postscript = textwrap.dedent(postscript).strip() if postscript else ''
if postcleanup:
if postscript:
postscript = postscript + os.linesep * 2 + postcleanup
else:
postscript = postcleanup
if import_first:
prescript += textwrap.dedent(f'''
# Now import the module.
assert name not in sys.modules
import {name}''')
return cls.SCRIPT.format(
imports=cls.IMPORTS.strip(),
name=name,
prescript=prescript.strip(),
body=cls.SCRIPT_BODY.strip(),
postscript=postscript,
)
@classmethod
def parse(cls, text):
raw = json.loads(text)
mod = raw['module']
mod['__spec__'] = types.SimpleNamespace(**mod['__spec__'])
raw['module'] = types.SimpleNamespace(**mod)
return cls(**raw)
@classmethod
def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds):
if pipe is not None:
return cls._from_subinterp(name, interpid, pipe, script_kwds)
pipe = os.pipe()
try:
return cls._from_subinterp(name, interpid, pipe, script_kwds)
finally:
r, w = pipe
os.close(r)
os.close(w)
@classmethod
def _from_subinterp(cls, name, interpid, pipe, script_kwargs):
r, w = pipe
# Build the script.
postscript = textwrap.dedent(f'''
# Send the result over the pipe.
import json
import os
os.write({w}, json.dumps(snapshot).encode())
''')
_postscript = script_kwargs.get('postscript')
if _postscript:
_postscript = textwrap.dedent(_postscript).lstrip()
postscript += _postscript
script_kwargs['postscript'] = postscript.strip()
script = cls.build_script(name, **script_kwargs)
# Run the script.
if interpid is None:
ret = support.run_in_subinterp(script)
if ret != 0:
raise AssertionError(f'{ret} != 0')
else:
_interpreters.run_string(interpid, script)
# Parse the results.
text = os.read(r, 1000)
return cls.parse(text.decode())
class LockTests(unittest.TestCase):
"""Very basic test of import lock functions."""
@ -620,669 +443,6 @@ class ImportTests(unittest.TestCase):
check_get_builtins()
class TestSinglePhaseSnapshot(ModuleSnapshot):
@classmethod
def from_module(cls, mod):
self = super().from_module(mod)
self.summed = mod.sum(1, 2)
self.lookedup = mod.look_up_self()
self.lookedup_id = id(self.lookedup)
self.state_initialized = mod.state_initialized()
if hasattr(mod, 'initialized_count'):
self.init_count = mod.initialized_count()
return self
SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(f'''
snapshot['module'].update(dict(
int_const=mod.int_const,
str_const=mod.str_const,
_module_initialized=mod._module_initialized,
))
snapshot.update(dict(
summed=mod.sum(1, 2),
lookedup_id=id(mod.look_up_self()),
state_initialized=mod.state_initialized(),
init_count=mod.initialized_count(),
has_spam=hasattr(mod, 'spam'),
spam=getattr(mod, 'spam', None),
))
''').rstrip()
@classmethod
def parse(cls, text):
self = super().parse(text)
if not self.has_spam:
del self.spam
del self.has_spam
return self
@requires_load_dynamic
class SinglephaseInitTests(unittest.TestCase):
NAME = '_testsinglephase'
@classmethod
def setUpClass(cls):
if '-R' in sys.argv or '--huntrleaks' in sys.argv:
# https://github.com/python/cpython/issues/102251
raise unittest.SkipTest('unresolved refleaks (see gh-102251)')
fileobj, filename, _ = imp.find_module(cls.NAME)
fileobj.close()
cls.FILE = filename
# Start fresh.
cls.clean_up()
def tearDown(self):
# Clean up the module.
self.clean_up()
@classmethod
def clean_up(cls):
name = cls.NAME
filename = cls.FILE
if name in sys.modules:
if hasattr(sys.modules[name], '_clear_globals'):
assert sys.modules[name].__file__ == filename
sys.modules[name]._clear_globals()
del sys.modules[name]
# Clear all internally cached data for the extension.
_testinternalcapi.clear_extension(name, filename)
#########################
# helpers
def add_module_cleanup(self, name):
def clean_up():
# Clear all internally cached data for the extension.
_testinternalcapi.clear_extension(name, self.FILE)
self.addCleanup(clean_up)
def load(self, name):
try:
already_loaded = self.already_loaded
except AttributeError:
already_loaded = self.already_loaded = {}
assert name not in already_loaded
mod = imp.load_dynamic(name, self.FILE)
self.assertNotIn(mod, already_loaded.values())
already_loaded[name] = mod
return types.SimpleNamespace(
name=name,
module=mod,
snapshot=TestSinglePhaseSnapshot.from_module(mod),
)
def re_load(self, name, mod):
assert sys.modules[name] is mod
assert mod.__dict__ == mod.__dict__
reloaded = imp.load_dynamic(name, self.FILE)
return types.SimpleNamespace(
name=name,
module=reloaded,
snapshot=TestSinglePhaseSnapshot.from_module(reloaded),
)
# subinterpreters
def add_subinterpreter(self):
interpid = _interpreters.create(isolated=False)
_interpreters.run_string(interpid, textwrap.dedent('''
import sys
import _testinternalcapi
'''))
def clean_up():
_interpreters.run_string(interpid, textwrap.dedent(f'''
name = {self.NAME!r}
if name in sys.modules:
sys.modules[name]._clear_globals()
_testinternalcapi.clear_extension(name, {self.FILE!r})
'''))
_interpreters.destroy(interpid)
self.addCleanup(clean_up)
return interpid
def import_in_subinterp(self, interpid=None, *,
postscript=None,
postcleanup=False,
):
name = self.NAME
if postcleanup:
import_ = 'import _testinternalcapi' if interpid is None else ''
postcleanup = f'''
{import_}
mod._clear_globals()
_testinternalcapi.clear_extension(name, {self.FILE!r})
'''
try:
pipe = self._pipe
except AttributeError:
r, w = pipe = self._pipe = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
snapshot = TestSinglePhaseSnapshot.from_subinterp(
name,
interpid,
pipe=pipe,
import_first=True,
postscript=postscript,
postcleanup=postcleanup,
)
return types.SimpleNamespace(
name=name,
module=None,
snapshot=snapshot,
)
# checks
def check_common(self, loaded):
isolated = False
mod = loaded.module
if not mod:
# It came from a subinterpreter.
isolated = True
mod = loaded.snapshot.module
# mod.__name__ might not match, but the spec will.
self.assertEqual(mod.__spec__.name, loaded.name)
self.assertEqual(mod.__file__, self.FILE)
self.assertEqual(mod.__spec__.origin, self.FILE)
if not isolated:
self.assertTrue(issubclass(mod.error, Exception))
self.assertEqual(mod.int_const, 1969)
self.assertEqual(mod.str_const, 'something different')
self.assertIsInstance(mod._module_initialized, float)
self.assertGreater(mod._module_initialized, 0)
snap = loaded.snapshot
self.assertEqual(snap.summed, 3)
if snap.state_initialized is not None:
self.assertIsInstance(snap.state_initialized, float)
self.assertGreater(snap.state_initialized, 0)
if isolated:
# The "looked up" module is interpreter-specific
# (interp->imports.modules_by_index was set for the module).
self.assertEqual(snap.lookedup_id, snap.id)
self.assertEqual(snap.cached_id, snap.id)
with self.assertRaises(AttributeError):
snap.spam
else:
self.assertIs(snap.lookedup, mod)
self.assertIs(snap.cached, mod)
def check_direct(self, loaded):
# The module has its own PyModuleDef, with a matching name.
self.assertEqual(loaded.module.__name__, loaded.name)
self.assertIs(loaded.snapshot.lookedup, loaded.module)
def check_indirect(self, loaded, orig):
# The module re-uses another's PyModuleDef, with a different name.
assert orig is not loaded.module
assert orig.__name__ != loaded.name
self.assertNotEqual(loaded.module.__name__, loaded.name)
self.assertIs(loaded.snapshot.lookedup, loaded.module)
def check_basic(self, loaded, expected_init_count):
# m_size == -1
# The module loads fresh the first time and copies m_copy after.
snap = loaded.snapshot
self.assertIsNot(snap.state_initialized, None)
self.assertIsInstance(snap.init_count, int)
self.assertGreater(snap.init_count, 0)
self.assertEqual(snap.init_count, expected_init_count)
def check_with_reinit(self, loaded):
# m_size >= 0
# The module loads fresh every time.
pass
def check_fresh(self, loaded):
"""
The module had not been loaded before (at least since fully reset).
"""
snap = loaded.snapshot
# The module's init func was run.
# A copy of the module's __dict__ was stored in def->m_base.m_copy.
# The previous m_copy was deleted first.
# _PyRuntime.imports.extensions was set.
self.assertEqual(snap.init_count, 1)
# The global state was initialized.
# The module attrs were initialized from that state.
self.assertEqual(snap.module._module_initialized,
snap.state_initialized)
def check_semi_fresh(self, loaded, base, prev):
"""
The module had been loaded before and then reset
(but the module global state wasn't).
"""
snap = loaded.snapshot
# The module's init func was run again.
# A copy of the module's __dict__ was stored in def->m_base.m_copy.
# The previous m_copy was deleted first.
# The module globals did not get reset.
self.assertNotEqual(snap.id, base.snapshot.id)
self.assertNotEqual(snap.id, prev.snapshot.id)
self.assertEqual(snap.init_count, prev.snapshot.init_count + 1)
# The global state was updated.
# The module attrs were initialized from that state.
self.assertEqual(snap.module._module_initialized,
snap.state_initialized)
self.assertNotEqual(snap.state_initialized,
base.snapshot.state_initialized)
self.assertNotEqual(snap.state_initialized,
prev.snapshot.state_initialized)
def check_copied(self, loaded, base):
"""
The module had been loaded before and never reset.
"""
snap = loaded.snapshot
# The module's init func was not run again.
# The interpreter copied m_copy, as set by the other interpreter,
# with objects owned by the other interpreter.
# The module globals did not get reset.
self.assertNotEqual(snap.id, base.snapshot.id)
self.assertEqual(snap.init_count, base.snapshot.init_count)
# The global state was not updated since the init func did not run.
# The module attrs were not directly initialized from that state.
# The state and module attrs still match the previous loading.
self.assertEqual(snap.module._module_initialized,
snap.state_initialized)
self.assertEqual(snap.state_initialized,
base.snapshot.state_initialized)
#########################
# the tests
def test_cleared_globals(self):
loaded = self.load(self.NAME)
_testsinglephase = loaded.module
init_before = _testsinglephase.state_initialized()
_testsinglephase._clear_globals()
init_after = _testsinglephase.state_initialized()
init_count = _testsinglephase.initialized_count()
self.assertGreater(init_before, 0)
self.assertEqual(init_after, 0)
self.assertEqual(init_count, -1)
def test_variants(self):
# Exercise the most meaningful variants described in Python/import.c.
self.maxDiff = None
# Check the "basic" module.
name = self.NAME
expected_init_count = 1
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.check_direct(loaded)
self.check_basic(loaded, expected_init_count)
basic = loaded.module
# Check its indirect variants.
name = f'{self.NAME}_basic_wrapper'
self.add_module_cleanup(name)
expected_init_count += 1
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.check_indirect(loaded, basic)
self.check_basic(loaded, expected_init_count)
# Currently PyState_AddModule() always replaces the cached module.
self.assertIs(basic.look_up_self(), loaded.module)
self.assertEqual(basic.initialized_count(), expected_init_count)
# The cached module shouldn't change after this point.
basic_lookedup = loaded.module
# Check its direct variant.
name = f'{self.NAME}_basic_copy'
self.add_module_cleanup(name)
expected_init_count += 1
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.check_direct(loaded)
self.check_basic(loaded, expected_init_count)
# This should change the cached module for _testsinglephase.
self.assertIs(basic.look_up_self(), basic_lookedup)
self.assertEqual(basic.initialized_count(), expected_init_count)
# Check the non-basic variant that has no state.
name = f'{self.NAME}_with_reinit'
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.assertIs(loaded.snapshot.state_initialized, None)
self.check_direct(loaded)
self.check_with_reinit(loaded)
# This should change the cached module for _testsinglephase.
self.assertIs(basic.look_up_self(), basic_lookedup)
self.assertEqual(basic.initialized_count(), expected_init_count)
# Check the basic variant that has state.
name = f'{self.NAME}_with_state'
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.assertIsNot(loaded.snapshot.state_initialized, None)
self.check_direct(loaded)
self.check_with_reinit(loaded)
# This should change the cached module for _testsinglephase.
self.assertIs(basic.look_up_self(), basic_lookedup)
self.assertEqual(basic.initialized_count(), expected_init_count)
def test_basic_reloaded(self):
# m_copy is copied into the existing module object.
# Global state is not changed.
self.maxDiff = None
for name in [
self.NAME, # the "basic" module
f'{self.NAME}_basic_wrapper', # the indirect variant
f'{self.NAME}_basic_copy', # the direct variant
]:
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
reloaded = self.re_load(name, loaded.module)
self.check_common(loaded)
self.check_common(reloaded)
# Make sure the original __dict__ did not get replaced.
self.assertEqual(id(loaded.module.__dict__),
loaded.snapshot.ns_id)
self.assertEqual(loaded.snapshot.ns.__dict__,
loaded.module.__dict__)
self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
self.assertEqual(reloaded.module.__name__,
reloaded.snapshot.ns.__name__)
self.assertIs(reloaded.module, loaded.module)
self.assertIs(reloaded.module.__dict__, loaded.module.__dict__)
# It only happens to be the same but that's good enough here.
# We really just want to verify that the re-loaded attrs
# didn't change.
self.assertIs(reloaded.snapshot.lookedup,
loaded.snapshot.lookedup)
self.assertEqual(reloaded.snapshot.state_initialized,
loaded.snapshot.state_initialized)
self.assertEqual(reloaded.snapshot.init_count,
loaded.snapshot.init_count)
self.assertIs(reloaded.snapshot.cached, reloaded.module)
def test_with_reinit_reloaded(self):
# The module's m_init func is run again.
self.maxDiff = None
# Keep a reference around.
basic = self.load(self.NAME)
for name in [
f'{self.NAME}_with_reinit', # m_size == 0
f'{self.NAME}_with_state', # m_size > 0
]:
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
reloaded = self.re_load(name, loaded.module)
self.check_common(loaded)
self.check_common(reloaded)
# Make sure the original __dict__ did not get replaced.
self.assertEqual(id(loaded.module.__dict__),
loaded.snapshot.ns_id)
self.assertEqual(loaded.snapshot.ns.__dict__,
loaded.module.__dict__)
self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
self.assertEqual(reloaded.module.__name__,
reloaded.snapshot.ns.__name__)
self.assertIsNot(reloaded.module, loaded.module)
self.assertNotEqual(reloaded.module.__dict__,
loaded.module.__dict__)
self.assertIs(reloaded.snapshot.lookedup, reloaded.module)
if loaded.snapshot.state_initialized is None:
self.assertIs(reloaded.snapshot.state_initialized, None)
else:
self.assertGreater(reloaded.snapshot.state_initialized,
loaded.snapshot.state_initialized)
self.assertIs(reloaded.snapshot.cached, reloaded.module)
# Currently, for every single-phrase init module loaded
# in multiple interpreters, those interpreters share a
# PyModuleDef for that object, which can be a problem.
# Also, we test with a single-phase module that has global state,
# which is shared by all interpreters.
@requires_subinterpreters
def test_basic_multiple_interpreters_main_no_reset(self):
# without resetting; already loaded in main interpreter
# At this point:
# * alive in 0 interpreters
# * module def may or may not be loaded already
# * module def not in _PyRuntime.imports.extensions
# * mod init func has not run yet (since reset, at least)
# * m_copy not set (hasn't been loaded yet or already cleared)
# * module's global state has not been initialized yet
# (or already cleared)
main_loaded = self.load(self.NAME)
_testsinglephase = main_loaded.module
# Attrs set after loading are not in m_copy.
_testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam'
self.check_common(main_loaded)
self.check_fresh(main_loaded)
interpid1 = self.add_subinterpreter()
interpid2 = self.add_subinterpreter()
# At this point:
# * alive in 1 interpreter (main)
# * module def in _PyRuntime.imports.extensions
# * mod init func ran for the first time (since reset, at least)
# * m_copy was copied from the main interpreter (was NULL)
# * module's global state was initialized
# Use an interpreter that gets destroyed right away.
loaded = self.import_in_subinterp()
self.check_common(loaded)
self.check_copied(loaded, main_loaded)
# At this point:
# * alive in 1 interpreter (main)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy is NULL (claered when the interpreter was destroyed)
# (was from main interpreter)
# * module's global state was updated, not reset
# Use a subinterpreter that sticks around.
loaded = self.import_in_subinterp(interpid1)
self.check_common(loaded)
self.check_copied(loaded, main_loaded)
# At this point:
# * alive in 2 interpreters (main, interp1)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp1
# * module's global state was updated, not reset
# Use a subinterpreter while the previous one is still alive.
loaded = self.import_in_subinterp(interpid2)
self.check_common(loaded)
self.check_copied(loaded, main_loaded)
# At this point:
# * alive in 3 interpreters (main, interp1, interp2)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp2 (was from interp1)
# * module's global state was updated, not reset
@requires_subinterpreters
def test_basic_multiple_interpreters_deleted_no_reset(self):
# without resetting; already loaded in a deleted interpreter
# At this point:
# * alive in 0 interpreters
# * module def may or may not be loaded already
# * module def not in _PyRuntime.imports.extensions
# * mod init func has not run yet (since reset, at least)
# * m_copy not set (hasn't been loaded yet or already cleared)
# * module's global state has not been initialized yet
# (or already cleared)
interpid1 = self.add_subinterpreter()
interpid2 = self.add_subinterpreter()
# First, load in the main interpreter but then completely clear it.
loaded_main = self.load(self.NAME)
loaded_main.module._clear_globals()
_testinternalcapi.clear_extension(self.NAME, self.FILE)
# At this point:
# * alive in 0 interpreters
# * module def loaded already
# * module def was in _PyRuntime.imports.extensions, but cleared
# * mod init func ran for the first time (since reset, at least)
# * m_copy was set, but cleared (was NULL)
# * module's global state was initialized but cleared
# Start with an interpreter that gets destroyed right away.
base = self.import_in_subinterp(postscript='''
# Attrs set after loading are not in m_copy.
mod.spam = 'spam, spam, mash, spam, eggs, and spam'
''')
self.check_common(base)
self.check_fresh(base)
# At this point:
# * alive in 0 interpreters
# * module def in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy is NULL (claered when the interpreter was destroyed)
# * module's global state was initialized, not reset
# Use a subinterpreter that sticks around.
loaded_interp1 = self.import_in_subinterp(interpid1)
self.check_common(loaded_interp1)
self.check_semi_fresh(loaded_interp1, loaded_main, base)
# At this point:
# * alive in 1 interpreter (interp1)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp1 (was NULL)
# * module's global state was updated, not reset
# Use a subinterpreter while the previous one is still alive.
loaded_interp2 = self.import_in_subinterp(interpid2)
self.check_common(loaded_interp2)
self.check_copied(loaded_interp2, loaded_interp1)
# At this point:
# * alive in 2 interpreters (interp1, interp2)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp2 (was from interp1)
# * module's global state was updated, not reset
@requires_subinterpreters
@requires_load_dynamic
def test_basic_multiple_interpreters_reset_each(self):
# resetting between each interpreter
# At this point:
# * alive in 0 interpreters
# * module def may or may not be loaded already
# * module def not in _PyRuntime.imports.extensions
# * mod init func has not run yet (since reset, at least)
# * m_copy not set (hasn't been loaded yet or already cleared)
# * module's global state has not been initialized yet
# (or already cleared)
interpid1 = self.add_subinterpreter()
interpid2 = self.add_subinterpreter()
# Use an interpreter that gets destroyed right away.
loaded = self.import_in_subinterp(
postscript='''
# Attrs set after loading are not in m_copy.
mod.spam = 'spam, spam, mash, spam, eggs, and spam'
''',
postcleanup=True,
)
self.check_common(loaded)
self.check_fresh(loaded)
# At this point:
# * alive in 0 interpreters
# * module def in _PyRuntime.imports.extensions
# * mod init func ran for the first time (since reset, at least)
# * m_copy is NULL (claered when the interpreter was destroyed)
# * module's global state was initialized, not reset
# Use a subinterpreter that sticks around.
loaded = self.import_in_subinterp(interpid1, postcleanup=True)
self.check_common(loaded)
self.check_fresh(loaded)
# At this point:
# * alive in 1 interpreter (interp1)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp1 (was NULL)
# * module's global state was initialized, not reset
# Use a subinterpreter while the previous one is still alive.
loaded = self.import_in_subinterp(interpid2, postcleanup=True)
self.check_common(loaded)
self.check_fresh(loaded)
# At this point:
# * alive in 2 interpreters (interp2, interp2)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp2 (was from interp1)
# * module's global state was initialized, not reset
class ReloadTests(unittest.TestCase):
"""Very basic tests to make sure that imp.reload() operates just like

View File

@ -2,6 +2,7 @@ import builtins
import contextlib
import errno
import glob
import json
import importlib.util
from importlib._bootstrap_external import _get_sourcefile
from importlib.machinery import (
@ -18,13 +19,15 @@ import sys
import textwrap
import threading
import time
import types
import unittest
from unittest import mock
import _testinternalcapi
from test.support import os_helper
from test.support import (
STDLIB_DIR, swap_attr, swap_item, cpython_only, is_emscripten,
is_wasi, run_in_subinterp_with_config)
is_wasi, run_in_subinterp, run_in_subinterp_with_config)
from test.support.import_helper import (
forget, make_legacy_pyc, unlink, unload, DirsOnSysPath, CleanImport)
from test.support.os_helper import (
@ -41,6 +44,10 @@ try:
import _testmultiphase
except ImportError:
_testmultiphase = None
try:
import _xxsubinterpreters as _interpreters
except ModuleNotFoundError:
_interpreters = None
skip_if_dont_write_bytecode = unittest.skipIf(
@ -120,6 +127,182 @@ def _ready_to_import(name=None, source=""):
del sys.modules[name]
def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(_interpreters is None,
'subinterpreters required')(meth)
def requires_singlephase_init(meth):
"""Decorator to skip if single-phase init modules are not supported."""
meth = cpython_only(meth)
return unittest.skipIf(_testsinglephase is None,
'test requires _testsinglephase module')(meth)
class ModuleSnapshot(types.SimpleNamespace):
"""A representation of a module for testing.
Fields:
* id - the module's object ID
* module - the actual module or an adequate substitute
* __file__
* __spec__
* name
* origin
* ns - a copy (dict) of the module's __dict__ (or None)
* ns_id - the object ID of the module's __dict__
* cached - the sys.modules[mod.__spec__.name] entry (or None)
* cached_id - the object ID of the sys.modules entry (or None)
In cases where the value is not available (e.g. due to serialization),
the value will be None.
"""
_fields = tuple('id module ns ns_id cached cached_id'.split())
@classmethod
def from_module(cls, mod):
name = mod.__spec__.name
cached = sys.modules.get(name)
return cls(
id=id(mod),
module=mod,
ns=types.SimpleNamespace(**mod.__dict__),
ns_id=id(mod.__dict__),
cached=cached,
cached_id=id(cached),
)
SCRIPT = textwrap.dedent('''
{imports}
name = {name!r}
{prescript}
mod = {name}
{body}
{postscript}
''')
IMPORTS = textwrap.dedent('''
import sys
''').strip()
SCRIPT_BODY = textwrap.dedent('''
# Capture the snapshot data.
cached = sys.modules.get(name)
snapshot = dict(
id=id(mod),
module=dict(
__file__=mod.__file__,
__spec__=dict(
name=mod.__spec__.name,
origin=mod.__spec__.origin,
),
),
ns=None,
ns_id=id(mod.__dict__),
cached=None,
cached_id=id(cached) if cached else None,
)
''').strip()
CLEANUP_SCRIPT = textwrap.dedent('''
# Clean up the module.
sys.modules.pop(name, None)
''').strip()
@classmethod
def build_script(cls, name, *,
prescript=None,
import_first=False,
postscript=None,
postcleanup=False,
):
if postcleanup is True:
postcleanup = cls.CLEANUP_SCRIPT
elif isinstance(postcleanup, str):
postcleanup = textwrap.dedent(postcleanup).strip()
postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup
else:
postcleanup = ''
prescript = textwrap.dedent(prescript).strip() if prescript else ''
postscript = textwrap.dedent(postscript).strip() if postscript else ''
if postcleanup:
if postscript:
postscript = postscript + os.linesep * 2 + postcleanup
else:
postscript = postcleanup
if import_first:
prescript += textwrap.dedent(f'''
# Now import the module.
assert name not in sys.modules
import {name}''')
return cls.SCRIPT.format(
imports=cls.IMPORTS.strip(),
name=name,
prescript=prescript.strip(),
body=cls.SCRIPT_BODY.strip(),
postscript=postscript,
)
@classmethod
def parse(cls, text):
raw = json.loads(text)
mod = raw['module']
mod['__spec__'] = types.SimpleNamespace(**mod['__spec__'])
raw['module'] = types.SimpleNamespace(**mod)
return cls(**raw)
@classmethod
def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds):
if pipe is not None:
return cls._from_subinterp(name, interpid, pipe, script_kwds)
pipe = os.pipe()
try:
return cls._from_subinterp(name, interpid, pipe, script_kwds)
finally:
r, w = pipe
os.close(r)
os.close(w)
@classmethod
def _from_subinterp(cls, name, interpid, pipe, script_kwargs):
r, w = pipe
# Build the script.
postscript = textwrap.dedent(f'''
# Send the result over the pipe.
import json
import os
os.write({w}, json.dumps(snapshot).encode())
''')
_postscript = script_kwargs.get('postscript')
if _postscript:
_postscript = textwrap.dedent(_postscript).lstrip()
postscript += _postscript
script_kwargs['postscript'] = postscript.strip()
script = cls.build_script(name, **script_kwargs)
# Run the script.
if interpid is None:
ret = run_in_subinterp(script)
if ret != 0:
raise AssertionError(f'{ret} != 0')
else:
_interpreters.run_string(interpid, script)
# Parse the results.
text = os.read(r, 1000)
return cls.parse(text.decode())
class ImportTests(unittest.TestCase):
def setUp(self):
@ -1604,7 +1787,7 @@ class SubinterpImportTests(unittest.TestCase):
with self.subTest(f'{module}: strict, not fresh'):
self.check_compatible_here(module, strict=True)
@unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
@requires_singlephase_init
def test_single_init_extension_compat(self):
module = '_testsinglephase'
require_extension(module)
@ -1636,7 +1819,7 @@ class SubinterpImportTests(unittest.TestCase):
with self.subTest(f'{module}: strict, fresh'):
self.check_compatible_fresh(module, strict=True)
@unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
@requires_singlephase_init
def test_singlephase_check_with_setting_and_override(self):
module = '_testsinglephase'
require_extension(module)
@ -1672,6 +1855,685 @@ class SubinterpImportTests(unittest.TestCase):
check_compatible(False, -1)
class TestSinglePhaseSnapshot(ModuleSnapshot):
@classmethod
def from_module(cls, mod):
self = super().from_module(mod)
self.summed = mod.sum(1, 2)
self.lookedup = mod.look_up_self()
self.lookedup_id = id(self.lookedup)
self.state_initialized = mod.state_initialized()
if hasattr(mod, 'initialized_count'):
self.init_count = mod.initialized_count()
return self
SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(f'''
snapshot['module'].update(dict(
int_const=mod.int_const,
str_const=mod.str_const,
_module_initialized=mod._module_initialized,
))
snapshot.update(dict(
summed=mod.sum(1, 2),
lookedup_id=id(mod.look_up_self()),
state_initialized=mod.state_initialized(),
init_count=mod.initialized_count(),
has_spam=hasattr(mod, 'spam'),
spam=getattr(mod, 'spam', None),
))
''').rstrip()
@classmethod
def parse(cls, text):
self = super().parse(text)
if not self.has_spam:
del self.spam
del self.has_spam
return self
@requires_singlephase_init
class SinglephaseInitTests(unittest.TestCase):
NAME = '_testsinglephase'
@classmethod
def setUpClass(cls):
if '-R' in sys.argv or '--huntrleaks' in sys.argv:
# https://github.com/python/cpython/issues/102251
raise unittest.SkipTest('unresolved refleaks (see gh-102251)')
spec = importlib.util.find_spec(cls.NAME)
from importlib.machinery import ExtensionFileLoader
cls.FILE = spec.origin
cls.LOADER = type(spec.loader)
assert cls.LOADER is ExtensionFileLoader
# Start fresh.
cls.clean_up()
def tearDown(self):
# Clean up the module.
self.clean_up()
@classmethod
def clean_up(cls):
name = cls.NAME
filename = cls.FILE
if name in sys.modules:
if hasattr(sys.modules[name], '_clear_globals'):
assert sys.modules[name].__file__ == filename
sys.modules[name]._clear_globals()
del sys.modules[name]
# Clear all internally cached data for the extension.
_testinternalcapi.clear_extension(name, filename)
#########################
# helpers
def add_module_cleanup(self, name):
def clean_up():
# Clear all internally cached data for the extension.
_testinternalcapi.clear_extension(name, self.FILE)
self.addCleanup(clean_up)
def _load_dynamic(self, name, path):
"""
Load an extension module.
"""
# This is essentially copied from the old imp module.
from importlib._bootstrap import _load
loader = self.LOADER(name, path)
# Issue bpo-24748: Skip the sys.modules check in _load_module_shim;
# always load new extension.
spec = importlib.util.spec_from_file_location(name, path,
loader=loader)
return _load(spec)
def load(self, name):
try:
already_loaded = self.already_loaded
except AttributeError:
already_loaded = self.already_loaded = {}
assert name not in already_loaded
mod = self._load_dynamic(name, self.FILE)
self.assertNotIn(mod, already_loaded.values())
already_loaded[name] = mod
return types.SimpleNamespace(
name=name,
module=mod,
snapshot=TestSinglePhaseSnapshot.from_module(mod),
)
def re_load(self, name, mod):
assert sys.modules[name] is mod
assert mod.__dict__ == mod.__dict__
reloaded = self._load_dynamic(name, self.FILE)
return types.SimpleNamespace(
name=name,
module=reloaded,
snapshot=TestSinglePhaseSnapshot.from_module(reloaded),
)
# subinterpreters
def add_subinterpreter(self):
interpid = _interpreters.create(isolated=False)
_interpreters.run_string(interpid, textwrap.dedent('''
import sys
import _testinternalcapi
'''))
def clean_up():
_interpreters.run_string(interpid, textwrap.dedent(f'''
name = {self.NAME!r}
if name in sys.modules:
sys.modules[name]._clear_globals()
_testinternalcapi.clear_extension(name, {self.FILE!r})
'''))
_interpreters.destroy(interpid)
self.addCleanup(clean_up)
return interpid
def import_in_subinterp(self, interpid=None, *,
postscript=None,
postcleanup=False,
):
name = self.NAME
if postcleanup:
import_ = 'import _testinternalcapi' if interpid is None else ''
postcleanup = f'''
{import_}
mod._clear_globals()
_testinternalcapi.clear_extension(name, {self.FILE!r})
'''
try:
pipe = self._pipe
except AttributeError:
r, w = pipe = self._pipe = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
snapshot = TestSinglePhaseSnapshot.from_subinterp(
name,
interpid,
pipe=pipe,
import_first=True,
postscript=postscript,
postcleanup=postcleanup,
)
return types.SimpleNamespace(
name=name,
module=None,
snapshot=snapshot,
)
# checks
def check_common(self, loaded):
isolated = False
mod = loaded.module
if not mod:
# It came from a subinterpreter.
isolated = True
mod = loaded.snapshot.module
# mod.__name__ might not match, but the spec will.
self.assertEqual(mod.__spec__.name, loaded.name)
self.assertEqual(mod.__file__, self.FILE)
self.assertEqual(mod.__spec__.origin, self.FILE)
if not isolated:
self.assertTrue(issubclass(mod.error, Exception))
self.assertEqual(mod.int_const, 1969)
self.assertEqual(mod.str_const, 'something different')
self.assertIsInstance(mod._module_initialized, float)
self.assertGreater(mod._module_initialized, 0)
snap = loaded.snapshot
self.assertEqual(snap.summed, 3)
if snap.state_initialized is not None:
self.assertIsInstance(snap.state_initialized, float)
self.assertGreater(snap.state_initialized, 0)
if isolated:
# The "looked up" module is interpreter-specific
# (interp->imports.modules_by_index was set for the module).
self.assertEqual(snap.lookedup_id, snap.id)
self.assertEqual(snap.cached_id, snap.id)
with self.assertRaises(AttributeError):
snap.spam
else:
self.assertIs(snap.lookedup, mod)
self.assertIs(snap.cached, mod)
def check_direct(self, loaded):
# The module has its own PyModuleDef, with a matching name.
self.assertEqual(loaded.module.__name__, loaded.name)
self.assertIs(loaded.snapshot.lookedup, loaded.module)
def check_indirect(self, loaded, orig):
# The module re-uses another's PyModuleDef, with a different name.
assert orig is not loaded.module
assert orig.__name__ != loaded.name
self.assertNotEqual(loaded.module.__name__, loaded.name)
self.assertIs(loaded.snapshot.lookedup, loaded.module)
def check_basic(self, loaded, expected_init_count):
# m_size == -1
# The module loads fresh the first time and copies m_copy after.
snap = loaded.snapshot
self.assertIsNot(snap.state_initialized, None)
self.assertIsInstance(snap.init_count, int)
self.assertGreater(snap.init_count, 0)
self.assertEqual(snap.init_count, expected_init_count)
def check_with_reinit(self, loaded):
# m_size >= 0
# The module loads fresh every time.
pass
def check_fresh(self, loaded):
"""
The module had not been loaded before (at least since fully reset).
"""
snap = loaded.snapshot
# The module's init func was run.
# A copy of the module's __dict__ was stored in def->m_base.m_copy.
# The previous m_copy was deleted first.
# _PyRuntime.imports.extensions was set.
self.assertEqual(snap.init_count, 1)
# The global state was initialized.
# The module attrs were initialized from that state.
self.assertEqual(snap.module._module_initialized,
snap.state_initialized)
def check_semi_fresh(self, loaded, base, prev):
"""
The module had been loaded before and then reset
(but the module global state wasn't).
"""
snap = loaded.snapshot
# The module's init func was run again.
# A copy of the module's __dict__ was stored in def->m_base.m_copy.
# The previous m_copy was deleted first.
# The module globals did not get reset.
self.assertNotEqual(snap.id, base.snapshot.id)
self.assertNotEqual(snap.id, prev.snapshot.id)
self.assertEqual(snap.init_count, prev.snapshot.init_count + 1)
# The global state was updated.
# The module attrs were initialized from that state.
self.assertEqual(snap.module._module_initialized,
snap.state_initialized)
self.assertNotEqual(snap.state_initialized,
base.snapshot.state_initialized)
self.assertNotEqual(snap.state_initialized,
prev.snapshot.state_initialized)
def check_copied(self, loaded, base):
"""
The module had been loaded before and never reset.
"""
snap = loaded.snapshot
# The module's init func was not run again.
# The interpreter copied m_copy, as set by the other interpreter,
# with objects owned by the other interpreter.
# The module globals did not get reset.
self.assertNotEqual(snap.id, base.snapshot.id)
self.assertEqual(snap.init_count, base.snapshot.init_count)
# The global state was not updated since the init func did not run.
# The module attrs were not directly initialized from that state.
# The state and module attrs still match the previous loading.
self.assertEqual(snap.module._module_initialized,
snap.state_initialized)
self.assertEqual(snap.state_initialized,
base.snapshot.state_initialized)
#########################
# the tests
def test_cleared_globals(self):
loaded = self.load(self.NAME)
_testsinglephase = loaded.module
init_before = _testsinglephase.state_initialized()
_testsinglephase._clear_globals()
init_after = _testsinglephase.state_initialized()
init_count = _testsinglephase.initialized_count()
self.assertGreater(init_before, 0)
self.assertEqual(init_after, 0)
self.assertEqual(init_count, -1)
def test_variants(self):
# Exercise the most meaningful variants described in Python/import.c.
self.maxDiff = None
# Check the "basic" module.
name = self.NAME
expected_init_count = 1
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.check_direct(loaded)
self.check_basic(loaded, expected_init_count)
basic = loaded.module
# Check its indirect variants.
name = f'{self.NAME}_basic_wrapper'
self.add_module_cleanup(name)
expected_init_count += 1
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.check_indirect(loaded, basic)
self.check_basic(loaded, expected_init_count)
# Currently PyState_AddModule() always replaces the cached module.
self.assertIs(basic.look_up_self(), loaded.module)
self.assertEqual(basic.initialized_count(), expected_init_count)
# The cached module shouldn't change after this point.
basic_lookedup = loaded.module
# Check its direct variant.
name = f'{self.NAME}_basic_copy'
self.add_module_cleanup(name)
expected_init_count += 1
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.check_direct(loaded)
self.check_basic(loaded, expected_init_count)
# This should change the cached module for _testsinglephase.
self.assertIs(basic.look_up_self(), basic_lookedup)
self.assertEqual(basic.initialized_count(), expected_init_count)
# Check the non-basic variant that has no state.
name = f'{self.NAME}_with_reinit'
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.assertIs(loaded.snapshot.state_initialized, None)
self.check_direct(loaded)
self.check_with_reinit(loaded)
# This should change the cached module for _testsinglephase.
self.assertIs(basic.look_up_self(), basic_lookedup)
self.assertEqual(basic.initialized_count(), expected_init_count)
# Check the basic variant that has state.
name = f'{self.NAME}_with_state'
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
self.check_common(loaded)
self.assertIsNot(loaded.snapshot.state_initialized, None)
self.check_direct(loaded)
self.check_with_reinit(loaded)
# This should change the cached module for _testsinglephase.
self.assertIs(basic.look_up_self(), basic_lookedup)
self.assertEqual(basic.initialized_count(), expected_init_count)
def test_basic_reloaded(self):
# m_copy is copied into the existing module object.
# Global state is not changed.
self.maxDiff = None
for name in [
self.NAME, # the "basic" module
f'{self.NAME}_basic_wrapper', # the indirect variant
f'{self.NAME}_basic_copy', # the direct variant
]:
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
reloaded = self.re_load(name, loaded.module)
self.check_common(loaded)
self.check_common(reloaded)
# Make sure the original __dict__ did not get replaced.
self.assertEqual(id(loaded.module.__dict__),
loaded.snapshot.ns_id)
self.assertEqual(loaded.snapshot.ns.__dict__,
loaded.module.__dict__)
self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
self.assertEqual(reloaded.module.__name__,
reloaded.snapshot.ns.__name__)
self.assertIs(reloaded.module, loaded.module)
self.assertIs(reloaded.module.__dict__, loaded.module.__dict__)
# It only happens to be the same but that's good enough here.
# We really just want to verify that the re-loaded attrs
# didn't change.
self.assertIs(reloaded.snapshot.lookedup,
loaded.snapshot.lookedup)
self.assertEqual(reloaded.snapshot.state_initialized,
loaded.snapshot.state_initialized)
self.assertEqual(reloaded.snapshot.init_count,
loaded.snapshot.init_count)
self.assertIs(reloaded.snapshot.cached, reloaded.module)
def test_with_reinit_reloaded(self):
# The module's m_init func is run again.
self.maxDiff = None
# Keep a reference around.
basic = self.load(self.NAME)
for name in [
f'{self.NAME}_with_reinit', # m_size == 0
f'{self.NAME}_with_state', # m_size > 0
]:
self.add_module_cleanup(name)
with self.subTest(name):
loaded = self.load(name)
reloaded = self.re_load(name, loaded.module)
self.check_common(loaded)
self.check_common(reloaded)
# Make sure the original __dict__ did not get replaced.
self.assertEqual(id(loaded.module.__dict__),
loaded.snapshot.ns_id)
self.assertEqual(loaded.snapshot.ns.__dict__,
loaded.module.__dict__)
self.assertEqual(reloaded.module.__spec__.name, reloaded.name)
self.assertEqual(reloaded.module.__name__,
reloaded.snapshot.ns.__name__)
self.assertIsNot(reloaded.module, loaded.module)
self.assertNotEqual(reloaded.module.__dict__,
loaded.module.__dict__)
self.assertIs(reloaded.snapshot.lookedup, reloaded.module)
if loaded.snapshot.state_initialized is None:
self.assertIs(reloaded.snapshot.state_initialized, None)
else:
self.assertGreater(reloaded.snapshot.state_initialized,
loaded.snapshot.state_initialized)
self.assertIs(reloaded.snapshot.cached, reloaded.module)
# Currently, for every single-phrase init module loaded
# in multiple interpreters, those interpreters share a
# PyModuleDef for that object, which can be a problem.
# Also, we test with a single-phase module that has global state,
# which is shared by all interpreters.
@requires_subinterpreters
def test_basic_multiple_interpreters_main_no_reset(self):
# without resetting; already loaded in main interpreter
# At this point:
# * alive in 0 interpreters
# * module def may or may not be loaded already
# * module def not in _PyRuntime.imports.extensions
# * mod init func has not run yet (since reset, at least)
# * m_copy not set (hasn't been loaded yet or already cleared)
# * module's global state has not been initialized yet
# (or already cleared)
main_loaded = self.load(self.NAME)
_testsinglephase = main_loaded.module
# Attrs set after loading are not in m_copy.
_testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam'
self.check_common(main_loaded)
self.check_fresh(main_loaded)
interpid1 = self.add_subinterpreter()
interpid2 = self.add_subinterpreter()
# At this point:
# * alive in 1 interpreter (main)
# * module def in _PyRuntime.imports.extensions
# * mod init func ran for the first time (since reset, at least)
# * m_copy was copied from the main interpreter (was NULL)
# * module's global state was initialized
# Use an interpreter that gets destroyed right away.
loaded = self.import_in_subinterp()
self.check_common(loaded)
self.check_copied(loaded, main_loaded)
# At this point:
# * alive in 1 interpreter (main)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy is NULL (claered when the interpreter was destroyed)
# (was from main interpreter)
# * module's global state was updated, not reset
# Use a subinterpreter that sticks around.
loaded = self.import_in_subinterp(interpid1)
self.check_common(loaded)
self.check_copied(loaded, main_loaded)
# At this point:
# * alive in 2 interpreters (main, interp1)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp1
# * module's global state was updated, not reset
# Use a subinterpreter while the previous one is still alive.
loaded = self.import_in_subinterp(interpid2)
self.check_common(loaded)
self.check_copied(loaded, main_loaded)
# At this point:
# * alive in 3 interpreters (main, interp1, interp2)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp2 (was from interp1)
# * module's global state was updated, not reset
@requires_subinterpreters
def test_basic_multiple_interpreters_deleted_no_reset(self):
# without resetting; already loaded in a deleted interpreter
# At this point:
# * alive in 0 interpreters
# * module def may or may not be loaded already
# * module def not in _PyRuntime.imports.extensions
# * mod init func has not run yet (since reset, at least)
# * m_copy not set (hasn't been loaded yet or already cleared)
# * module's global state has not been initialized yet
# (or already cleared)
interpid1 = self.add_subinterpreter()
interpid2 = self.add_subinterpreter()
# First, load in the main interpreter but then completely clear it.
loaded_main = self.load(self.NAME)
loaded_main.module._clear_globals()
_testinternalcapi.clear_extension(self.NAME, self.FILE)
# At this point:
# * alive in 0 interpreters
# * module def loaded already
# * module def was in _PyRuntime.imports.extensions, but cleared
# * mod init func ran for the first time (since reset, at least)
# * m_copy was set, but cleared (was NULL)
# * module's global state was initialized but cleared
# Start with an interpreter that gets destroyed right away.
base = self.import_in_subinterp(postscript='''
# Attrs set after loading are not in m_copy.
mod.spam = 'spam, spam, mash, spam, eggs, and spam'
''')
self.check_common(base)
self.check_fresh(base)
# At this point:
# * alive in 0 interpreters
# * module def in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy is NULL (claered when the interpreter was destroyed)
# * module's global state was initialized, not reset
# Use a subinterpreter that sticks around.
loaded_interp1 = self.import_in_subinterp(interpid1)
self.check_common(loaded_interp1)
self.check_semi_fresh(loaded_interp1, loaded_main, base)
# At this point:
# * alive in 1 interpreter (interp1)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp1 (was NULL)
# * module's global state was updated, not reset
# Use a subinterpreter while the previous one is still alive.
loaded_interp2 = self.import_in_subinterp(interpid2)
self.check_common(loaded_interp2)
self.check_copied(loaded_interp2, loaded_interp1)
# At this point:
# * alive in 2 interpreters (interp1, interp2)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp2 (was from interp1)
# * module's global state was updated, not reset
@requires_subinterpreters
def test_basic_multiple_interpreters_reset_each(self):
# resetting between each interpreter
# At this point:
# * alive in 0 interpreters
# * module def may or may not be loaded already
# * module def not in _PyRuntime.imports.extensions
# * mod init func has not run yet (since reset, at least)
# * m_copy not set (hasn't been loaded yet or already cleared)
# * module's global state has not been initialized yet
# (or already cleared)
interpid1 = self.add_subinterpreter()
interpid2 = self.add_subinterpreter()
# Use an interpreter that gets destroyed right away.
loaded = self.import_in_subinterp(
postscript='''
# Attrs set after loading are not in m_copy.
mod.spam = 'spam, spam, mash, spam, eggs, and spam'
''',
postcleanup=True,
)
self.check_common(loaded)
self.check_fresh(loaded)
# At this point:
# * alive in 0 interpreters
# * module def in _PyRuntime.imports.extensions
# * mod init func ran for the first time (since reset, at least)
# * m_copy is NULL (claered when the interpreter was destroyed)
# * module's global state was initialized, not reset
# Use a subinterpreter that sticks around.
loaded = self.import_in_subinterp(interpid1, postcleanup=True)
self.check_common(loaded)
self.check_fresh(loaded)
# At this point:
# * alive in 1 interpreter (interp1)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp1 (was NULL)
# * module's global state was initialized, not reset
# Use a subinterpreter while the previous one is still alive.
loaded = self.import_in_subinterp(interpid2, postcleanup=True)
self.check_common(loaded)
self.check_fresh(loaded)
# At this point:
# * alive in 2 interpreters (interp2, interp2)
# * module def still in _PyRuntime.imports.extensions
# * mod init func ran again
# * m_copy was copied from interp2 (was from interp1)
# * module's global state was initialized, not reset
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
unittest.main()