diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index f2a6dbf589f..fce23313c7d 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -245,8 +245,10 @@ them down. queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise immediately instead. - All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate* - is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`. + All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be + unblocked. If *immediate* is true, a task will be marked as done for each + remaining item in the queue, which may unblock callers of + :meth:`~Queue.join`. .. versionadded:: 3.13 diff --git a/Lib/queue.py b/Lib/queue.py index 387ce542587..25beb46e30d 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -239,8 +239,9 @@ class Queue: By default, gets will only raise once the queue is empty. Set 'immediate' to True to make gets raise immediately instead. - All blocked callers of put() will be unblocked, and also get() - and join() if 'immediate'. + All blocked callers of put() and get() will be unblocked. If + 'immediate', a task is marked as done for each item remaining in + the queue, which may unblock callers of join(). ''' with self.mutex: self.is_shutdown = True @@ -249,9 +250,10 @@ class Queue: self._get() if self.unfinished_tasks > 0: self.unfinished_tasks -= 1 - self.not_empty.notify_all() # release all blocked threads in `join()` self.all_tasks_done.notify_all() + # All getters need to re-check queue-empty to raise ShutDown + self.not_empty.notify_all() self.not_full.notify_all() # Override these methods to implement other queue organizations diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index c4d10110132..d5927fbf391 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -636,6 +636,23 @@ class BaseQueueTestMixin(BlockingTestMixin): self.assertEqual(results, [True]*len(thrds)) + def test_shutdown_pending_get(self): + def get(): + try: + results.append(q.get()) + except Exception as e: + results.append(e) + + q = self.type2test() + results = [] + get_thread = threading.Thread(target=get) + get_thread.start() + q.shutdown(immediate=False) + get_thread.join(timeout=10.0) + self.assertFalse(get_thread.is_alive()) + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], self.queue.ShutDown) + class QueueTest(BaseQueueTestMixin):