From 7d9c68513d112823a9a6cdc7453b998b2c24eb4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Janek=20Nouvertn=C3=A9?= Date: Thu, 27 Jun 2024 09:09:01 +0200 Subject: [PATCH] gh-120868: Fix breaking change in `logging.config` when using `QueueHandler` (GH-120872) --- Lib/logging/config.py | 53 +++++++++++++++++++++++++++------------- Lib/test/test_logging.py | 44 +++++++++++++++++++++++++++++++++ Misc/ACKS | 1 + 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/Lib/logging/config.py b/Lib/logging/config.py index d2f23e53f35..95e129ae988 100644 --- a/Lib/logging/config.py +++ b/Lib/logging/config.py @@ -780,25 +780,44 @@ class DictConfigurator(BaseConfigurator): # if 'handlers' not in config: # raise ValueError('No handlers specified for a QueueHandler') if 'queue' in config: - from multiprocessing.queues import Queue as MPQueue - from multiprocessing import Manager as MM - proxy_queue = MM().Queue() - proxy_joinable_queue = MM().JoinableQueue() qspec = config['queue'] - if not isinstance(qspec, (queue.Queue, MPQueue, - type(proxy_queue), type(proxy_joinable_queue))): - if isinstance(qspec, str): - q = self.resolve(qspec) - if not callable(q): - raise TypeError('Invalid queue specifier %r' % qspec) - q = q() - elif isinstance(qspec, dict): - if '()' not in qspec: - raise TypeError('Invalid queue specifier %r' % qspec) - q = self.configure_custom(dict(qspec)) - else: + + if isinstance(qspec, str): + q = self.resolve(qspec) + if not callable(q): raise TypeError('Invalid queue specifier %r' % qspec) - config['queue'] = q + config['queue'] = q() + elif isinstance(qspec, dict): + if '()' not in qspec: + raise TypeError('Invalid queue specifier %r' % qspec) + config['queue'] = self.configure_custom(dict(qspec)) + else: + from multiprocessing.queues import Queue as MPQueue + + if not isinstance(qspec, (queue.Queue, MPQueue)): + # Safely check if 'qspec' is an instance of Manager.Queue + # / Manager.JoinableQueue + + from multiprocessing import Manager as MM + from multiprocessing.managers import BaseProxy + + # if it's not an instance of BaseProxy, it also can't be + # an instance of Manager.Queue / Manager.JoinableQueue + if isinstance(qspec, BaseProxy): + # Sometimes manager or queue creation might fail + # (e.g. see issue gh-120868). In that case, any + # exception during the creation of these queues will + # propagate up to the caller and be wrapped in a + # `ValueError`, whose cause will indicate the details of + # the failure. + mm = MM() + proxy_queue = mm.Queue() + proxy_joinable_queue = mm.JoinableQueue() + if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))): + raise TypeError('Invalid queue specifier %r' % qspec) + else: + raise TypeError('Invalid queue specifier %r' % qspec) + if 'listener' in config: lspec = config['listener'] if isinstance(lspec, type): diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index ddb9481807b..e6daea2333b 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -3928,6 +3928,50 @@ class ConfigDictTest(BaseTest): msg = str(ctx.exception) self.assertEqual(msg, "Unable to configure handler 'ah'") + @threading_helper.requires_working_threading() + @support.requires_subprocess() + @patch("multiprocessing.Manager") + def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager): + # gh-120868 + + from multiprocessing import Queue as MQ + + q1 = {"()": "queue.Queue", "maxsize": -1} + q2 = MQ() + q3 = queue.Queue() + + for qspec in (q1, q2, q3): + self.apply_config( + { + "version": 1, + "handlers": { + "queue_listener": { + "class": "logging.handlers.QueueHandler", + "queue": qspec, + }, + }, + } + ) + manager.assert_not_called() + + @patch("multiprocessing.Manager") + def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager): + # gh-120868 + + with self.assertRaises(ValueError): + self.apply_config( + { + "version": 1, + "handlers": { + "queue_listener": { + "class": "logging.handlers.QueueHandler", + "queue": object(), + }, + }, + } + ) + manager.assert_not_called() + @support.requires_subprocess() def test_multiprocessing_queues(self): # See gh-119819 diff --git a/Misc/ACKS b/Misc/ACKS index a406fca8744..53258dbfd9f 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -1318,6 +1318,7 @@ Hrvoje Nikšić Gregory Nofi Jesse Noller Bill Noon +Janek Nouvertné Stefan Norberg Tim Northover Joe Norton