mirror of https://github.com/python/cpython
76 lines
1.7 KiB
Python
76 lines
1.7 KiB
Python
import contextlib
|
|
import os
|
|
import threading
|
|
from textwrap import dedent
|
|
import unittest
|
|
|
|
from test.support import interpreters
|
|
|
|
|
|
def _captured_script(script):
|
|
r, w = os.pipe()
|
|
indented = script.replace('\n', '\n ')
|
|
wrapped = dedent(f"""
|
|
import contextlib
|
|
with open({w}, 'w', encoding='utf-8') as spipe:
|
|
with contextlib.redirect_stdout(spipe):
|
|
{indented}
|
|
""")
|
|
return wrapped, open(r, encoding='utf-8')
|
|
|
|
|
|
def clean_up_interpreters():
|
|
for interp in interpreters.list_all():
|
|
if interp.id == 0: # main
|
|
continue
|
|
try:
|
|
interp.close()
|
|
except RuntimeError:
|
|
pass # already destroyed
|
|
|
|
|
|
def _run_output(interp, request, init=None):
|
|
script, rpipe = _captured_script(request)
|
|
with rpipe:
|
|
if init:
|
|
interp.prepare_main(init)
|
|
interp.exec_sync(script)
|
|
return rpipe.read()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _running(interp):
|
|
r, w = os.pipe()
|
|
def run():
|
|
interp.exec_sync(dedent(f"""
|
|
# wait for "signal"
|
|
with open({r}) as rpipe:
|
|
rpipe.read()
|
|
"""))
|
|
|
|
t = threading.Thread(target=run)
|
|
t.start()
|
|
|
|
yield
|
|
|
|
with open(w, 'w') as spipe:
|
|
spipe.write('done')
|
|
t.join()
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
|
|
def pipe(self):
|
|
def ensure_closed(fd):
|
|
try:
|
|
os.close(fd)
|
|
except OSError:
|
|
pass
|
|
r, w = os.pipe()
|
|
self.addCleanup(lambda: ensure_closed(r))
|
|
self.addCleanup(lambda: ensure_closed(w))
|
|
return r, w
|
|
|
|
def tearDown(self):
|
|
clean_up_interpreters()
|