From 7367d0883891a0d446ef2467e31ba3a1fe81bd07 Mon Sep 17 00:00:00 2001 From: Vinay Sajip Date: Mon, 2 May 2011 13:17:27 +0100 Subject: [PATCH] Added tests to improve coverage. --- Lib/test/test_logging.py | 447 +++++++++++++++++++++++++++++---------- 1 file changed, 339 insertions(+), 108 deletions(-) diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 54ed734c83e..42757cb05bd 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -33,6 +33,7 @@ import errno import pickle import io import gc +from http.server import HTTPServer, BaseHTTPRequestHandler import json import os import queue @@ -40,7 +41,8 @@ import re import select import smtpd import socket -from socketserver import ThreadingTCPServer, StreamRequestHandler +from socketserver import (ThreadingUDPServer, DatagramRequestHandler, + ThreadingTCPServer, StreamRequestHandler) import struct import sys import tempfile @@ -49,6 +51,7 @@ from test.support import TestHandler, Matcher import textwrap import time import unittest +from urllib.parse import urlparse, parse_qs import warnings import weakref try: @@ -596,9 +599,8 @@ class StreamHandlerTest(BaseTest): logging.raiseExceptions = old_raise sys.stderr = old_stderr - -TEST_SMTP_PORT = 9025 - +# -- The following section could be moved into a server_helper.py module +# -- if it proves to be of wider utility than just test_logging class TestSMTPChannel(smtpd.SMTPChannel): def __init__(self, server, conn, addr, sockmap): @@ -642,6 +644,7 @@ class TestSMTPServer(smtpd.SMTPServer): # try to re-use a server port if possible self.set_reuse_addr() self.bind(addr) + self.port = sock.getsockname()[1] self.listen(5) except: self.close() @@ -666,23 +669,162 @@ class TestSMTPServer(smtpd.SMTPServer): def serve_forever(self, poll_interval): asyncore.loop(poll_interval, map=self.sockmap) - def stop(self): + def stop(self, timeout=None): self.close() - self._thread.join() + self._thread.join(timeout) self._thread = None +class ControlMixin(object): + """ + This mixin is used to start a server on a separate thread, and + shut it down programmatically. Request handling is simplified - instead + of needing to derive a suitable RequestHandler subclass, you just + provide a callable which will be passed each received request to be + processed. + + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. This handler is called on the + server thread, effectively meaning that requests are + processed serially. While not quite Web scale ;-), + this should be fine for testing applications. + :param poll_interval: The polling interval in seconds. + """ + def __init__(self, handler, poll_interval): + self._thread = None + self.poll_interval = poll_interval + self._handler = handler + self.ready = threading.Event() + + def start(self): + """ + Create a daemon thread to run the server, and start it. + """ + self._thread = t = threading.Thread(target=self.serve_forever, + args=(self.poll_interval,)) + t.setDaemon(True) + t.start() + + def serve_forever(self, poll_interval): + self.ready.set() + super(ControlMixin, self).serve_forever(poll_interval) + + def stop(self, timeout=None): + """ + Tell the server thread to stop, and wait for it to do so. + """ + self.shutdown() + if self._thread is not None: + self._thread.join(timeout) + self._thread = None + self.server_close() + self.ready.clear() + +class TestHTTPServer(ControlMixin, HTTPServer): + """ + An HTTP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. + :param poll_interval: The polling interval in seconds. + """ + def __init__(self, addr, handler, poll_interval=0.5, log=False): + class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): + def __getattr__(self, name, default=None): + if name.startswith('do_'): + return self.process_request + raise AttributeError(name) + + def process_request(self): + self.server._handler(self) + + def log_message(self, format, *args): + if log: + super(DelegatingHTTPRequestHandler, + self).log_message(format, *args) + HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) + ControlMixin.__init__(self, handler, poll_interval) + +class TestTCPServer(ControlMixin, ThreadingTCPServer): + """ + A TCP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a single + parameter - the request - in order to process the request. + :param poll_interval: The polling interval in seconds. + :bind_and_activate: If True (the default), binds the server and starts it + listening. If False, you need to call + :meth:`server_bind` and :meth:`server_activate` at + some later time before calling :meth:`start`, so that + the server will set up the socket and listen on it. + """ + + allow_reuse_address = True + + def __init__(self, addr, handler, poll_interval=0.5, + bind_and_activate=True): + class DelegatingTCPRequestHandler(StreamRequestHandler): + + def handle(self): + self.server._handler(self) + ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, + bind_and_activate) + ControlMixin.__init__(self, handler, poll_interval) + + def server_bind(self): + super(TestTCPServer, self).server_bind() + self.port = self.socket.getsockname()[1] + +class TestUDPServer(ControlMixin, ThreadingUDPServer): + """ + A UDP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. + :param poll_interval: The polling interval for shutdown requests, + in seconds. + :bind_and_activate: If True (the default), binds the server and + starts it listening. If False, you need to + call :meth:`server_bind` and + :meth:`server_activate` at some later time + before calling :meth:`start`, so that the server will + set up the socket and listen on it. + """ + def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True): + class DelegatingUDPRequestHandler(DatagramRequestHandler): + + def handle(self): + self.server._handler(self) + ThreadingUDPServer.__init__(self, addr, DelegatingUDPRequestHandler, + bind_and_activate) + ControlMixin.__init__(self, handler, poll_interval) + + def server_bind(self): + super(TestUDPServer, self).server_bind() + self.port = self.socket.getsockname()[1] + + +# - end of server_helper section class SMTPHandlerTest(BaseTest): def test_basic(self): - addr = ('localhost', TEST_SMTP_PORT) + sockmap = {} + server = TestSMTPServer(('localhost', 0), self.process_message, 0.001, + sockmap) + server.start() + addr = ('localhost', server.port) h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log') self.assertEqual(h.toaddrs, ['you']) self.messages = [] - sockmap = {} - server = TestSMTPServer(addr, self.process_message, 0.001, sockmap) - server.start() r = logging.makeLogRecord({'msg': 'Hello'}) + self.handled = threading.Event() h.handle(r) + self.handled.wait() server.stop() self.assertEqual(len(self.messages), 1) peer, mailfrom, rcpttos, data = self.messages[0] @@ -694,7 +836,7 @@ class SMTPHandlerTest(BaseTest): def process_message(self, *args): self.messages.append(args) - + self.handled.set() class MemoryHandlerTest(BaseTest): @@ -1068,68 +1210,6 @@ class ConfigFileTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) -class LogRecordStreamHandler(StreamRequestHandler): - - """Handler for a streaming logging request. It saves the log message in the - TCP server's 'log_output' attribute.""" - - TCP_LOG_END = "!!!END!!!" - - def handle(self): - """Handle multiple requests - each expected to be of 4-byte length, - followed by the LogRecord in pickle format. Logs the record - according to whatever policy is configured locally.""" - while True: - chunk = self.connection.recv(4) - if len(chunk) < 4: - break - slen = struct.unpack(">L", chunk)[0] - chunk = self.connection.recv(slen) - while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unpickle(chunk) - record = logging.makeLogRecord(obj) - self.handle_log_record(record) - - def unpickle(self, data): - return pickle.loads(data) - - def handle_log_record(self, record): - # If the end-of-messages sentinel is seen, tell the server to - # terminate. - if self.TCP_LOG_END in record.msg: - self.server.abort = 1 - return - self.server.log_output += record.msg + "\n" - - -class LogRecordSocketReceiver(ThreadingTCPServer): - - """A simple-minded TCP socket-based logging receiver suitable for test - purposes.""" - - allow_reuse_address = 1 - log_output = "" - - def __init__(self, host='localhost', - port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler=LogRecordStreamHandler): - ThreadingTCPServer.__init__(self, (host, port), handler) - self.abort = False - self.timeout = 0.1 - self.finished = threading.Event() - - def serve_until_stopped(self): - while not self.abort: - rd, wr, ex = select.select([self.socket.fileno()], [], [], - self.timeout) - if rd: - self.handle_request() - # Notify the main thread that we're about to exit - self.finished.set() - # close the listen socket - self.server_close() - @unittest.skipUnless(threading, 'Threading required for this test.') class SocketHandlerTest(BaseTest): @@ -1140,51 +1220,54 @@ class SocketHandlerTest(BaseTest): """Set up a TCP server to receive log messages, and a SocketHandler pointing to that server's address and port.""" BaseTest.setUp(self) - self.tcpserver = LogRecordSocketReceiver(port=0) - self.port = self.tcpserver.socket.getsockname()[1] - self.threads = [ - threading.Thread(target=self.tcpserver.serve_until_stopped)] - for thread in self.threads: - thread.start() - - self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) - self.sock_hdlr.setFormatter(self.root_formatter) + addr = ('localhost', 0) + self.server = server = TestTCPServer(addr, self.handle_socket, + 0.01) + server.start() + server.ready.wait() + self.sock_hdlr = logging.handlers.SocketHandler('localhost', + server.port) + self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sock_hdlr) + self.handled = threading.Semaphore(0) def tearDown(self): """Shutdown the TCP server.""" try: - if hasattr(self, 'tcpserver'): - self.tcpserver.abort = True - del self.tcpserver + self.server.stop(2.0) self.root_logger.removeHandler(self.sock_hdlr) self.sock_hdlr.close() - for thread in self.threads: - thread.join(2.0) finally: BaseTest.tearDown(self) - def get_output(self): - """Get the log output as received by the TCP server.""" - # Signal the TCP receiver and wait for it to terminate. - self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) - self.tcpserver.finished.wait(2.0) - return self.tcpserver.log_output + def handle_socket(self, request): + conn = request.connection + while True: + chunk = conn.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + chunk = conn.recv(slen) + while len(chunk) < slen: + chunk = chunk + conn.recv(slen - len(chunk)) + obj = pickle.loads(chunk) + record = logging.makeLogRecord(obj) + self.log_output += record.msg + '\n' + self.handled.release() def test_output(self): # The log message sent to the SocketHandler is properly received. logger = logging.getLogger("tcp") logger.error("spam") + self.handled.acquire() logger.debug("eggs") - self.assertEqual(self.get_output(), "spam\neggs\n") + self.handled.acquire() + self.assertEqual(self.log_output, "spam\neggs\n") def test_noserver(self): # Kill the server - self.tcpserver.abort = True - del self.tcpserver - for thread in self.threads: - thread.join(2.0) + self.server.stop(2.0) #The logging call should try to connect, which should fail try: raise RuntimeError('Deliberate mistake') @@ -1196,6 +1279,143 @@ class SocketHandlerTest(BaseTest): time.sleep(self.sock_hdlr.retryTime - now + 0.001) self.root_logger.error('Nor this') + +@unittest.skipUnless(threading, 'Threading required for this test.') +class DatagramHandlerTest(BaseTest): + + """Test for DatagramHandler.""" + + def setUp(self): + """Set up a UDP server to receive log messages, and a DatagramHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + addr = ('localhost', 0) + self.server = server = TestUDPServer(addr, self.handle_datagram, + 0.01) + server.start() + server.ready.wait() + self.sock_hdlr = logging.handlers.DatagramHandler('localhost', + server.port) + self.log_output = '' + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.sock_hdlr) + self.handled = threading.Event() + + def tearDown(self): + """Shutdown the UDP server.""" + try: + self.server.stop(2.0) + self.root_logger.removeHandler(self.sock_hdlr) + self.sock_hdlr.close() + finally: + BaseTest.tearDown(self) + + def handle_datagram(self, request): + slen = struct.pack('>L', 0) # length of prefix + packet = request.packet[len(slen):] + obj = pickle.loads(packet) + record = logging.makeLogRecord(obj) + self.log_output += record.msg + '\n' + self.handled.set() + + def test_output(self): + # The log message sent to the DatagramHandler is properly received. + logger = logging.getLogger("udp") + logger.error("spam") + self.handled.wait() + self.assertEqual(self.log_output, "spam\n") + + +@unittest.skipUnless(threading, 'Threading required for this test.') +class SysLogHandlerTest(BaseTest): + + """Test for SysLogHandler using UDP.""" + + def setUp(self): + """Set up a UDP server to receive log messages, and a SysLogHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + addr = ('localhost', 0) + self.server = server = TestUDPServer(addr, self.handle_datagram, + 0.01) + server.start() + server.ready.wait() + self.sl_hdlr = logging.handlers.SysLogHandler(('localhost', + server.port)) + self.log_output = '' + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.sl_hdlr) + self.handled = threading.Event() + + def tearDown(self): + """Shutdown the UDP server.""" + try: + self.server.stop(2.0) + self.root_logger.removeHandler(self.sl_hdlr) + self.sl_hdlr.close() + finally: + BaseTest.tearDown(self) + + def handle_datagram(self, request): + self.log_output = request.packet + self.handled.set() + + def test_output(self): + # The log message sent to the SysLogHandler is properly received. + logger = logging.getLogger("slh") + logger.error("sp\xe4m") + self.handled.wait() + self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfsp\xc3\xa4m\x00') + + +@unittest.skipUnless(threading, 'Threading required for this test.') +class HTTPHandlerTest(BaseTest): + + """Test for HTTPHandler.""" + + def setUp(self): + """Set up an HTTP server to receive log messages, and a HTTPHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + addr = ('localhost', 0) + self.server = server = TestHTTPServer(addr, self.handle_request, + 0.01) + server.start() + server.ready.wait() + host = 'localhost:%d' % server.server_port + self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob') + self.log_data = None + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.h_hdlr) + self.handled = threading.Event() + + def tearDown(self): + """Shutdown the UDP server.""" + try: + self.server.stop(2.0) + self.root_logger.removeHandler(self.h_hdlr) + self.h_hdlr.close() + finally: + BaseTest.tearDown(self) + + def handle_request(self, request): + self.log_data = urlparse(request.path) + request.send_response(200) + self.handled.set() + + def test_output(self): + # The log message sent to the SysLogHandler is properly received. + logger = logging.getLogger("http") + msg = "sp\xe4m" + logger.error(msg) + self.handled.wait() + self.assertEqual(self.log_data.path, '/frob') + d = parse_qs(self.log_data.query) + self.assertEqual(d['name'], ['http']) + self.assertEqual(d['funcName'], ['test_output']) + self.assertEqual(d['msg'], [msg]) + + class MemoryTest(BaseTest): """Test memory persistence of logger objects.""" @@ -3129,7 +3349,7 @@ class BaseFileTest(BaseTest): def assertLogFile(self, filename): "Assert a log file is there and register it for deletion" self.assertTrue(os.path.exists(filename), - msg="Log file %r does not exist") + msg="Log file %r does not exist" % filename) self.rmfiles.append(filename) @@ -3181,8 +3401,18 @@ class RotatingFileHandlerTest(BaseFileTest): rh.close() class TimedRotatingFileHandlerTest(BaseFileTest): - # test methods added below - pass + # other test methods added below + def test_rollover(self): + fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S') + r = logging.makeLogRecord({'msg': 'testing'}) + fh.emit(r) + self.assertLogFile(self.fn) + time.sleep(1.0) + fh.emit(r) + now = datetime.datetime.now() + prevsec = now - datetime.timedelta(seconds=1) + suffix = prevsec.strftime(".%Y-%m-%d_%H-%M-%S") + self.assertLogFile(self.fn + suffix) def secs(**kw): return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) @@ -3238,14 +3468,15 @@ for when, exp in (('S', 1), def test_main(): run_unittest(BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest, - ConfigFileTest, SocketHandlerTest, MemoryTest, - EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, - FormatterTest, BufferingFormatterTest, StreamHandlerTest, - LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, - ShutdownTest, ModuleLevelMiscTest, BasicConfigTest, - LoggerAdapterTest, LoggerTest, SMTPHandlerTest, - FileHandlerTest, RotatingFileHandlerTest, + ConfigFileTest, SocketHandlerTest, DatagramHandlerTest, + MemoryTest, EncodingTest, WarningsTest, ConfigDictTest, + ManagerTest, FormatterTest, BufferingFormatterTest, + StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, + QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, + BasicConfigTest, LoggerAdapterTest, LoggerTest, + SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest, LastResortTest, LogRecordTest, ExceptionTest, + SysLogHandlerTest, HTTPHandlerTest, TimedRotatingFileHandlerTest )