mirror of https://github.com/python/cpython
2388 lines
76 KiB
Python
2388 lines
76 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright 2001-2012 by Vinay Sajip. All Rights Reserved.
|
|
#
|
|
# Permission to use, copy, modify, and distribute this software and its
|
|
# documentation for any purpose and without fee is hereby granted,
|
|
# provided that the above copyright notice appear in all copies and that
|
|
# both that copyright notice and this permission notice appear in
|
|
# supporting documentation, and that the name of Vinay Sajip
|
|
# not be used in advertising or publicity pertaining to distribution
|
|
# of the software without specific, written prior permission.
|
|
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
|
|
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
|
|
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
|
|
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
|
|
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
|
|
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
"""Test harness for the logging module. Run all tests.
|
|
|
|
Copyright (C) 2001-2012 Vinay Sajip. All Rights Reserved.
|
|
"""
|
|
|
|
import logging
|
|
import logging.handlers
|
|
import logging.config
|
|
|
|
import codecs
|
|
import datetime
|
|
import pickle
|
|
import io
|
|
import gc
|
|
import json
|
|
import os
|
|
import queue
|
|
import random
|
|
import re
|
|
import select
|
|
import socket
|
|
from socketserver import ThreadingTCPServer, StreamRequestHandler
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
from test.support import captured_stdout, run_with_locale, run_unittest
|
|
from test.support import TestHandler, Matcher
|
|
import textwrap
|
|
import time
|
|
import unittest
|
|
import warnings
|
|
import weakref
|
|
try:
|
|
import threading
|
|
except ImportError:
|
|
threading = None
|
|
|
|
|
|
class BaseTest(unittest.TestCase):
|
|
|
|
"""Base class for logging tests."""
|
|
|
|
log_format = "%(name)s -> %(levelname)s: %(message)s"
|
|
expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
|
|
message_num = 0
|
|
|
|
def setUp(self):
|
|
"""Setup the default logging stream to an internal StringIO instance,
|
|
so that we can examine log output as we want."""
|
|
logger_dict = logging.getLogger().manager.loggerDict
|
|
logging._acquireLock()
|
|
try:
|
|
self.saved_handlers = logging._handlers.copy()
|
|
self.saved_handler_list = logging._handlerList[:]
|
|
self.saved_loggers = saved_loggers = logger_dict.copy()
|
|
self.saved_level_names = logging._levelNames.copy()
|
|
self.logger_states = logger_states = {}
|
|
for name in saved_loggers:
|
|
logger_states[name] = getattr(saved_loggers[name],
|
|
'disabled', None)
|
|
finally:
|
|
logging._releaseLock()
|
|
|
|
# Set two unused loggers: one non-ASCII and one Unicode.
|
|
# This is to test correct operation when sorting existing
|
|
# loggers in the configuration code. See issue 8201.
|
|
self.logger1 = logging.getLogger("\xab\xd7\xbb")
|
|
self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
|
|
|
|
self.root_logger = logging.getLogger("")
|
|
self.original_logging_level = self.root_logger.getEffectiveLevel()
|
|
|
|
self.stream = io.StringIO()
|
|
self.root_logger.setLevel(logging.DEBUG)
|
|
self.root_hdlr = logging.StreamHandler(self.stream)
|
|
self.root_formatter = logging.Formatter(self.log_format)
|
|
self.root_hdlr.setFormatter(self.root_formatter)
|
|
if self.logger1.hasHandlers():
|
|
hlist = self.logger1.handlers + self.root_logger.handlers
|
|
raise AssertionError('Unexpected handlers: %s' % hlist)
|
|
if self.logger2.hasHandlers():
|
|
hlist = self.logger2.handlers + self.root_logger.handlers
|
|
raise AssertionError('Unexpected handlers: %s' % hlist)
|
|
self.root_logger.addHandler(self.root_hdlr)
|
|
self.assertTrue(self.logger1.hasHandlers())
|
|
self.assertTrue(self.logger2.hasHandlers())
|
|
|
|
def tearDown(self):
|
|
"""Remove our logging stream, and restore the original logging
|
|
level."""
|
|
self.stream.close()
|
|
self.root_logger.removeHandler(self.root_hdlr)
|
|
while self.root_logger.handlers:
|
|
h = self.root_logger.handlers[0]
|
|
self.root_logger.removeHandler(h)
|
|
h.close()
|
|
self.root_logger.setLevel(self.original_logging_level)
|
|
logging._acquireLock()
|
|
try:
|
|
logging._levelNames.clear()
|
|
logging._levelNames.update(self.saved_level_names)
|
|
logging._handlers.clear()
|
|
logging._handlers.update(self.saved_handlers)
|
|
logging._handlerList[:] = self.saved_handler_list
|
|
loggerDict = logging.getLogger().manager.loggerDict
|
|
loggerDict.clear()
|
|
loggerDict.update(self.saved_loggers)
|
|
logger_states = self.logger_states
|
|
for name in self.logger_states:
|
|
if logger_states[name] is not None:
|
|
self.saved_loggers[name].disabled = logger_states[name]
|
|
finally:
|
|
logging._releaseLock()
|
|
|
|
def assert_log_lines(self, expected_values, stream=None):
|
|
"""Match the collected log lines against the regular expression
|
|
self.expected_log_pat, and compare the extracted group values to
|
|
the expected_values list of tuples."""
|
|
stream = stream or self.stream
|
|
pat = re.compile(self.expected_log_pat)
|
|
try:
|
|
stream.reset()
|
|
actual_lines = stream.readlines()
|
|
except AttributeError:
|
|
# StringIO.StringIO lacks a reset() method.
|
|
actual_lines = stream.getvalue().splitlines()
|
|
self.assertEqual(len(actual_lines), len(expected_values),
|
|
'%s vs. %s' % (actual_lines, expected_values))
|
|
for actual, expected in zip(actual_lines, expected_values):
|
|
match = pat.search(actual)
|
|
if not match:
|
|
self.fail("Log line does not match expected pattern:\n" +
|
|
actual)
|
|
self.assertEqual(tuple(match.groups()), expected)
|
|
s = stream.read()
|
|
if s:
|
|
self.fail("Remaining output at end of log stream:\n" + s)
|
|
|
|
def next_message(self):
|
|
"""Generate a message consisting solely of an auto-incrementing
|
|
integer."""
|
|
self.message_num += 1
|
|
return "%d" % self.message_num
|
|
|
|
|
|
class BuiltinLevelsTest(BaseTest):
|
|
"""Test builtin levels and their inheritance."""
|
|
|
|
def test_flat(self):
|
|
#Logging levels in a flat logger namespace.
|
|
m = self.next_message
|
|
|
|
ERR = logging.getLogger("ERR")
|
|
ERR.setLevel(logging.ERROR)
|
|
INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
|
|
INF.setLevel(logging.INFO)
|
|
DEB = logging.getLogger("DEB")
|
|
DEB.setLevel(logging.DEBUG)
|
|
|
|
# These should log.
|
|
ERR.log(logging.CRITICAL, m())
|
|
ERR.error(m())
|
|
|
|
INF.log(logging.CRITICAL, m())
|
|
INF.error(m())
|
|
INF.warn(m())
|
|
INF.info(m())
|
|
|
|
DEB.log(logging.CRITICAL, m())
|
|
DEB.error(m())
|
|
DEB.warn (m())
|
|
DEB.info (m())
|
|
DEB.debug(m())
|
|
|
|
# These should not log.
|
|
ERR.warn(m())
|
|
ERR.info(m())
|
|
ERR.debug(m())
|
|
|
|
INF.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('ERR', 'CRITICAL', '1'),
|
|
('ERR', 'ERROR', '2'),
|
|
('INF', 'CRITICAL', '3'),
|
|
('INF', 'ERROR', '4'),
|
|
('INF', 'WARNING', '5'),
|
|
('INF', 'INFO', '6'),
|
|
('DEB', 'CRITICAL', '7'),
|
|
('DEB', 'ERROR', '8'),
|
|
('DEB', 'WARNING', '9'),
|
|
('DEB', 'INFO', '10'),
|
|
('DEB', 'DEBUG', '11'),
|
|
])
|
|
|
|
def test_nested_explicit(self):
|
|
# Logging levels in a nested namespace, all explicitly set.
|
|
m = self.next_message
|
|
|
|
INF = logging.getLogger("INF")
|
|
INF.setLevel(logging.INFO)
|
|
INF_ERR = logging.getLogger("INF.ERR")
|
|
INF_ERR.setLevel(logging.ERROR)
|
|
|
|
# These should log.
|
|
INF_ERR.log(logging.CRITICAL, m())
|
|
INF_ERR.error(m())
|
|
|
|
# These should not log.
|
|
INF_ERR.warn(m())
|
|
INF_ERR.info(m())
|
|
INF_ERR.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('INF.ERR', 'CRITICAL', '1'),
|
|
('INF.ERR', 'ERROR', '2'),
|
|
])
|
|
|
|
def test_nested_inherited(self):
|
|
#Logging levels in a nested namespace, inherited from parent loggers.
|
|
m = self.next_message
|
|
|
|
INF = logging.getLogger("INF")
|
|
INF.setLevel(logging.INFO)
|
|
INF_ERR = logging.getLogger("INF.ERR")
|
|
INF_ERR.setLevel(logging.ERROR)
|
|
INF_UNDEF = logging.getLogger("INF.UNDEF")
|
|
INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
|
|
UNDEF = logging.getLogger("UNDEF")
|
|
|
|
# These should log.
|
|
INF_UNDEF.log(logging.CRITICAL, m())
|
|
INF_UNDEF.error(m())
|
|
INF_UNDEF.warn(m())
|
|
INF_UNDEF.info(m())
|
|
INF_ERR_UNDEF.log(logging.CRITICAL, m())
|
|
INF_ERR_UNDEF.error(m())
|
|
|
|
# These should not log.
|
|
INF_UNDEF.debug(m())
|
|
INF_ERR_UNDEF.warn(m())
|
|
INF_ERR_UNDEF.info(m())
|
|
INF_ERR_UNDEF.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('INF.UNDEF', 'CRITICAL', '1'),
|
|
('INF.UNDEF', 'ERROR', '2'),
|
|
('INF.UNDEF', 'WARNING', '3'),
|
|
('INF.UNDEF', 'INFO', '4'),
|
|
('INF.ERR.UNDEF', 'CRITICAL', '5'),
|
|
('INF.ERR.UNDEF', 'ERROR', '6'),
|
|
])
|
|
|
|
def test_nested_with_virtual_parent(self):
|
|
# Logging levels when some parent does not exist yet.
|
|
m = self.next_message
|
|
|
|
INF = logging.getLogger("INF")
|
|
GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
|
|
CHILD = logging.getLogger("INF.BADPARENT")
|
|
INF.setLevel(logging.INFO)
|
|
|
|
# These should log.
|
|
GRANDCHILD.log(logging.FATAL, m())
|
|
GRANDCHILD.info(m())
|
|
CHILD.log(logging.FATAL, m())
|
|
CHILD.info(m())
|
|
|
|
# These should not log.
|
|
GRANDCHILD.debug(m())
|
|
CHILD.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
|
|
('INF.BADPARENT.UNDEF', 'INFO', '2'),
|
|
('INF.BADPARENT', 'CRITICAL', '3'),
|
|
('INF.BADPARENT', 'INFO', '4'),
|
|
])
|
|
|
|
def test_invalid_name(self):
|
|
self.assertRaises(TypeError, logging.getLogger, any)
|
|
|
|
class BasicFilterTest(BaseTest):
|
|
|
|
"""Test the bundled Filter class."""
|
|
|
|
def test_filter(self):
|
|
# Only messages satisfying the specified criteria pass through the
|
|
# filter.
|
|
filter_ = logging.Filter("spam.eggs")
|
|
handler = self.root_logger.handlers[0]
|
|
try:
|
|
handler.addFilter(filter_)
|
|
spam = logging.getLogger("spam")
|
|
spam_eggs = logging.getLogger("spam.eggs")
|
|
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
|
|
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
|
|
|
|
spam.info(self.next_message())
|
|
spam_eggs.info(self.next_message()) # Good.
|
|
spam_eggs_fish.info(self.next_message()) # Good.
|
|
spam_bakedbeans.info(self.next_message())
|
|
|
|
self.assert_log_lines([
|
|
('spam.eggs', 'INFO', '2'),
|
|
('spam.eggs.fish', 'INFO', '3'),
|
|
])
|
|
finally:
|
|
handler.removeFilter(filter_)
|
|
|
|
def test_callable_filter(self):
|
|
# Only messages satisfying the specified criteria pass through the
|
|
# filter.
|
|
|
|
def filterfunc(record):
|
|
parts = record.name.split('.')
|
|
prefix = '.'.join(parts[:2])
|
|
return prefix == 'spam.eggs'
|
|
|
|
handler = self.root_logger.handlers[0]
|
|
try:
|
|
handler.addFilter(filterfunc)
|
|
spam = logging.getLogger("spam")
|
|
spam_eggs = logging.getLogger("spam.eggs")
|
|
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
|
|
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
|
|
|
|
spam.info(self.next_message())
|
|
spam_eggs.info(self.next_message()) # Good.
|
|
spam_eggs_fish.info(self.next_message()) # Good.
|
|
spam_bakedbeans.info(self.next_message())
|
|
|
|
self.assert_log_lines([
|
|
('spam.eggs', 'INFO', '2'),
|
|
('spam.eggs.fish', 'INFO', '3'),
|
|
])
|
|
finally:
|
|
handler.removeFilter(filterfunc)
|
|
|
|
|
|
#
|
|
# First, we define our levels. There can be as many as you want - the only
|
|
# limitations are that they should be integers, the lowest should be > 0 and
|
|
# larger values mean less information being logged. If you need specific
|
|
# level values which do not fit into these limitations, you can use a
|
|
# mapping dictionary to convert between your application levels and the
|
|
# logging system.
|
|
#
|
|
SILENT = 120
|
|
TACITURN = 119
|
|
TERSE = 118
|
|
EFFUSIVE = 117
|
|
SOCIABLE = 116
|
|
VERBOSE = 115
|
|
TALKATIVE = 114
|
|
GARRULOUS = 113
|
|
CHATTERBOX = 112
|
|
BORING = 111
|
|
|
|
LEVEL_RANGE = range(BORING, SILENT + 1)
|
|
|
|
#
|
|
# Next, we define names for our levels. You don't need to do this - in which
|
|
# case the system will use "Level n" to denote the text for the level.
|
|
#
|
|
my_logging_levels = {
|
|
SILENT : 'Silent',
|
|
TACITURN : 'Taciturn',
|
|
TERSE : 'Terse',
|
|
EFFUSIVE : 'Effusive',
|
|
SOCIABLE : 'Sociable',
|
|
VERBOSE : 'Verbose',
|
|
TALKATIVE : 'Talkative',
|
|
GARRULOUS : 'Garrulous',
|
|
CHATTERBOX : 'Chatterbox',
|
|
BORING : 'Boring',
|
|
}
|
|
|
|
class GarrulousFilter(logging.Filter):
|
|
|
|
"""A filter which blocks garrulous messages."""
|
|
|
|
def filter(self, record):
|
|
return record.levelno != GARRULOUS
|
|
|
|
class VerySpecificFilter(logging.Filter):
|
|
|
|
"""A filter which blocks sociable and taciturn messages."""
|
|
|
|
def filter(self, record):
|
|
return record.levelno not in [SOCIABLE, TACITURN]
|
|
|
|
|
|
class CustomLevelsAndFiltersTest(BaseTest):
|
|
|
|
"""Test various filtering possibilities with custom logging levels."""
|
|
|
|
# Skip the logger name group.
|
|
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
|
|
|
|
def setUp(self):
|
|
BaseTest.setUp(self)
|
|
for k, v in my_logging_levels.items():
|
|
logging.addLevelName(k, v)
|
|
|
|
def log_at_all_levels(self, logger):
|
|
for lvl in LEVEL_RANGE:
|
|
logger.log(lvl, self.next_message())
|
|
|
|
def test_logger_filter(self):
|
|
# Filter at logger level.
|
|
self.root_logger.setLevel(VERBOSE)
|
|
# Levels >= 'Verbose' are good.
|
|
self.log_at_all_levels(self.root_logger)
|
|
self.assert_log_lines([
|
|
('Verbose', '5'),
|
|
('Sociable', '6'),
|
|
('Effusive', '7'),
|
|
('Terse', '8'),
|
|
('Taciturn', '9'),
|
|
('Silent', '10'),
|
|
])
|
|
|
|
def test_handler_filter(self):
|
|
# Filter at handler level.
|
|
self.root_logger.handlers[0].setLevel(SOCIABLE)
|
|
try:
|
|
# Levels >= 'Sociable' are good.
|
|
self.log_at_all_levels(self.root_logger)
|
|
self.assert_log_lines([
|
|
('Sociable', '6'),
|
|
('Effusive', '7'),
|
|
('Terse', '8'),
|
|
('Taciturn', '9'),
|
|
('Silent', '10'),
|
|
])
|
|
finally:
|
|
self.root_logger.handlers[0].setLevel(logging.NOTSET)
|
|
|
|
def test_specific_filters(self):
|
|
# Set a specific filter object on the handler, and then add another
|
|
# filter object on the logger itself.
|
|
handler = self.root_logger.handlers[0]
|
|
specific_filter = None
|
|
garr = GarrulousFilter()
|
|
handler.addFilter(garr)
|
|
try:
|
|
self.log_at_all_levels(self.root_logger)
|
|
first_lines = [
|
|
# Notice how 'Garrulous' is missing
|
|
('Boring', '1'),
|
|
('Chatterbox', '2'),
|
|
('Talkative', '4'),
|
|
('Verbose', '5'),
|
|
('Sociable', '6'),
|
|
('Effusive', '7'),
|
|
('Terse', '8'),
|
|
('Taciturn', '9'),
|
|
('Silent', '10'),
|
|
]
|
|
self.assert_log_lines(first_lines)
|
|
|
|
specific_filter = VerySpecificFilter()
|
|
self.root_logger.addFilter(specific_filter)
|
|
self.log_at_all_levels(self.root_logger)
|
|
self.assert_log_lines(first_lines + [
|
|
# Not only 'Garrulous' is still missing, but also 'Sociable'
|
|
# and 'Taciturn'
|
|
('Boring', '11'),
|
|
('Chatterbox', '12'),
|
|
('Talkative', '14'),
|
|
('Verbose', '15'),
|
|
('Effusive', '17'),
|
|
('Terse', '18'),
|
|
('Silent', '20'),
|
|
])
|
|
finally:
|
|
if specific_filter:
|
|
self.root_logger.removeFilter(specific_filter)
|
|
handler.removeFilter(garr)
|
|
|
|
|
|
class MemoryHandlerTest(BaseTest):
|
|
|
|
"""Tests for the MemoryHandler."""
|
|
|
|
# Do not bother with a logger name group.
|
|
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
|
|
|
|
def setUp(self):
|
|
BaseTest.setUp(self)
|
|
self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
|
|
self.root_hdlr)
|
|
self.mem_logger = logging.getLogger('mem')
|
|
self.mem_logger.propagate = 0
|
|
self.mem_logger.addHandler(self.mem_hdlr)
|
|
|
|
def tearDown(self):
|
|
self.mem_hdlr.close()
|
|
BaseTest.tearDown(self)
|
|
|
|
def test_flush(self):
|
|
# The memory handler flushes to its target handler based on specific
|
|
# criteria (message count and message level).
|
|
self.mem_logger.debug(self.next_message())
|
|
self.assert_log_lines([])
|
|
self.mem_logger.info(self.next_message())
|
|
self.assert_log_lines([])
|
|
# This will flush because the level is >= logging.WARNING
|
|
self.mem_logger.warn(self.next_message())
|
|
lines = [
|
|
('DEBUG', '1'),
|
|
('INFO', '2'),
|
|
('WARNING', '3'),
|
|
]
|
|
self.assert_log_lines(lines)
|
|
for n in (4, 14):
|
|
for i in range(9):
|
|
self.mem_logger.debug(self.next_message())
|
|
self.assert_log_lines(lines)
|
|
# This will flush because it's the 10th message since the last
|
|
# flush.
|
|
self.mem_logger.debug(self.next_message())
|
|
lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
|
|
self.assert_log_lines(lines)
|
|
|
|
self.mem_logger.debug(self.next_message())
|
|
self.assert_log_lines(lines)
|
|
|
|
|
|
class ExceptionFormatter(logging.Formatter):
|
|
"""A special exception formatter."""
|
|
def formatException(self, ei):
|
|
return "Got a [%s]" % ei[0].__name__
|
|
|
|
|
|
class ConfigFileTest(BaseTest):
|
|
|
|
"""Reading logging config from a .ini-style config file."""
|
|
|
|
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
|
|
|
|
# config0 is a standard configuration.
|
|
config0 = """
|
|
[loggers]
|
|
keys=root
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=hand1
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config1 adds a little to the standard configuration.
|
|
config1 = """
|
|
[loggers]
|
|
keys=root,parser
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=
|
|
|
|
[logger_parser]
|
|
level=DEBUG
|
|
handlers=hand1
|
|
propagate=1
|
|
qualname=compiler.parser
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config1a moves the handler to the root.
|
|
config1a = """
|
|
[loggers]
|
|
keys=root,parser
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=hand1
|
|
|
|
[logger_parser]
|
|
level=DEBUG
|
|
handlers=
|
|
propagate=1
|
|
qualname=compiler.parser
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config2 has a subtle configuration error that should be reported
|
|
config2 = config1.replace("sys.stdout", "sys.stbout")
|
|
|
|
# config3 has a less subtle configuration error
|
|
config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
|
|
|
|
# config4 specifies a custom formatter class to be loaded
|
|
config4 = """
|
|
[loggers]
|
|
keys=root
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=NOTSET
|
|
handlers=hand1
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
class=""" + __name__ + """.ExceptionFormatter
|
|
format=%(levelname)s:%(name)s:%(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config5 specifies a custom handler class to be loaded
|
|
config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
|
|
|
|
# config6 uses ', ' delimiters in the handlers and formatters sections
|
|
config6 = """
|
|
[loggers]
|
|
keys=root,parser
|
|
|
|
[handlers]
|
|
keys=hand1, hand2
|
|
|
|
[formatters]
|
|
keys=form1, form2
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=
|
|
|
|
[logger_parser]
|
|
level=DEBUG
|
|
handlers=hand1
|
|
propagate=1
|
|
qualname=compiler.parser
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[handler_hand2]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stderr,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
|
|
[formatter_form2]
|
|
format=%(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config7 adds a compiler logger.
|
|
config7 = """
|
|
[loggers]
|
|
keys=root,parser,compiler
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=hand1
|
|
|
|
[logger_compiler]
|
|
level=DEBUG
|
|
handlers=
|
|
propagate=1
|
|
qualname=compiler
|
|
|
|
[logger_parser]
|
|
level=DEBUG
|
|
handlers=
|
|
propagate=1
|
|
qualname=compiler.parser
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
def apply_config(self, conf):
|
|
file = io.StringIO(textwrap.dedent(conf))
|
|
logging.config.fileConfig(file)
|
|
|
|
def test_config0_ok(self):
|
|
# A simple config file which overrides the default settings.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config0)
|
|
logger = logging.getLogger()
|
|
# Won't output anything
|
|
logger.info(self.next_message())
|
|
# Outputs a message
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config1_ok(self, config=config1):
|
|
# A config file defining a sub-parser as well.
|
|
with captured_stdout() as output:
|
|
self.apply_config(config)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config2_failure(self):
|
|
# A simple config file which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config2)
|
|
|
|
def test_config3_failure(self):
|
|
# A simple config file which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config3)
|
|
|
|
def test_config4_ok(self):
|
|
# A config file specifying a custom formatter class.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config4)
|
|
logger = logging.getLogger()
|
|
try:
|
|
raise RuntimeError()
|
|
except RuntimeError:
|
|
logging.exception("just testing")
|
|
sys.stdout.seek(0)
|
|
self.assertEqual(output.getvalue(),
|
|
"ERROR:root:just testing\nGot a [RuntimeError]\n")
|
|
# Original logger output is empty
|
|
self.assert_log_lines([])
|
|
|
|
def test_config5_ok(self):
|
|
self.test_config1_ok(config=self.config5)
|
|
|
|
def test_config6_ok(self):
|
|
self.test_config1_ok(config=self.config6)
|
|
|
|
def test_config7_ok(self):
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config1a)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# See issue #11424. compiler-hyphenated sorts
|
|
# between compiler and compiler.xyz and this
|
|
# was preventing compiler.xyz from being included
|
|
# in the child loggers of compiler because of an
|
|
# overzealous loop termination condition.
|
|
hyphenated = logging.getLogger('compiler-hyphenated')
|
|
# All will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
hyphenated.critical(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
('CRITICAL', '3'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config7)
|
|
logger = logging.getLogger("compiler.parser")
|
|
self.assertFalse(logger.disabled)
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
logger = logging.getLogger("compiler.lexer")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
# Will not appear
|
|
hyphenated.critical(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '4'),
|
|
('ERROR', '5'),
|
|
('INFO', '6'),
|
|
('ERROR', '7'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
class LogRecordStreamHandler(StreamRequestHandler):
|
|
|
|
"""Handler for a streaming logging request. It saves the log message in the
|
|
TCP server's 'log_output' attribute."""
|
|
|
|
TCP_LOG_END = "!!!END!!!"
|
|
|
|
def handle(self):
|
|
"""Handle multiple requests - each expected to be of 4-byte length,
|
|
followed by the LogRecord in pickle format. Logs the record
|
|
according to whatever policy is configured locally."""
|
|
while True:
|
|
chunk = self.connection.recv(4)
|
|
if len(chunk) < 4:
|
|
break
|
|
slen = struct.unpack(">L", chunk)[0]
|
|
chunk = self.connection.recv(slen)
|
|
while len(chunk) < slen:
|
|
chunk = chunk + self.connection.recv(slen - len(chunk))
|
|
obj = self.unpickle(chunk)
|
|
record = logging.makeLogRecord(obj)
|
|
self.handle_log_record(record)
|
|
|
|
def unpickle(self, data):
|
|
return pickle.loads(data)
|
|
|
|
def handle_log_record(self, record):
|
|
# If the end-of-messages sentinel is seen, tell the server to
|
|
# terminate.
|
|
if self.TCP_LOG_END in record.msg:
|
|
self.server.abort = 1
|
|
return
|
|
self.server.log_output += record.msg + "\n"
|
|
|
|
|
|
class LogRecordSocketReceiver(ThreadingTCPServer):
|
|
|
|
"""A simple-minded TCP socket-based logging receiver suitable for test
|
|
purposes."""
|
|
|
|
allow_reuse_address = 1
|
|
log_output = ""
|
|
|
|
def __init__(self, host='localhost',
|
|
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
|
|
handler=LogRecordStreamHandler):
|
|
ThreadingTCPServer.__init__(self, (host, port), handler)
|
|
self.abort = False
|
|
self.timeout = 0.1
|
|
self.finished = threading.Event()
|
|
|
|
def serve_until_stopped(self):
|
|
while not self.abort:
|
|
rd, wr, ex = select.select([self.socket.fileno()], [], [],
|
|
self.timeout)
|
|
if rd:
|
|
self.handle_request()
|
|
# Notify the main thread that we're about to exit
|
|
self.finished.set()
|
|
# close the listen socket
|
|
self.server_close()
|
|
|
|
|
|
@unittest.skipUnless(threading, 'Threading required for this test.')
|
|
class SocketHandlerTest(BaseTest):
|
|
|
|
"""Test for SocketHandler objects."""
|
|
|
|
def setUp(self):
|
|
"""Set up a TCP server to receive log messages, and a SocketHandler
|
|
pointing to that server's address and port."""
|
|
BaseTest.setUp(self)
|
|
self.tcpserver = LogRecordSocketReceiver(port=0)
|
|
self.port = self.tcpserver.socket.getsockname()[1]
|
|
self.threads = [
|
|
threading.Thread(target=self.tcpserver.serve_until_stopped)]
|
|
for thread in self.threads:
|
|
thread.start()
|
|
|
|
self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
|
|
self.sock_hdlr.setFormatter(self.root_formatter)
|
|
self.root_logger.removeHandler(self.root_logger.handlers[0])
|
|
self.root_logger.addHandler(self.sock_hdlr)
|
|
|
|
def tearDown(self):
|
|
"""Shutdown the TCP server."""
|
|
try:
|
|
self.tcpserver.abort = True
|
|
del self.tcpserver
|
|
self.root_logger.removeHandler(self.sock_hdlr)
|
|
self.sock_hdlr.close()
|
|
for thread in self.threads:
|
|
thread.join(2.0)
|
|
finally:
|
|
BaseTest.tearDown(self)
|
|
|
|
def get_output(self):
|
|
"""Get the log output as received by the TCP server."""
|
|
# Signal the TCP receiver and wait for it to terminate.
|
|
self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
|
|
self.tcpserver.finished.wait(2.0)
|
|
return self.tcpserver.log_output
|
|
|
|
def test_output(self):
|
|
# The log message sent to the SocketHandler is properly received.
|
|
logger = logging.getLogger("tcp")
|
|
logger.error("spam")
|
|
logger.debug("eggs")
|
|
self.assertEqual(self.get_output(), "spam\neggs\n")
|
|
|
|
|
|
class MemoryTest(BaseTest):
|
|
|
|
"""Test memory persistence of logger objects."""
|
|
|
|
def setUp(self):
|
|
"""Create a dict to remember potentially destroyed objects."""
|
|
BaseTest.setUp(self)
|
|
self._survivors = {}
|
|
|
|
def _watch_for_survival(self, *args):
|
|
"""Watch the given objects for survival, by creating weakrefs to
|
|
them."""
|
|
for obj in args:
|
|
key = id(obj), repr(obj)
|
|
self._survivors[key] = weakref.ref(obj)
|
|
|
|
def _assertTruesurvival(self):
|
|
"""Assert that all objects watched for survival have survived."""
|
|
# Trigger cycle breaking.
|
|
gc.collect()
|
|
dead = []
|
|
for (id_, repr_), ref in self._survivors.items():
|
|
if ref() is None:
|
|
dead.append(repr_)
|
|
if dead:
|
|
self.fail("%d objects should have survived "
|
|
"but have been destroyed: %s" % (len(dead), ", ".join(dead)))
|
|
|
|
def test_persistent_loggers(self):
|
|
# Logger objects are persistent and retain their configuration, even
|
|
# if visible references are destroyed.
|
|
self.root_logger.setLevel(logging.INFO)
|
|
foo = logging.getLogger("foo")
|
|
self._watch_for_survival(foo)
|
|
foo.setLevel(logging.DEBUG)
|
|
self.root_logger.debug(self.next_message())
|
|
foo.debug(self.next_message())
|
|
self.assert_log_lines([
|
|
('foo', 'DEBUG', '2'),
|
|
])
|
|
del foo
|
|
# foo has survived.
|
|
self._assertTruesurvival()
|
|
# foo has retained its settings.
|
|
bar = logging.getLogger("foo")
|
|
bar.debug(self.next_message())
|
|
self.assert_log_lines([
|
|
('foo', 'DEBUG', '2'),
|
|
('foo', 'DEBUG', '3'),
|
|
])
|
|
|
|
|
|
class EncodingTest(BaseTest):
|
|
def test_encoding_plain_file(self):
|
|
# In Python 2.x, a plain file object is treated as having no encoding.
|
|
log = logging.getLogger("test")
|
|
fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
|
|
os.close(fd)
|
|
# the non-ascii data we write to the log.
|
|
data = "foo\x80"
|
|
try:
|
|
handler = logging.FileHandler(fn, encoding="utf-8")
|
|
log.addHandler(handler)
|
|
try:
|
|
# write non-ascii data to the log.
|
|
log.warning(data)
|
|
finally:
|
|
log.removeHandler(handler)
|
|
handler.close()
|
|
# check we wrote exactly those bytes, ignoring trailing \n etc
|
|
f = open(fn, encoding="utf-8")
|
|
try:
|
|
self.assertEqual(f.read().rstrip(), data)
|
|
finally:
|
|
f.close()
|
|
finally:
|
|
if os.path.isfile(fn):
|
|
os.remove(fn)
|
|
|
|
def test_encoding_cyrillic_unicode(self):
|
|
log = logging.getLogger("test")
|
|
#Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
|
|
message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
|
|
#Ensure it's written in a Cyrillic encoding
|
|
writer_class = codecs.getwriter('cp1251')
|
|
writer_class.encoding = 'cp1251'
|
|
stream = io.BytesIO()
|
|
writer = writer_class(stream, 'strict')
|
|
handler = logging.StreamHandler(writer)
|
|
log.addHandler(handler)
|
|
try:
|
|
log.warning(message)
|
|
finally:
|
|
log.removeHandler(handler)
|
|
handler.close()
|
|
# check we wrote exactly those bytes, ignoring trailing \n etc
|
|
s = stream.getvalue()
|
|
#Compare against what the data should be when encoded in CP-1251
|
|
self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
|
|
|
|
|
|
class WarningsTest(BaseTest):
|
|
|
|
def test_warnings(self):
|
|
with warnings.catch_warnings():
|
|
logging.captureWarnings(True)
|
|
try:
|
|
warnings.filterwarnings("always", category=UserWarning)
|
|
file = io.StringIO()
|
|
h = logging.StreamHandler(file)
|
|
logger = logging.getLogger("py.warnings")
|
|
logger.addHandler(h)
|
|
warnings.warn("I'm warning you...")
|
|
logger.removeHandler(h)
|
|
s = file.getvalue()
|
|
h.close()
|
|
self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
|
|
|
|
#See if an explicit file uses the original implementation
|
|
file = io.StringIO()
|
|
warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
|
|
file, "Dummy line")
|
|
s = file.getvalue()
|
|
file.close()
|
|
self.assertEqual(s,
|
|
"dummy.py:42: UserWarning: Explicit\n Dummy line\n")
|
|
finally:
|
|
logging.captureWarnings(False)
|
|
|
|
|
|
def formatFunc(format, datefmt=None):
|
|
return logging.Formatter(format, datefmt)
|
|
|
|
def handlerFunc():
|
|
return logging.StreamHandler()
|
|
|
|
class CustomHandler(logging.StreamHandler):
|
|
pass
|
|
|
|
class ConfigDictTest(BaseTest):
|
|
|
|
"""Reading logging config from a dictionary."""
|
|
|
|
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
|
|
|
|
# config0 is a standard configuration.
|
|
config0 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
}
|
|
|
|
# config1 adds a little to the standard configuration.
|
|
config1 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
# config1a moves the handler to the root. Used with config8a
|
|
config1a = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
}
|
|
|
|
# config2 has a subtle configuration error that should be reported
|
|
config2 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdbout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
#As config1 but with a misspelt level on a handler
|
|
config2a = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NTOSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
|
|
#As config1 but with a misspelt level on a logger
|
|
config2b = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WRANING',
|
|
},
|
|
}
|
|
|
|
# config3 has a less subtle configuration error
|
|
config3 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'misspelled_name',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
# config4 specifies a custom formatter class to be loaded
|
|
config4 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'()' : __name__ + '.ExceptionFormatter',
|
|
'format' : '%(levelname)s:%(name)s:%(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'NOTSET',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
}
|
|
|
|
# As config4 but using an actual callable rather than a string
|
|
config4a = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'()' : ExceptionFormatter,
|
|
'format' : '%(levelname)s:%(name)s:%(message)s',
|
|
},
|
|
'form2' : {
|
|
'()' : __name__ + '.formatFunc',
|
|
'format' : '%(levelname)s:%(name)s:%(message)s',
|
|
},
|
|
'form3' : {
|
|
'()' : formatFunc,
|
|
'format' : '%(levelname)s:%(name)s:%(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
'hand2' : {
|
|
'()' : handlerFunc,
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'NOTSET',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
}
|
|
|
|
# config5 specifies a custom handler class to be loaded
|
|
config5 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : __name__ + '.CustomHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
# config6 specifies a custom handler class to be loaded
|
|
# but has bad arguments
|
|
config6 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : __name__ + '.CustomHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
'9' : 'invalid parameter name',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
#config 7 does not define compiler.parser but defines compiler.lexer
|
|
#so compiler.parser should be disabled after applying it
|
|
config7 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.lexer' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
# config8 defines both compiler and compiler.lexer
|
|
# so compiler.parser should not be disabled (since
|
|
# compiler is defined)
|
|
config8 = {
|
|
'version': 1,
|
|
'disable_existing_loggers' : False,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
'compiler.lexer' : {
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
# config8a disables existing loggers
|
|
config8a = {
|
|
'version': 1,
|
|
'disable_existing_loggers' : True,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
'compiler.lexer' : {
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
config9 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'WARNING',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'WARNING',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'NOTSET',
|
|
},
|
|
}
|
|
|
|
config9a = {
|
|
'version': 1,
|
|
'incremental' : True,
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'INFO',
|
|
},
|
|
},
|
|
}
|
|
|
|
config9b = {
|
|
'version': 1,
|
|
'incremental' : True,
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'level' : 'INFO',
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'INFO',
|
|
},
|
|
},
|
|
}
|
|
|
|
#As config1 but with a filter added
|
|
config10 = {
|
|
'version': 1,
|
|
'formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'filters' : {
|
|
'filt1' : {
|
|
'name' : 'compiler.parser',
|
|
},
|
|
},
|
|
'handlers' : {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
'filters' : ['filt1'],
|
|
},
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'filters' : ['filt1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
}
|
|
|
|
#As config1 but using cfg:// references
|
|
config11 = {
|
|
'version': 1,
|
|
'true_formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handler_configs': {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'formatters' : 'cfg://true_formatters',
|
|
'handlers' : {
|
|
'hand1' : 'cfg://handler_configs[hand1]',
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
#As config11 but missing the version key
|
|
config12 = {
|
|
'true_formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handler_configs': {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'formatters' : 'cfg://true_formatters',
|
|
'handlers' : {
|
|
'hand1' : 'cfg://handler_configs[hand1]',
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
#As config11 but using an unsupported version
|
|
config13 = {
|
|
'version': 2,
|
|
'true_formatters': {
|
|
'form1' : {
|
|
'format' : '%(levelname)s ++ %(message)s',
|
|
},
|
|
},
|
|
'handler_configs': {
|
|
'hand1' : {
|
|
'class' : 'logging.StreamHandler',
|
|
'formatter' : 'form1',
|
|
'level' : 'NOTSET',
|
|
'stream' : 'ext://sys.stdout',
|
|
},
|
|
},
|
|
'formatters' : 'cfg://true_formatters',
|
|
'handlers' : {
|
|
'hand1' : 'cfg://handler_configs[hand1]',
|
|
},
|
|
'loggers' : {
|
|
'compiler.parser' : {
|
|
'level' : 'DEBUG',
|
|
'handlers' : ['hand1'],
|
|
},
|
|
},
|
|
'root' : {
|
|
'level' : 'WARNING',
|
|
},
|
|
}
|
|
|
|
def apply_config(self, conf):
|
|
logging.config.dictConfig(conf)
|
|
|
|
def test_config0_ok(self):
|
|
# A simple config which overrides the default settings.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config0)
|
|
logger = logging.getLogger()
|
|
# Won't output anything
|
|
logger.info(self.next_message())
|
|
# Outputs a message
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config1_ok(self, config=config1):
|
|
# A config defining a sub-parser as well.
|
|
with captured_stdout() as output:
|
|
self.apply_config(config)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config2_failure(self):
|
|
# A simple config which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config2)
|
|
|
|
def test_config2a_failure(self):
|
|
# A simple config which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config2a)
|
|
|
|
def test_config2b_failure(self):
|
|
# A simple config which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config2b)
|
|
|
|
def test_config3_failure(self):
|
|
# A simple config which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config3)
|
|
|
|
def test_config4_ok(self):
|
|
# A config specifying a custom formatter class.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config4)
|
|
#logger = logging.getLogger()
|
|
try:
|
|
raise RuntimeError()
|
|
except RuntimeError:
|
|
logging.exception("just testing")
|
|
sys.stdout.seek(0)
|
|
self.assertEqual(output.getvalue(),
|
|
"ERROR:root:just testing\nGot a [RuntimeError]\n")
|
|
# Original logger output is empty
|
|
self.assert_log_lines([])
|
|
|
|
def test_config4a_ok(self):
|
|
# A config specifying a custom formatter class.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config4a)
|
|
#logger = logging.getLogger()
|
|
try:
|
|
raise RuntimeError()
|
|
except RuntimeError:
|
|
logging.exception("just testing")
|
|
sys.stdout.seek(0)
|
|
self.assertEqual(output.getvalue(),
|
|
"ERROR:root:just testing\nGot a [RuntimeError]\n")
|
|
# Original logger output is empty
|
|
self.assert_log_lines([])
|
|
|
|
def test_config5_ok(self):
|
|
self.test_config1_ok(config=self.config5)
|
|
|
|
def test_config6_failure(self):
|
|
self.assertRaises(Exception, self.apply_config, self.config6)
|
|
|
|
def test_config7_ok(self):
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config1)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config7)
|
|
logger = logging.getLogger("compiler.parser")
|
|
self.assertTrue(logger.disabled)
|
|
logger = logging.getLogger("compiler.lexer")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '3'),
|
|
('ERROR', '4'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
#Same as test_config_7_ok but don't disable old loggers.
|
|
def test_config_8_ok(self):
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config1)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# All will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config8)
|
|
logger = logging.getLogger("compiler.parser")
|
|
self.assertFalse(logger.disabled)
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
logger = logging.getLogger("compiler.lexer")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '3'),
|
|
('ERROR', '4'),
|
|
('INFO', '5'),
|
|
('ERROR', '6'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config_8a_ok(self):
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config1a)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# See issue #11424. compiler-hyphenated sorts
|
|
# between compiler and compiler.xyz and this
|
|
# was preventing compiler.xyz from being included
|
|
# in the child loggers of compiler because of an
|
|
# overzealous loop termination condition.
|
|
hyphenated = logging.getLogger('compiler-hyphenated')
|
|
# All will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
hyphenated.critical(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
('CRITICAL', '3'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config8a)
|
|
logger = logging.getLogger("compiler.parser")
|
|
self.assertFalse(logger.disabled)
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
logger = logging.getLogger("compiler.lexer")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
# Will not appear
|
|
hyphenated.critical(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '4'),
|
|
('ERROR', '5'),
|
|
('INFO', '6'),
|
|
('ERROR', '7'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config_9_ok(self):
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config9)
|
|
logger = logging.getLogger("compiler.parser")
|
|
#Nothing will be output since both handler and logger are set to WARNING
|
|
logger.info(self.next_message())
|
|
self.assert_log_lines([], stream=output)
|
|
self.apply_config(self.config9a)
|
|
#Nothing will be output since both handler is still set to WARNING
|
|
logger.info(self.next_message())
|
|
self.assert_log_lines([], stream=output)
|
|
self.apply_config(self.config9b)
|
|
#Message should now be output
|
|
logger.info(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '3'),
|
|
], stream=output)
|
|
|
|
def test_config_10_ok(self):
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config10)
|
|
logger = logging.getLogger("compiler.parser")
|
|
logger.warning(self.next_message())
|
|
logger = logging.getLogger('compiler')
|
|
#Not output, because filtered
|
|
logger.warning(self.next_message())
|
|
logger = logging.getLogger('compiler.lexer')
|
|
#Not output, because filtered
|
|
logger.warning(self.next_message())
|
|
logger = logging.getLogger("compiler.parser.codegen")
|
|
#Output, as not filtered
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('WARNING', '1'),
|
|
('ERROR', '4'),
|
|
], stream=output)
|
|
|
|
def test_config11_ok(self):
|
|
self.test_config1_ok(self.config11)
|
|
|
|
def test_config12_failure(self):
|
|
self.assertRaises(Exception, self.apply_config, self.config12)
|
|
|
|
def test_config13_failure(self):
|
|
self.assertRaises(Exception, self.apply_config, self.config13)
|
|
|
|
@unittest.skipUnless(threading, 'listen() needs threading to work')
|
|
def setup_via_listener(self, text):
|
|
text = text.encode("utf-8")
|
|
# Ask for a randomly assigned port (by using port 0)
|
|
t = logging.config.listen(0)
|
|
t.start()
|
|
t.ready.wait()
|
|
# Now get the port allocated
|
|
port = t.port
|
|
t.ready.clear()
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(2.0)
|
|
sock.connect(('localhost', port))
|
|
|
|
slen = struct.pack('>L', len(text))
|
|
s = slen + text
|
|
sentsofar = 0
|
|
left = len(s)
|
|
while left > 0:
|
|
sent = sock.send(s[sentsofar:])
|
|
sentsofar += sent
|
|
left -= sent
|
|
sock.close()
|
|
finally:
|
|
t.ready.wait(2.0)
|
|
logging.config.stopListening()
|
|
t.join(2.0)
|
|
|
|
def test_listen_config_10_ok(self):
|
|
with captured_stdout() as output:
|
|
self.setup_via_listener(json.dumps(self.config10))
|
|
logger = logging.getLogger("compiler.parser")
|
|
logger.warning(self.next_message())
|
|
logger = logging.getLogger('compiler')
|
|
#Not output, because filtered
|
|
logger.warning(self.next_message())
|
|
logger = logging.getLogger('compiler.lexer')
|
|
#Not output, because filtered
|
|
logger.warning(self.next_message())
|
|
logger = logging.getLogger("compiler.parser.codegen")
|
|
#Output, as not filtered
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('WARNING', '1'),
|
|
('ERROR', '4'),
|
|
], stream=output)
|
|
|
|
def test_listen_config_1_ok(self):
|
|
with captured_stdout() as output:
|
|
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
|
|
logger = logging.getLogger("compiler.parser")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
|
|
class ManagerTest(BaseTest):
|
|
def test_manager_loggerclass(self):
|
|
logged = []
|
|
|
|
class MyLogger(logging.Logger):
|
|
def _log(self, level, msg, args, exc_info=None, extra=None):
|
|
logged.append(msg)
|
|
|
|
man = logging.Manager(None)
|
|
self.assertRaises(TypeError, man.setLoggerClass, int)
|
|
man.setLoggerClass(MyLogger)
|
|
logger = man.getLogger('test')
|
|
logger.warning('should appear in logged')
|
|
logging.warning('should not appear in logged')
|
|
|
|
self.assertEqual(logged, ['should appear in logged'])
|
|
|
|
|
|
class ChildLoggerTest(BaseTest):
|
|
def test_child_loggers(self):
|
|
r = logging.getLogger()
|
|
l1 = logging.getLogger('abc')
|
|
l2 = logging.getLogger('def.ghi')
|
|
c1 = r.getChild('xyz')
|
|
c2 = r.getChild('uvw.xyz')
|
|
self.assertTrue(c1 is logging.getLogger('xyz'))
|
|
self.assertTrue(c2 is logging.getLogger('uvw.xyz'))
|
|
c1 = l1.getChild('def')
|
|
c2 = c1.getChild('ghi')
|
|
c3 = l1.getChild('def.ghi')
|
|
self.assertTrue(c1 is logging.getLogger('abc.def'))
|
|
self.assertTrue(c2 is logging.getLogger('abc.def.ghi'))
|
|
self.assertTrue(c2 is c3)
|
|
|
|
|
|
class DerivedLogRecord(logging.LogRecord):
|
|
pass
|
|
|
|
class LogRecordFactoryTest(BaseTest):
|
|
|
|
def setUp(self):
|
|
class CheckingFilter(logging.Filter):
|
|
def __init__(self, cls):
|
|
self.cls = cls
|
|
|
|
def filter(self, record):
|
|
t = type(record)
|
|
if t is not self.cls:
|
|
msg = 'Unexpected LogRecord type %s, expected %s' % (t,
|
|
self.cls)
|
|
raise TypeError(msg)
|
|
return True
|
|
|
|
BaseTest.setUp(self)
|
|
self.filter = CheckingFilter(DerivedLogRecord)
|
|
self.root_logger.addFilter(self.filter)
|
|
self.orig_factory = logging.getLogRecordFactory()
|
|
|
|
def tearDown(self):
|
|
self.root_logger.removeFilter(self.filter)
|
|
BaseTest.tearDown(self)
|
|
logging.setLogRecordFactory(self.orig_factory)
|
|
|
|
def test_logrecord_class(self):
|
|
self.assertRaises(TypeError, self.root_logger.warning,
|
|
self.next_message())
|
|
logging.setLogRecordFactory(DerivedLogRecord)
|
|
self.root_logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('root', 'ERROR', '2'),
|
|
])
|
|
|
|
|
|
class QueueHandlerTest(BaseTest):
|
|
# Do not bother with a logger name group.
|
|
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
|
|
|
|
def setUp(self):
|
|
BaseTest.setUp(self)
|
|
self.queue = queue.Queue(-1)
|
|
self.que_hdlr = logging.handlers.QueueHandler(self.queue)
|
|
self.que_logger = logging.getLogger('que')
|
|
self.que_logger.propagate = False
|
|
self.que_logger.setLevel(logging.WARNING)
|
|
self.que_logger.addHandler(self.que_hdlr)
|
|
|
|
def tearDown(self):
|
|
self.que_hdlr.close()
|
|
BaseTest.tearDown(self)
|
|
|
|
def test_queue_handler(self):
|
|
self.que_logger.debug(self.next_message())
|
|
self.assertRaises(queue.Empty, self.queue.get_nowait)
|
|
self.que_logger.info(self.next_message())
|
|
self.assertRaises(queue.Empty, self.queue.get_nowait)
|
|
msg = self.next_message()
|
|
self.que_logger.warning(msg)
|
|
data = self.queue.get_nowait()
|
|
self.assertTrue(isinstance(data, logging.LogRecord))
|
|
self.assertEqual(data.name, self.que_logger.name)
|
|
self.assertEqual((data.msg, data.args), (msg, None))
|
|
|
|
@unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
|
|
'logging.handlers.QueueListener required for this test')
|
|
def test_queue_listener(self):
|
|
handler = TestHandler(Matcher())
|
|
listener = logging.handlers.QueueListener(self.queue, handler)
|
|
listener.start()
|
|
try:
|
|
self.que_logger.warning(self.next_message())
|
|
self.que_logger.error(self.next_message())
|
|
self.que_logger.critical(self.next_message())
|
|
finally:
|
|
listener.stop()
|
|
self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
|
|
self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
|
|
self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
|
|
|
|
|
|
class FormatterTest(unittest.TestCase):
|
|
def setUp(self):
|
|
self.common = {
|
|
'name': 'formatter.test',
|
|
'level': logging.DEBUG,
|
|
'pathname': os.path.join('path', 'to', 'dummy.ext'),
|
|
'lineno': 42,
|
|
'exc_info': None,
|
|
'func': None,
|
|
'msg': 'Message with %d %s',
|
|
'args': (2, 'placeholders'),
|
|
}
|
|
self.variants = {
|
|
}
|
|
|
|
def get_record(self, name=None):
|
|
result = dict(self.common)
|
|
if name is not None:
|
|
result.update(self.variants[name])
|
|
return logging.makeLogRecord(result)
|
|
|
|
def test_percent(self):
|
|
# Test %-formatting
|
|
r = self.get_record()
|
|
f = logging.Formatter('${%(message)s}')
|
|
self.assertEqual(f.format(r), '${Message with 2 placeholders}')
|
|
f = logging.Formatter('%(random)s')
|
|
self.assertRaises(KeyError, f.format, r)
|
|
self.assertFalse(f.usesTime())
|
|
f = logging.Formatter('%(asctime)s')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('%(asctime)-15s')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('asctime')
|
|
self.assertFalse(f.usesTime())
|
|
|
|
def test_braces(self):
|
|
# Test {}-formatting
|
|
r = self.get_record()
|
|
f = logging.Formatter('$%{message}%$', style='{')
|
|
self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
|
|
f = logging.Formatter('{random}', style='{')
|
|
self.assertRaises(KeyError, f.format, r)
|
|
self.assertFalse(f.usesTime())
|
|
f = logging.Formatter('{asctime}', style='{')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('{asctime!s:15}', style='{')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('{asctime:15}', style='{')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('asctime', style='{')
|
|
self.assertFalse(f.usesTime())
|
|
|
|
def test_dollars(self):
|
|
# Test $-formatting
|
|
r = self.get_record()
|
|
f = logging.Formatter('$message', style='$')
|
|
self.assertEqual(f.format(r), 'Message with 2 placeholders')
|
|
f = logging.Formatter('$$%${message}%$$', style='$')
|
|
self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
|
|
f = logging.Formatter('${random}', style='$')
|
|
self.assertRaises(KeyError, f.format, r)
|
|
self.assertFalse(f.usesTime())
|
|
f = logging.Formatter('${asctime}', style='$')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('${asctime', style='$')
|
|
self.assertFalse(f.usesTime())
|
|
f = logging.Formatter('$asctime', style='$')
|
|
self.assertTrue(f.usesTime())
|
|
f = logging.Formatter('asctime', style='$')
|
|
self.assertFalse(f.usesTime())
|
|
|
|
class LastResortTest(BaseTest):
|
|
def test_last_resort(self):
|
|
# Test the last resort handler
|
|
root = self.root_logger
|
|
root.removeHandler(self.root_hdlr)
|
|
old_stderr = sys.stderr
|
|
old_lastresort = logging.lastResort
|
|
old_raise_exceptions = logging.raiseExceptions
|
|
try:
|
|
sys.stderr = sio = io.StringIO()
|
|
root.warning('This is your final chance!')
|
|
self.assertEqual(sio.getvalue(), 'This is your final chance!\n')
|
|
#No handlers and no last resort, so 'No handlers' message
|
|
logging.lastResort = None
|
|
sys.stderr = sio = io.StringIO()
|
|
root.warning('This is your final chance!')
|
|
self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n')
|
|
# 'No handlers' message only printed once
|
|
sys.stderr = sio = io.StringIO()
|
|
root.warning('This is your final chance!')
|
|
self.assertEqual(sio.getvalue(), '')
|
|
root.manager.emittedNoHandlerWarning = False
|
|
#If raiseExceptions is False, no message is printed
|
|
logging.raiseExceptions = False
|
|
sys.stderr = sio = io.StringIO()
|
|
root.warning('This is your final chance!')
|
|
self.assertEqual(sio.getvalue(), '')
|
|
finally:
|
|
sys.stderr = old_stderr
|
|
root.addHandler(self.root_hdlr)
|
|
logging.lastResort = old_lastresort
|
|
logging.raiseExceptions = old_raise_exceptions
|
|
|
|
|
|
class BaseFileTest(BaseTest):
|
|
"Base class for handler tests that write log files"
|
|
|
|
def setUp(self):
|
|
BaseTest.setUp(self)
|
|
fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
|
|
os.close(fd)
|
|
self.rmfiles = []
|
|
|
|
def tearDown(self):
|
|
for fn in self.rmfiles:
|
|
os.unlink(fn)
|
|
if os.path.exists(self.fn):
|
|
os.unlink(self.fn)
|
|
BaseTest.tearDown(self)
|
|
|
|
def assertLogFile(self, filename):
|
|
"Assert a log file is there and register it for deletion"
|
|
self.assertTrue(os.path.exists(filename),
|
|
msg="Log file %r does not exist")
|
|
self.rmfiles.append(filename)
|
|
|
|
|
|
class RotatingFileHandlerTest(BaseFileTest):
|
|
def next_rec(self):
|
|
return logging.LogRecord('n', logging.DEBUG, 'p', 1,
|
|
self.next_message(), None, None, None)
|
|
|
|
def test_should_not_rollover(self):
|
|
# If maxbytes is zero rollover never occurs
|
|
rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
|
|
self.assertFalse(rh.shouldRollover(None))
|
|
rh.close()
|
|
|
|
def test_should_rollover(self):
|
|
rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
|
|
self.assertTrue(rh.shouldRollover(self.next_rec()))
|
|
rh.close()
|
|
|
|
def test_file_created(self):
|
|
# checks that the file is created and assumes it was created
|
|
# by us
|
|
rh = logging.handlers.RotatingFileHandler(self.fn)
|
|
rh.emit(self.next_rec())
|
|
self.assertLogFile(self.fn)
|
|
rh.close()
|
|
|
|
def test_rollover_filenames(self):
|
|
rh = logging.handlers.RotatingFileHandler(
|
|
self.fn, backupCount=2, maxBytes=1)
|
|
rh.emit(self.next_rec())
|
|
self.assertLogFile(self.fn)
|
|
rh.emit(self.next_rec())
|
|
self.assertLogFile(self.fn + ".1")
|
|
rh.emit(self.next_rec())
|
|
self.assertLogFile(self.fn + ".2")
|
|
self.assertFalse(os.path.exists(self.fn + ".3"))
|
|
rh.close()
|
|
|
|
class TimedRotatingFileHandlerTest(BaseFileTest):
|
|
# test methods added below
|
|
pass
|
|
|
|
def secs(**kw):
|
|
return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
|
|
|
|
for when, exp in (('S', 1),
|
|
('M', 60),
|
|
('H', 60 * 60),
|
|
('D', 60 * 60 * 24),
|
|
('MIDNIGHT', 60 * 60 * 24),
|
|
# current time (epoch start) is a Thursday, W0 means Monday
|
|
('W0', secs(days=4, hours=24)),
|
|
):
|
|
def test_compute_rollover(self, when=when, exp=exp):
|
|
rh = logging.handlers.TimedRotatingFileHandler(
|
|
self.fn, when=when, interval=1, backupCount=0, utc=True)
|
|
currentTime = 0.0
|
|
actual = rh.computeRollover(currentTime)
|
|
if exp != actual:
|
|
# Failures occur on some systems for MIDNIGHT and W0.
|
|
# Print detailed calculation for MIDNIGHT so we can try to see
|
|
# what's going on
|
|
if when == 'MIDNIGHT':
|
|
try:
|
|
if rh.utc:
|
|
t = time.gmtime(currentTime)
|
|
else:
|
|
t = time.localtime(currentTime)
|
|
currentHour = t[3]
|
|
currentMinute = t[4]
|
|
currentSecond = t[5]
|
|
# r is the number of seconds left between now and midnight
|
|
r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
|
|
currentMinute) * 60 +
|
|
currentSecond)
|
|
result = currentTime + r
|
|
print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
|
|
print('currentHour: %s' % currentHour, file=sys.stderr)
|
|
print('currentMinute: %s' % currentMinute, file=sys.stderr)
|
|
print('currentSecond: %s' % currentSecond, file=sys.stderr)
|
|
print('r: %s' % r, file=sys.stderr)
|
|
print('result: %s' % result, file=sys.stderr)
|
|
except Exception:
|
|
print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
|
|
self.assertEqual(exp, actual)
|
|
rh.close()
|
|
setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
|
|
|
|
class HandlerTest(BaseTest):
|
|
|
|
@unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
|
|
@unittest.skipUnless(threading, 'Threading required for this test.')
|
|
def test_race(self):
|
|
# Issue #14632 refers.
|
|
def remove_loop(fname, tries):
|
|
for _ in range(tries):
|
|
try:
|
|
os.unlink(fname)
|
|
except OSError:
|
|
pass
|
|
time.sleep(0.004 * random.randint(0, 4))
|
|
|
|
del_count = 500
|
|
log_count = 500
|
|
|
|
for delay in (False, True):
|
|
fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
|
|
os.close(fd)
|
|
remover = threading.Thread(target=remove_loop, args=(fn, del_count))
|
|
remover.daemon = True
|
|
remover.start()
|
|
h = logging.handlers.WatchedFileHandler(fn, delay=delay)
|
|
f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
|
|
h.setFormatter(f)
|
|
try:
|
|
for _ in range(log_count):
|
|
time.sleep(0.005)
|
|
r = logging.makeLogRecord({'msg': 'testing' })
|
|
h.handle(r)
|
|
finally:
|
|
remover.join()
|
|
h.close()
|
|
if os.path.exists(fn):
|
|
os.unlink(fn)
|
|
|
|
|
|
# Set the locale to the platform-dependent default. I have no idea
|
|
# why the test does this, but in any case we save the current locale
|
|
# first and restore it at the end.
|
|
@run_with_locale('LC_ALL', '')
|
|
def test_main():
|
|
run_unittest(BuiltinLevelsTest, BasicFilterTest,
|
|
CustomLevelsAndFiltersTest, MemoryHandlerTest,
|
|
ConfigFileTest, SocketHandlerTest, MemoryTest,
|
|
EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
|
|
FormatterTest,
|
|
LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
|
|
RotatingFileHandlerTest,
|
|
LastResortTest,
|
|
TimedRotatingFileHandlerTest, HandlerTest,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|