diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 63388c957ce..b66f045045f 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -1,5 +1,4 @@ import faulthandler -import json import os import platform import random @@ -9,13 +8,10 @@ import sys import sysconfig import tempfile import textwrap -import traceback import unittest from test.libregrtest.runtest import ( - findtests, runtest, run_test_in_subprocess, - STDTESTS, NOTTESTS, - PASSED, FAILED, ENV_CHANGED, SKIPPED, - RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR) + findtests, runtest, + STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED) from test.libregrtest.refleak import warm_caches from test.libregrtest.cmdline import _parse_args from test import support @@ -39,23 +35,6 @@ else: TEMPDIR = os.path.abspath(TEMPDIR) -def slave_runner(slaveargs): - args, kwargs = json.loads(slaveargs) - if kwargs.get('huntrleaks'): - unittest.BaseTestSuite._cleanup = False - try: - result = runtest(*args, **kwargs) - except KeyboardInterrupt: - result = INTERRUPTED, '' - except BaseException as e: - traceback.print_exc() - result = CHILD_ERROR, str(e) - sys.stdout.flush() - print() # Force a newline (just in case) - print(json.dumps(result)) - sys.exit(0) - - def setup_python(): # Display the Python traceback on fatal errors (e.g. segfault) faulthandler.enable(all_threads=True) @@ -367,75 +346,6 @@ class Regrtest: print(count(len(self.bad), 'test'), "failed again:") printlist(self.bad) - def _run_tests_mp(self): - try: - from threading import Thread - except ImportError: - print("Multiprocess option requires thread support") - sys.exit(2) - from queue import Queue - - debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$") - output = Queue() - pending = MultiprocessTests(self.tests) - - def work(): - # A worker thread. - try: - while True: - try: - test = next(pending) - except StopIteration: - output.put((None, None, None, None)) - return - retcode, stdout, stderr = run_test_in_subprocess(test, self.ns) - # Strip last refcount output line if it exists, since it - # comes from the shutdown of the interpreter in the subcommand. - stderr = debug_output_pat.sub("", stderr) - stdout, _, result = stdout.strip().rpartition("\n") - if retcode != 0: - result = (CHILD_ERROR, "Exit code %s" % retcode) - output.put((test, stdout.rstrip(), stderr.rstrip(), result)) - return - if not result: - output.put((None, None, None, None)) - return - result = json.loads(result) - output.put((test, stdout.rstrip(), stderr.rstrip(), result)) - except BaseException: - output.put((None, None, None, None)) - raise - - workers = [Thread(target=work) for i in range(self.ns.use_mp)] - for worker in workers: - worker.start() - finished = 0 - test_index = 1 - try: - while finished < self.ns.use_mp: - test, stdout, stderr, result = output.get() - if test is None: - finished += 1 - continue - self.accumulate_result(test, result) - self.display_progress(test_index, test) - if stdout: - print(stdout) - if stderr: - print(stderr, file=sys.stderr) - sys.stdout.flush() - sys.stderr.flush() - if result[0] == INTERRUPTED: - raise KeyboardInterrupt - if result[0] == CHILD_ERROR: - raise Exception("Child error on {}: {}".format(test, result[1])) - test_index += 1 - except KeyboardInterrupt: - self.interrupted = True - pending.interrupted = True - for worker in workers: - worker.join() - def _run_tests_sequential(self): save_modules = sys.modules.keys() @@ -491,7 +401,8 @@ class Regrtest: self.test_count_width = len(self.test_count) - 1 if self.ns.use_mp: - self._run_tests_mp() + from test.libregrtest.runtest_mp import run_tests_multiprocess + run_tests_multiprocess(self) else: self._run_tests_sequential() @@ -518,7 +429,8 @@ class Regrtest: if self.ns.wait: input("Press any key to continue...") if self.ns.slaveargs is not None: - slave_runner(self.ns.slaveargs) + from test.libregrtest.runtest_mp import run_tests_slave + run_tests_slave(self.ns.slaveargs) self.find_tests(tests) self.run_tests() self.display_result() @@ -526,26 +438,6 @@ class Regrtest: sys.exit(len(self.bad) > 0 or self.interrupted) -# We do not use a generator so multiple threads can call next(). -class MultiprocessTests(object): - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests): - self.interrupted = False - self.lock = threading.Lock() - self.tests = tests - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.interrupted: - raise StopIteration('tests interrupted') - return next(self.tests) - - def replace_stdout(): """Set stdout encoder error handler to backslashreplace (as stderr error handler) to avoid UnicodeEncodeError when printing a traceback""" diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py index d8c0eb2a868..60b8f5f5380 100644 --- a/Lib/test/libregrtest/runtest.py +++ b/Lib/test/libregrtest/runtest.py @@ -1,7 +1,6 @@ import faulthandler import importlib import io -import json import os import sys import time @@ -22,38 +21,6 @@ INTERRUPTED = -4 CHILD_ERROR = -5 # error in a child process -def run_test_in_subprocess(testname, ns): - """Run the given test in a subprocess with --slaveargs. - - ns is the option Namespace parsed from command-line arguments. regrtest - is invoked in a subprocess with the --slaveargs argument; when the - subprocess exits, its return code, stdout and stderr are returned as a - 3-tuple. - """ - from subprocess import Popen, PIPE - base_cmd = ([sys.executable] + support.args_from_interpreter_flags() + - ['-X', 'faulthandler', '-m', 'test.regrtest']) - - slaveargs = ( - (testname, ns.verbose, ns.quiet), - dict(huntrleaks=ns.huntrleaks, - use_resources=ns.use_resources, - output_on_failure=ns.verbose3, - timeout=ns.timeout, failfast=ns.failfast, - match_tests=ns.match_tests)) - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)], - stdout=PIPE, stderr=PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD) - stdout, stderr = popen.communicate() - retcode = popen.wait() - return retcode, stdout, stderr - - # small set of tests to determine if we have a basically functioning interpreter # (i.e. if any of these fail, then anything else is likely to follow) STDTESTS = [ diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py new file mode 100644 index 00000000000..b55c11afb53 --- /dev/null +++ b/Lib/test/libregrtest/runtest_mp.py @@ -0,0 +1,158 @@ +import json +import os +import re +import sys +import traceback +import unittest +from queue import Queue +from test import support +try: + import threading +except ImportError: + print("Multiprocess option requires thread support") + sys.exit(2) + +from test.libregrtest.runtest import runtest, INTERRUPTED, CHILD_ERROR + + +debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$") + + +def run_tests_in_subprocess(testname, ns): + """Run the given test in a subprocess with --slaveargs. + + ns is the option Namespace parsed from command-line arguments. regrtest + is invoked in a subprocess with the --slaveargs argument; when the + subprocess exits, its return code, stdout and stderr are returned as a + 3-tuple. + """ + from subprocess import Popen, PIPE + base_cmd = ([sys.executable] + support.args_from_interpreter_flags() + + ['-X', 'faulthandler', '-m', 'test.regrtest']) + + slaveargs = ( + (testname, ns.verbose, ns.quiet), + dict(huntrleaks=ns.huntrleaks, + use_resources=ns.use_resources, + output_on_failure=ns.verbose3, + timeout=ns.timeout, failfast=ns.failfast, + match_tests=ns.match_tests)) + # Running the child from the same working directory as regrtest's original + # invocation ensures that TEMPDIR for the child is the same when + # sysconfig.is_python_build() is true. See issue 15300. + popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)], + stdout=PIPE, stderr=PIPE, + universal_newlines=True, + close_fds=(os.name != 'nt'), + cwd=support.SAVEDCWD) + stdout, stderr = popen.communicate() + retcode = popen.wait() + return retcode, stdout, stderr + + +def run_tests_slave(slaveargs): + args, kwargs = json.loads(slaveargs) + if kwargs.get('huntrleaks'): + unittest.BaseTestSuite._cleanup = False + try: + result = runtest(*args, **kwargs) + except KeyboardInterrupt: + result = INTERRUPTED, '' + except BaseException as e: + traceback.print_exc() + result = CHILD_ERROR, str(e) + sys.stdout.flush() + print() # Force a newline (just in case) + print(json.dumps(result)) + sys.exit(0) + + +# We do not use a generator so multiple threads can call next(). +class MultiprocessIterator: + + """A thread-safe iterator over tests for multiprocess mode.""" + + def __init__(self, tests): + self.interrupted = False + self.lock = threading.Lock() + self.tests = tests + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + if self.interrupted: + raise StopIteration('tests interrupted') + return next(self.tests) + + +class MultiprocessThread(threading.Thread): + def __init__(self, pending, output, ns): + super().__init__() + self.pending = pending + self.output = output + self.ns = ns + + def run(self): + # A worker thread. + try: + while True: + try: + test = next(self.pending) + except StopIteration: + self.output.put((None, None, None, None)) + return + retcode, stdout, stderr = run_tests_in_subprocess(test, self.ns) + # Strip last refcount output line if it exists, since it + # comes from the shutdown of the interpreter in the subcommand. + stderr = debug_output_pat.sub("", stderr) + stdout, _, result = stdout.strip().rpartition("\n") + if retcode != 0: + result = (CHILD_ERROR, "Exit code %s" % retcode) + self.output.put((test, stdout.rstrip(), stderr.rstrip(), result)) + return + if not result: + self.output.put((None, None, None, None)) + return + result = json.loads(result) + self.output.put((test, stdout.rstrip(), stderr.rstrip(), result)) + except BaseException: + self.output.put((None, None, None, None)) + raise + + +def run_tests_multiprocess(regrtest): + output = Queue() + pending = MultiprocessIterator(regrtest.tests) + + workers = [MultiprocessThread(pending, output, regrtest.ns) + for i in range(regrtest.ns.use_mp)] + for worker in workers: + worker.start() + finished = 0 + test_index = 1 + try: + while finished < regrtest.ns.use_mp: + test, stdout, stderr, result = output.get() + if test is None: + finished += 1 + continue + regrtest.accumulate_result(test, result) + regrtest.display_progress(test_index, test) + if stdout: + print(stdout) + if stderr: + print(stderr, file=sys.stderr) + sys.stdout.flush() + sys.stderr.flush() + if result[0] == INTERRUPTED: + raise KeyboardInterrupt + if result[0] == CHILD_ERROR: + raise Exception("Child error on {}: {}".format(test, result[1])) + test_index += 1 + except KeyboardInterrupt: + regrtest.interrupted = True + pending.interrupted = True + for worker in workers: + worker.join()