mirror of https://github.com/python/cpython
161 lines
4.3 KiB
Python
161 lines
4.3 KiB
Python
import contextlib
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from textwrap import dedent
|
|
import threading
|
|
import types
|
|
import unittest
|
|
|
|
from test import support
|
|
from test.support import os_helper
|
|
|
|
from test.support import interpreters
|
|
|
|
|
|
def _captured_script(script):
|
|
r, w = os.pipe()
|
|
indented = script.replace('\n', '\n ')
|
|
wrapped = dedent(f"""
|
|
import contextlib
|
|
with open({w}, 'w', encoding='utf-8') as spipe:
|
|
with contextlib.redirect_stdout(spipe):
|
|
{indented}
|
|
""")
|
|
return wrapped, open(r, encoding='utf-8')
|
|
|
|
|
|
def clean_up_interpreters():
|
|
for interp in interpreters.list_all():
|
|
if interp.id == 0: # main
|
|
continue
|
|
try:
|
|
interp.close()
|
|
except RuntimeError:
|
|
pass # already destroyed
|
|
|
|
|
|
def _run_output(interp, request, init=None):
|
|
script, rpipe = _captured_script(request)
|
|
with rpipe:
|
|
if init:
|
|
interp.prepare_main(init)
|
|
interp.exec(script)
|
|
return rpipe.read()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _running(interp):
|
|
r, w = os.pipe()
|
|
def run():
|
|
interp.exec(dedent(f"""
|
|
# wait for "signal"
|
|
with open({r}) as rpipe:
|
|
rpipe.read()
|
|
"""))
|
|
|
|
t = threading.Thread(target=run)
|
|
t.start()
|
|
|
|
yield
|
|
|
|
with open(w, 'w') as spipe:
|
|
spipe.write('done')
|
|
t.join()
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
|
|
def pipe(self):
|
|
def ensure_closed(fd):
|
|
try:
|
|
os.close(fd)
|
|
except OSError:
|
|
pass
|
|
r, w = os.pipe()
|
|
self.addCleanup(lambda: ensure_closed(r))
|
|
self.addCleanup(lambda: ensure_closed(w))
|
|
return r, w
|
|
|
|
def temp_dir(self):
|
|
tempdir = tempfile.mkdtemp()
|
|
tempdir = os.path.realpath(tempdir)
|
|
self.addCleanup(lambda: os_helper.rmtree(tempdir))
|
|
return tempdir
|
|
|
|
@contextlib.contextmanager
|
|
def captured_thread_exception(self):
|
|
ctx = types.SimpleNamespace(caught=None)
|
|
def excepthook(args):
|
|
ctx.caught = args
|
|
orig_excepthook = threading.excepthook
|
|
threading.excepthook = excepthook
|
|
try:
|
|
yield ctx
|
|
finally:
|
|
threading.excepthook = orig_excepthook
|
|
|
|
def make_script(self, filename, dirname=None, text=None):
|
|
if text:
|
|
text = dedent(text)
|
|
if dirname is None:
|
|
dirname = self.temp_dir()
|
|
filename = os.path.join(dirname, filename)
|
|
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
with open(filename, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(text or '')
|
|
return filename
|
|
|
|
def make_module(self, name, pathentry=None, text=None):
|
|
if text:
|
|
text = dedent(text)
|
|
if pathentry is None:
|
|
pathentry = self.temp_dir()
|
|
else:
|
|
os.makedirs(pathentry, exist_ok=True)
|
|
*subnames, basename = name.split('.')
|
|
|
|
dirname = pathentry
|
|
for subname in subnames:
|
|
dirname = os.path.join(dirname, subname)
|
|
if os.path.isdir(dirname):
|
|
pass
|
|
elif os.path.exists(dirname):
|
|
raise Exception(dirname)
|
|
else:
|
|
os.mkdir(dirname)
|
|
initfile = os.path.join(dirname, '__init__.py')
|
|
if not os.path.exists(initfile):
|
|
with open(initfile, 'w'):
|
|
pass
|
|
filename = os.path.join(dirname, basename + '.py')
|
|
|
|
with open(filename, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(text or '')
|
|
return filename
|
|
|
|
@support.requires_subprocess()
|
|
def run_python(self, *argv):
|
|
proc = subprocess.run(
|
|
[sys.executable, *argv],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
|
|
def assert_python_ok(self, *argv):
|
|
exitcode, stdout, stderr = self.run_python(*argv)
|
|
self.assertNotEqual(exitcode, 1)
|
|
return stdout, stderr
|
|
|
|
def assert_python_failure(self, *argv):
|
|
exitcode, stdout, stderr = self.run_python(*argv)
|
|
self.assertNotEqual(exitcode, 0)
|
|
return stdout, stderr
|
|
|
|
def tearDown(self):
|
|
clean_up_interpreters()
|