import os import pickle import threading from textwrap import dedent import unittest from test import support from test.support import import_helper # Raise SkipTest if subinterpreters not supported. import_helper.import_module('_xxsubinterpreters') from test.support import interpreters from test.support.interpreters import InterpreterNotFoundError from .utils import _captured_script, _run_output, _running, TestBase class ModuleTests(TestBase): def test_queue_aliases(self): first = [ interpreters.create_queue, interpreters.Queue, interpreters.QueueEmpty, interpreters.QueueFull, ] second = [ interpreters.create_queue, interpreters.Queue, interpreters.QueueEmpty, interpreters.QueueFull, ] self.assertEqual(second, first) class CreateTests(TestBase): def test_in_main(self): interp = interpreters.create() self.assertIsInstance(interp, interpreters.Interpreter) self.assertIn(interp, interpreters.list_all()) def test_in_thread(self): lock = threading.Lock() interp = None def f(): nonlocal interp interp = interpreters.create() lock.acquire() lock.release() t = threading.Thread(target=f) with lock: t.start() t.join() self.assertIn(interp, interpreters.list_all()) def test_in_subinterpreter(self): main, = interpreters.list_all() interp = interpreters.create() out = _run_output(interp, dedent(""" from test.support import interpreters interp = interpreters.create() print(interp.id) """)) interp2 = interpreters.Interpreter(int(out)) self.assertEqual(interpreters.list_all(), [main, interp, interp2]) def test_after_destroy_all(self): before = set(interpreters.list_all()) # Create 3 subinterpreters. interp_lst = [] for _ in range(3): interps = interpreters.create() interp_lst.append(interps) # Now destroy them. for interp in interp_lst: interp.close() # Finally, create another. interp = interpreters.create() self.assertEqual(set(interpreters.list_all()), before | {interp}) def test_after_destroy_some(self): before = set(interpreters.list_all()) # Create 3 subinterpreters. interp1 = interpreters.create() interp2 = interpreters.create() interp3 = interpreters.create() # Now destroy 2 of them. interp1.close() interp2.close() # Finally, create another. interp = interpreters.create() self.assertEqual(set(interpreters.list_all()), before | {interp3, interp}) class GetMainTests(TestBase): def test_id(self): main = interpreters.get_main() self.assertEqual(main.id, 0) def test_current(self): main = interpreters.get_main() current = interpreters.get_current() self.assertIs(main, current) def test_idempotent(self): main1 = interpreters.get_main() main2 = interpreters.get_main() self.assertIs(main1, main2) class GetCurrentTests(TestBase): def test_main(self): main = interpreters.get_main() current = interpreters.get_current() self.assertEqual(current, main) def test_subinterpreter(self): main = interpreters.get_main() interp = interpreters.create() out = _run_output(interp, dedent(""" from test.support import interpreters cur = interpreters.get_current() print(cur.id) """)) current = interpreters.Interpreter(int(out)) self.assertEqual(current, interp) self.assertNotEqual(current, main) def test_idempotent(self): with self.subTest('main'): cur1 = interpreters.get_current() cur2 = interpreters.get_current() self.assertIs(cur1, cur2) with self.subTest('subinterpreter'): interp = interpreters.create() out = _run_output(interp, dedent(""" from test.support import interpreters cur = interpreters.get_current() print(id(cur)) cur = interpreters.get_current() print(id(cur)) """)) objid1, objid2 = (int(v) for v in out.splitlines()) self.assertEqual(objid1, objid2) with self.subTest('per-interpreter'): interp = interpreters.create() out = _run_output(interp, dedent(""" from test.support import interpreters cur = interpreters.get_current() print(id(cur)) """)) id1 = int(out) id2 = id(interp) self.assertNotEqual(id1, id2) class ListAllTests(TestBase): def test_initial(self): interps = interpreters.list_all() self.assertEqual(1, len(interps)) def test_after_creating(self): main = interpreters.get_current() first = interpreters.create() second = interpreters.create() ids = [] for interp in interpreters.list_all(): ids.append(interp.id) self.assertEqual(ids, [main.id, first.id, second.id]) def test_after_destroying(self): main = interpreters.get_current() first = interpreters.create() second = interpreters.create() first.close() ids = [] for interp in interpreters.list_all(): ids.append(interp.id) self.assertEqual(ids, [main.id, second.id]) def test_idempotent(self): main = interpreters.get_current() first = interpreters.create() second = interpreters.create() expected = [main, first, second] actual = interpreters.list_all() self.assertEqual(actual, expected) for interp1, interp2 in zip(actual, expected): self.assertIs(interp1, interp2) class InterpreterObjectTests(TestBase): def test_init_int(self): interpid = interpreters.get_current().id interp = interpreters.Interpreter(interpid) self.assertEqual(interp.id, interpid) def test_init_interpreter_id(self): interpid = interpreters.get_current()._id interp = interpreters.Interpreter(interpid) self.assertEqual(interp._id, interpid) def test_init_unsupported(self): actualid = interpreters.get_current().id for interpid in [ str(actualid), float(actualid), object(), None, '', ]: with self.subTest(repr(interpid)): with self.assertRaises(TypeError): interpreters.Interpreter(interpid) def test_idempotent(self): main = interpreters.get_main() interp = interpreters.Interpreter(main.id) self.assertIs(interp, main) def test_init_does_not_exist(self): with self.assertRaises(InterpreterNotFoundError): interpreters.Interpreter(1_000_000) def test_init_bad_id(self): with self.assertRaises(ValueError): interpreters.Interpreter(-1) def test_id_type(self): main = interpreters.get_main() current = interpreters.get_current() interp = interpreters.create() self.assertIsInstance(main.id, int) self.assertIsInstance(current.id, int) self.assertIsInstance(interp.id, int) def test_id_readonly(self): interp = interpreters.create() with self.assertRaises(AttributeError): interp.id = 1_000_000 def test_hashable(self): interp = interpreters.create() expected = hash(interp.id) actual = hash(interp) self.assertEqual(actual, expected) def test_equality(self): interp1 = interpreters.create() interp2 = interpreters.create() self.assertEqual(interp1, interp1) self.assertNotEqual(interp1, interp2) def test_pickle(self): interp = interpreters.create() data = pickle.dumps(interp) unpickled = pickle.loads(data) self.assertEqual(unpickled, interp) class TestInterpreterIsRunning(TestBase): def test_main(self): main = interpreters.get_main() self.assertTrue(main.is_running()) @unittest.skip('Fails on FreeBSD') def test_subinterpreter(self): interp = interpreters.create() self.assertFalse(interp.is_running()) with _running(interp): self.assertTrue(interp.is_running()) self.assertFalse(interp.is_running()) def test_finished(self): r, w = self.pipe() interp = interpreters.create() interp.exec(f"""if True: import os os.write({w}, b'x') """) self.assertFalse(interp.is_running()) self.assertEqual(os.read(r, 1), b'x') def test_from_subinterpreter(self): interp = interpreters.create() out = _run_output(interp, dedent(f""" import _xxsubinterpreters as _interpreters if _interpreters.is_running({interp.id}): print(True) else: print(False) """)) self.assertEqual(out.strip(), 'True') def test_already_destroyed(self): interp = interpreters.create() interp.close() with self.assertRaises(InterpreterNotFoundError): interp.is_running() def test_with_only_background_threads(self): r_interp, w_interp = self.pipe() r_thread, w_thread = self.pipe() DONE = b'D' FINISHED = b'F' interp = interpreters.create() interp.exec(f"""if True: import os import threading def task(): v = os.read({r_thread}, 1) assert v == {DONE!r} os.write({w_interp}, {FINISHED!r}) t = threading.Thread(target=task) t.start() """) self.assertFalse(interp.is_running()) os.write(w_thread, DONE) interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), FINISHED) class TestInterpreterClose(TestBase): def test_basic(self): main = interpreters.get_main() interp1 = interpreters.create() interp2 = interpreters.create() interp3 = interpreters.create() self.assertEqual(set(interpreters.list_all()), {main, interp1, interp2, interp3}) interp2.close() self.assertEqual(set(interpreters.list_all()), {main, interp1, interp3}) def test_all(self): before = set(interpreters.list_all()) interps = set() for _ in range(3): interp = interpreters.create() interps.add(interp) self.assertEqual(set(interpreters.list_all()), before | interps) for interp in interps: interp.close() self.assertEqual(set(interpreters.list_all()), before) def test_main(self): main, = interpreters.list_all() with self.assertRaises(RuntimeError): main.close() def f(): with self.assertRaises(RuntimeError): main.close() t = threading.Thread(target=f) t.start() t.join() def test_already_destroyed(self): interp = interpreters.create() interp.close() with self.assertRaises(InterpreterNotFoundError): interp.close() def test_from_current(self): main, = interpreters.list_all() interp = interpreters.create() out = _run_output(interp, dedent(f""" from test.support import interpreters interp = interpreters.Interpreter({interp.id}) try: interp.close() except RuntimeError: print('failed') """)) self.assertEqual(out.strip(), 'failed') self.assertEqual(set(interpreters.list_all()), {main, interp}) def test_from_sibling(self): main, = interpreters.list_all() interp1 = interpreters.create() interp2 = interpreters.create() self.assertEqual(set(interpreters.list_all()), {main, interp1, interp2}) interp1.exec(dedent(f""" from test.support import interpreters interp2 = interpreters.Interpreter({interp2.id}) interp2.close() interp3 = interpreters.create() interp3.close() """)) self.assertEqual(set(interpreters.list_all()), {main, interp1}) def test_from_other_thread(self): interp = interpreters.create() def f(): interp.close() t = threading.Thread(target=f) t.start() t.join() @unittest.skip('Fails on FreeBSD') def test_still_running(self): main, = interpreters.list_all() interp = interpreters.create() with _running(interp): with self.assertRaises(RuntimeError): interp.close() self.assertTrue(interp.is_running()) def test_subthreads_still_running(self): r_interp, w_interp = self.pipe() r_thread, w_thread = self.pipe() FINISHED = b'F' interp = interpreters.create() interp.exec(f"""if True: import os import threading import time done = False def notify_fini(): global done done = True t.join() threading._register_atexit(notify_fini) def task(): while not done: time.sleep(0.1) os.write({w_interp}, {FINISHED!r}) t = threading.Thread(target=task) t.start() """) interp.close() self.assertEqual(os.read(r_interp, 1), FINISHED) class TestInterpreterPrepareMain(TestBase): def test_empty(self): interp = interpreters.create() with self.assertRaises(ValueError): interp.prepare_main() def test_dict(self): values = {'spam': 42, 'eggs': 'ham'} interp = interpreters.create() interp.prepare_main(values) out = _run_output(interp, dedent(""" print(spam, eggs) """)) self.assertEqual(out.strip(), '42 ham') def test_tuple(self): values = {'spam': 42, 'eggs': 'ham'} values = tuple(values.items()) interp = interpreters.create() interp.prepare_main(values) out = _run_output(interp, dedent(""" print(spam, eggs) """)) self.assertEqual(out.strip(), '42 ham') def test_kwargs(self): values = {'spam': 42, 'eggs': 'ham'} interp = interpreters.create() interp.prepare_main(**values) out = _run_output(interp, dedent(""" print(spam, eggs) """)) self.assertEqual(out.strip(), '42 ham') def test_dict_and_kwargs(self): values = {'spam': 42, 'eggs': 'ham'} interp = interpreters.create() interp.prepare_main(values, foo='bar') out = _run_output(interp, dedent(""" print(spam, eggs, foo) """)) self.assertEqual(out.strip(), '42 ham bar') def test_not_shareable(self): interp = interpreters.create() # XXX TypeError? with self.assertRaises(ValueError): interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) # Make sure neither was actually bound. with self.assertRaises(interpreters.ExecutionFailed): interp.exec('print(foo)') with self.assertRaises(interpreters.ExecutionFailed): interp.exec('print(spam)') class TestInterpreterExec(TestBase): def test_success(self): interp = interpreters.create() script, file = _captured_script('print("it worked!", end="")') with file: interp.exec(script) out = file.read() self.assertEqual(out, 'it worked!') def test_failure(self): interp = interpreters.create() with self.assertRaises(interpreters.ExecutionFailed): interp.exec('raise Exception') def test_display_preserved_exception(self): tempdir = self.temp_dir() modfile = self.make_module('spam', tempdir, text=""" def ham(): raise RuntimeError('uh-oh!') def eggs(): ham() """) scriptfile = self.make_script('script.py', tempdir, text=""" from test.support import interpreters def script(): import spam spam.eggs() interp = interpreters.create() interp.exec(script) """) stdout, stderr = self.assert_python_failure(scriptfile) self.maxDiff = None interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l) # File "{interpreters.__file__}", line 179, in exec self.assertEqual(stderr, dedent(f"""\ Traceback (most recent call last): File "{scriptfile}", line 9, in interp.exec(script) ~~~~~~~~~~~^^^^^^^^ {interpmod_line.strip()} raise ExecutionFailed(excinfo) test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh! Uncaught in the interpreter: Traceback (most recent call last): File "{scriptfile}", line 6, in script spam.eggs() ~~~~~~~~~^^ File "{modfile}", line 6, in eggs ham() ~~~^^ File "{modfile}", line 3, in ham raise RuntimeError('uh-oh!') RuntimeError: uh-oh! """)) self.assertEqual(stdout, '') def test_in_thread(self): interp = interpreters.create() script, file = _captured_script('print("it worked!", end="")') with file: def f(): interp.exec(script) t = threading.Thread(target=f) t.start() t.join() out = file.read() self.assertEqual(out, 'it worked!') @support.requires_fork() def test_fork(self): interp = interpreters.create() import tempfile with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file: file.write('') file.flush() expected = 'spam spam spam spam spam' script = dedent(f""" import os try: os.fork() except RuntimeError: with open('{file.name}', 'w', encoding='utf-8') as out: out.write('{expected}') """) interp.exec(script) file.seek(0) content = file.read() self.assertEqual(content, expected) @unittest.skip('Fails on FreeBSD') def test_already_running(self): interp = interpreters.create() with _running(interp): with self.assertRaises(RuntimeError): interp.exec('print("spam")') def test_bad_script(self): interp = interpreters.create() with self.assertRaises(TypeError): interp.exec(10) def test_bytes_for_script(self): interp = interpreters.create() with self.assertRaises(TypeError): interp.exec(b'print("spam")') def test_with_background_threads_still_running(self): r_interp, w_interp = self.pipe() r_thread, w_thread = self.pipe() RAN = b'R' DONE = b'D' FINISHED = b'F' interp = interpreters.create() interp.exec(f"""if True: import os import threading def task(): v = os.read({r_thread}, 1) assert v == {DONE!r} os.write({w_interp}, {FINISHED!r}) t = threading.Thread(target=task) t.start() os.write({w_interp}, {RAN!r}) """) interp.exec(f"""if True: os.write({w_interp}, {RAN!r}) """) os.write(w_thread, DONE) interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), FINISHED) # test_xxsubinterpreters covers the remaining # Interpreter.exec() behavior. def call_func_noop(): pass def call_func_return_shareable(): return (1, None) def call_func_return_not_shareable(): return [1, 2, 3] def call_func_failure(): raise Exception('spam!') def call_func_ident(value): return value def get_call_func_closure(value): def call_func_closure(): return value return call_func_closure class Spam: @staticmethod def noop(): pass @classmethod def from_values(cls, *values): return cls(values) def __init__(self, value): self.value = value def __call__(self, *args, **kwargs): return (self.value, args, kwargs) def __eq__(self, other): if not isinstance(other, Spam): return NotImplemented return self.value == other.value def run(self, *args, **kwargs): return (self.value, args, kwargs) def call_func_complex(op, /, value=None, *args, exc=None, **kwargs): if exc is not None: raise exc if op == '': raise ValueError('missing op') elif op == 'ident': if args or kwargs: raise Exception((args, kwargs)) return value elif op == 'full-ident': return (value, args, kwargs) elif op == 'globals': if value is not None or args or kwargs: raise Exception((value, args, kwargs)) return __name__ elif op == 'interpid': if value is not None or args or kwargs: raise Exception((value, args, kwargs)) return interpreters.get_current().id elif op == 'closure': if args or kwargs: raise Exception((args, kwargs)) return get_call_func_closure(value) elif op == 'custom': if args or kwargs: raise Exception((args, kwargs)) return Spam(value) elif op == 'custom-inner': if args or kwargs: raise Exception((args, kwargs)) class Eggs(Spam): pass return Eggs(value) elif not isinstance(op, str): raise TypeError(op) else: raise NotImplementedError(op) class TestInterpreterCall(TestBase): # signature # - blank # - args # - kwargs # - args, kwargs # return # - nothing (None) # - simple # - closure # - custom # ops: # - do nothing # - fail # - echo # - do complex, relative to interpreter # scope # - global func # - local closure # - returned closure # - callable type instance # - type # - classmethod # - staticmethod # - instance method # exception # - builtin # - custom # - preserves info (e.g. SyntaxError) # - matching error display def test_call(self): interp = interpreters.create() for i, (callable, args, kwargs) in enumerate([ (call_func_noop, (), {}), (call_func_return_shareable, (), {}), (call_func_return_not_shareable, (), {}), (Spam.noop, (), {}), ]): with self.subTest(f'success case #{i+1}'): res = interp.call(callable) self.assertIs(res, None) for i, (callable, args, kwargs) in enumerate([ (call_func_ident, ('spamspamspam',), {}), (get_call_func_closure, (42,), {}), (get_call_func_closure(42), (), {}), (Spam.from_values, (), {}), (Spam.from_values, (1, 2, 3), {}), (Spam, ('???'), {}), (Spam(101), (), {}), (Spam(10101).run, (), {}), (call_func_complex, ('ident', 'spam'), {}), (call_func_complex, ('full-ident', 'spam'), {}), (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), (call_func_complex, ('globals',), {}), (call_func_complex, ('interpid',), {}), (call_func_complex, ('closure',), {'value': '~~~'}), (call_func_complex, ('custom', 'spam!'), {}), (call_func_complex, ('custom-inner', 'eggs!'), {}), (call_func_complex, ('???',), {'exc': ValueError('spam')}), ]): with self.subTest(f'invalid case #{i+1}'): with self.assertRaises(Exception): if args or kwargs: raise Exception((args, kwargs)) interp.call(callable) with self.assertRaises(interpreters.ExecutionFailed): interp.call(call_func_failure) def test_call_in_thread(self): interp = interpreters.create() for i, (callable, args, kwargs) in enumerate([ (call_func_noop, (), {}), (call_func_return_shareable, (), {}), (call_func_return_not_shareable, (), {}), (Spam.noop, (), {}), ]): with self.subTest(f'success case #{i+1}'): with self.captured_thread_exception() as ctx: t = interp.call_in_thread(callable) t.join() self.assertIsNone(ctx.caught) for i, (callable, args, kwargs) in enumerate([ (call_func_ident, ('spamspamspam',), {}), (get_call_func_closure, (42,), {}), (get_call_func_closure(42), (), {}), (Spam.from_values, (), {}), (Spam.from_values, (1, 2, 3), {}), (Spam, ('???'), {}), (Spam(101), (), {}), (Spam(10101).run, (), {}), (call_func_complex, ('ident', 'spam'), {}), (call_func_complex, ('full-ident', 'spam'), {}), (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), (call_func_complex, ('globals',), {}), (call_func_complex, ('interpid',), {}), (call_func_complex, ('closure',), {'value': '~~~'}), (call_func_complex, ('custom', 'spam!'), {}), (call_func_complex, ('custom-inner', 'eggs!'), {}), (call_func_complex, ('???',), {'exc': ValueError('spam')}), ]): with self.subTest(f'invalid case #{i+1}'): if args or kwargs: continue with self.captured_thread_exception() as ctx: t = interp.call_in_thread(callable) t.join() self.assertIsNotNone(ctx.caught) with self.captured_thread_exception() as ctx: t = interp.call_in_thread(call_func_failure) t.join() self.assertIsNotNone(ctx.caught) class TestIsShareable(TestBase): def test_default_shareables(self): shareables = [ # singletons None, # builtin objects b'spam', 'spam', 10, -10, True, False, 100.0, (), (1, ('spam', 'eggs'), True), ] for obj in shareables: with self.subTest(obj): shareable = interpreters.is_shareable(obj) self.assertTrue(shareable) def test_not_shareable(self): class Cheese: def __init__(self, name): self.name = name def __str__(self): return self.name class SubBytes(bytes): """A subclass of a shareable type.""" not_shareables = [ # singletons NotImplemented, ..., # builtin types and objects type, object, object(), Exception(), # user-defined types and objects Cheese, Cheese('Wensleydale'), SubBytes(b'spam'), ] for obj in not_shareables: with self.subTest(repr(obj)): self.assertFalse( interpreters.is_shareable(obj)) if __name__ == '__main__': # Test needs to be a package, so we can do relative imports. unittest.main()