bpo-40275: Avoid importing logging in test.support (GH-19601)
Import logging lazily in assertLogs() in unittest. Move TestHandler from test.support to logging_helper.
This commit is contained in:
parent
16994912c9
commit
515fce4fc4
|
@ -1410,11 +1410,6 @@ The :mod:`test.support` module defines the following classes:
|
|||
Run *test* and return the result.
|
||||
|
||||
|
||||
.. class:: TestHandler(logging.handlers.BufferingHandler)
|
||||
|
||||
Class for logging support.
|
||||
|
||||
|
||||
.. class:: FakePath(path)
|
||||
|
||||
Simple :term:`path-like object`. It implements the :meth:`__fspath__`
|
||||
|
|
|
@ -15,7 +15,6 @@ import hashlib
|
|||
import importlib
|
||||
import importlib.util
|
||||
import locale
|
||||
import logging.handlers
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
|
@ -99,8 +98,6 @@ __all__ = [
|
|||
"open_urlresource",
|
||||
# processes
|
||||
'temp_umask', "reap_children",
|
||||
# logging
|
||||
"TestHandler",
|
||||
# threads
|
||||
"threading_setup", "threading_cleanup", "reap_threads", "start_threads",
|
||||
# miscellaneous
|
||||
|
@ -2368,37 +2365,6 @@ def optim_args_from_interpreter_flags():
|
|||
optimization settings in sys.flags."""
|
||||
return subprocess._optim_args_from_interpreter_flags()
|
||||
|
||||
#============================================================
|
||||
# Support for assertions about logging.
|
||||
#============================================================
|
||||
|
||||
class TestHandler(logging.handlers.BufferingHandler):
|
||||
def __init__(self, matcher):
|
||||
# BufferingHandler takes a "capacity" argument
|
||||
# so as to know when to flush. As we're overriding
|
||||
# shouldFlush anyway, we can set a capacity of zero.
|
||||
# You can call flush() manually to clear out the
|
||||
# buffer.
|
||||
logging.handlers.BufferingHandler.__init__(self, 0)
|
||||
self.matcher = matcher
|
||||
|
||||
def shouldFlush(self):
|
||||
return False
|
||||
|
||||
def emit(self, record):
|
||||
self.format(record)
|
||||
self.buffer.append(record.__dict__)
|
||||
|
||||
def matches(self, **kwargs):
|
||||
"""
|
||||
Look for a saved dict whose keys/values match the supplied arguments.
|
||||
"""
|
||||
result = False
|
||||
for d in self.buffer:
|
||||
if self.matcher.matches(d, **kwargs):
|
||||
result = True
|
||||
break
|
||||
return result
|
||||
|
||||
class Matcher(object):
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
import logging.handlers
|
||||
|
||||
class TestHandler(logging.handlers.BufferingHandler):
|
||||
def __init__(self, matcher):
|
||||
# BufferingHandler takes a "capacity" argument
|
||||
# so as to know when to flush. As we're overriding
|
||||
# shouldFlush anyway, we can set a capacity of zero.
|
||||
# You can call flush() manually to clear out the
|
||||
# buffer.
|
||||
logging.handlers.BufferingHandler.__init__(self, 0)
|
||||
self.matcher = matcher
|
||||
|
||||
def shouldFlush(self):
|
||||
return False
|
||||
|
||||
def emit(self, record):
|
||||
self.format(record)
|
||||
self.buffer.append(record.__dict__)
|
||||
|
||||
def matches(self, **kwargs):
|
||||
"""
|
||||
Look for a saved dict whose keys/values match the supplied arguments.
|
||||
"""
|
||||
result = False
|
||||
for d in self.buffer:
|
||||
if self.matcher.matches(d, **kwargs):
|
||||
result = True
|
||||
break
|
||||
return result
|
|
@ -44,6 +44,7 @@ import tempfile
|
|||
from test.support.script_helper import assert_python_ok, assert_python_failure
|
||||
from test import support
|
||||
from test.support import socket_helper
|
||||
from test.support.logging_helper import TestHandler
|
||||
import textwrap
|
||||
import threading
|
||||
import time
|
||||
|
@ -3524,7 +3525,7 @@ class QueueHandlerTest(BaseTest):
|
|||
@unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
|
||||
'logging.handlers.QueueListener required for this test')
|
||||
def test_queue_listener(self):
|
||||
handler = support.TestHandler(support.Matcher())
|
||||
handler = TestHandler(support.Matcher())
|
||||
listener = logging.handlers.QueueListener(self.queue, handler)
|
||||
listener.start()
|
||||
try:
|
||||
|
@ -3540,7 +3541,7 @@ class QueueHandlerTest(BaseTest):
|
|||
|
||||
# Now test with respect_handler_level set
|
||||
|
||||
handler = support.TestHandler(support.Matcher())
|
||||
handler = TestHandler(support.Matcher())
|
||||
handler.setLevel(logging.CRITICAL)
|
||||
listener = logging.handlers.QueueListener(self.queue, handler,
|
||||
respect_handler_level=True)
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
import logging
|
||||
import collections
|
||||
|
||||
from .case import _BaseTestCaseContext
|
||||
|
||||
|
||||
_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
|
||||
["records", "output"])
|
||||
|
||||
class _CapturingHandler(logging.Handler):
|
||||
"""
|
||||
A logging handler capturing all (raw and formatted) logging output.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
logging.Handler.__init__(self)
|
||||
self.watcher = _LoggingWatcher([], [])
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def emit(self, record):
|
||||
self.watcher.records.append(record)
|
||||
msg = self.format(record)
|
||||
self.watcher.output.append(msg)
|
||||
|
||||
|
||||
class _AssertLogsContext(_BaseTestCaseContext):
|
||||
"""A context manager used to implement TestCase.assertLogs()."""
|
||||
|
||||
LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
|
||||
|
||||
def __init__(self, test_case, logger_name, level):
|
||||
_BaseTestCaseContext.__init__(self, test_case)
|
||||
self.logger_name = logger_name
|
||||
if level:
|
||||
self.level = logging._nameToLevel.get(level, level)
|
||||
else:
|
||||
self.level = logging.INFO
|
||||
self.msg = None
|
||||
|
||||
def __enter__(self):
|
||||
if isinstance(self.logger_name, logging.Logger):
|
||||
logger = self.logger = self.logger_name
|
||||
else:
|
||||
logger = self.logger = logging.getLogger(self.logger_name)
|
||||
formatter = logging.Formatter(self.LOGGING_FORMAT)
|
||||
handler = _CapturingHandler()
|
||||
handler.setFormatter(formatter)
|
||||
self.watcher = handler.watcher
|
||||
self.old_handlers = logger.handlers[:]
|
||||
self.old_level = logger.level
|
||||
self.old_propagate = logger.propagate
|
||||
logger.handlers = [handler]
|
||||
logger.setLevel(self.level)
|
||||
logger.propagate = False
|
||||
return handler.watcher
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
self.logger.handlers = self.old_handlers
|
||||
self.logger.propagate = self.old_propagate
|
||||
self.logger.setLevel(self.old_level)
|
||||
if exc_type is not None:
|
||||
# let unexpected exceptions pass through
|
||||
return False
|
||||
if len(self.watcher.records) == 0:
|
||||
self._raiseFailure(
|
||||
"no logs of level {} or higher triggered on {}"
|
||||
.format(logging.getLevelName(self.level), self.logger.name))
|
|
@ -3,7 +3,6 @@
|
|||
import sys
|
||||
import functools
|
||||
import difflib
|
||||
import logging
|
||||
import pprint
|
||||
import re
|
||||
import warnings
|
||||
|
@ -297,73 +296,6 @@ class _AssertWarnsContext(_AssertRaisesBaseContext):
|
|||
|
||||
|
||||
|
||||
_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
|
||||
["records", "output"])
|
||||
|
||||
|
||||
class _CapturingHandler(logging.Handler):
|
||||
"""
|
||||
A logging handler capturing all (raw and formatted) logging output.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
logging.Handler.__init__(self)
|
||||
self.watcher = _LoggingWatcher([], [])
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def emit(self, record):
|
||||
self.watcher.records.append(record)
|
||||
msg = self.format(record)
|
||||
self.watcher.output.append(msg)
|
||||
|
||||
|
||||
|
||||
class _AssertLogsContext(_BaseTestCaseContext):
|
||||
"""A context manager used to implement TestCase.assertLogs()."""
|
||||
|
||||
LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
|
||||
|
||||
def __init__(self, test_case, logger_name, level):
|
||||
_BaseTestCaseContext.__init__(self, test_case)
|
||||
self.logger_name = logger_name
|
||||
if level:
|
||||
self.level = logging._nameToLevel.get(level, level)
|
||||
else:
|
||||
self.level = logging.INFO
|
||||
self.msg = None
|
||||
|
||||
def __enter__(self):
|
||||
if isinstance(self.logger_name, logging.Logger):
|
||||
logger = self.logger = self.logger_name
|
||||
else:
|
||||
logger = self.logger = logging.getLogger(self.logger_name)
|
||||
formatter = logging.Formatter(self.LOGGING_FORMAT)
|
||||
handler = _CapturingHandler()
|
||||
handler.setFormatter(formatter)
|
||||
self.watcher = handler.watcher
|
||||
self.old_handlers = logger.handlers[:]
|
||||
self.old_level = logger.level
|
||||
self.old_propagate = logger.propagate
|
||||
logger.handlers = [handler]
|
||||
logger.setLevel(self.level)
|
||||
logger.propagate = False
|
||||
return handler.watcher
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
self.logger.handlers = self.old_handlers
|
||||
self.logger.propagate = self.old_propagate
|
||||
self.logger.setLevel(self.old_level)
|
||||
if exc_type is not None:
|
||||
# let unexpected exceptions pass through
|
||||
return False
|
||||
if len(self.watcher.records) == 0:
|
||||
self._raiseFailure(
|
||||
"no logs of level {} or higher triggered on {}"
|
||||
.format(logging.getLevelName(self.level), self.logger.name))
|
||||
|
||||
|
||||
class _OrderedChainMap(collections.ChainMap):
|
||||
def __iter__(self):
|
||||
seen = set()
|
||||
|
@ -854,6 +786,8 @@ class TestCase(object):
|
|||
self.assertEqual(cm.output, ['INFO:foo:first message',
|
||||
'ERROR:foo.bar:second message'])
|
||||
"""
|
||||
# Lazy import to avoid importing logging if it is not needed.
|
||||
from ._log import _AssertLogsContext
|
||||
return _AssertLogsContext(self, logger, level)
|
||||
|
||||
def _getAssertEqualityFunc(self, first, second):
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
The :mod:`logging` package is now imported lazily in :mod:`unittest` only
|
||||
when the :meth:`~unittest.TestCase.assertLogs` assertion is used.
|
Loading…
Reference in New Issue