Issue #25220: Create libregrtest/runtest_mp.py

Move the code to run tests in multiple processes using threading and subprocess
to a new submodule.

Move also slave_runner() (renamed to run_tests_slave()) and
run_test_in_subprocess() (renamed to run_tests_in_subprocess()) there.
This commit is contained in:
Victor Stinner 2015-09-29 23:15:38 +02:00
parent dad20e4876
commit 56e05dd0b0
3 changed files with 164 additions and 147 deletions

View File

@ -1,5 +1,4 @@
import faulthandler import faulthandler
import json
import os import os
import platform import platform
import random import random
@ -9,13 +8,10 @@ import sys
import sysconfig import sysconfig
import tempfile import tempfile
import textwrap import textwrap
import traceback
import unittest import unittest
from test.libregrtest.runtest import ( from test.libregrtest.runtest import (
findtests, runtest, run_test_in_subprocess, findtests, runtest,
STDTESTS, NOTTESTS, STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED)
PASSED, FAILED, ENV_CHANGED, SKIPPED,
RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR)
from test.libregrtest.refleak import warm_caches from test.libregrtest.refleak import warm_caches
from test.libregrtest.cmdline import _parse_args from test.libregrtest.cmdline import _parse_args
from test import support from test import support
@ -39,23 +35,6 @@ else:
TEMPDIR = os.path.abspath(TEMPDIR) TEMPDIR = os.path.abspath(TEMPDIR)
def slave_runner(slaveargs):
args, kwargs = json.loads(slaveargs)
if kwargs.get('huntrleaks'):
unittest.BaseTestSuite._cleanup = False
try:
result = runtest(*args, **kwargs)
except KeyboardInterrupt:
result = INTERRUPTED, ''
except BaseException as e:
traceback.print_exc()
result = CHILD_ERROR, str(e)
sys.stdout.flush()
print() # Force a newline (just in case)
print(json.dumps(result))
sys.exit(0)
def setup_python(): def setup_python():
# Display the Python traceback on fatal errors (e.g. segfault) # Display the Python traceback on fatal errors (e.g. segfault)
faulthandler.enable(all_threads=True) faulthandler.enable(all_threads=True)
@ -367,75 +346,6 @@ class Regrtest:
print(count(len(self.bad), 'test'), "failed again:") print(count(len(self.bad), 'test'), "failed again:")
printlist(self.bad) printlist(self.bad)
def _run_tests_mp(self):
try:
from threading import Thread
except ImportError:
print("Multiprocess option requires thread support")
sys.exit(2)
from queue import Queue
debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
output = Queue()
pending = MultiprocessTests(self.tests)
def work():
# A worker thread.
try:
while True:
try:
test = next(pending)
except StopIteration:
output.put((None, None, None, None))
return
retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
# Strip last refcount output line if it exists, since it
# comes from the shutdown of the interpreter in the subcommand.
stderr = debug_output_pat.sub("", stderr)
stdout, _, result = stdout.strip().rpartition("\n")
if retcode != 0:
result = (CHILD_ERROR, "Exit code %s" % retcode)
output.put((test, stdout.rstrip(), stderr.rstrip(), result))
return
if not result:
output.put((None, None, None, None))
return
result = json.loads(result)
output.put((test, stdout.rstrip(), stderr.rstrip(), result))
except BaseException:
output.put((None, None, None, None))
raise
workers = [Thread(target=work) for i in range(self.ns.use_mp)]
for worker in workers:
worker.start()
finished = 0
test_index = 1
try:
while finished < self.ns.use_mp:
test, stdout, stderr, result = output.get()
if test is None:
finished += 1
continue
self.accumulate_result(test, result)
self.display_progress(test_index, test)
if stdout:
print(stdout)
if stderr:
print(stderr, file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
if result[0] == INTERRUPTED:
raise KeyboardInterrupt
if result[0] == CHILD_ERROR:
raise Exception("Child error on {}: {}".format(test, result[1]))
test_index += 1
except KeyboardInterrupt:
self.interrupted = True
pending.interrupted = True
for worker in workers:
worker.join()
def _run_tests_sequential(self): def _run_tests_sequential(self):
save_modules = sys.modules.keys() save_modules = sys.modules.keys()
@ -491,7 +401,8 @@ class Regrtest:
self.test_count_width = len(self.test_count) - 1 self.test_count_width = len(self.test_count) - 1
if self.ns.use_mp: if self.ns.use_mp:
self._run_tests_mp() from test.libregrtest.runtest_mp import run_tests_multiprocess
run_tests_multiprocess(self)
else: else:
self._run_tests_sequential() self._run_tests_sequential()
@ -518,7 +429,8 @@ class Regrtest:
if self.ns.wait: if self.ns.wait:
input("Press any key to continue...") input("Press any key to continue...")
if self.ns.slaveargs is not None: if self.ns.slaveargs is not None:
slave_runner(self.ns.slaveargs) from test.libregrtest.runtest_mp import run_tests_slave
run_tests_slave(self.ns.slaveargs)
self.find_tests(tests) self.find_tests(tests)
self.run_tests() self.run_tests()
self.display_result() self.display_result()
@ -526,26 +438,6 @@ class Regrtest:
sys.exit(len(self.bad) > 0 or self.interrupted) sys.exit(len(self.bad) > 0 or self.interrupted)
# We do not use a generator so multiple threads can call next().
class MultiprocessTests(object):
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests):
self.interrupted = False
self.lock = threading.Lock()
self.tests = tests
def __iter__(self):
return self
def __next__(self):
with self.lock:
if self.interrupted:
raise StopIteration('tests interrupted')
return next(self.tests)
def replace_stdout(): def replace_stdout():
"""Set stdout encoder error handler to backslashreplace (as stderr error """Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback""" handler) to avoid UnicodeEncodeError when printing a traceback"""

