205 lines
6.5 KiB
Python
205 lines
6.5 KiB
Python
'''Test runner and result class for the regression test suite.
|
|
|
|
'''
|
|
|
|
import functools
|
|
import io
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import unittest
|
|
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from datetime import datetime
|
|
|
|
class RegressionTestResult(unittest.TextTestResult):
|
|
separator1 = '=' * 70 + '\n'
|
|
separator2 = '-' * 70 + '\n'
|
|
|
|
def __init__(self, stream, descriptions, verbosity):
|
|
super().__init__(stream=stream, descriptions=descriptions, verbosity=0)
|
|
self.buffer = True
|
|
self.__suite = ET.Element('testsuite')
|
|
self.__suite.set('start', datetime.utcnow().isoformat(' '))
|
|
|
|
self.__e = None
|
|
self.__start_time = None
|
|
self.__results = []
|
|
self.__verbose = bool(verbosity)
|
|
|
|
@classmethod
|
|
def __getId(cls, test):
|
|
try:
|
|
test_id = test.id
|
|
except AttributeError:
|
|
return str(test)
|
|
try:
|
|
return test_id()
|
|
except TypeError:
|
|
return str(test_id)
|
|
return repr(test)
|
|
|
|
def startTest(self, test):
|
|
super().startTest(test)
|
|
self.__e = e = ET.SubElement(self.__suite, 'testcase')
|
|
self.__start_time = time.perf_counter()
|
|
if self.__verbose:
|
|
self.stream.write(f'{self.getDescription(test)} ... ')
|
|
self.stream.flush()
|
|
|
|
def _add_result(self, test, capture=False, **args):
|
|
e = self.__e
|
|
self.__e = None
|
|
if e is None:
|
|
return
|
|
e.set('name', args.pop('name', self.__getId(test)))
|
|
e.set('status', args.pop('status', 'run'))
|
|
e.set('result', args.pop('result', 'completed'))
|
|
if self.__start_time:
|
|
e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}')
|
|
|
|
if capture:
|
|
if self._stdout_buffer is not None:
|
|
stdout = self._stdout_buffer.getvalue().rstrip()
|
|
ET.SubElement(e, 'system-out').text = stdout
|
|
if self._stderr_buffer is not None:
|
|
stderr = self._stderr_buffer.getvalue().rstrip()
|
|
ET.SubElement(e, 'system-err').text = stderr
|
|
|
|
for k, v in args.items():
|
|
if not k or not v:
|
|
continue
|
|
e2 = ET.SubElement(e, k)
|
|
if hasattr(v, 'items'):
|
|
for k2, v2 in v.items():
|
|
if k2:
|
|
e2.set(k2, str(v2))
|
|
else:
|
|
e2.text = str(v2)
|
|
else:
|
|
e2.text = str(v)
|
|
|
|
def __write(self, c, word):
|
|
if self.__verbose:
|
|
self.stream.write(f'{word}\n')
|
|
|
|
@classmethod
|
|
def __makeErrorDict(cls, err_type, err_value, err_tb):
|
|
if isinstance(err_type, type):
|
|
if err_type.__module__ == 'builtins':
|
|
typename = err_type.__name__
|
|
else:
|
|
typename = f'{err_type.__module__}.{err_type.__name__}'
|
|
else:
|
|
typename = repr(err_type)
|
|
|
|
msg = traceback.format_exception(err_type, err_value, None)
|
|
tb = traceback.format_exception(err_type, err_value, err_tb)
|
|
|
|
return {
|
|
'type': typename,
|
|
'message': ''.join(msg),
|
|
'': ''.join(tb),
|
|
}
|
|
|
|
def addError(self, test, err):
|
|
self._add_result(test, True, error=self.__makeErrorDict(*err))
|
|
super().addError(test, err)
|
|
self.__write('E', 'ERROR')
|
|
|
|
def addExpectedFailure(self, test, err):
|
|
self._add_result(test, True, output=self.__makeErrorDict(*err))
|
|
super().addExpectedFailure(test, err)
|
|
self.__write('x', 'expected failure')
|
|
|
|
def addFailure(self, test, err):
|
|
self._add_result(test, True, failure=self.__makeErrorDict(*err))
|
|
super().addFailure(test, err)
|
|
self.__write('F', 'FAIL')
|
|
|
|
def addSkip(self, test, reason):
|
|
self._add_result(test, skipped=reason)
|
|
super().addSkip(test, reason)
|
|
self.__write('S', f'skipped {reason!r}')
|
|
|
|
def addSuccess(self, test):
|
|
self._add_result(test)
|
|
super().addSuccess(test)
|
|
self.__write('.', 'ok')
|
|
|
|
def addUnexpectedSuccess(self, test):
|
|
self._add_result(test, outcome='UNEXPECTED_SUCCESS')
|
|
super().addUnexpectedSuccess(test)
|
|
self.__write('u', 'unexpected success')
|
|
|
|
def printErrors(self):
|
|
if self.__verbose:
|
|
self.stream.write('\n')
|
|
self.printErrorList('ERROR', self.errors)
|
|
self.printErrorList('FAIL', self.failures)
|
|
|
|
def printErrorList(self, flavor, errors):
|
|
for test, err in errors:
|
|
self.stream.write(self.separator1)
|
|
self.stream.write(f'{flavor}: {self.getDescription(test)}\n')
|
|
self.stream.write(self.separator2)
|
|
self.stream.write('%s\n' % err)
|
|
|
|
def get_xml_element(self):
|
|
e = self.__suite
|
|
e.set('tests', str(self.testsRun))
|
|
e.set('errors', str(len(self.errors)))
|
|
e.set('failures', str(len(self.failures)))
|
|
return e
|
|
|
|
class QuietRegressionTestRunner:
|
|
def __init__(self, stream, buffer=False):
|
|
self.result = RegressionTestResult(stream, None, 0)
|
|
self.result.buffer = buffer
|
|
|
|
def run(self, test):
|
|
test(self.result)
|
|
return self.result
|
|
|
|
def get_test_runner_class(verbosity, buffer=False):
|
|
if verbosity:
|
|
return functools.partial(unittest.TextTestRunner,
|
|
resultclass=RegressionTestResult,
|
|
buffer=buffer,
|
|
verbosity=verbosity)
|
|
return functools.partial(QuietRegressionTestRunner, buffer=buffer)
|
|
|
|
def get_test_runner(stream, verbosity, capture_output=False):
|
|
return get_test_runner_class(verbosity, capture_output)(stream)
|
|
|
|
if __name__ == '__main__':
|
|
class TestTests(unittest.TestCase):
|
|
def test_pass(self):
|
|
pass
|
|
|
|
def test_pass_slow(self):
|
|
time.sleep(1.0)
|
|
|
|
def test_fail(self):
|
|
print('stdout', file=sys.stdout)
|
|
print('stderr', file=sys.stderr)
|
|
self.fail('failure message')
|
|
|
|
def test_error(self):
|
|
print('stdout', file=sys.stdout)
|
|
print('stderr', file=sys.stderr)
|
|
raise RuntimeError('error message')
|
|
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(TestTests))
|
|
stream = io.StringIO()
|
|
runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv))
|
|
runner = runner_cls(sys.stdout)
|
|
result = runner.run(suite)
|
|
print('Output:', stream.getvalue())
|
|
print('XML: ', end='')
|
|
for s in ET.tostringlist(result.get_xml_element()):
|
|
print(s.decode(), end='')
|
|
print()
|