test_logging coverage improvements.

This commit is contained in:
Vinay Sajip 2011-04-26 18:43:05 +01:00
parent 45456a09f0
commit 26fe4b70cf
2 changed files with 136 additions and 33 deletions

View File

@ -37,13 +37,13 @@ __all__ = ['BASIC_FORMAT', 'BufferingFormatter', 'CRITICAL', 'DEBUG', 'ERROR',
try:
import codecs
except ImportError:
except ImportError: #pragma: no cover
codecs = None
try:
import _thread as thread
import threading
except ImportError:
except ImportError: #pragma: no cover
thread = None
__author__ = "Vinay Sajip <vinay_sajip@red-dove.com>"
@ -67,7 +67,10 @@ else:
_srcfile = __file__
_srcfile = os.path.normcase(_srcfile)
# next bit filched from 1.5.2's inspect.py
if hasattr(sys, '_getframe'):
currentframe = lambda: sys._getframe(3)
else: #pragma: no cover
def currentframe():
"""Return the frame object for the caller's stack frame."""
try:
@ -75,9 +78,6 @@ def currentframe():
except:
return sys.exc_info()[2].tb_frame.f_back
if hasattr(sys, '_getframe'): currentframe = lambda: sys._getframe(3)
# done filching
# _srcfile is only used in conjunction with sys._getframe().
# To provide compatibility with older versions of Python, set _srcfile
# to None if _getframe() is not available; this value will prevent
@ -94,22 +94,22 @@ _startTime = time.time()
#raiseExceptions is used to see if exceptions during handling should be
#propagated
#
raiseExceptions = 1
raiseExceptions = True
#
# If you don't want threading information in the log, set this to zero
#
logThreads = 1
logThreads = True
#
# If you don't want multiprocessing information in the log, set this to zero
#
logMultiprocessing = 1
logMultiprocessing = True
#
# If you don't want process information in the log, set this to zero
#
logProcesses = 1
logProcesses = True
#---------------------------------------------------------------------------
# Level related stuff
@ -201,7 +201,7 @@ def _checkLevel(level):
#
if thread:
_lock = threading.RLock()
else:
else: #pragma: no cover
_lock = None
@ -281,10 +281,10 @@ class LogRecord(object):
if logThreads and thread:
self.thread = thread.get_ident()
self.threadName = threading.current_thread().name
else:
else: # pragma: no cover
self.thread = None
self.threadName = None
if not logMultiprocessing:
if not logMultiprocessing: # pragma: no cover
self.processName = None
else:
self.processName = 'MainProcess'
@ -644,11 +644,11 @@ class Filter(object):
yes. If deemed appropriate, the record may be modified in-place.
"""
if self.nlen == 0:
return 1
return True
elif self.name == record.name:
return 1
return True
elif record.name.find(self.name, 0, self.nlen) != 0:
return 0
return False
return (record.name[self.nlen] == ".")
class Filterer(object):
@ -775,7 +775,7 @@ class Handler(Filterer):
"""
if thread:
self.lock = threading.RLock()
else:
else: #pragma: no cover
self.lock = None
def acquire(self):
@ -939,7 +939,7 @@ class StreamHandler(Handler):
stream.write(msg)
stream.write(self.terminator)
self.flush()
except (KeyboardInterrupt, SystemExit):
except (KeyboardInterrupt, SystemExit): #pragma: no cover
raise
except:
self.handleError(record)
@ -948,13 +948,13 @@ class FileHandler(StreamHandler):
"""
A handler class which writes formatted logging records to disk files.
"""
def __init__(self, filename, mode='a', encoding=None, delay=0):
def __init__(self, filename, mode='a', encoding=None, delay=False):
"""
Open the specified file and use it as the stream for logging.
"""
#keep the absolute path, otherwise derived classes which use this
#may come a cropper when the current directory changes
if codecs is None:
if codecs is None: #pragma: no cover
encoding = None
self.baseFilename = os.path.abspath(filename)
self.mode = mode
@ -1352,9 +1352,9 @@ class Logger(Filterer):
#IronPython can use logging.
try:
fn, lno, func, sinfo = self.findCaller(stack_info)
except ValueError:
except ValueError: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
else:
else: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
if exc_info:
if not isinstance(exc_info, tuple):
@ -1465,7 +1465,7 @@ class Logger(Filterer):
Is this logger enabled for level 'level'?
"""
if self.manager.disable >= level:
return 0
return False
return level >= self.getEffectiveLevel()
def getChild(self, suffix):

View File

@ -494,6 +494,37 @@ class CustomLevelsAndFiltersTest(BaseTest):
handler.removeFilter(garr)
class HandlerTest(BaseTest):
def test_name(self):
h = logging.Handler()
h.name = 'generic'
self.assertEqual(h.name, 'generic')
h.name = 'anothergeneric'
self.assertEqual(h.name, 'anothergeneric')
self.assertRaises(NotImplementedError, h.emit, None)
def test_abc(self):
pass
class BadStream(object):
def write(self, data):
raise RuntimeError('deliberate mistake')
class TestStreamHandler(logging.StreamHandler):
def handleError(self, record):
self.error_record = record
class StreamHandlerTest(BaseTest):
def test_error_handling(self):
h = TestStreamHandler(BadStream())
r = logging.makeLogRecord({})
old_raise = logging.raiseExceptions
try:
h.handle(r)
self.assertIs(h.error_record, r)
finally:
logging.raiseExceptions = old_raise
class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
@ -2196,6 +2227,39 @@ class FormatterTest(unittest.TestCase):
f = logging.Formatter('asctime', style='$')
self.assertFalse(f.usesTime())
def test_invalid_style(self):
self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
def test_time(self):
r = self.get_record()
r.created = 735375780.0 # 21 April 1993 08:03:00
r.msecs = 123
f = logging.Formatter('%(asctime)s %(message)s')
self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
class ExceptionTest(BaseTest):
def test_formatting(self):
r = self.root_logger
h = RecordingHandler()
r.addHandler(h)
try:
raise RuntimeError('deliberate mistake')
except:
logging.exception('failed', stack_info=True)
r.removeHandler(h)
h.close()
r = h.records[0]
self.assertTrue(r.exc_text.startswith('Traceback (most recent '
'call last):\n'))
self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
'deliberate mistake'))
self.assertTrue(r.stack_info.startswith('Stack (most recent '
'call last):\n'))
self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
'stack_info=True)'))
class LastResortTest(BaseTest):
def test_last_resort(self):
# Test the last resort handler
@ -2407,6 +2471,23 @@ class ModuleLevelMiscTest(BaseTest):
logging.setLoggerClass(logging.Logger)
self.assertEqual(logging.getLoggerClass(), logging.Logger)
class LogRecordTest(BaseTest):
def test_str_rep(self):
r = logging.makeLogRecord({})
s = str(r)
self.assertTrue(s.startswith('<LogRecord: '))
self.assertTrue(s.endswith('>'))
def test_dict_arg(self):
h = RecordingHandler()
r = logging.getLogger()
r.addHandler(h)
d = {'less' : 'more' }
logging.warning('less is %(less)s', d)
self.assertIs(h.records[0].args, d)
self.assertEqual(h.records[0].message, 'less is more')
r.removeHandler(h)
h.close()
class BasicConfigTest(unittest.TestCase):
@ -2508,6 +2589,9 @@ class BasicConfigTest(unittest.TestCase):
logging.basicConfig(level=57)
self.assertEqual(logging.root.level, 57)
# Test that second call has no effect
logging.basicConfig(level=58)
self.assertEqual(logging.root.level, 57)
def test_incompatible(self):
assertRaises = self.assertRaises
@ -2521,12 +2605,20 @@ class BasicConfigTest(unittest.TestCase):
handlers=handlers)
def test_handlers(self):
handlers = [logging.StreamHandler(), logging.StreamHandler(sys.stdout)]
handlers = [
logging.StreamHandler(),
logging.StreamHandler(sys.stdout),
logging.StreamHandler(),
]
f = logging.Formatter()
handlers[2].setFormatter(f)
logging.basicConfig(handlers=handlers)
self.assertIs(handlers[0], logging.root.handlers[0])
self.assertIs(handlers[1], logging.root.handlers[1])
self.assertIs(handlers[2], logging.root.handlers[2])
self.assertIsNotNone(handlers[0].formatter)
self.assertIsNotNone(handlers[1].formatter)
self.assertIs(handlers[2].formatter, f)
self.assertIs(handlers[0].formatter, handlers[1].formatter)
def _test_log(self, method, level=None):
@ -2758,6 +2850,17 @@ class BaseFileTest(BaseTest):
self.rmfiles.append(filename)
class FileHandlerTest(BaseFileTest):
def test_delay(self):
os.unlink(self.fn)
fh = logging.FileHandler(self.fn, delay=True)
self.assertIsNone(fh.stream)
self.assertFalse(os.path.exists(self.fn))
fh.handle(logging.makeLogRecord({}))
self.assertIsNotNone(fh.stream)
self.assertTrue(os.path.exists(self.fn))
fh.close()
class RotatingFileHandlerTest(BaseFileTest):
def next_rec(self):
return logging.LogRecord('n', logging.DEBUG, 'p', 1,
@ -2851,15 +2954,15 @@ for when, exp in (('S', 1),
@run_with_locale('LC_ALL', '')
def test_main():
run_unittest(BuiltinLevelsTest, BasicFilterTest,
CustomLevelsAndFiltersTest, MemoryHandlerTest,
CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest,
ConfigFileTest, SocketHandlerTest, MemoryTest,
EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
FormatterTest,
FormatterTest, StreamHandlerTest,
LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
LoggerAdapterTest, LoggerTest,
RotatingFileHandlerTest,
LastResortTest,
FileHandlerTest, RotatingFileHandlerTest,
LastResortTest, LogRecordTest, ExceptionTest,
TimedRotatingFileHandlerTest
)