View File

@ -1,7 +1,6 @@
import faulthandler import faulthandler
import importlib import importlib
import io import io
import json
import os import os
import sys import sys
import time import time
@ -22,38 +21,6 @@ INTERRUPTED = -4
CHILD_ERROR = -5 # error in a child process CHILD_ERROR = -5 # error in a child process
def run_test_in_subprocess(testname, ns):
"""Run the given test in a subprocess with --slaveargs.
ns is the option Namespace parsed from command-line arguments. regrtest
is invoked in a subprocess with the --slaveargs argument; when the
subprocess exits, its return code, stdout and stderr are returned as a
3-tuple.
"""
from subprocess import Popen, PIPE
base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
['-X', 'faulthandler', '-m', 'test.regrtest'])
slaveargs = (
(testname, ns.verbose, ns.quiet),
dict(huntrleaks=ns.huntrleaks,
use_resources=ns.use_resources,
output_on_failure=ns.verbose3,
timeout=ns.timeout, failfast=ns.failfast,
match_tests=ns.match_tests))
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
stdout=PIPE, stderr=PIPE,
universal_newlines=True,
close_fds=(os.name != 'nt'),
cwd=support.SAVEDCWD)
stdout, stderr = popen.communicate()
retcode = popen.wait()
return retcode, stdout, stderr
# small set of tests to determine if we have a basically functioning interpreter # small set of tests to determine if we have a basically functioning interpreter
# (i.e. if any of these fail, then anything else is likely to follow) # (i.e. if any of these fail, then anything else is likely to follow)
STDTESTS = [ STDTESTS = [

View File

@ -0,0 +1,158 @@
import json
import os
import re
import sys
import traceback
import unittest
from queue import Queue
from test import support
try:
import threading
except ImportError:
print("Multiprocess option requires thread support")
sys.exit(2)
from test.libregrtest.runtest import runtest, INTERRUPTED, CHILD_ERROR
debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
def run_tests_in_subprocess(testname, ns):
"""Run the given test in a subprocess with --slaveargs.
ns is the option Namespace parsed from command-line arguments. regrtest
is invoked in a subprocess with the --slaveargs argument; when the
subprocess exits, its return code, stdout and stderr are returned as a
3-tuple.
"""
from subprocess import Popen, PIPE
base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
['-X', 'faulthandler', '-m', 'test.regrtest'])
slaveargs = (
(testname, ns.verbose, ns.quiet),
dict(huntrleaks=ns.huntrleaks,
use_resources=ns.use_resources,
output_on_failure=ns.verbose3,
timeout=ns.timeout, failfast=ns.failfast,
match_tests=ns.match_tests))
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
stdout=PIPE, stderr=PIPE,
universal_newlines=True,
close_fds=(os.name != 'nt'),
cwd=support.SAVEDCWD)
stdout, stderr = popen.communicate()
retcode = popen.wait()
return retcode, stdout, stderr
def run_tests_slave(slaveargs):
args, kwargs = json.loads(slaveargs)
if kwargs.get('huntrleaks'):
unittest.BaseTestSuite._cleanup = False
try:
result = runtest(*args, **kwargs)
except KeyboardInterrupt:
result = INTERRUPTED, ''
except BaseException as e:
traceback.print_exc()
result = CHILD_ERROR, str(e)
sys.stdout.flush()
print() # Force a newline (just in case)
print(json.dumps(result))
sys.exit(0)
# We do not use a generator so multiple threads can call next().
class MultiprocessIterator:
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests):
self.interrupted = False
self.lock = threading.Lock()
self.tests = tests
def __iter__(self):
return self
def __next__(self):
with self.lock:
if self.interrupted:
raise StopIteration('tests interrupted')
return next(self.tests)
class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns):
super().__init__()
self.pending = pending
self.output = output
self.ns = ns
def run(self):
# A worker thread.
try:
while True:
try:
test = next(self.pending)
except StopIteration:
self.output.put((None, None, None, None))
return
retcode, stdout, stderr = run_tests_in_subprocess(test, self.ns)
# Strip last refcount output line if it exists, since it
# comes from the shutdown of the interpreter in the subcommand.
stderr = debug_output_pat.sub("", stderr)
stdout, _, result = stdout.strip().rpartition("\n")
if retcode != 0:
result = (CHILD_ERROR, "Exit code %s" % retcode)
self.output.put((test, stdout.rstrip(), stderr.rstrip(), result))
return
if not result:
self.output.put((None, None, None, None))
return
result = json.loads(result)
self.output.put((test, stdout.rstrip(), stderr.rstrip(), result))
except BaseException:
self.output.put((None, None, None, None))
raise
def run_tests_multiprocess(regrtest):
output = Queue()
pending = MultiprocessIterator(regrtest.tests)
workers = [MultiprocessThread(pending, output, regrtest.ns)
for i in range(regrtest.ns.use_mp)]
for worker in workers:
worker.start()
finished = 0
test_index = 1
try:
while finished < regrtest.ns.use_mp:
test, stdout, stderr, result = output.get()
if test is None:
finished += 1
continue
regrtest.accumulate_result(test, result)
regrtest.display_progress(test_index, test)
if stdout:
print(stdout)
if stderr:
print(stderr, file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
if result[0] == INTERRUPTED:
raise KeyboardInterrupt
if result[0] == CHILD_ERROR:
raise Exception("Child error on {}: {}".format(test, result[1]))
test_index += 1
except KeyboardInterrupt:
regrtest.interrupted = True
pending.interrupted = True
for worker in workers:
worker.join()