gh-117531: Unblock getters after non-immediate queue shutdown (#117532)

(This is a small tweak of the original gh-104750 which added shutdown.)
This commit is contained in:
Laurie O 2024-04-11 01:01:42 +10:00 committed by GitHub
parent dfcae4379f
commit 6bc0b33a91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 5 deletions

View File

@ -245,8 +245,10 @@ them down.
queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise
immediately instead.
All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate*
is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`.
All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be
unblocked. If *immediate* is true, a task will be marked as done for each
remaining item in the queue, which may unblock callers of
:meth:`~Queue.join`.
.. versionadded:: 3.13

View File

@ -239,8 +239,9 @@ class Queue:
By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.
All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'.
All blocked callers of put() and get() will be unblocked. If
'immediate', a task is marked as done for each item remaining in
the queue, which may unblock callers of join().
'''
with self.mutex:
self.is_shutdown = True
@ -249,9 +250,10 @@ class Queue:
self._get()
if self.unfinished_tasks > 0:
self.unfinished_tasks -= 1
self.not_empty.notify_all()
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
# All getters need to re-check queue-empty to raise ShutDown
self.not_empty.notify_all()
self.not_full.notify_all()
# Override these methods to implement other queue organizations

View File

@ -636,6 +636,23 @@ class BaseQueueTestMixin(BlockingTestMixin):
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_pending_get(self):
def get():
try:
results.append(q.get())
except Exception as e:
results.append(e)
q = self.type2test()
results = []
get_thread = threading.Thread(target=get)
get_thread.start()
q.shutdown(immediate=False)
get_thread.join(timeout=10.0)
self.assertFalse(get_thread.is_alive())
self.assertEqual(len(results), 1)
self.assertIsInstance(results[0], self.queue.ShutDown)
class QueueTest(BaseQueueTestMixin):