Merged revisions 79437 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r79437 | michael.foord | 2010-03-26 03:18:31 +0000 (Fri, 26 Mar 2010) | 1 line Addition of -c command line option to unittest, to handle ctrl-c during a test run more elegantly ........
This commit is contained in:
parent
8777f01d71
commit
65b69a1093
|
@ -47,7 +47,8 @@ SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
|
|||
__all__ = ['TestResult', 'TestCase', 'TestSuite',
|
||||
'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
|
||||
'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
|
||||
'expectedFailure', 'TextTestResult']
|
||||
'expectedFailure', 'TextTestResult', 'installHandler',
|
||||
'registerResult', 'removeResult']
|
||||
|
||||
# Expose obsolete functions for backwards compatibility
|
||||
__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])
|
||||
|
@ -62,6 +63,7 @@ from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames,
|
|||
findTestCases)
|
||||
from .main import TestProgram, main
|
||||
from .runner import TextTestRunner, TextTestResult
|
||||
from .signals import installHandler, registerResult, removeResult
|
||||
|
||||
# deprecated
|
||||
_TextTestResult = TextTestResult
|
||||
|
|
|
@ -5,10 +5,14 @@ import os
|
|||
import types
|
||||
|
||||
from . import loader, runner
|
||||
from .signals import installHandler
|
||||
|
||||
__unittest = True
|
||||
|
||||
|
||||
FAILFAST = " -f, --failfast Stop on first failure\n"
|
||||
CATCHBREAK = " -c, --catch Catch control-C and display results\n"
|
||||
|
||||
USAGE_AS_MAIN = """\
|
||||
Usage: %(progName)s [options] [tests]
|
||||
|
||||
|
@ -16,8 +20,7 @@ Options:
|
|||
-h, --help Show this message
|
||||
-v, --verbose Verbose output
|
||||
-q, --quiet Minimal output
|
||||
-f, --failfast Stop on first failure
|
||||
|
||||
%(failfast)s%(catchbreak)s
|
||||
Examples:
|
||||
%(progName)s test_module - run tests from test_module
|
||||
%(progName)s test_module.TestClass - run tests from
|
||||
|
@ -31,8 +34,7 @@ Alternative Usage: %(progName)s discover [options]
|
|||
|
||||
Options:
|
||||
-v, --verbose Verbose output
|
||||
-f, --failfast Stop on first failure
|
||||
-s directory Directory to start discovery ('.' default)
|
||||
%(failfast)s%(catchbreak)s -s directory Directory to start discovery ('.' default)
|
||||
-p pattern Pattern to match test files ('test*.py' default)
|
||||
-t directory Top level directory of project (default to
|
||||
start directory)
|
||||
|
@ -48,8 +50,7 @@ Options:
|
|||
-h, --help Show this message
|
||||
-v, --verbose Verbose output
|
||||
-q, --quiet Minimal output
|
||||
-f, --failfast Stop on first failure
|
||||
|
||||
%(failfast)s%(catchbreak)s
|
||||
Examples:
|
||||
%(progName)s - run default set of tests
|
||||
%(progName)s MyTestSuite - run suite 'MyTestSuite'
|
||||
|
@ -58,15 +59,21 @@ Examples:
|
|||
in MyTestCase
|
||||
"""
|
||||
|
||||
|
||||
|
||||
class TestProgram(object):
|
||||
"""A command-line program that runs a set of tests; this is primarily
|
||||
for making test modules conveniently executable.
|
||||
"""
|
||||
USAGE = USAGE_FROM_MODULE
|
||||
|
||||
# defaults for testing
|
||||
failfast = catchbreak = None
|
||||
|
||||
def __init__(self, module='__main__', defaultTest=None,
|
||||
argv=None, testRunner=None,
|
||||
testLoader=loader.defaultTestLoader, exit=True,
|
||||
verbosity=1, failfast=False):
|
||||
verbosity=1, failfast=None, catchbreak=None):
|
||||
if isinstance(module, str):
|
||||
self.module = __import__(module)
|
||||
for part in module.split('.')[1:]:
|
||||
|
@ -78,6 +85,7 @@ class TestProgram(object):
|
|||
|
||||
self.exit = exit
|
||||
self.failfast = failfast
|
||||
self.catchbreak = catchbreak
|
||||
self.verbosity = verbosity
|
||||
self.defaultTest = defaultTest
|
||||
self.testRunner = testRunner
|
||||
|
@ -89,7 +97,12 @@ class TestProgram(object):
|
|||
def usageExit(self, msg=None):
|
||||
if msg:
|
||||
print(msg)
|
||||
print(self.USAGE % self.__dict__)
|
||||
usage = {'progName': self.progName, 'catchbreak': '', 'failfast': ''}
|
||||
if self.failfast != False:
|
||||
usage['failfast'] = FAILFAST
|
||||
if self.catchbreak != False:
|
||||
usage['catchbreak'] = CATCHBREAK
|
||||
print(self.USAGE % usage)
|
||||
sys.exit(2)
|
||||
|
||||
def parseArgs(self, argv):
|
||||
|
@ -98,9 +111,9 @@ class TestProgram(object):
|
|||
return
|
||||
|
||||
import getopt
|
||||
long_opts = ['help', 'verbose', 'quiet', 'failfast']
|
||||
long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch']
|
||||
try:
|
||||
options, args = getopt.getopt(argv[1:], 'hHvqf', long_opts)
|
||||
options, args = getopt.getopt(argv[1:], 'hHvqfc', long_opts)
|
||||
for opt, value in options:
|
||||
if opt in ('-h','-H','--help'):
|
||||
self.usageExit()
|
||||
|
@ -109,7 +122,13 @@ class TestProgram(object):
|
|||
if opt in ('-v','--verbose'):
|
||||
self.verbosity = 2
|
||||
if opt in ('-f','--failfast'):
|
||||
self.failfast = True
|
||||
if self.failfast is None:
|
||||
self.failfast = True
|
||||
# Should this raise an exception if -f is not valid?
|
||||
if opt in ('-c','--catch'):
|
||||
if self.catchbreak is None:
|
||||
self.catchbreak = True
|
||||
# Should this raise an exception if -c is not valid?
|
||||
if len(args) == 0 and self.defaultTest is None:
|
||||
# createTests will load tests from self.module
|
||||
self.testNames = None
|
||||
|
@ -137,8 +156,14 @@ class TestProgram(object):
|
|||
parser = optparse.OptionParser()
|
||||
parser.add_option('-v', '--verbose', dest='verbose', default=False,
|
||||
help='Verbose output', action='store_true')
|
||||
parser.add_option('-f', '--failfast', dest='failfast', default=False,
|
||||
help='Stop on first fail or error', action='store_true')
|
||||
if self.failfast != False:
|
||||
parser.add_option('-f', '--failfast', dest='failfast', default=False,
|
||||
help='Stop on first fail or error',
|
||||
action='store_true')
|
||||
if self.catchbreak != False:
|
||||
parser.add_option('-c', '--catch', dest='catchbreak', default=False,
|
||||
help='Catch ctrl-C and display results so far',
|
||||
action='store_true')
|
||||
parser.add_option('-s', '--start-directory', dest='start', default='.',
|
||||
help="Directory to start discovery ('.' default)")
|
||||
parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
|
||||
|
@ -153,7 +178,13 @@ class TestProgram(object):
|
|||
for name, value in zip(('start', 'pattern', 'top'), args):
|
||||
setattr(options, name, value)
|
||||
|
||||
self.failfast = options.failfast
|
||||
# only set options from the parsing here
|
||||
# if they weren't set explicitly in the constructor
|
||||
if self.failfast is None:
|
||||
self.failfast = options.failfast
|
||||
if self.catchbreak is None:
|
||||
self.catchbreak = options.catchbreak
|
||||
|
||||
if options.verbose:
|
||||
self.verbosity = 2
|
||||
|
||||
|
@ -165,6 +196,8 @@ class TestProgram(object):
|
|||
self.test = loader.discover(start_dir, pattern, top_level_dir)
|
||||
|
||||
def runTests(self):
|
||||
if self.catchbreak:
|
||||
installHandler()
|
||||
if self.testRunner is None:
|
||||
self.testRunner = runner.TextTestRunner
|
||||
if isinstance(self.testRunner, type):
|
||||
|
|
|
@ -4,6 +4,7 @@ import sys
|
|||
import time
|
||||
|
||||
from . import result
|
||||
from .signals import registerResult
|
||||
|
||||
__unittest = True
|
||||
|
||||
|
@ -138,6 +139,7 @@ class TextTestRunner(object):
|
|||
def run(self, test):
|
||||
"Run the given test case or test suite."
|
||||
result = self._makeResult()
|
||||
registerResult(result)
|
||||
result.failfast = self.failfast
|
||||
startTime = time.time()
|
||||
startTestRun = getattr(result, 'startTestRun', None)
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
import signal
|
||||
import weakref
|
||||
|
||||
__unittest = True
|
||||
|
||||
|
||||
class _InterruptHandler(object):
|
||||
def __init__(self, default_handler):
|
||||
self.called = False
|
||||
self.default_handler = default_handler
|
||||
|
||||
def __call__(self, signum, frame):
|
||||
installed_handler = signal.getsignal(signal.SIGINT)
|
||||
if installed_handler is not self:
|
||||
# if we aren't the installed handler, then delegate immediately
|
||||
# to the default handler
|
||||
self.default_handler(signum, frame)
|
||||
|
||||
if self.called:
|
||||
self.default_handler(signum, frame)
|
||||
self.called = True
|
||||
for result in _results.keys():
|
||||
result.stop()
|
||||
|
||||
_results = weakref.WeakKeyDictionary()
|
||||
def registerResult(result):
|
||||
_results[result] = 1
|
||||
|
||||
def removeResult(result):
|
||||
return bool(_results.pop(result, None))
|
||||
|
||||
_interrupt_handler = None
|
||||
def installHandler():
|
||||
global _interrupt_handler
|
||||
if _interrupt_handler is None:
|
||||
default_handler = signal.getsignal(signal.SIGINT)
|
||||
_interrupt_handler = _InterruptHandler(default_handler)
|
||||
signal.signal(signal.SIGINT, _interrupt_handler)
|
|
@ -0,0 +1,223 @@
|
|||
import gc
|
||||
import io
|
||||
import os
|
||||
import signal
|
||||
import weakref
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
|
||||
class TestBreak(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._default_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
def tearDown(self):
|
||||
signal.signal(signal.SIGINT, self._default_handler)
|
||||
unittest.signals._results = weakref.WeakKeyDictionary()
|
||||
unittest.signals._interrupt_handler = None
|
||||
|
||||
|
||||
def testInstallHandler(self):
|
||||
default_handler = signal.getsignal(signal.SIGINT)
|
||||
unittest.installHandler()
|
||||
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
||||
|
||||
try:
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signal.SIGINT)
|
||||
except KeyboardInterrupt:
|
||||
self.fail("KeyboardInterrupt not handled")
|
||||
|
||||
self.assertTrue(unittest.signals._interrupt_handler.called)
|
||||
|
||||
def testRegisterResult(self):
|
||||
result = unittest.TestResult()
|
||||
unittest.registerResult(result)
|
||||
|
||||
for ref in unittest.signals._results:
|
||||
if ref is result:
|
||||
break
|
||||
elif ref is not result:
|
||||
self.fail("odd object in result set")
|
||||
else:
|
||||
self.fail("result not found")
|
||||
|
||||
|
||||
def testInterruptCaught(self):
|
||||
default_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
result = unittest.TestResult()
|
||||
unittest.installHandler()
|
||||
unittest.registerResult(result)
|
||||
|
||||
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
||||
|
||||
def test(result):
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signal.SIGINT)
|
||||
result.breakCaught = True
|
||||
self.assertTrue(result.shouldStop)
|
||||
|
||||
try:
|
||||
test(result)
|
||||
except KeyboardInterrupt:
|
||||
self.fail("KeyboardInterrupt not handled")
|
||||
self.assertTrue(result.breakCaught)
|
||||
|
||||
|
||||
def testSecondInterrupt(self):
|
||||
result = unittest.TestResult()
|
||||
unittest.installHandler()
|
||||
unittest.registerResult(result)
|
||||
|
||||
def test(result):
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signal.SIGINT)
|
||||
result.breakCaught = True
|
||||
self.assertTrue(result.shouldStop)
|
||||
os.kill(pid, signal.SIGINT)
|
||||
self.fail("Second KeyboardInterrupt not raised")
|
||||
|
||||
try:
|
||||
test(result)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
else:
|
||||
self.fail("Second KeyboardInterrupt not raised")
|
||||
self.assertTrue(result.breakCaught)
|
||||
|
||||
|
||||
def testTwoResults(self):
|
||||
unittest.installHandler()
|
||||
|
||||
result = unittest.TestResult()
|
||||
unittest.registerResult(result)
|
||||
new_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
result2 = unittest.TestResult()
|
||||
unittest.registerResult(result2)
|
||||
self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
|
||||
|
||||
result3 = unittest.TestResult()
|
||||
|
||||
def test(result):
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signal.SIGINT)
|
||||
|
||||
try:
|
||||
test(result)
|
||||
except KeyboardInterrupt:
|
||||
self.fail("KeyboardInterrupt not handled")
|
||||
|
||||
self.assertTrue(result.shouldStop)
|
||||
self.assertTrue(result2.shouldStop)
|
||||
self.assertFalse(result3.shouldStop)
|
||||
|
||||
|
||||
def testHandlerReplacedButCalled(self):
|
||||
# If our handler has been replaced (is no longer installed) but is
|
||||
# called by the *new* handler, then it isn't safe to delay the
|
||||
# SIGINT and we should immediately delegate to the default handler
|
||||
unittest.installHandler()
|
||||
|
||||
handler = signal.getsignal(signal.SIGINT)
|
||||
def new_handler(frame, signum):
|
||||
handler(frame, signum)
|
||||
signal.signal(signal.SIGINT, new_handler)
|
||||
|
||||
try:
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signal.SIGINT)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
else:
|
||||
self.fail("replaced but delegated handler doesn't raise interrupt")
|
||||
|
||||
def testRunner(self):
|
||||
# Creating a TextTestRunner with the appropriate argument should
|
||||
# register the TextTestResult it creates
|
||||
runner = unittest.TextTestRunner(stream=io.StringIO())
|
||||
|
||||
result = runner.run(unittest.TestSuite())
|
||||
self.assertIn(result, unittest.signals._results)
|
||||
|
||||
def testWeakReferences(self):
|
||||
# Calling registerResult on a result should not keep it alive
|
||||
result = unittest.TestResult()
|
||||
unittest.registerResult(result)
|
||||
|
||||
ref = weakref.ref(result)
|
||||
del result
|
||||
|
||||
# For non-reference counting implementations
|
||||
gc.collect();gc.collect()
|
||||
self.assertIsNone(ref())
|
||||
|
||||
|
||||
def testRemoveResult(self):
|
||||
result = unittest.TestResult()
|
||||
unittest.registerResult(result)
|
||||
|
||||
unittest.installHandler()
|
||||
self.assertTrue(unittest.removeResult(result))
|
||||
|
||||
# Should this raise an error instead?
|
||||
self.assertFalse(unittest.removeResult(unittest.TestResult()))
|
||||
|
||||
try:
|
||||
pid = os.getpid()
|
||||
os.kill(pid, signal.SIGINT)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
self.assertFalse(result.shouldStop)
|
||||
|
||||
def testMainInstallsHandler(self):
|
||||
failfast = object()
|
||||
test = object()
|
||||
verbosity = object()
|
||||
result = object()
|
||||
default_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
class FakeRunner(object):
|
||||
initArgs = []
|
||||
runArgs = []
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.initArgs.append((args, kwargs))
|
||||
def run(self, test):
|
||||
self.runArgs.append(test)
|
||||
return result
|
||||
|
||||
class Program(unittest.TestProgram):
|
||||
def __init__(self, catchbreak):
|
||||
self.exit = False
|
||||
self.verbosity = verbosity
|
||||
self.failfast = failfast
|
||||
self.catchbreak = catchbreak
|
||||
self.testRunner = FakeRunner
|
||||
self.test = test
|
||||
self.result = None
|
||||
|
||||
p = Program(False)
|
||||
p.runTests()
|
||||
|
||||
self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
|
||||
'failfast': failfast})])
|
||||
self.assertEqual(FakeRunner.runArgs, [test])
|
||||
self.assertEqual(p.result, result)
|
||||
|
||||
self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
|
||||
|
||||
FakeRunner.initArgs = []
|
||||
FakeRunner.runArgs = []
|
||||
p = Program(True)
|
||||
p.runTests()
|
||||
|
||||
self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
|
||||
'failfast': failfast})])
|
||||
self.assertEqual(FakeRunner.runArgs, [test])
|
||||
self.assertEqual(p.result, result)
|
||||
|
||||
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
|
|
@ -275,11 +275,18 @@ class TestDiscovery(unittest.TestCase):
|
|||
self.assertEqual(program.test, 'tests')
|
||||
self.assertEqual(Loader.args, [('.', 'fish', None)])
|
||||
self.assertFalse(program.failfast)
|
||||
self.assertFalse(program.catchbreak)
|
||||
|
||||
Loader.args = []
|
||||
program = object.__new__(unittest.TestProgram)
|
||||
program._do_discovery(['-p', 'eggs', '-s', 'fish', '-v', '-f'], Loader=Loader)
|
||||
program._do_discovery(['-p', 'eggs', '-s', 'fish', '-v', '-f', '-c'],
|
||||
Loader=Loader)
|
||||
self.assertEqual(program.test, 'tests')
|
||||
self.assertEqual(Loader.args, [('fish', 'eggs', None)])
|
||||
self.assertEqual(program.verbosity, 2)
|
||||
self.assertTrue(program.failfast)
|
||||
self.assertTrue(program.catchbreak)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue