mirror of https://github.com/python/cpython
1130 lines
34 KiB
Python
1130 lines
34 KiB
Python
import contextlib
|
|
import json
|
|
import os
|
|
import os.path
|
|
import sys
|
|
import threading
|
|
from textwrap import dedent
|
|
import unittest
|
|
import time
|
|
|
|
from test import support
|
|
from test.support import import_helper
|
|
from test.support import threading_helper
|
|
from test.support import os_helper
|
|
_interpreters = import_helper.import_module('_xxsubinterpreters')
|
|
_channels = import_helper.import_module('_xxinterpchannels')
|
|
from test.support import interpreters
|
|
|
|
|
|
def _captured_script(script):
|
|
r, w = os.pipe()
|
|
indented = script.replace('\n', '\n ')
|
|
wrapped = dedent(f"""
|
|
import contextlib
|
|
with open({w}, 'w', encoding='utf-8') as spipe:
|
|
with contextlib.redirect_stdout(spipe):
|
|
{indented}
|
|
""")
|
|
return wrapped, open(r, encoding='utf-8')
|
|
|
|
|
|
def clean_up_interpreters():
|
|
for interp in interpreters.list_all():
|
|
if interp.id == 0: # main
|
|
continue
|
|
try:
|
|
interp.close()
|
|
except RuntimeError:
|
|
pass # already destroyed
|
|
|
|
|
|
def _run_output(interp, request, channels=None):
|
|
script, rpipe = _captured_script(request)
|
|
with rpipe:
|
|
interp.run(script, channels=channels)
|
|
return rpipe.read()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _running(interp):
|
|
r, w = os.pipe()
|
|
def run():
|
|
interp.run(dedent(f"""
|
|
# wait for "signal"
|
|
with open({r}) as rpipe:
|
|
rpipe.read()
|
|
"""))
|
|
|
|
t = threading.Thread(target=run)
|
|
t.start()
|
|
|
|
yield
|
|
|
|
with open(w, 'w') as spipe:
|
|
spipe.write('done')
|
|
t.join()
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
|
|
def pipe(self):
|
|
def ensure_closed(fd):
|
|
try:
|
|
os.close(fd)
|
|
except OSError:
|
|
pass
|
|
r, w = os.pipe()
|
|
self.addCleanup(lambda: ensure_closed(r))
|
|
self.addCleanup(lambda: ensure_closed(w))
|
|
return r, w
|
|
|
|
def tearDown(self):
|
|
clean_up_interpreters()
|
|
|
|
|
|
class CreateTests(TestBase):
|
|
|
|
def test_in_main(self):
|
|
interp = interpreters.create()
|
|
self.assertIsInstance(interp, interpreters.Interpreter)
|
|
self.assertIn(interp, interpreters.list_all())
|
|
|
|
def test_in_thread(self):
|
|
lock = threading.Lock()
|
|
interp = None
|
|
def f():
|
|
nonlocal interp
|
|
interp = interpreters.create()
|
|
lock.acquire()
|
|
lock.release()
|
|
t = threading.Thread(target=f)
|
|
with lock:
|
|
t.start()
|
|
t.join()
|
|
self.assertIn(interp, interpreters.list_all())
|
|
|
|
def test_in_subinterpreter(self):
|
|
main, = interpreters.list_all()
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent("""
|
|
from test.support import interpreters
|
|
interp = interpreters.create()
|
|
print(interp.id)
|
|
"""))
|
|
interp2 = interpreters.Interpreter(int(out))
|
|
self.assertEqual(interpreters.list_all(), [main, interp, interp2])
|
|
|
|
def test_after_destroy_all(self):
|
|
before = set(interpreters.list_all())
|
|
# Create 3 subinterpreters.
|
|
interp_lst = []
|
|
for _ in range(3):
|
|
interps = interpreters.create()
|
|
interp_lst.append(interps)
|
|
# Now destroy them.
|
|
for interp in interp_lst:
|
|
interp.close()
|
|
# Finally, create another.
|
|
interp = interpreters.create()
|
|
self.assertEqual(set(interpreters.list_all()), before | {interp})
|
|
|
|
def test_after_destroy_some(self):
|
|
before = set(interpreters.list_all())
|
|
# Create 3 subinterpreters.
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
interp3 = interpreters.create()
|
|
# Now destroy 2 of them.
|
|
interp1.close()
|
|
interp2.close()
|
|
# Finally, create another.
|
|
interp = interpreters.create()
|
|
self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
|
|
|
|
|
|
class GetCurrentTests(TestBase):
|
|
|
|
def test_main(self):
|
|
main = interpreters.get_main()
|
|
current = interpreters.get_current()
|
|
self.assertEqual(current, main)
|
|
|
|
def test_subinterpreter(self):
|
|
main = _interpreters.get_main()
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent("""
|
|
from test.support import interpreters
|
|
cur = interpreters.get_current()
|
|
print(cur.id)
|
|
"""))
|
|
current = interpreters.Interpreter(int(out))
|
|
self.assertNotEqual(current, main)
|
|
|
|
|
|
class ListAllTests(TestBase):
|
|
|
|
def test_initial(self):
|
|
interps = interpreters.list_all()
|
|
self.assertEqual(1, len(interps))
|
|
|
|
def test_after_creating(self):
|
|
main = interpreters.get_current()
|
|
first = interpreters.create()
|
|
second = interpreters.create()
|
|
|
|
ids = []
|
|
for interp in interpreters.list_all():
|
|
ids.append(interp.id)
|
|
|
|
self.assertEqual(ids, [main.id, first.id, second.id])
|
|
|
|
def test_after_destroying(self):
|
|
main = interpreters.get_current()
|
|
first = interpreters.create()
|
|
second = interpreters.create()
|
|
first.close()
|
|
|
|
ids = []
|
|
for interp in interpreters.list_all():
|
|
ids.append(interp.id)
|
|
|
|
self.assertEqual(ids, [main.id, second.id])
|
|
|
|
|
|
class TestInterpreterAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
main = interpreters.get_main()
|
|
current = interpreters.get_current()
|
|
interp = interpreters.create()
|
|
self.assertIsInstance(main.id, _interpreters.InterpreterID)
|
|
self.assertIsInstance(current.id, _interpreters.InterpreterID)
|
|
self.assertIsInstance(interp.id, _interpreters.InterpreterID)
|
|
|
|
def test_main_id(self):
|
|
main = interpreters.get_main()
|
|
self.assertEqual(main.id, 0)
|
|
|
|
def test_custom_id(self):
|
|
interp = interpreters.Interpreter(1)
|
|
self.assertEqual(interp.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
interpreters.Interpreter('1')
|
|
|
|
def test_id_readonly(self):
|
|
interp = interpreters.Interpreter(1)
|
|
with self.assertRaises(AttributeError):
|
|
interp.id = 2
|
|
|
|
@unittest.skip('not ready yet (see bpo-32604)')
|
|
def test_main_isolated(self):
|
|
main = interpreters.get_main()
|
|
self.assertFalse(main.isolated)
|
|
|
|
@unittest.skip('not ready yet (see bpo-32604)')
|
|
def test_subinterpreter_isolated_default(self):
|
|
interp = interpreters.create()
|
|
self.assertFalse(interp.isolated)
|
|
|
|
def test_subinterpreter_isolated_explicit(self):
|
|
interp1 = interpreters.create(isolated=True)
|
|
interp2 = interpreters.create(isolated=False)
|
|
self.assertTrue(interp1.isolated)
|
|
self.assertFalse(interp2.isolated)
|
|
|
|
@unittest.skip('not ready yet (see bpo-32604)')
|
|
def test_custom_isolated_default(self):
|
|
interp = interpreters.Interpreter(1)
|
|
self.assertFalse(interp.isolated)
|
|
|
|
def test_custom_isolated_explicit(self):
|
|
interp1 = interpreters.Interpreter(1, isolated=True)
|
|
interp2 = interpreters.Interpreter(1, isolated=False)
|
|
self.assertTrue(interp1.isolated)
|
|
self.assertFalse(interp2.isolated)
|
|
|
|
def test_isolated_readonly(self):
|
|
interp = interpreters.Interpreter(1)
|
|
with self.assertRaises(AttributeError):
|
|
interp.isolated = True
|
|
|
|
def test_equality(self):
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
self.assertEqual(interp1, interp1)
|
|
self.assertNotEqual(interp1, interp2)
|
|
|
|
|
|
class TestInterpreterIsRunning(TestBase):
|
|
|
|
def test_main(self):
|
|
main = interpreters.get_main()
|
|
self.assertTrue(main.is_running())
|
|
|
|
@unittest.skip('Fails on FreeBSD')
|
|
def test_subinterpreter(self):
|
|
interp = interpreters.create()
|
|
self.assertFalse(interp.is_running())
|
|
|
|
with _running(interp):
|
|
self.assertTrue(interp.is_running())
|
|
self.assertFalse(interp.is_running())
|
|
|
|
def test_finished(self):
|
|
r, w = self.pipe()
|
|
interp = interpreters.create()
|
|
interp.run(f"""if True:
|
|
import os
|
|
os.write({w}, b'x')
|
|
""")
|
|
self.assertFalse(interp.is_running())
|
|
self.assertEqual(os.read(r, 1), b'x')
|
|
|
|
def test_from_subinterpreter(self):
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
if _interpreters.is_running({interp.id}):
|
|
print(True)
|
|
else:
|
|
print(False)
|
|
"""))
|
|
self.assertEqual(out.strip(), 'True')
|
|
|
|
def test_already_destroyed(self):
|
|
interp = interpreters.create()
|
|
interp.close()
|
|
with self.assertRaises(RuntimeError):
|
|
interp.is_running()
|
|
|
|
def test_does_not_exist(self):
|
|
interp = interpreters.Interpreter(1_000_000)
|
|
with self.assertRaises(RuntimeError):
|
|
interp.is_running()
|
|
|
|
def test_bad_id(self):
|
|
interp = interpreters.Interpreter(-1)
|
|
with self.assertRaises(ValueError):
|
|
interp.is_running()
|
|
|
|
def test_with_only_background_threads(self):
|
|
r_interp, w_interp = self.pipe()
|
|
r_thread, w_thread = self.pipe()
|
|
|
|
DONE = b'D'
|
|
FINISHED = b'F'
|
|
|
|
interp = interpreters.create()
|
|
interp.run(f"""if True:
|
|
import os
|
|
import threading
|
|
|
|
def task():
|
|
v = os.read({r_thread}, 1)
|
|
assert v == {DONE!r}
|
|
os.write({w_interp}, {FINISHED!r})
|
|
t = threading.Thread(target=task)
|
|
t.start()
|
|
""")
|
|
self.assertFalse(interp.is_running())
|
|
|
|
os.write(w_thread, DONE)
|
|
interp.run('t.join()')
|
|
self.assertEqual(os.read(r_interp, 1), FINISHED)
|
|
|
|
|
|
class TestInterpreterClose(TestBase):
|
|
|
|
def test_basic(self):
|
|
main = interpreters.get_main()
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
interp3 = interpreters.create()
|
|
self.assertEqual(set(interpreters.list_all()),
|
|
{main, interp1, interp2, interp3})
|
|
interp2.close()
|
|
self.assertEqual(set(interpreters.list_all()),
|
|
{main, interp1, interp3})
|
|
|
|
def test_all(self):
|
|
before = set(interpreters.list_all())
|
|
interps = set()
|
|
for _ in range(3):
|
|
interp = interpreters.create()
|
|
interps.add(interp)
|
|
self.assertEqual(set(interpreters.list_all()), before | interps)
|
|
for interp in interps:
|
|
interp.close()
|
|
self.assertEqual(set(interpreters.list_all()), before)
|
|
|
|
def test_main(self):
|
|
main, = interpreters.list_all()
|
|
with self.assertRaises(RuntimeError):
|
|
main.close()
|
|
|
|
def f():
|
|
with self.assertRaises(RuntimeError):
|
|
main.close()
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
|
|
def test_already_destroyed(self):
|
|
interp = interpreters.create()
|
|
interp.close()
|
|
with self.assertRaises(RuntimeError):
|
|
interp.close()
|
|
|
|
def test_does_not_exist(self):
|
|
interp = interpreters.Interpreter(1_000_000)
|
|
with self.assertRaises(RuntimeError):
|
|
interp.close()
|
|
|
|
def test_bad_id(self):
|
|
interp = interpreters.Interpreter(-1)
|
|
with self.assertRaises(ValueError):
|
|
interp.close()
|
|
|
|
def test_from_current(self):
|
|
main, = interpreters.list_all()
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent(f"""
|
|
from test.support import interpreters
|
|
interp = interpreters.Interpreter({int(interp.id)})
|
|
try:
|
|
interp.close()
|
|
except RuntimeError:
|
|
print('failed')
|
|
"""))
|
|
self.assertEqual(out.strip(), 'failed')
|
|
self.assertEqual(set(interpreters.list_all()), {main, interp})
|
|
|
|
def test_from_sibling(self):
|
|
main, = interpreters.list_all()
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
self.assertEqual(set(interpreters.list_all()),
|
|
{main, interp1, interp2})
|
|
interp1.run(dedent(f"""
|
|
from test.support import interpreters
|
|
interp2 = interpreters.Interpreter(int({interp2.id}))
|
|
interp2.close()
|
|
interp3 = interpreters.create()
|
|
interp3.close()
|
|
"""))
|
|
self.assertEqual(set(interpreters.list_all()), {main, interp1})
|
|
|
|
def test_from_other_thread(self):
|
|
interp = interpreters.create()
|
|
def f():
|
|
interp.close()
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
|
|
@unittest.skip('Fails on FreeBSD')
|
|
def test_still_running(self):
|
|
main, = interpreters.list_all()
|
|
interp = interpreters.create()
|
|
with _running(interp):
|
|
with self.assertRaises(RuntimeError):
|
|
interp.close()
|
|
self.assertTrue(interp.is_running())
|
|
|
|
def test_subthreads_still_running(self):
|
|
r_interp, w_interp = self.pipe()
|
|
r_thread, w_thread = self.pipe()
|
|
|
|
FINISHED = b'F'
|
|
|
|
interp = interpreters.create()
|
|
interp.run(f"""if True:
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
done = False
|
|
|
|
def notify_fini():
|
|
global done
|
|
done = True
|
|
t.join()
|
|
threading._register_atexit(notify_fini)
|
|
|
|
def task():
|
|
while not done:
|
|
time.sleep(0.1)
|
|
os.write({w_interp}, {FINISHED!r})
|
|
t = threading.Thread(target=task)
|
|
t.start()
|
|
""")
|
|
interp.close()
|
|
|
|
self.assertEqual(os.read(r_interp, 1), FINISHED)
|
|
|
|
|
|
class TestInterpreterRun(TestBase):
|
|
|
|
def test_success(self):
|
|
interp = interpreters.create()
|
|
script, file = _captured_script('print("it worked!", end="")')
|
|
with file:
|
|
interp.run(script)
|
|
out = file.read()
|
|
|
|
self.assertEqual(out, 'it worked!')
|
|
|
|
def test_in_thread(self):
|
|
interp = interpreters.create()
|
|
script, file = _captured_script('print("it worked!", end="")')
|
|
with file:
|
|
def f():
|
|
interp.run(script)
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
out = file.read()
|
|
|
|
self.assertEqual(out, 'it worked!')
|
|
|
|
@support.requires_fork()
|
|
def test_fork(self):
|
|
interp = interpreters.create()
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
|
|
file.write('')
|
|
file.flush()
|
|
|
|
expected = 'spam spam spam spam spam'
|
|
script = dedent(f"""
|
|
import os
|
|
try:
|
|
os.fork()
|
|
except RuntimeError:
|
|
with open('{file.name}', 'w', encoding='utf-8') as out:
|
|
out.write('{expected}')
|
|
""")
|
|
interp.run(script)
|
|
|
|
file.seek(0)
|
|
content = file.read()
|
|
self.assertEqual(content, expected)
|
|
|
|
@unittest.skip('Fails on FreeBSD')
|
|
def test_already_running(self):
|
|
interp = interpreters.create()
|
|
with _running(interp):
|
|
with self.assertRaises(RuntimeError):
|
|
interp.run('print("spam")')
|
|
|
|
def test_does_not_exist(self):
|
|
interp = interpreters.Interpreter(1_000_000)
|
|
with self.assertRaises(RuntimeError):
|
|
interp.run('print("spam")')
|
|
|
|
def test_bad_id(self):
|
|
interp = interpreters.Interpreter(-1)
|
|
with self.assertRaises(ValueError):
|
|
interp.run('print("spam")')
|
|
|
|
def test_bad_script(self):
|
|
interp = interpreters.create()
|
|
with self.assertRaises(TypeError):
|
|
interp.run(10)
|
|
|
|
def test_bytes_for_script(self):
|
|
interp = interpreters.create()
|
|
with self.assertRaises(TypeError):
|
|
interp.run(b'print("spam")')
|
|
|
|
def test_with_background_threads_still_running(self):
|
|
r_interp, w_interp = self.pipe()
|
|
r_thread, w_thread = self.pipe()
|
|
|
|
RAN = b'R'
|
|
DONE = b'D'
|
|
FINISHED = b'F'
|
|
|
|
interp = interpreters.create()
|
|
interp.run(f"""if True:
|
|
import os
|
|
import threading
|
|
|
|
def task():
|
|
v = os.read({r_thread}, 1)
|
|
assert v == {DONE!r}
|
|
os.write({w_interp}, {FINISHED!r})
|
|
t = threading.Thread(target=task)
|
|
t.start()
|
|
os.write({w_interp}, {RAN!r})
|
|
""")
|
|
interp.run(f"""if True:
|
|
os.write({w_interp}, {RAN!r})
|
|
""")
|
|
|
|
os.write(w_thread, DONE)
|
|
interp.run('t.join()')
|
|
self.assertEqual(os.read(r_interp, 1), RAN)
|
|
self.assertEqual(os.read(r_interp, 1), RAN)
|
|
self.assertEqual(os.read(r_interp, 1), FINISHED)
|
|
|
|
# test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
|
|
|
|
|
|
class StressTests(TestBase):
|
|
|
|
# In these tests we generally want a lot of interpreters,
|
|
# but not so many that any test takes too long.
|
|
|
|
@support.requires_resource('cpu')
|
|
def test_create_many_sequential(self):
|
|
alive = []
|
|
for _ in range(100):
|
|
interp = interpreters.create()
|
|
alive.append(interp)
|
|
|
|
@support.requires_resource('cpu')
|
|
def test_create_many_threaded(self):
|
|
alive = []
|
|
def task():
|
|
interp = interpreters.create()
|
|
alive.append(interp)
|
|
threads = (threading.Thread(target=task) for _ in range(200))
|
|
with threading_helper.start_threads(threads):
|
|
pass
|
|
|
|
|
|
class StartupTests(TestBase):
|
|
|
|
# We want to ensure the initial state of subinterpreters
|
|
# matches expectations.
|
|
|
|
_subtest_count = 0
|
|
|
|
@contextlib.contextmanager
|
|
def subTest(self, *args):
|
|
with super().subTest(*args) as ctx:
|
|
self._subtest_count += 1
|
|
try:
|
|
yield ctx
|
|
finally:
|
|
if self._debugged_in_subtest:
|
|
if self._subtest_count == 1:
|
|
# The first subtest adds a leading newline, so we
|
|
# compensate here by not printing a trailing newline.
|
|
print('### end subtest debug ###', end='')
|
|
else:
|
|
print('### end subtest debug ###')
|
|
self._debugged_in_subtest = False
|
|
|
|
def debug(self, msg, *, header=None):
|
|
if header:
|
|
self._debug(f'--- {header} ---')
|
|
if msg:
|
|
if msg.endswith(os.linesep):
|
|
self._debug(msg[:-len(os.linesep)])
|
|
else:
|
|
self._debug(msg)
|
|
self._debug('<no newline>')
|
|
self._debug('------')
|
|
else:
|
|
self._debug(msg)
|
|
|
|
_debugged = False
|
|
_debugged_in_subtest = False
|
|
def _debug(self, msg):
|
|
if not self._debugged:
|
|
print()
|
|
self._debugged = True
|
|
if self._subtest is not None:
|
|
if True:
|
|
if not self._debugged_in_subtest:
|
|
self._debugged_in_subtest = True
|
|
print('### start subtest debug ###')
|
|
print(msg)
|
|
else:
|
|
print(msg)
|
|
|
|
def create_temp_dir(self):
|
|
import tempfile
|
|
tmp = tempfile.mkdtemp(prefix='test_interpreters_')
|
|
tmp = os.path.realpath(tmp)
|
|
self.addCleanup(os_helper.rmtree, tmp)
|
|
return tmp
|
|
|
|
def write_script(self, *path, text):
|
|
filename = os.path.join(*path)
|
|
dirname = os.path.dirname(filename)
|
|
if dirname:
|
|
os.makedirs(dirname, exist_ok=True)
|
|
with open(filename, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(dedent(text))
|
|
return filename
|
|
|
|
@support.requires_subprocess()
|
|
def run_python(self, argv, *, cwd=None):
|
|
# This method is inspired by
|
|
# EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py.
|
|
import shlex
|
|
import subprocess
|
|
if isinstance(argv, str):
|
|
argv = shlex.split(argv)
|
|
argv = [sys.executable, *argv]
|
|
try:
|
|
proc = subprocess.run(
|
|
argv,
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
except Exception as exc:
|
|
self.debug(f'# cmd: {shlex.join(argv)}')
|
|
if isinstance(exc, FileNotFoundError) and not exc.filename:
|
|
if os.path.exists(argv[0]):
|
|
exists = 'exists'
|
|
else:
|
|
exists = 'does not exist'
|
|
self.debug(f'{argv[0]} {exists}')
|
|
raise # re-raise
|
|
assert proc.stderr == '' or proc.returncode != 0, proc.stderr
|
|
if proc.returncode != 0 and support.verbose:
|
|
self.debug(f'# python3 {shlex.join(argv[1:])} failed:')
|
|
self.debug(proc.stdout, header='stdout')
|
|
self.debug(proc.stderr, header='stderr')
|
|
self.assertEqual(proc.returncode, 0)
|
|
self.assertEqual(proc.stderr, '')
|
|
return proc.stdout
|
|
|
|
def test_sys_path_0(self):
|
|
# The main interpreter's sys.path[0] should be used by subinterpreters.
|
|
script = '''
|
|
import sys
|
|
from test.support import interpreters
|
|
|
|
orig = sys.path[0]
|
|
|
|
interp = interpreters.create()
|
|
interp.run(f"""if True:
|
|
import json
|
|
import sys
|
|
print(json.dumps({{
|
|
'main': {orig!r},
|
|
'sub': sys.path[0],
|
|
}}, indent=4), flush=True)
|
|
""")
|
|
'''
|
|
# <tmp>/
|
|
# pkg/
|
|
# __init__.py
|
|
# __main__.py
|
|
# script.py
|
|
# script.py
|
|
cwd = self.create_temp_dir()
|
|
self.write_script(cwd, 'pkg', '__init__.py', text='')
|
|
self.write_script(cwd, 'pkg', '__main__.py', text=script)
|
|
self.write_script(cwd, 'pkg', 'script.py', text=script)
|
|
self.write_script(cwd, 'script.py', text=script)
|
|
|
|
cases = [
|
|
('script.py', cwd),
|
|
('-m script', cwd),
|
|
('-m pkg', cwd),
|
|
('-m pkg.script', cwd),
|
|
('-c "import script"', ''),
|
|
]
|
|
for argv, expected in cases:
|
|
with self.subTest(f'python3 {argv}'):
|
|
out = self.run_python(argv, cwd=cwd)
|
|
data = json.loads(out)
|
|
sp0_main, sp0_sub = data['main'], data['sub']
|
|
self.assertEqual(sp0_sub, sp0_main)
|
|
self.assertEqual(sp0_sub, expected)
|
|
# XXX Also check them all with the -P cmdline flag?
|
|
|
|
|
|
class FinalizationTests(TestBase):
|
|
|
|
def test_gh_109793(self):
|
|
import subprocess
|
|
argv = [sys.executable, '-c', '''if True:
|
|
import _xxsubinterpreters as _interpreters
|
|
interpid = _interpreters.create()
|
|
raise Exception
|
|
''']
|
|
proc = subprocess.run(argv, capture_output=True, text=True)
|
|
self.assertIn('Traceback', proc.stderr)
|
|
if proc.returncode == 0 and support.verbose:
|
|
print()
|
|
print("--- cmd unexpected succeeded ---")
|
|
print(f"stdout:\n{proc.stdout}")
|
|
print(f"stderr:\n{proc.stderr}")
|
|
print("------")
|
|
self.assertEqual(proc.returncode, 1)
|
|
|
|
|
|
class TestIsShareable(TestBase):
|
|
|
|
def test_default_shareables(self):
|
|
shareables = [
|
|
# singletons
|
|
None,
|
|
# builtin objects
|
|
b'spam',
|
|
'spam',
|
|
10,
|
|
-10,
|
|
100.0,
|
|
]
|
|
for obj in shareables:
|
|
with self.subTest(obj):
|
|
shareable = interpreters.is_shareable(obj)
|
|
self.assertTrue(shareable)
|
|
|
|
def test_not_shareable(self):
|
|
class Cheese:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class SubBytes(bytes):
|
|
"""A subclass of a shareable type."""
|
|
|
|
not_shareables = [
|
|
# singletons
|
|
True,
|
|
False,
|
|
NotImplemented,
|
|
...,
|
|
# builtin types and objects
|
|
type,
|
|
object,
|
|
object(),
|
|
Exception(),
|
|
# user-defined types and objects
|
|
Cheese,
|
|
Cheese('Wensleydale'),
|
|
SubBytes(b'spam'),
|
|
]
|
|
for obj in not_shareables:
|
|
with self.subTest(repr(obj)):
|
|
self.assertFalse(
|
|
interpreters.is_shareable(obj))
|
|
|
|
|
|
class TestChannels(TestBase):
|
|
|
|
def test_create(self):
|
|
r, s = interpreters.create_channel()
|
|
self.assertIsInstance(r, interpreters.RecvChannel)
|
|
self.assertIsInstance(s, interpreters.SendChannel)
|
|
|
|
def test_list_all(self):
|
|
self.assertEqual(interpreters.list_all_channels(), [])
|
|
created = set()
|
|
for _ in range(3):
|
|
ch = interpreters.create_channel()
|
|
created.add(ch)
|
|
after = set(interpreters.list_all_channels())
|
|
self.assertEqual(after, created)
|
|
|
|
def test_shareable(self):
|
|
rch, sch = interpreters.create_channel()
|
|
|
|
self.assertTrue(
|
|
interpreters.is_shareable(rch))
|
|
self.assertTrue(
|
|
interpreters.is_shareable(sch))
|
|
|
|
sch.send_nowait(rch)
|
|
sch.send_nowait(sch)
|
|
rch2 = rch.recv()
|
|
sch2 = rch.recv()
|
|
|
|
self.assertEqual(rch2, rch)
|
|
self.assertEqual(sch2, sch)
|
|
|
|
def test_is_closed(self):
|
|
rch, sch = interpreters.create_channel()
|
|
rbefore = rch.is_closed
|
|
sbefore = sch.is_closed
|
|
rch.close()
|
|
rafter = rch.is_closed
|
|
safter = sch.is_closed
|
|
|
|
self.assertFalse(rbefore)
|
|
self.assertFalse(sbefore)
|
|
self.assertTrue(rafter)
|
|
self.assertTrue(safter)
|
|
|
|
|
|
class TestRecvChannelAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
rch, _ = interpreters.create_channel()
|
|
self.assertIsInstance(rch.id, _channels.ChannelID)
|
|
|
|
def test_custom_id(self):
|
|
rch = interpreters.RecvChannel(1)
|
|
self.assertEqual(rch.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
interpreters.RecvChannel('1')
|
|
|
|
def test_id_readonly(self):
|
|
rch = interpreters.RecvChannel(1)
|
|
with self.assertRaises(AttributeError):
|
|
rch.id = 2
|
|
|
|
def test_equality(self):
|
|
ch1, _ = interpreters.create_channel()
|
|
ch2, _ = interpreters.create_channel()
|
|
self.assertEqual(ch1, ch1)
|
|
self.assertNotEqual(ch1, ch2)
|
|
|
|
|
|
class TestSendChannelAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
_, sch = interpreters.create_channel()
|
|
self.assertIsInstance(sch.id, _channels.ChannelID)
|
|
|
|
def test_custom_id(self):
|
|
sch = interpreters.SendChannel(1)
|
|
self.assertEqual(sch.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
interpreters.SendChannel('1')
|
|
|
|
def test_id_readonly(self):
|
|
sch = interpreters.SendChannel(1)
|
|
with self.assertRaises(AttributeError):
|
|
sch.id = 2
|
|
|
|
def test_equality(self):
|
|
_, ch1 = interpreters.create_channel()
|
|
_, ch2 = interpreters.create_channel()
|
|
self.assertEqual(ch1, ch1)
|
|
self.assertNotEqual(ch1, ch2)
|
|
|
|
|
|
class TestSendRecv(TestBase):
|
|
|
|
def test_send_recv_main(self):
|
|
r, s = interpreters.create_channel()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_same_interpreter(self):
|
|
interp = interpreters.create()
|
|
interp.run(dedent("""
|
|
from test.support import interpreters
|
|
r, s = interpreters.create_channel()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv()
|
|
assert obj == orig, 'expected: obj == orig'
|
|
assert obj is not orig, 'expected: obj is not orig'
|
|
"""))
|
|
|
|
@unittest.skip('broken (see BPO-...)')
|
|
def test_send_recv_different_interpreters(self):
|
|
r1, s1 = interpreters.create_channel()
|
|
r2, s2 = interpreters.create_channel()
|
|
orig1 = b'spam'
|
|
s1.send_nowait(orig1)
|
|
out = _run_output(
|
|
interpreters.create(),
|
|
dedent(f"""
|
|
obj1 = r.recv()
|
|
assert obj1 == b'spam', 'expected: obj1 == orig1'
|
|
# When going to another interpreter we get a copy.
|
|
assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
|
|
orig2 = b'eggs'
|
|
print(id(orig2))
|
|
s.send_nowait(orig2)
|
|
"""),
|
|
channels=dict(r=r1, s=s2),
|
|
)
|
|
obj2 = r2.recv()
|
|
|
|
self.assertEqual(obj2, b'eggs')
|
|
self.assertNotEqual(id(obj2), int(out))
|
|
|
|
def test_send_recv_different_threads(self):
|
|
r, s = interpreters.create_channel()
|
|
|
|
def f():
|
|
while True:
|
|
try:
|
|
obj = r.recv()
|
|
break
|
|
except interpreters.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
s.send(obj)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
orig = b'spam'
|
|
s.send(orig)
|
|
obj = r.recv()
|
|
t.join()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_nowait_main(self):
|
|
r, s = interpreters.create_channel()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv_nowait()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_nowait_main_with_default(self):
|
|
r, _ = interpreters.create_channel()
|
|
obj = r.recv_nowait(None)
|
|
|
|
self.assertIsNone(obj)
|
|
|
|
def test_send_recv_nowait_same_interpreter(self):
|
|
interp = interpreters.create()
|
|
interp.run(dedent("""
|
|
from test.support import interpreters
|
|
r, s = interpreters.create_channel()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv_nowait()
|
|
assert obj == orig, 'expected: obj == orig'
|
|
# When going back to the same interpreter we get the same object.
|
|
assert obj is not orig, 'expected: obj is not orig'
|
|
"""))
|
|
|
|
@unittest.skip('broken (see BPO-...)')
|
|
def test_send_recv_nowait_different_interpreters(self):
|
|
r1, s1 = interpreters.create_channel()
|
|
r2, s2 = interpreters.create_channel()
|
|
orig1 = b'spam'
|
|
s1.send_nowait(orig1)
|
|
out = _run_output(
|
|
interpreters.create(),
|
|
dedent(f"""
|
|
obj1 = r.recv_nowait()
|
|
assert obj1 == b'spam', 'expected: obj1 == orig1'
|
|
# When going to another interpreter we get a copy.
|
|
assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
|
|
orig2 = b'eggs'
|
|
print(id(orig2))
|
|
s.send_nowait(orig2)
|
|
"""),
|
|
channels=dict(r=r1, s=s2),
|
|
)
|
|
obj2 = r2.recv_nowait()
|
|
|
|
self.assertEqual(obj2, b'eggs')
|
|
self.assertNotEqual(id(obj2), int(out))
|
|
|
|
def test_recv_timeout(self):
|
|
r, _ = interpreters.create_channel()
|
|
with self.assertRaises(TimeoutError):
|
|
r.recv(timeout=1)
|
|
|
|
def test_recv_channel_does_not_exist(self):
|
|
ch = interpreters.RecvChannel(1_000_000)
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
ch.recv()
|
|
|
|
def test_send_channel_does_not_exist(self):
|
|
ch = interpreters.SendChannel(1_000_000)
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
ch.send(b'spam')
|
|
|
|
def test_recv_nowait_channel_does_not_exist(self):
|
|
ch = interpreters.RecvChannel(1_000_000)
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
ch.recv_nowait()
|
|
|
|
def test_send_nowait_channel_does_not_exist(self):
|
|
ch = interpreters.SendChannel(1_000_000)
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
ch.send_nowait(b'spam')
|
|
|
|
def test_recv_nowait_empty(self):
|
|
ch, _ = interpreters.create_channel()
|
|
with self.assertRaises(interpreters.ChannelEmptyError):
|
|
ch.recv_nowait()
|
|
|
|
def test_recv_nowait_default(self):
|
|
default = object()
|
|
rch, sch = interpreters.create_channel()
|
|
obj1 = rch.recv_nowait(default)
|
|
sch.send_nowait(None)
|
|
sch.send_nowait(1)
|
|
sch.send_nowait(b'spam')
|
|
sch.send_nowait(b'eggs')
|
|
obj2 = rch.recv_nowait(default)
|
|
obj3 = rch.recv_nowait(default)
|
|
obj4 = rch.recv_nowait()
|
|
obj5 = rch.recv_nowait(default)
|
|
obj6 = rch.recv_nowait(default)
|
|
|
|
self.assertIs(obj1, default)
|
|
self.assertIs(obj2, None)
|
|
self.assertEqual(obj3, 1)
|
|
self.assertEqual(obj4, b'spam')
|
|
self.assertEqual(obj5, b'eggs')
|
|
self.assertIs(obj6, default)
|
|
|
|
def test_send_buffer(self):
|
|
buf = bytearray(b'spamspamspam')
|
|
obj = None
|
|
rch, sch = interpreters.create_channel()
|
|
|
|
def f():
|
|
nonlocal obj
|
|
while True:
|
|
try:
|
|
obj = rch.recv()
|
|
break
|
|
except interpreters.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
sch.send_buffer(buf)
|
|
t.join()
|
|
|
|
self.assertIsNot(obj, buf)
|
|
self.assertIsInstance(obj, memoryview)
|
|
self.assertEqual(obj, buf)
|
|
|
|
buf[4:8] = b'eggs'
|
|
self.assertEqual(obj, buf)
|
|
obj[4:8] = b'ham.'
|
|
self.assertEqual(obj, buf)
|
|
|
|
def test_send_buffer_nowait(self):
|
|
buf = bytearray(b'spamspamspam')
|
|
rch, sch = interpreters.create_channel()
|
|
sch.send_buffer_nowait(buf)
|
|
obj = rch.recv()
|
|
|
|
self.assertIsNot(obj, buf)
|
|
self.assertIsInstance(obj, memoryview)
|
|
self.assertEqual(obj, buf)
|
|
|
|
buf[4:8] = b'eggs'
|
|
self.assertEqual(obj, buf)
|
|
obj[4:8] = b'ham.'
|
|
self.assertEqual(obj, buf)
|