diff --git a/Doc/library/logging.config.rst b/Doc/library/logging.config.rst index dfbf0b1cf2f..0ddbc1a5f88 100644 --- a/Doc/library/logging.config.rst +++ b/Doc/library/logging.config.rst @@ -753,9 +753,12 @@ The ``queue`` and ``listener`` keys are optional. If the ``queue`` key is present, the corresponding value can be one of the following: -* An actual instance of :class:`queue.Queue` or a subclass thereof. This is of course - only possible if you are constructing or modifying the configuration dictionary in - code. +* An object implementing the :class:`queue.Queue` public API. For instance, + this may be an actual instance of :class:`queue.Queue` or a subclass thereof, + or a proxy obtained by :meth:`multiprocessing.managers.SyncManager.Queue`. + + This is of course only possible if you are constructing or modifying + the configuration dictionary in code. * A string that resolves to a callable which, when called with no arguments, returns the :class:`queue.Queue` instance to use. That callable could be a diff --git a/Lib/logging/config.py b/Lib/logging/config.py index 95e129ae988..3781cb1aeb9 100644 --- a/Lib/logging/config.py +++ b/Lib/logging/config.py @@ -497,6 +497,33 @@ class BaseConfigurator(object): value = tuple(value) return value +def _is_queue_like_object(obj): + """Check that *obj* implements the Queue API.""" + if isinstance(obj, queue.Queue): + return True + # defer importing multiprocessing as much as possible + from multiprocessing.queues import Queue as MPQueue + if isinstance(obj, MPQueue): + return True + # Depending on the multiprocessing start context, we cannot create + # a multiprocessing.managers.BaseManager instance 'mm' to get the + # runtime type of mm.Queue() or mm.JoinableQueue() (see gh-119819). + # + # Since we only need an object implementing the Queue API, we only + # do a protocol check, but we do not use typing.runtime_checkable() + # and typing.Protocol to reduce import time (see gh-121723). + # + # Ideally, we would have wanted to simply use strict type checking + # instead of a protocol-based type checking since the latter does + # not check the method signatures. + queue_interface = [ + 'empty', 'full', 'get', 'get_nowait', + 'put', 'put_nowait', 'join', 'qsize', + 'task_done', + ] + return all(callable(getattr(obj, method, None)) + for method in queue_interface) + class DictConfigurator(BaseConfigurator): """ Configure logging using a dictionary-like object to describe the @@ -791,32 +818,8 @@ class DictConfigurator(BaseConfigurator): if '()' not in qspec: raise TypeError('Invalid queue specifier %r' % qspec) config['queue'] = self.configure_custom(dict(qspec)) - else: - from multiprocessing.queues import Queue as MPQueue - - if not isinstance(qspec, (queue.Queue, MPQueue)): - # Safely check if 'qspec' is an instance of Manager.Queue - # / Manager.JoinableQueue - - from multiprocessing import Manager as MM - from multiprocessing.managers import BaseProxy - - # if it's not an instance of BaseProxy, it also can't be - # an instance of Manager.Queue / Manager.JoinableQueue - if isinstance(qspec, BaseProxy): - # Sometimes manager or queue creation might fail - # (e.g. see issue gh-120868). In that case, any - # exception during the creation of these queues will - # propagate up to the caller and be wrapped in a - # `ValueError`, whose cause will indicate the details of - # the failure. - mm = MM() - proxy_queue = mm.Queue() - proxy_joinable_queue = mm.JoinableQueue() - if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))): - raise TypeError('Invalid queue specifier %r' % qspec) - else: - raise TypeError('Invalid queue specifier %r' % qspec) + elif not _is_queue_like_object(qspec): + raise TypeError('Invalid queue specifier %r' % qspec) if 'listener' in config: lspec = config['listener'] diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 6d688d4b81b..49523756e11 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -2368,6 +2368,26 @@ class CustomListener(logging.handlers.QueueListener): class CustomQueue(queue.Queue): pass +class CustomQueueProtocol: + def __init__(self, maxsize=0): + self.queue = queue.Queue(maxsize) + + def __getattr__(self, attribute): + queue = object.__getattribute__(self, 'queue') + return getattr(queue, attribute) + +class CustomQueueFakeProtocol(CustomQueueProtocol): + # An object implementing the Queue API (incorrect signatures). + # The object will be considered a valid queue class since we + # do not check the signatures (only callability of methods) + # but will NOT be usable in production since a TypeError will + # be raised due to a missing argument. + def empty(self, x): + pass + +class CustomQueueWrongProtocol(CustomQueueProtocol): + empty = None + def queueMaker(): return queue.Queue() @@ -3901,18 +3921,16 @@ class ConfigDictTest(BaseTest): @threading_helper.requires_working_threading() @support.requires_subprocess() def test_config_queue_handler(self): - q = CustomQueue() - dq = { - '()': __name__ + '.CustomQueue', - 'maxsize': 10 - } + qs = [CustomQueue(), CustomQueueProtocol()] + dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10} + for cls in ['CustomQueue', 'CustomQueueProtocol']] dl = { '()': __name__ + '.listenerMaker', 'arg1': None, 'arg2': None, 'respect_handler_level': True } - qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', dq, q) + qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs) lvalues = (None, __name__ + '.CustomListener', dl, CustomListener) for qspec, lspec in itertools.product(qvalues, lvalues): self.do_queuehandler_configuration(qspec, lspec) @@ -3932,15 +3950,21 @@ class ConfigDictTest(BaseTest): @support.requires_subprocess() @patch("multiprocessing.Manager") def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager): - # gh-120868 + # gh-120868, gh-121723 from multiprocessing import Queue as MQ q1 = {"()": "queue.Queue", "maxsize": -1} q2 = MQ() q3 = queue.Queue() + # CustomQueueFakeProtocol passes the checks but will not be usable + # since the signatures are incompatible. Checking the Queue API + # without testing the type of the actual queue is a trade-off + # between usability and the work we need to do in order to safely + # check that the queue object correctly implements the API. + q4 = CustomQueueFakeProtocol() - for qspec in (q1, q2, q3): + for qspec in (q1, q2, q3, q4): self.apply_config( { "version": 1, @@ -3956,21 +3980,62 @@ class ConfigDictTest(BaseTest): @patch("multiprocessing.Manager") def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager): - # gh-120868 + # gh-120868, gh-121723 - with self.assertRaises(ValueError): - self.apply_config( - { - "version": 1, - "handlers": { - "queue_listener": { - "class": "logging.handlers.QueueHandler", - "queue": object(), + for qspec in [object(), CustomQueueWrongProtocol()]: + with self.assertRaises(ValueError): + self.apply_config( + { + "version": 1, + "handlers": { + "queue_listener": { + "class": "logging.handlers.QueueHandler", + "queue": qspec, + }, }, - }, + } + ) + manager.assert_not_called() + + @skip_if_tsan_fork + @support.requires_subprocess() + @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing" + "assertions in multiprocessing") + def test_config_queue_handler_multiprocessing_context(self): + # regression test for gh-121723 + if support.MS_WINDOWS: + start_methods = ['spawn'] + else: + start_methods = ['spawn', 'fork', 'forkserver'] + for start_method in start_methods: + with self.subTest(start_method=start_method): + ctx = multiprocessing.get_context(start_method) + with ctx.Manager() as manager: + q = manager.Queue() + records = [] + # use 1 process and 1 task per child to put 1 record + with ctx.Pool(1, initializer=self._mpinit_issue121723, + initargs=(q, "text"), maxtasksperchild=1): + records.append(q.get(timeout=60)) + self.assertTrue(q.empty()) + self.assertEqual(len(records), 1) + + @staticmethod + def _mpinit_issue121723(qspec, message_to_log): + # static method for pickling support + logging.config.dictConfig({ + 'version': 1, + 'disable_existing_loggers': True, + 'handlers': { + 'log_to_parent': { + 'class': 'logging.handlers.QueueHandler', + 'queue': qspec } - ) - manager.assert_not_called() + }, + 'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'} + }) + # log a message (this creates a record put in the queue) + logging.getLogger().info(message_to_log) @skip_if_tsan_fork @support.requires_subprocess() diff --git a/Misc/NEWS.d/next/Library/2024-07-23-10-59-38.gh-issue-121723.iJEf7e.rst b/Misc/NEWS.d/next/Library/2024-07-23-10-59-38.gh-issue-121723.iJEf7e.rst new file mode 100644 index 00000000000..cabb4024fb1 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-07-23-10-59-38.gh-issue-121723.iJEf7e.rst @@ -0,0 +1,3 @@ +Make :func:`logging.config.dictConfig` accept any object implementing the +Queue public API. See the :ref:`queue configuration ` +section for details. Patch by Bénédikt Tran.