cpython/Lib/test/test__xxsubinterpreters.py

1138 lines
33 KiB
Python

import contextlib
import itertools
import os
import pickle
import sys
from textwrap import dedent
import threading
import unittest
import _testinternalcapi
from test import support
from test.support import import_helper
from test.support import os_helper
from test.support import script_helper
interpreters = import_helper.import_module('_xxsubinterpreters')
from _xxsubinterpreters import InterpreterNotFoundError
##################################
# helpers
def _captured_script(script):
r, w = os.pipe()
indented = script.replace('\n', '\n ')
wrapped = dedent(f"""
import contextlib
with open({w}, 'w', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
{indented}
""")
return wrapped, open(r, encoding="utf-8")
def _run_output(interp, request):
script, rpipe = _captured_script(request)
with rpipe:
interpreters.run_string(interp, script)
return rpipe.read()
def _wait_for_interp_to_run(interp, timeout=None):
# bpo-37224: Running this test file in multiprocesses will fail randomly.
# The failure reason is that the thread can't acquire the cpu to
# run subinterpreter eariler than the main thread in multiprocess.
if timeout is None:
timeout = support.SHORT_TIMEOUT
for _ in support.sleeping_retry(timeout, error=False):
if interpreters.is_running(interp):
break
else:
raise RuntimeError('interp is not running')
@contextlib.contextmanager
def _running(interp):
r, w = os.pipe()
def run():
interpreters.run_string(interp, dedent(f"""
# wait for "signal"
with open({r}, encoding="utf-8") as rpipe:
rpipe.read()
"""))
t = threading.Thread(target=run)
t.start()
_wait_for_interp_to_run(interp)
yield
with open(w, 'w', encoding="utf-8") as spipe:
spipe.write('done')
t.join()
def clean_up_interpreters():
for id in interpreters.list_all():
if id == 0: # main
continue
try:
interpreters.destroy(id)
except RuntimeError:
pass # already destroyed
class TestBase(unittest.TestCase):
def tearDown(self):
clean_up_interpreters()
##################################
# misc. tests
class IsShareableTests(unittest.TestCase):
def test_default_shareables(self):
shareables = [
# singletons
None,
# builtin objects
b'spam',
'spam',
10,
-10,
True,
False,
100.0,
(1, ('spam', 'eggs')),
]
for obj in shareables:
with self.subTest(obj):
self.assertTrue(
interpreters.is_shareable(obj))
def test_not_shareable(self):
class Cheese:
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
class SubBytes(bytes):
"""A subclass of a shareable type."""
not_shareables = [
# singletons
NotImplemented,
...,
# builtin types and objects
type,
object,
object(),
Exception(),
# user-defined types and objects
Cheese,
Cheese('Wensleydale'),
SubBytes(b'spam'),
]
for obj in not_shareables:
with self.subTest(repr(obj)):
self.assertFalse(
interpreters.is_shareable(obj))
class ShareableTypeTests(unittest.TestCase):
def _assert_values(self, values):
for obj in values:
with self.subTest(obj):
xid = _testinternalcapi.get_crossinterp_data(obj)
got = _testinternalcapi.restore_crossinterp_data(xid)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
def test_singletons(self):
for obj in [None]:
with self.subTest(obj):
xid = _testinternalcapi.get_crossinterp_data(obj)
got = _testinternalcapi.restore_crossinterp_data(xid)
# XXX What about between interpreters?
self.assertIs(got, obj)
def test_types(self):
self._assert_values([
b'spam',
9999,
])
def test_bytes(self):
self._assert_values(i.to_bytes(2, 'little', signed=True)
for i in range(-1, 258))
def test_strs(self):
self._assert_values(['hello world', '你好世界', ''])
def test_int(self):
self._assert_values(itertools.chain(range(-1, 258),
[sys.maxsize, -sys.maxsize - 1]))
def test_non_shareable_int(self):
ints = [
sys.maxsize + 1,
-sys.maxsize - 2,
2**1000,
]
for i in ints:
with self.subTest(i):
with self.assertRaises(OverflowError):
_testinternalcapi.get_crossinterp_data(i)
def test_bool(self):
self._assert_values([True, False])
def test_float(self):
self._assert_values([0.0, 1.1, -1.0, 0.12345678, -0.12345678])
def test_tuple(self):
self._assert_values([(), (1,), ("hello", "world", ), (1, True, "hello")])
# Test nesting
self._assert_values([
((1,),),
((1, 2), (3, 4)),
((1, 2), (3, 4), (5, 6)),
])
def test_tuples_containing_non_shareable_types(self):
non_shareables = [
Exception(),
object(),
]
for s in non_shareables:
value = tuple([0, 1.0, s])
with self.subTest(repr(value)):
# XXX Assert the NotShareableError when it is exported
with self.assertRaises(ValueError):
_testinternalcapi.get_crossinterp_data(value)
# Check nested as well
value = tuple([0, 1., (s,)])
with self.subTest("nested " + repr(value)):
# XXX Assert the NotShareableError when it is exported
with self.assertRaises(ValueError):
_testinternalcapi.get_crossinterp_data(value)
class ModuleTests(TestBase):
def test_import_in_interpreter(self):
_run_output(
interpreters.create(),
'import _xxsubinterpreters as _interpreters',
)
##################################
# interpreter tests
class ListAllTests(TestBase):
def test_initial(self):
main = interpreters.get_main()
ids = interpreters.list_all()
self.assertEqual(ids, [main])
def test_after_creating(self):
main = interpreters.get_main()
first = interpreters.create()
second = interpreters.create()
ids = interpreters.list_all()
self.assertEqual(ids, [main, first, second])
def test_after_destroying(self):
main = interpreters.get_main()
first = interpreters.create()
second = interpreters.create()
interpreters.destroy(first)
ids = interpreters.list_all()
self.assertEqual(ids, [main, second])
class GetCurrentTests(TestBase):
def test_main(self):
main = interpreters.get_main()
cur = interpreters.get_current()
self.assertEqual(cur, main)
self.assertIsInstance(cur, int)
def test_subinterpreter(self):
main = interpreters.get_main()
interp = interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
cur = _interpreters.get_current()
print(cur)
assert isinstance(cur, int)
"""))
cur = int(out.strip())
_, expected = interpreters.list_all()
self.assertEqual(cur, expected)
self.assertNotEqual(cur, main)
class GetMainTests(TestBase):
def test_from_main(self):
[expected] = interpreters.list_all()
main = interpreters.get_main()
self.assertEqual(main, expected)
self.assertIsInstance(main, int)
def test_from_subinterpreter(self):
[expected] = interpreters.list_all()
interp = interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
main = _interpreters.get_main()
print(main)
assert isinstance(main, int)
"""))
main = int(out.strip())
self.assertEqual(main, expected)
class IsRunningTests(TestBase):
def test_main(self):
main = interpreters.get_main()
self.assertTrue(interpreters.is_running(main))
@unittest.skip('Fails on FreeBSD')
def test_subinterpreter(self):
interp = interpreters.create()
self.assertFalse(interpreters.is_running(interp))
with _running(interp):
self.assertTrue(interpreters.is_running(interp))
self.assertFalse(interpreters.is_running(interp))
def test_from_subinterpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
if _interpreters.is_running({interp}):
print(True)
else:
print(False)
"""))
self.assertEqual(out.strip(), 'True')
def test_already_destroyed(self):
interp = interpreters.create()
interpreters.destroy(interp)
with self.assertRaises(InterpreterNotFoundError):
interpreters.is_running(interp)
def test_does_not_exist(self):
with self.assertRaises(InterpreterNotFoundError):
interpreters.is_running(1_000_000)
def test_bad_id(self):
with self.assertRaises(ValueError):
interpreters.is_running(-1)
class CreateTests(TestBase):
def test_in_main(self):
id = interpreters.create()
self.assertIsInstance(id, int)
self.assertIn(id, interpreters.list_all())
@unittest.skip('enable this test when working on pystate.c')
def test_unique_id(self):
seen = set()
for _ in range(100):
id = interpreters.create()
interpreters.destroy(id)
seen.add(id)
self.assertEqual(len(seen), 100)
def test_in_thread(self):
lock = threading.Lock()
id = None
def f():
nonlocal id
id = interpreters.create()
lock.acquire()
lock.release()
t = threading.Thread(target=f)
with lock:
t.start()
t.join()
self.assertIn(id, interpreters.list_all())
def test_in_subinterpreter(self):
main, = interpreters.list_all()
id1 = interpreters.create()
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
print(id)
assert isinstance(id, int)
"""))
id2 = int(out.strip())
self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
def test_in_threaded_subinterpreter(self):
main, = interpreters.list_all()
id1 = interpreters.create()
id2 = None
def f():
nonlocal id2
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
print(id)
"""))
id2 = int(out.strip())
t = threading.Thread(target=f)
t.start()
t.join()
self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
def test_after_destroy_all(self):
before = set(interpreters.list_all())
# Create 3 subinterpreters.
ids = []
for _ in range(3):
id = interpreters.create()
ids.append(id)
# Now destroy them.
for id in ids:
interpreters.destroy(id)
# Finally, create another.
id = interpreters.create()
self.assertEqual(set(interpreters.list_all()), before | {id})
def test_after_destroy_some(self):
before = set(interpreters.list_all())
# Create 3 subinterpreters.
id1 = interpreters.create()
id2 = interpreters.create()
id3 = interpreters.create()
# Now destroy 2 of them.
interpreters.destroy(id1)
interpreters.destroy(id3)
# Finally, create another.
id = interpreters.create()
self.assertEqual(set(interpreters.list_all()), before | {id, id2})
class DestroyTests(TestBase):
def test_one(self):
id1 = interpreters.create()
id2 = interpreters.create()
id3 = interpreters.create()
self.assertIn(id2, interpreters.list_all())
interpreters.destroy(id2)
self.assertNotIn(id2, interpreters.list_all())
self.assertIn(id1, interpreters.list_all())
self.assertIn(id3, interpreters.list_all())
def test_all(self):
before = set(interpreters.list_all())
ids = set()
for _ in range(3):
id = interpreters.create()
ids.add(id)
self.assertEqual(set(interpreters.list_all()), before | ids)
for id in ids:
interpreters.destroy(id)
self.assertEqual(set(interpreters.list_all()), before)
def test_main(self):
main, = interpreters.list_all()
with self.assertRaises(RuntimeError):
interpreters.destroy(main)
def f():
with self.assertRaises(RuntimeError):
interpreters.destroy(main)
t = threading.Thread(target=f)
t.start()
t.join()
def test_already_destroyed(self):
id = interpreters.create()
interpreters.destroy(id)
with self.assertRaises(InterpreterNotFoundError):
interpreters.destroy(id)
def test_does_not_exist(self):
with self.assertRaises(InterpreterNotFoundError):
interpreters.destroy(1_000_000)
def test_bad_id(self):
with self.assertRaises(ValueError):
interpreters.destroy(-1)
def test_from_current(self):
main, = interpreters.list_all()
id = interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
try:
_interpreters.destroy({id})
except RuntimeError:
pass
""")
interpreters.run_string(id, script)
self.assertEqual(set(interpreters.list_all()), {main, id})
def test_from_sibling(self):
main, = interpreters.list_all()
id1 = interpreters.create()
id2 = interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
_interpreters.destroy({id2})
""")
interpreters.run_string(id1, script)
self.assertEqual(set(interpreters.list_all()), {main, id1})
def test_from_other_thread(self):
id = interpreters.create()
def f():
interpreters.destroy(id)
t = threading.Thread(target=f)
t.start()
t.join()
def test_still_running(self):
main, = interpreters.list_all()
interp = interpreters.create()
with _running(interp):
self.assertTrue(interpreters.is_running(interp),
msg=f"Interp {interp} should be running before destruction.")
with self.assertRaises(RuntimeError,
msg=f"Should not be able to destroy interp {interp} while it's still running."):
interpreters.destroy(interp)
self.assertTrue(interpreters.is_running(interp))
class RunStringTests(TestBase):
def setUp(self):
super().setUp()
self.id = interpreters.create()
def test_success(self):
script, file = _captured_script('print("it worked!", end="")')
with file:
interpreters.run_string(self.id, script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_in_thread(self):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
interpreters.run_string(self.id, script)
t = threading.Thread(target=f)
t.start()
t.join()
out = file.read()
self.assertEqual(out, 'it worked!')
def test_create_thread(self):
subinterp = interpreters.create()
script, file = _captured_script("""
import threading
def f():
print('it worked!', end='')
t = threading.Thread(target=f)
t.start()
t.join()
""")
with file:
interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_create_daemon_thread(self):
with self.subTest('isolated'):
expected = 'spam spam spam spam spam'
subinterp = interpreters.create(isolated=True)
script, file = _captured_script(f"""
import threading
def f():
print('it worked!', end='')
try:
t = threading.Thread(target=f, daemon=True)
t.start()
t.join()
except RuntimeError:
print('{expected}', end='')
""")
with file:
interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, expected)
with self.subTest('not isolated'):
subinterp = interpreters.create(isolated=False)
script, file = _captured_script("""
import threading
def f():
print('it worked!', end='')
t = threading.Thread(target=f, daemon=True)
t.start()
t.join()
""")
with file:
interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_shareable_types(self):
interp = interpreters.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
interpreters.set___main___attrs(interp, dict(obj=obj))
interpreters.run_string(
interp,
f'assert(obj == {obj!r})',
)
def test_os_exec(self):
expected = 'spam spam spam spam spam'
subinterp = interpreters.create()
script, file = _captured_script(f"""
import os, sys
try:
os.execl(sys.executable)
except RuntimeError:
print('{expected}', end='')
""")
with file:
interpreters.run_string(subinterp, script)
out = file.read()
self.assertEqual(out, expected)
@support.requires_fork()
def test_fork(self):
import tempfile
with tempfile.NamedTemporaryFile('w+', encoding="utf-8") as file:
file.write('')
file.flush()
expected = 'spam spam spam spam spam'
script = dedent(f"""
import os
try:
os.fork()
except RuntimeError:
with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
interpreters.run_string(self.id, script)
file.seek(0)
content = file.read()
self.assertEqual(content, expected)
def test_already_running(self):
with _running(self.id):
with self.assertRaises(RuntimeError):
interpreters.run_string(self.id, 'print("spam")')
def test_does_not_exist(self):
id = 0
while id in interpreters.list_all():
id += 1
with self.assertRaises(InterpreterNotFoundError):
interpreters.run_string(id, 'print("spam")')
def test_error_id(self):
with self.assertRaises(ValueError):
interpreters.run_string(-1, 'print("spam")')
def test_bad_id(self):
with self.assertRaises(TypeError):
interpreters.run_string('spam', 'print("spam")')
def test_bad_script(self):
with self.assertRaises(TypeError):
interpreters.run_string(self.id, 10)
def test_bytes_for_script(self):
with self.assertRaises(TypeError):
interpreters.run_string(self.id, b'print("spam")')
def test_with_shared(self):
r, w = os.pipe()
shared = {
'spam': b'ham',
'eggs': b'-1',
'cheddar': None,
}
script = dedent(f"""
eggs = int(eggs)
spam = 42
result = spam + eggs
ns = dict(vars())
del ns['__builtins__']
import pickle
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
interpreters.set___main___attrs(self.id, shared)
interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
self.assertEqual(ns['spam'], 42)
self.assertEqual(ns['eggs'], -1)
self.assertEqual(ns['result'], 41)
self.assertIsNone(ns['cheddar'])
def test_shared_overwrites(self):
interpreters.run_string(self.id, dedent("""
spam = 'eggs'
ns1 = dict(vars())
del ns1['__builtins__']
"""))
shared = {'spam': b'ham'}
script = dedent("""
ns2 = dict(vars())
del ns2['__builtins__']
""")
interpreters.set___main___attrs(self.id, shared)
interpreters.run_string(self.id, script)
r, w = os.pipe()
script = dedent(f"""
ns = dict(vars())
del ns['__builtins__']
import pickle
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
self.assertEqual(ns['ns1']['spam'], 'eggs')
self.assertEqual(ns['ns2']['spam'], b'ham')
self.assertEqual(ns['spam'], b'ham')
def test_shared_overwrites_default_vars(self):
r, w = os.pipe()
shared = {'__name__': b'not __main__'}
script = dedent(f"""
spam = 42
ns = dict(vars())
del ns['__builtins__']
import pickle
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
interpreters.set___main___attrs(self.id, shared)
interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
self.assertEqual(ns['__name__'], b'not __main__')
def test_main_reused(self):
r, w = os.pipe()
interpreters.run_string(self.id, dedent(f"""
spam = True
ns = dict(vars())
del ns['__builtins__']
import pickle
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
del ns, pickle, chan
"""))
with open(r, 'rb') as chan:
ns1 = pickle.load(chan)
r, w = os.pipe()
interpreters.run_string(self.id, dedent(f"""
eggs = False
ns = dict(vars())
del ns['__builtins__']
import pickle
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
"""))
with open(r, 'rb') as chan:
ns2 = pickle.load(chan)
self.assertIn('spam', ns1)
self.assertNotIn('eggs', ns1)
self.assertIn('eggs', ns2)
self.assertIn('spam', ns2)
def test_execution_namespace_is_main(self):
r, w = os.pipe()
script = dedent(f"""
spam = 42
ns = dict(vars())
ns['__builtins__'] = str(ns['__builtins__'])
import pickle
with open({w}, 'wb') as chan:
pickle.dump(ns, chan)
""")
interpreters.run_string(self.id, script)
with open(r, 'rb') as chan:
ns = pickle.load(chan)
ns.pop('__builtins__')
ns.pop('__loader__')
self.assertEqual(ns, {
'__name__': '__main__',
'__annotations__': {},
'__doc__': None,
'__package__': None,
'__spec__': None,
'spam': 42,
})
# XXX Fix this test!
@unittest.skip('blocking forever')
def test_still_running_at_exit(self):
script = dedent("""
from textwrap import dedent
import threading
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
def f():
_interpreters.run_string(id, dedent('''
import time
# Give plenty of time for the main interpreter to finish.
time.sleep(1_000_000)
'''))
t = threading.Thread(target=f)
t.start()
""")
with support.temp_dir() as dirname:
filename = script_helper.make_script(dirname, 'interp', script)
with script_helper.spawn_python(filename) as proc:
retcode = proc.wait()
self.assertEqual(retcode, 0)
class RunFailedTests(TestBase):
def setUp(self):
super().setUp()
self.id = interpreters.create()
def add_module(self, modname, text):
import tempfile
tempdir = tempfile.mkdtemp()
self.addCleanup(lambda: os_helper.rmtree(tempdir))
interpreters.run_string(self.id, dedent(f"""
import sys
sys.path.insert(0, {tempdir!r})
"""))
return script_helper.make_script(tempdir, modname, text)
def run_script(self, text, *, fails=False):
r, w = os.pipe()
try:
script = dedent(f"""
import os, sys
os.write({w}, b'0')
# This raises an exception:
{{}}
# Nothing from here down should ever run.
os.write({w}, b'1')
class NeverError(Exception): pass
raise NeverError # never raised
""").format(dedent(text))
if fails:
err = interpreters.run_string(self.id, script)
self.assertIsNot(err, None)
return err
else:
err = interpreters.run_string(self.id, script)
self.assertIs(err, None)
return None
except:
raise # re-raise
else:
msg = os.read(r, 100)
self.assertEqual(msg, b'0')
finally:
os.close(r)
os.close(w)
def _assert_run_failed(self, exctype, msg, script):
if isinstance(exctype, str):
exctype_name = exctype
exctype = None
else:
exctype_name = exctype.__name__
# Run the script.
excinfo = self.run_script(script, fails=True)
# Check the wrapper exception.
self.assertEqual(excinfo.type.__name__, exctype_name)
if msg is None:
self.assertEqual(excinfo.formatted.split(':')[0],
exctype_name)
else:
self.assertEqual(excinfo.formatted,
'{}: {}'.format(exctype_name, msg))
return excinfo
def assert_run_failed(self, exctype, script):
self._assert_run_failed(exctype, None, script)
def assert_run_failed_msg(self, exctype, msg, script):
self._assert_run_failed(exctype, msg, script)
def test_exit(self):
with self.subTest('sys.exit(0)'):
# XXX Should an unhandled SystemExit(0) be handled as not-an-error?
self.assert_run_failed(SystemExit, """
sys.exit(0)
""")
with self.subTest('sys.exit()'):
self.assert_run_failed(SystemExit, """
import sys
sys.exit()
""")
with self.subTest('sys.exit(42)'):
self.assert_run_failed_msg(SystemExit, '42', """
import sys
sys.exit(42)
""")
with self.subTest('SystemExit'):
self.assert_run_failed_msg(SystemExit, '42', """
raise SystemExit(42)
""")
# XXX Also check os._exit() (via a subprocess)?
def test_plain_exception(self):
self.assert_run_failed_msg(Exception, 'spam', """
raise Exception("spam")
""")
def test_invalid_syntax(self):
script = dedent("""
x = 1 + 2
y = 2 + 4
z = 4 + 8
# missing close paren
print("spam"
if x + y + z < 20:
...
""")
with self.subTest('script'):
self.assert_run_failed(SyntaxError, script)
with self.subTest('module'):
modname = 'spam_spam_spam'
filename = self.add_module(modname, script)
self.assert_run_failed(SyntaxError, f"""
import {modname}
""")
def test_NameError(self):
self.assert_run_failed(NameError, """
res = spam + eggs
""")
# XXX check preserved suggestions
def test_AttributeError(self):
self.assert_run_failed(AttributeError, """
object().spam
""")
# XXX check preserved suggestions
def test_ExceptionGroup(self):
self.assert_run_failed(ExceptionGroup, """
raise ExceptionGroup('exceptions', [
Exception('spam'),
ImportError('eggs'),
])
""")
def test_user_defined_exception(self):
self.assert_run_failed_msg('MyError', 'spam', """
class MyError(Exception):
pass
raise MyError('spam')
""")
class RunFuncTests(TestBase):
def setUp(self):
super().setUp()
self.id = interpreters.create()
def test_success(self):
r, w = os.pipe()
def script():
global w
import contextlib
with open(w, 'w', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
interpreters.set___main___attrs(self.id, dict(w=w))
interpreters.run_func(self.id, script)
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
self.assertEqual(out, 'it worked!')
def test_in_thread(self):
r, w = os.pipe()
def script():
global w
import contextlib
with open(w, 'w', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
def f():
interpreters.set___main___attrs(self.id, dict(w=w))
interpreters.run_func(self.id, script)
t = threading.Thread(target=f)
t.start()
t.join()
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
self.assertEqual(out, 'it worked!')
def test_code_object(self):
r, w = os.pipe()
def script():
global w
import contextlib
with open(w, 'w', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
print('it worked!', end='')
code = script.__code__
interpreters.set___main___attrs(self.id, dict(w=w))
interpreters.run_func(self.id, code)
with open(r, encoding="utf-8") as outfile:
out = outfile.read()
self.assertEqual(out, 'it worked!')
def test_closure(self):
spam = True
def script():
assert spam
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
# XXX This hasn't been fixed yet.
@unittest.expectedFailure
def test_return_value(self):
def script():
return 'spam'
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
def test_args(self):
with self.subTest('args'):
def script(a, b=0):
assert a == b
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
with self.subTest('*args'):
def script(*args):
assert not args
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
with self.subTest('**kwargs'):
def script(**kwargs):
assert not kwargs
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
with self.subTest('kwonly'):
def script(*, spam=True):
assert spam
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
with self.subTest('posonly'):
def script(spam, /):
assert spam
with self.assertRaises(ValueError):
interpreters.run_func(self.id, script)
if __name__ == '__main__':
unittest.main()