From d61910c598876788c9b4bf0e116370bbfc5a2f85 Mon Sep 17 00:00:00 2001 From: Vinay Sajip Date: Thu, 8 Sep 2016 01:13:39 +0100 Subject: [PATCH] Fixes #27930: improved QueueListener behaviour. --- Lib/logging/handlers.py | 21 ++-------- Lib/test/test_logging.py | 90 ++++++++++++++++++++++++++++++++++++++-- Misc/NEWS | 3 ++ 3 files changed, 93 insertions(+), 21 deletions(-) diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py index c9f8217b06b..c39a56ff066 100644 --- a/Lib/logging/handlers.py +++ b/Lib/logging/handlers.py @@ -1,4 +1,4 @@ -# Copyright 2001-2015 by Vinay Sajip. All Rights Reserved. +# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose and without fee is hereby granted, @@ -18,7 +18,7 @@ Additional handlers for the logging package for Python. The core package is based on PEP 282 and comments thereto in comp.lang.python. -Copyright (C) 2001-2015 Vinay Sajip. All Rights Reserved. +Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved. To use, simply 'import logging.handlers' and log away! """ @@ -1366,7 +1366,6 @@ if threading: """ self.queue = queue self.handlers = handlers - self._stop = threading.Event() self._thread = None self.respect_handler_level = respect_handler_level @@ -1387,7 +1386,7 @@ if threading: LogRecords to process. """ self._thread = t = threading.Thread(target=self._monitor) - t.setDaemon(True) + t.daemon = True t.start() def prepare(self , record): @@ -1426,20 +1425,9 @@ if threading: """ q = self.queue has_task_done = hasattr(q, 'task_done') - while not self._stop.isSet(): - try: - record = self.dequeue(True) - if record is self._sentinel: - break - self.handle(record) - if has_task_done: - q.task_done() - except queue.Empty: - pass - # There might still be records in the queue. while True: try: - record = self.dequeue(False) + record = self.dequeue(True) if record is self._sentinel: break self.handle(record) @@ -1466,7 +1454,6 @@ if threading: Note that if you don't call this before your application exits, there may be some records still left on the queue, which won't be processed. """ - self._stop.set() self.enqueue_sentinel() self._thread.join() self._thread = None diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index cb6a6282b13..85344de5735 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -1,4 +1,4 @@ -# Copyright 2001-2014 by Vinay Sajip. All Rights Reserved. +# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose and without fee is hereby granted, @@ -16,7 +16,7 @@ """Test harness for the logging module. Run all tests. -Copyright (C) 2001-2014 Vinay Sajip. All Rights Reserved. +Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved. """ import logging @@ -3022,6 +3022,84 @@ class QueueHandlerTest(BaseTest): self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) +if hasattr(logging.handlers, 'QueueListener'): + import multiprocessing + from unittest.mock import patch + + class QueueListenerTest(BaseTest): + """ + Tests based on patch submitted for issue #27930. Ensure that + QueueListener handles all log messages. + """ + + repeat = 20 + + @staticmethod + def setup_and_log(log_queue, ident): + """ + Creates a logger with a QueueHandler that logs to a queue read by a + QueueListener. Starts the listener, logs five messages, and stops + the listener. + """ + logger = logging.getLogger('test_logger_with_id_%s' % ident) + logger.setLevel(logging.DEBUG) + handler = logging.handlers.QueueHandler(log_queue) + logger.addHandler(handler) + listener = logging.handlers.QueueListener(log_queue) + listener.start() + + logger.info('one') + logger.info('two') + logger.info('three') + logger.info('four') + logger.info('five') + + listener.stop() + logger.removeHandler(handler) + handler.close() + + @patch.object(logging.handlers.QueueListener, 'handle') + def test_handle_called_with_queue_queue(self, mock_handle): + for i in range(self.repeat): + log_queue = queue.Queue() + self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) + self.assertEqual(mock_handle.call_count, 5 * self.repeat, + 'correct number of handled log messages') + + @patch.object(logging.handlers.QueueListener, 'handle') + def test_handle_called_with_mp_queue(self, mock_handle): + for i in range(self.repeat): + log_queue = multiprocessing.Queue() + self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) + self.assertEqual(mock_handle.call_count, 5 * self.repeat, + 'correct number of handled log messages') + + @staticmethod + def get_all_from_queue(log_queue): + try: + while True: + yield log_queue.get_nowait() + except queue.Empty: + return [] + + def test_no_messages_in_queue_after_stop(self): + """ + Five messages are logged then the QueueListener is stopped. This + test then gets everything off the queue. Failure of this test + indicates that messages were not registered on the queue until + _after_ the QueueListener stopped. + """ + for i in range(self.repeat): + queue = multiprocessing.Queue() + self.setup_and_log(queue, '%s_%s' %(self.id(), i)) + # time.sleep(1) + items = list(self.get_all_from_queue(queue)) + expected = [[], [logging.handlers.QueueListener._sentinel]] + self.assertIn(items, expected, + 'Found unexpected messages in queue: %s' % ( + [m.msg if isinstance(m, logging.LogRecord) + else m for m in items])) + ZERO = datetime.timedelta(0) @@ -4166,7 +4244,7 @@ class NTEventLogHandlerTest(BaseTest): # first and restore it at the end. @support.run_with_locale('LC_ALL', '') def test_main(): - support.run_unittest( + tests = [ BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest, DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest, @@ -4177,7 +4255,11 @@ def test_main(): RotatingFileHandlerTest, LastResortTest, LogRecordTest, ExceptionTest, SysLogHandlerTest, HTTPHandlerTest, NTEventLogHandlerTest, TimedRotatingFileHandlerTest, - UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest) + UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest, + ] + if hasattr(logging.handlers, 'QueueListener'): + tests.append(QueueListenerTest) + support.run_unittest(*tests) if __name__ == "__main__": test_main() diff --git a/Misc/NEWS b/Misc/NEWS index d142675eaf5..9398b8d440b 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -218,6 +218,9 @@ Library - Issue #27392: Add loop.connect_accepted_socket(). Patch by Jim Fulton. +- Issue #27930: Improved behaviour of logging.handlers.QueueListener. + Thanks to Paulo Andrade and Petr Viktorin for the analysis and patch. + IDLE ----