cpython/Doc/library/asyncio-queue.rst

241 lines
6.6 KiB
ReStructuredText
Raw Normal View History

2015-02-25 08:55:43 -04:00
.. currentmodule:: asyncio
.. _asyncio-queues:
======
2015-02-25 08:55:43 -04:00
Queues
======
**Source code:** :source:`Lib/asyncio/queues.py`
------------------------------------------------
asyncio queues are designed to be similar to classes of the
:mod:`queue` module. Although asyncio queues are not thread-safe,
they are designed to be used specifically in async/await code.
2015-02-25 08:55:43 -04:00
Note that methods of asyncio queues don't have a *timeout* parameter;
use :func:`asyncio.wait_for` function to do queue operations with a
timeout.
2015-02-25 08:55:43 -04:00
See also the `Examples`_ section below.
2015-02-25 08:55:43 -04:00
Queue
=====
2015-02-25 08:55:43 -04:00
.. class:: Queue(maxsize=0)
2015-02-25 08:55:43 -04:00
A first in, first out (FIFO) queue.
2015-02-25 08:55:43 -04:00
If *maxsize* is less than or equal to zero, the queue size is
infinite. If it is an integer greater than ``0``, then
``await put()`` blocks when the queue reaches *maxsize*
until an item is removed by :meth:`get`.
2015-02-25 08:55:43 -04:00
Unlike the standard library threading :mod:`queue`, the size of
the queue is always known and can be returned by calling the
:meth:`qsize` method.
2015-02-25 08:55:43 -04:00
.. versionchanged:: 3.10
Removed the *loop* parameter.
This class is :ref:`not thread safe <asyncio-multithreading>`.
.. attribute:: maxsize
Number of items allowed in the queue.
2015-02-25 08:55:43 -04:00
.. method:: empty()
Return ``True`` if the queue is empty, ``False`` otherwise.
.. method:: full()
Return ``True`` if there are :attr:`maxsize` items in the queue.
If the queue was initialized with ``maxsize=0`` (the default),
then :meth:`full()` never returns ``True``.
2015-02-25 08:55:43 -04:00
.. coroutinemethod:: get()
Remove and return an item from the queue. If queue is empty,
wait until an item is available.
2015-02-25 08:55:43 -04:00
Raises :exc:`QueueShutDown` if the queue has been shut down and
is empty, or if the queue has been shut down immediately.
2015-02-25 08:55:43 -04:00
.. method:: get_nowait()
Return an item if one is immediately available, else raise
:exc:`QueueEmpty`.
.. coroutinemethod:: join()
Block until all items in the queue have been received and processed.
2015-02-25 08:55:43 -04:00
The count of unfinished tasks goes up whenever an item is added
to the queue. The count goes down whenever a consumer coroutine calls
:meth:`task_done` to indicate that the item was retrieved and all
work on it is complete. When the count of unfinished tasks drops
to zero, :meth:`join` unblocks.
2015-02-25 08:55:43 -04:00
.. coroutinemethod:: put(item)
Put an item into the queue. If the queue is full, wait until a
free slot is available before adding the item.
2015-02-25 08:55:43 -04:00
Raises :exc:`QueueShutDown` if the queue has been shut down.
2015-02-25 08:55:43 -04:00
.. method:: put_nowait(item)
Put an item into the queue without blocking.
If no free slot is immediately available, raise :exc:`QueueFull`.
.. method:: qsize()
Return the number of items in the queue.
2015-02-25 08:55:43 -04:00
.. method:: shutdown(immediate=False)
Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put`
raise :exc:`QueueShutDown`.
By default, :meth:`~Queue.get` on a shut down queue will only
raise once the queue is empty. Set *immediate* to true to make
:meth:`~Queue.get` raise immediately instead.
All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get`
will be unblocked. If *immediate* is true, a task will be marked
as done for each remaining item in the queue, which may unblock
callers of :meth:`~Queue.join`.
.. versionadded:: 3.13
2015-02-25 08:55:43 -04:00
.. method:: task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each :meth:`~Queue.get` used to
fetch a task, a subsequent call to :meth:`task_done` tells the
queue that the processing on the task is complete.
2015-02-25 08:55:43 -04:00
If a :meth:`join` is currently blocking, it will resume when all
items have been processed (meaning that a :meth:`task_done`
call was received for every item that had been :meth:`~Queue.put`
into the queue).
2015-02-25 08:55:43 -04:00
``shutdown(immediate=True)`` calls :meth:`task_done` for each
remaining item in the queue.
Raises :exc:`ValueError` if called more times than there were
items placed in the queue.
2015-02-25 08:55:43 -04:00
Priority Queue
==============
2015-02-25 08:55:43 -04:00
.. class:: PriorityQueue
A variant of :class:`Queue`; retrieves entries in priority order
(lowest first).
2015-02-25 08:55:43 -04:00
Entries are typically tuples of the form
``(priority_number, data)``.
2015-02-25 08:55:43 -04:00
LIFO Queue
==========
2015-02-25 08:55:43 -04:00
.. class:: LifoQueue
A variant of :class:`Queue` that retrieves most recently added
entries first (last in, first out).
2015-02-25 08:55:43 -04:00
Exceptions
==========
2015-02-25 08:55:43 -04:00
.. exception:: QueueEmpty
This exception is raised when the :meth:`~Queue.get_nowait` method
is called on an empty queue.
2015-02-25 08:55:43 -04:00
.. exception:: QueueFull
Exception raised when the :meth:`~Queue.put_nowait` method is called
on a queue that has reached its *maxsize*.
.. exception:: QueueShutDown
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is
called on a queue which has been shut down.
.. versionadded:: 3.13
Examples
========
.. _asyncio_example_queue_dist:
Queues can be used to distribute workload between several
concurrent tasks::
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())