mirror of https://github.com/python/cpython
582 lines
17 KiB
ReStructuredText
582 lines
17 KiB
ReStructuredText
.. currentmodule:: asyncio
|
|
|
|
.. _asyncio-streams:
|
|
|
|
=======
|
|
Streams
|
|
=======
|
|
|
|
**Source code:** :source:`Lib/asyncio/streams.py`
|
|
|
|
-------------------------------------------------
|
|
|
|
Streams are high-level async/await-ready primitives to work with
|
|
network connections. Streams allow sending and receiving data without
|
|
using callbacks or low-level protocols and transports.
|
|
|
|
.. _asyncio_example_stream:
|
|
|
|
Here is an example of a TCP echo client written using asyncio
|
|
streams::
|
|
|
|
import asyncio
|
|
|
|
async def tcp_echo_client(message):
|
|
reader, writer = await asyncio.open_connection(
|
|
'127.0.0.1', 8888)
|
|
|
|
print(f'Send: {message!r}')
|
|
writer.write(message.encode())
|
|
await writer.drain()
|
|
|
|
data = await reader.read(100)
|
|
print(f'Received: {data.decode()!r}')
|
|
|
|
print('Close the connection')
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
asyncio.run(tcp_echo_client('Hello World!'))
|
|
|
|
|
|
See also the `Examples`_ section below.
|
|
|
|
|
|
.. rubric:: Stream Functions
|
|
|
|
The following top-level asyncio functions can be used to create
|
|
and work with streams:
|
|
|
|
|
|
.. coroutinefunction:: open_connection(host=None, port=None, *, \
|
|
limit=None, ssl=None, family=0, proto=0, \
|
|
flags=0, sock=None, local_addr=None, \
|
|
server_hostname=None, ssl_handshake_timeout=None, \
|
|
ssl_shutdown_timeout=None, \
|
|
happy_eyeballs_delay=None, interleave=None)
|
|
|
|
Establish a network connection and return a pair of
|
|
``(reader, writer)`` objects.
|
|
|
|
The returned *reader* and *writer* objects are instances of
|
|
:class:`StreamReader` and :class:`StreamWriter` classes.
|
|
|
|
*limit* determines the buffer size limit used by the
|
|
returned :class:`StreamReader` instance. By default the *limit*
|
|
is set to 64 KiB.
|
|
|
|
The rest of the arguments are passed directly to
|
|
:meth:`loop.create_connection`.
|
|
|
|
.. note::
|
|
|
|
The *sock* argument transfers ownership of the socket to the
|
|
:class:`StreamWriter` created. To close the socket, call its
|
|
:meth:`~asyncio.StreamWriter.close` method.
|
|
|
|
.. versionchanged:: 3.7
|
|
Added the *ssl_handshake_timeout* parameter.
|
|
|
|
.. versionchanged:: 3.8
|
|
Added the *happy_eyeballs_delay* and *interleave* parameters.
|
|
|
|
.. versionchanged:: 3.10
|
|
Removed the *loop* parameter.
|
|
|
|
.. versionchanged:: 3.11
|
|
Added the *ssl_shutdown_timeout* parameter.
|
|
|
|
|
|
.. coroutinefunction:: start_server(client_connected_cb, host=None, \
|
|
port=None, *, limit=None, \
|
|
family=socket.AF_UNSPEC, \
|
|
flags=socket.AI_PASSIVE, sock=None, \
|
|
backlog=100, ssl=None, reuse_address=None, \
|
|
reuse_port=None, ssl_handshake_timeout=None, \
|
|
ssl_shutdown_timeout=None, start_serving=True)
|
|
|
|
Start a socket server.
|
|
|
|
The *client_connected_cb* callback is called whenever a new client
|
|
connection is established. It receives a ``(reader, writer)`` pair
|
|
as two arguments, instances of the :class:`StreamReader` and
|
|
:class:`StreamWriter` classes.
|
|
|
|
*client_connected_cb* can be a plain callable or a
|
|
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
|
|
it will be automatically scheduled as a :class:`Task`.
|
|
|
|
*limit* determines the buffer size limit used by the
|
|
returned :class:`StreamReader` instance. By default the *limit*
|
|
is set to 64 KiB.
|
|
|
|
The rest of the arguments are passed directly to
|
|
:meth:`loop.create_server`.
|
|
|
|
.. note::
|
|
|
|
The *sock* argument transfers ownership of the socket to the
|
|
server created. To close the socket, call the server's
|
|
:meth:`~asyncio.Server.close` method.
|
|
|
|
.. versionchanged:: 3.7
|
|
Added the *ssl_handshake_timeout* and *start_serving* parameters.
|
|
|
|
.. versionchanged:: 3.10
|
|
Removed the *loop* parameter.
|
|
|
|
.. versionchanged:: 3.11
|
|
Added the *ssl_shutdown_timeout* parameter.
|
|
|
|
|
|
.. rubric:: Unix Sockets
|
|
|
|
.. coroutinefunction:: open_unix_connection(path=None, *, limit=None, \
|
|
ssl=None, sock=None, server_hostname=None, \
|
|
ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
|
|
|
|
Establish a Unix socket connection and return a pair of
|
|
``(reader, writer)``.
|
|
|
|
Similar to :func:`open_connection` but operates on Unix sockets.
|
|
|
|
See also the documentation of :meth:`loop.create_unix_connection`.
|
|
|
|
.. note::
|
|
|
|
The *sock* argument transfers ownership of the socket to the
|
|
:class:`StreamWriter` created. To close the socket, call its
|
|
:meth:`~asyncio.StreamWriter.close` method.
|
|
|
|
.. availability:: Unix.
|
|
|
|
.. versionchanged:: 3.7
|
|
Added the *ssl_handshake_timeout* parameter.
|
|
The *path* parameter can now be a :term:`path-like object`
|
|
|
|
.. versionchanged:: 3.10
|
|
Removed the *loop* parameter.
|
|
|
|
.. versionchanged:: 3.11
|
|
Added the *ssl_shutdown_timeout* parameter.
|
|
|
|
|
|
.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
|
|
*, limit=None, sock=None, backlog=100, ssl=None, \
|
|
ssl_handshake_timeout=None, \
|
|
ssl_shutdown_timeout=None, start_serving=True)
|
|
|
|
Start a Unix socket server.
|
|
|
|
Similar to :func:`start_server` but works with Unix sockets.
|
|
|
|
See also the documentation of :meth:`loop.create_unix_server`.
|
|
|
|
.. note::
|
|
|
|
The *sock* argument transfers ownership of the socket to the
|
|
server created. To close the socket, call the server's
|
|
:meth:`~asyncio.Server.close` method.
|
|
|
|
.. availability:: Unix.
|
|
|
|
.. versionchanged:: 3.7
|
|
Added the *ssl_handshake_timeout* and *start_serving* parameters.
|
|
The *path* parameter can now be a :term:`path-like object`.
|
|
|
|
.. versionchanged:: 3.10
|
|
Removed the *loop* parameter.
|
|
|
|
.. versionchanged:: 3.11
|
|
Added the *ssl_shutdown_timeout* parameter.
|
|
|
|
|
|
StreamReader
|
|
============
|
|
|
|
.. class:: StreamReader
|
|
|
|
Represents a reader object that provides APIs to read data
|
|
from the IO stream. As an :term:`asynchronous iterable`, the
|
|
object supports the :keyword:`async for` statement.
|
|
|
|
It is not recommended to instantiate *StreamReader* objects
|
|
directly; use :func:`open_connection` and :func:`start_server`
|
|
instead.
|
|
|
|
.. method:: feed_eof()
|
|
|
|
Acknowledge the EOF.
|
|
|
|
.. coroutinemethod:: read(n=-1)
|
|
|
|
Read up to *n* bytes from the stream.
|
|
|
|
If *n* is not provided or set to ``-1``,
|
|
read until EOF, then return all read :class:`bytes`.
|
|
If EOF was received and the internal buffer is empty,
|
|
return an empty ``bytes`` object.
|
|
|
|
If *n* is ``0``, return an empty ``bytes`` object immediately.
|
|
|
|
If *n* is positive, return at most *n* available ``bytes``
|
|
as soon as at least 1 byte is available in the internal buffer.
|
|
If EOF is received before any byte is read, return an empty
|
|
``bytes`` object.
|
|
|
|
.. coroutinemethod:: readline()
|
|
|
|
Read one line, where "line" is a sequence of bytes
|
|
ending with ``\n``.
|
|
|
|
If EOF is received and ``\n`` was not found, the method
|
|
returns partially read data.
|
|
|
|
If EOF is received and the internal buffer is empty,
|
|
return an empty ``bytes`` object.
|
|
|
|
.. coroutinemethod:: readexactly(n)
|
|
|
|
Read exactly *n* bytes.
|
|
|
|
Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
|
|
can be read. Use the :attr:`IncompleteReadError.partial`
|
|
attribute to get the partially read data.
|
|
|
|
.. coroutinemethod:: readuntil(separator=b'\n')
|
|
|
|
Read data from the stream until *separator* is found.
|
|
|
|
On success, the data and separator will be removed from the
|
|
internal buffer (consumed). Returned data will include the
|
|
separator at the end.
|
|
|
|
If the amount of data read exceeds the configured stream limit, a
|
|
:exc:`LimitOverrunError` exception is raised, and the data
|
|
is left in the internal buffer and can be read again.
|
|
|
|
If EOF is reached before the complete separator is found,
|
|
an :exc:`IncompleteReadError` exception is raised, and the internal
|
|
buffer is reset. The :attr:`IncompleteReadError.partial` attribute
|
|
may contain a portion of the separator.
|
|
|
|
The *separator* may also be a tuple of separators. In this
|
|
case the return value will be the shortest possible that has any
|
|
separator as the suffix. For the purposes of :exc:`LimitOverrunError`,
|
|
the shortest possible separator is considered to be the one that
|
|
matched.
|
|
|
|
.. versionadded:: 3.5.2
|
|
|
|
.. versionchanged:: 3.13
|
|
|
|
The *separator* parameter may now be a :class:`tuple` of
|
|
separators.
|
|
|
|
.. method:: at_eof()
|
|
|
|
Return ``True`` if the buffer is empty and :meth:`feed_eof`
|
|
was called.
|
|
|
|
|
|
StreamWriter
|
|
============
|
|
|
|
.. class:: StreamWriter
|
|
|
|
Represents a writer object that provides APIs to write data
|
|
to the IO stream.
|
|
|
|
It is not recommended to instantiate *StreamWriter* objects
|
|
directly; use :func:`open_connection` and :func:`start_server`
|
|
instead.
|
|
|
|
.. method:: write(data)
|
|
|
|
The method attempts to write the *data* to the underlying socket immediately.
|
|
If that fails, the data is queued in an internal write buffer until it can be
|
|
sent.
|
|
|
|
The method should be used along with the ``drain()`` method::
|
|
|
|
stream.write(data)
|
|
await stream.drain()
|
|
|
|
.. method:: writelines(data)
|
|
|
|
The method writes a list (or any iterable) of bytes to the underlying socket
|
|
immediately.
|
|
If that fails, the data is queued in an internal write buffer until it can be
|
|
sent.
|
|
|
|
The method should be used along with the ``drain()`` method::
|
|
|
|
stream.writelines(lines)
|
|
await stream.drain()
|
|
|
|
.. method:: close()
|
|
|
|
The method closes the stream and the underlying socket.
|
|
|
|
The method should be used, though not mandatory,
|
|
along with the ``wait_closed()`` method::
|
|
|
|
stream.close()
|
|
await stream.wait_closed()
|
|
|
|
.. method:: can_write_eof()
|
|
|
|
Return ``True`` if the underlying transport supports
|
|
the :meth:`write_eof` method, ``False`` otherwise.
|
|
|
|
.. method:: write_eof()
|
|
|
|
Close the write end of the stream after the buffered write
|
|
data is flushed.
|
|
|
|
.. attribute:: transport
|
|
|
|
Return the underlying asyncio transport.
|
|
|
|
.. method:: get_extra_info(name, default=None)
|
|
|
|
Access optional transport information; see
|
|
:meth:`BaseTransport.get_extra_info` for details.
|
|
|
|
.. coroutinemethod:: drain()
|
|
|
|
Wait until it is appropriate to resume writing to the stream.
|
|
Example::
|
|
|
|
writer.write(data)
|
|
await writer.drain()
|
|
|
|
This is a flow control method that interacts with the underlying
|
|
IO write buffer. When the size of the buffer reaches
|
|
the high watermark, *drain()* blocks until the size of the
|
|
buffer is drained down to the low watermark and writing can
|
|
be resumed. When there is nothing to wait for, the :meth:`drain`
|
|
returns immediately.
|
|
|
|
.. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \
|
|
ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
|
|
|
|
Upgrade an existing stream-based connection to TLS.
|
|
|
|
Parameters:
|
|
|
|
* *sslcontext*: a configured instance of :class:`~ssl.SSLContext`.
|
|
|
|
* *server_hostname*: sets or overrides the host name that the target
|
|
server's certificate will be matched against.
|
|
|
|
* *ssl_handshake_timeout* is the time in seconds to wait for the TLS
|
|
handshake to complete before aborting the connection. ``60.0`` seconds
|
|
if ``None`` (default).
|
|
|
|
* *ssl_shutdown_timeout* is the time in seconds to wait for the SSL shutdown
|
|
to complete before aborting the connection. ``30.0`` seconds if ``None``
|
|
(default).
|
|
|
|
.. versionadded:: 3.11
|
|
|
|
.. versionchanged:: 3.12
|
|
Added the *ssl_shutdown_timeout* parameter.
|
|
|
|
|
|
.. method:: is_closing()
|
|
|
|
Return ``True`` if the stream is closed or in the process of
|
|
being closed.
|
|
|
|
.. versionadded:: 3.7
|
|
|
|
.. coroutinemethod:: wait_closed()
|
|
|
|
Wait until the stream is closed.
|
|
|
|
Should be called after :meth:`close` to wait until the underlying
|
|
connection is closed, ensuring that all data has been flushed
|
|
before e.g. exiting the program.
|
|
|
|
.. versionadded:: 3.7
|
|
|
|
|
|
Examples
|
|
========
|
|
|
|
.. _asyncio-tcp-echo-client-streams:
|
|
|
|
TCP echo client using streams
|
|
-----------------------------
|
|
|
|
TCP echo client using the :func:`asyncio.open_connection` function::
|
|
|
|
import asyncio
|
|
|
|
async def tcp_echo_client(message):
|
|
reader, writer = await asyncio.open_connection(
|
|
'127.0.0.1', 8888)
|
|
|
|
print(f'Send: {message!r}')
|
|
writer.write(message.encode())
|
|
await writer.drain()
|
|
|
|
data = await reader.read(100)
|
|
print(f'Received: {data.decode()!r}')
|
|
|
|
print('Close the connection')
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
asyncio.run(tcp_echo_client('Hello World!'))
|
|
|
|
|
|
.. seealso::
|
|
|
|
The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
|
|
example uses the low-level :meth:`loop.create_connection` method.
|
|
|
|
|
|
.. _asyncio-tcp-echo-server-streams:
|
|
|
|
TCP echo server using streams
|
|
-----------------------------
|
|
|
|
TCP echo server using the :func:`asyncio.start_server` function::
|
|
|
|
import asyncio
|
|
|
|
async def handle_echo(reader, writer):
|
|
data = await reader.read(100)
|
|
message = data.decode()
|
|
addr = writer.get_extra_info('peername')
|
|
|
|
print(f"Received {message!r} from {addr!r}")
|
|
|
|
print(f"Send: {message!r}")
|
|
writer.write(data)
|
|
await writer.drain()
|
|
|
|
print("Close the connection")
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
async def main():
|
|
server = await asyncio.start_server(
|
|
handle_echo, '127.0.0.1', 8888)
|
|
|
|
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
|
|
print(f'Serving on {addrs}')
|
|
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
asyncio.run(main())
|
|
|
|
|
|
.. seealso::
|
|
|
|
The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
|
|
example uses the :meth:`loop.create_server` method.
|
|
|
|
|
|
Get HTTP headers
|
|
----------------
|
|
|
|
Simple example querying HTTP headers of the URL passed on the command line::
|
|
|
|
import asyncio
|
|
import urllib.parse
|
|
import sys
|
|
|
|
async def print_http_headers(url):
|
|
url = urllib.parse.urlsplit(url)
|
|
if url.scheme == 'https':
|
|
reader, writer = await asyncio.open_connection(
|
|
url.hostname, 443, ssl=True)
|
|
else:
|
|
reader, writer = await asyncio.open_connection(
|
|
url.hostname, 80)
|
|
|
|
query = (
|
|
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
|
|
f"Host: {url.hostname}\r\n"
|
|
f"\r\n"
|
|
)
|
|
|
|
writer.write(query.encode('latin-1'))
|
|
while True:
|
|
line = await reader.readline()
|
|
if not line:
|
|
break
|
|
|
|
line = line.decode('latin1').rstrip()
|
|
if line:
|
|
print(f'HTTP header> {line}')
|
|
|
|
# Ignore the body, close the socket
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
url = sys.argv[1]
|
|
asyncio.run(print_http_headers(url))
|
|
|
|
|
|
Usage::
|
|
|
|
python example.py http://example.com/path/page.html
|
|
|
|
or with HTTPS::
|
|
|
|
python example.py https://example.com/path/page.html
|
|
|
|
|
|
.. _asyncio_example_create_connection-streams:
|
|
|
|
Register an open socket to wait for data using streams
|
|
------------------------------------------------------
|
|
|
|
Coroutine waiting until a socket receives data using the
|
|
:func:`open_connection` function::
|
|
|
|
import asyncio
|
|
import socket
|
|
|
|
async def wait_for_data():
|
|
# Get a reference to the current event loop because
|
|
# we want to access low-level APIs.
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Create a pair of connected sockets.
|
|
rsock, wsock = socket.socketpair()
|
|
|
|
# Register the open socket to wait for data.
|
|
reader, writer = await asyncio.open_connection(sock=rsock)
|
|
|
|
# Simulate the reception of data from the network
|
|
loop.call_soon(wsock.send, 'abc'.encode())
|
|
|
|
# Wait for data
|
|
data = await reader.read(100)
|
|
|
|
# Got data, we are done: close the socket
|
|
print("Received:", data.decode())
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
# Close the second socket
|
|
wsock.close()
|
|
|
|
asyncio.run(wait_for_data())
|
|
|
|
.. seealso::
|
|
|
|
The :ref:`register an open socket to wait for data using a protocol
|
|
<asyncio_example_create_connection>` example uses a low-level protocol and
|
|
the :meth:`loop.create_connection` method.
|
|
|
|
The :ref:`watch a file descriptor for read events
|
|
<asyncio_example_watch_fd>` example uses the low-level
|
|
:meth:`loop.add_reader` method to watch a file descriptor.
|