from collections import namedtuple import contextlib import json import io import os import os.path import pickle import queue #import select import subprocess import sys import tempfile from textwrap import dedent, indent import threading import types import unittest import warnings from test import support from test.support import os_helper from test.support import import_helper _interpreters = import_helper.import_module('_xxsubinterpreters') from test.support import interpreters try: import _testinternalcapi import _testcapi except ImportError: _testinternalcapi = None _testcapi = None def requires_test_modules(func): return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func) def _dump_script(text): lines = text.splitlines() print() print('-' * 20) for i, line in enumerate(lines, 1): print(f' {i:>{len(str(len(lines)))}} {line}') print('-' * 20) def _close_file(file): try: if hasattr(file, 'close'): file.close() else: os.close(file) except OSError as exc: if exc.errno != 9: raise # re-raise # It was closed already. def pack_exception(exc=None): captured = _interpreters.capture_exception(exc) data = dict(captured.__dict__) data['type'] = dict(captured.type.__dict__) return json.dumps(data) def unpack_exception(packed): try: data = json.loads(packed) except json.decoder.JSONDecodeError: warnings.warn('incomplete exception data', RuntimeWarning) print(packed if isinstance(packed, str) else packed.decode('utf-8')) return None exc = types.SimpleNamespace(**data) exc.type = types.SimpleNamespace(**exc.type) return exc; class CapturingResults: STDIO = dedent("""\ with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}: _captured_std{stream} = io.StringIO() with contextlib.redirect_std{stream}(_captured_std{stream}): ######################### # begin wrapped script {indented} # end wrapped script ######################### text = _captured_std{stream}.getvalue() _spipe_{stream}.write(text.encode('utf-8')) """)[:-1] EXC = dedent("""\ with open({w_pipe}, 'wb', buffering=0) as _spipe_exc: try: ######################### # begin wrapped script {indented} # end wrapped script ######################### except Exception as exc: text = _interp_utils.pack_exception(exc) _spipe_exc.write(text.encode('utf-8')) """)[:-1] @classmethod def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False): script = dedent(script).strip(os.linesep) imports = [ f'import {__name__} as _interp_utils', ] wrapped = script # Handle exc. if exc: exc = os.pipe() r_exc, w_exc = exc indented = wrapped.replace('\n', '\n ') wrapped = cls.EXC.format( w_pipe=w_exc, indented=indented, ) else: exc = None # Handle stdout. if stdout: imports.extend([ 'import contextlib, io', ]) stdout = os.pipe() r_out, w_out = stdout indented = wrapped.replace('\n', '\n ') wrapped = cls.STDIO.format( w_pipe=w_out, indented=indented, stream='out', ) else: stdout = None # Handle stderr. if stderr == 'stdout': stderr = None elif stderr: if not stdout: imports.extend([ 'import contextlib, io', ]) stderr = os.pipe() r_err, w_err = stderr indented = wrapped.replace('\n', '\n ') wrapped = cls.STDIO.format( w_pipe=w_err, indented=indented, stream='err', ) else: stderr = None if wrapped == script: raise NotImplementedError else: for line in imports: wrapped = f'{line}{os.linesep}{wrapped}' results = cls(stdout, stderr, exc) return wrapped, results def __init__(self, out, err, exc): self._rf_out = None self._rf_err = None self._rf_exc = None self._w_out = None self._w_err = None self._w_exc = None if out is not None: r_out, w_out = out self._rf_out = open(r_out, 'rb', buffering=0) self._w_out = w_out if err is not None: r_err, w_err = err self._rf_err = open(r_err, 'rb', buffering=0) self._w_err = w_err if exc is not None: r_exc, w_exc = exc self._rf_exc = open(r_exc, 'rb', buffering=0) self._w_exc = w_exc self._buf_out = b'' self._buf_err = b'' self._buf_exc = b'' self._exc = None self._closed = False def __enter__(self): return self def __exit__(self, *args): self.close() @property def closed(self): return self._closed def close(self): if self._closed: return self._closed = True if self._w_out is not None: _close_file(self._w_out) self._w_out = None if self._w_err is not None: _close_file(self._w_err) self._w_err = None if self._w_exc is not None: _close_file(self._w_exc) self._w_exc = None self._capture() if self._rf_out is not None: _close_file(self._rf_out) self._rf_out = None if self._rf_err is not None: _close_file(self._rf_err) self._rf_err = None if self._rf_exc is not None: _close_file(self._rf_exc) self._rf_exc = None def _capture(self): # Ideally this is called only after the script finishes # (and thus has closed the write end of the pipe. if self._rf_out is not None: chunk = self._rf_out.read(100) while chunk: self._buf_out += chunk chunk = self._rf_out.read(100) if self._rf_err is not None: chunk = self._rf_err.read(100) while chunk: self._buf_err += chunk chunk = self._rf_err.read(100) if self._rf_exc is not None: chunk = self._rf_exc.read(100) while chunk: self._buf_exc += chunk chunk = self._rf_exc.read(100) def _unpack_stdout(self): return self._buf_out.decode('utf-8') def _unpack_stderr(self): return self._buf_err.decode('utf-8') def _unpack_exc(self): if self._exc is not None: return self._exc if not self._buf_exc: return None self._exc = unpack_exception(self._buf_exc) return self._exc def stdout(self): if self.closed: return self.final().stdout self._capture() return self._unpack_stdout() def stderr(self): if self.closed: return self.final().stderr self._capture() return self._unpack_stderr() def exc(self): if self.closed: return self.final().exc self._capture() return self._unpack_exc() def final(self, *, force=False): try: return self._final except AttributeError: if not self._closed: if not force: raise Exception('no final results available yet') else: return CapturedResults.Proxy(self) self._final = CapturedResults( self._unpack_stdout(), self._unpack_stderr(), self._unpack_exc(), ) return self._final class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')): class Proxy: def __init__(self, capturing): self._capturing = capturing def _finish(self): if self._capturing is None: return self._final = self._capturing.final() self._capturing = None def __iter__(self): self._finish() yield from self._final def __len__(self): self._finish() return len(self._final) def __getattr__(self, name): self._finish() if name.startswith('_'): raise AttributeError(name) return getattr(self._final, name) def raise_if_failed(self): if self.exc is not None: raise interpreters.ExecutionFailed(self.exc) def _captured_script(script, *, stdout=True, stderr=False, exc=False): return CapturingResults.wrap_script( script, stdout=stdout, stderr=stderr, exc=exc, ) def clean_up_interpreters(): for interp in interpreters.list_all(): if interp.id == 0: # main continue try: interp.close() except _interpreters.InterpreterError: pass # already destroyed def _run_output(interp, request, init=None): script, results = _captured_script(request) with results: if init: interp.prepare_main(init) interp.exec(script) return results.stdout() @contextlib.contextmanager def _running(interp): r, w = os.pipe() def run(): interp.exec(dedent(f""" # wait for "signal" with open({r}) as rpipe: rpipe.read() """)) t = threading.Thread(target=run) t.start() yield with open(w, 'w') as spipe: spipe.write('done') t.join() class TestBase(unittest.TestCase): def tearDown(self): clean_up_interpreters() def pipe(self): def ensure_closed(fd): try: os.close(fd) except OSError: pass r, w = os.pipe() self.addCleanup(lambda: ensure_closed(r)) self.addCleanup(lambda: ensure_closed(w)) return r, w def temp_dir(self): tempdir = tempfile.mkdtemp() tempdir = os.path.realpath(tempdir) self.addCleanup(lambda: os_helper.rmtree(tempdir)) return tempdir @contextlib.contextmanager def captured_thread_exception(self): ctx = types.SimpleNamespace(caught=None) def excepthook(args): ctx.caught = args orig_excepthook = threading.excepthook threading.excepthook = excepthook try: yield ctx finally: threading.excepthook = orig_excepthook def make_script(self, filename, dirname=None, text=None): if text: text = dedent(text) if dirname is None: dirname = self.temp_dir() filename = os.path.join(dirname, filename) os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'w', encoding='utf-8') as outfile: outfile.write(text or '') return filename def make_module(self, name, pathentry=None, text=None): if text: text = dedent(text) if pathentry is None: pathentry = self.temp_dir() else: os.makedirs(pathentry, exist_ok=True) *subnames, basename = name.split('.') dirname = pathentry for subname in subnames: dirname = os.path.join(dirname, subname) if os.path.isdir(dirname): pass elif os.path.exists(dirname): raise Exception(dirname) else: os.mkdir(dirname) initfile = os.path.join(dirname, '__init__.py') if not os.path.exists(initfile): with open(initfile, 'w'): pass filename = os.path.join(dirname, basename + '.py') with open(filename, 'w', encoding='utf-8') as outfile: outfile.write(text or '') return filename @support.requires_subprocess() def run_python(self, *argv): proc = subprocess.run( [sys.executable, *argv], capture_output=True, text=True, ) return proc.returncode, proc.stdout, proc.stderr def assert_python_ok(self, *argv): exitcode, stdout, stderr = self.run_python(*argv) self.assertNotEqual(exitcode, 1) return stdout, stderr def assert_python_failure(self, *argv): exitcode, stdout, stderr = self.run_python(*argv) self.assertNotEqual(exitcode, 0) return stdout, stderr def assert_ns_equal(self, ns1, ns2, msg=None): # This is mostly copied from TestCase.assertDictEqual. self.assertEqual(type(ns1), type(ns2)) if ns1 == ns2: return import difflib import pprint from unittest.util import _common_shorten_repr standardMsg = '%s != %s' % _common_shorten_repr(ns1, ns2) diff = ('\n' + '\n'.join(difflib.ndiff( pprint.pformat(vars(ns1)).splitlines(), pprint.pformat(vars(ns2)).splitlines()))) diff = f'namespace({diff})' standardMsg = self._truncateMessage(standardMsg, diff) self.fail(self._formatMessage(msg, standardMsg)) def _run_string(self, interp, script): wrapped, results = _captured_script(script, exc=False) #_dump_script(wrapped) with results: if isinstance(interp, interpreters.Interpreter): interp.exec(script) else: err = _interpreters.run_string(interp, wrapped) if err is not None: return None, err return results.stdout(), None def run_and_capture(self, interp, script): text, err = self._run_string(interp, script) if err is not None: raise interpreters.ExecutionFailed(err) else: return text def interp_exists(self, interpid): try: _interpreters.whence(interpid) except _interpreters.InterpreterNotFoundError: return False else: return True @requires_test_modules @contextlib.contextmanager def interpreter_from_capi(self, config=None, whence=None): if config is False: if whence is None: whence = _interpreters.WHENCE_LEGACY_CAPI else: assert whence in (_interpreters.WHENCE_LEGACY_CAPI, _interpreters.WHENCE_UNKNOWN), repr(whence) config = None elif config is True: config = _interpreters.new_config('default') elif config is None: if whence not in ( _interpreters.WHENCE_LEGACY_CAPI, _interpreters.WHENCE_UNKNOWN, ): config = _interpreters.new_config('legacy') elif isinstance(config, str): config = _interpreters.new_config(config) if whence is None: whence = _interpreters.WHENCE_XI interpid = _testinternalcapi.create_interpreter(config, whence=whence) try: yield interpid finally: try: _testinternalcapi.destroy_interpreter(interpid) except _interpreters.InterpreterNotFoundError: pass @contextlib.contextmanager def interpreter_obj_from_capi(self, config='legacy'): with self.interpreter_from_capi(config) as interpid: interp = interpreters.Interpreter( interpid, _whence=_interpreters.WHENCE_CAPI, _ownsref=False, ) yield interp, interpid @contextlib.contextmanager def capturing(self, script): wrapped, capturing = _captured_script(script, stdout=True, exc=True) #_dump_script(wrapped) with capturing: yield wrapped, capturing.final(force=True) @requires_test_modules def run_from_capi(self, interpid, script, *, main=False): with self.capturing(script) as (wrapped, results): rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main) assert rc == 0, rc results.raise_if_failed() return results.stdout @contextlib.contextmanager def _running(self, run_interp, exec_interp): token = b'\0' r_in, w_in = self.pipe() r_out, w_out = self.pipe() def close(): _close_file(r_in) _close_file(w_in) _close_file(r_out) _close_file(w_out) # Start running (and wait). script = dedent(f""" import os try: # handshake token = os.read({r_in}, 1) os.write({w_out}, token) # Wait for the "done" message. os.read({r_in}, 1) except BrokenPipeError: pass except OSError as exc: if exc.errno != 9: raise # re-raise # It was closed already. """) failed = None def run(): nonlocal failed try: run_interp(script) except Exception as exc: failed = exc close() t = threading.Thread(target=run) t.start() # handshake try: os.write(w_in, token) token2 = os.read(r_out, 1) assert token2 == token, (token2, token) except OSError: t.join() if failed is not None: raise failed # CM __exit__() try: try: yield finally: # Send "done". os.write(w_in, b'\0') finally: close() t.join() if failed is not None: raise failed @contextlib.contextmanager def running(self, interp): if isinstance(interp, int): interpid = interp def exec_interp(script): exc = _interpreters.exec(interpid, script) assert exc is None, exc run_interp = exec_interp else: def run_interp(script): text = self.run_and_capture(interp, script) assert text == '', repr(text) def exec_interp(script): interp.exec(script) with self._running(run_interp, exec_interp): yield @requires_test_modules @contextlib.contextmanager def running_from_capi(self, interpid, *, main=False): def run_interp(script): text = self.run_from_capi(interpid, script, main=main) assert text == '', repr(text) def exec_interp(script): rc = _testinternalcapi.exec_interpreter(interpid, script) assert rc == 0, rc with self._running(run_interp, exec_interp): yield @requires_test_modules def run_temp_from_capi(self, script, config='legacy'): if config is False: # Force using Py_NewInterpreter(). run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s)) config = None else: run_in_interp = _testinternalcapi.run_in_subinterp_with_config if config is True: config = 'default' if isinstance(config, str): config = _interpreters.new_config(config) with self.capturing(script) as (wrapped, results): rc = run_in_interp(wrapped, config) assert rc == 0, rc results.raise_if_failed() return results.stdout