bpo-36889: Document asyncio Stream and StreamServer (GH-14203)
(cherry picked from commit 6793cce155
)
Co-authored-by: Xtreak <tir.karthi@gmail.com>
This commit is contained in:
parent
76b72f6ea2
commit
7268fd0d21
|
@ -1625,8 +1625,7 @@ Wait until a file descriptor received some data using the
|
|||
:meth:`loop.create_connection` method.
|
||||
|
||||
* Another similar :ref:`example <asyncio_example_create_connection-streams>`
|
||||
using the high-level :func:`asyncio.open_connection` function
|
||||
and streams.
|
||||
using the high-level :func:`asyncio.connect` function and streams.
|
||||
|
||||
|
||||
.. _asyncio_example_unix_signals:
|
||||
|
|
|
@ -810,7 +810,7 @@ data, and waits until the connection is closed::
|
|||
.. seealso::
|
||||
|
||||
The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
|
||||
example uses the high-level :func:`asyncio.open_connection` function.
|
||||
example uses the high-level :func:`asyncio.connect` function.
|
||||
|
||||
|
||||
.. _asyncio-udp-echo-server-protocol:
|
||||
|
@ -977,7 +977,7 @@ Wait until a socket receives data using the
|
|||
|
||||
The :ref:`register an open socket to wait for data using streams
|
||||
<asyncio_example_create_connection-streams>` example uses high-level streams
|
||||
created by the :func:`open_connection` function in a coroutine.
|
||||
created by the :func:`asyncio.connect` function in a coroutine.
|
||||
|
||||
.. _asyncio_example_subprocess_proto:
|
||||
|
||||
|
|
|
@ -18,17 +18,12 @@ streams::
|
|||
import asyncio
|
||||
|
||||
async def tcp_echo_client(message):
|
||||
reader, writer = await asyncio.open_connection(
|
||||
'127.0.0.1', 8888)
|
||||
async with asyncio.connect('127.0.0.1', 8888) as stream:
|
||||
print(f'Send: {message!r}')
|
||||
await stream.write(message.encode())
|
||||
|
||||
print(f'Send: {message!r}')
|
||||
await writer.write(message.encode())
|
||||
|
||||
data = await reader.read(100)
|
||||
print(f'Received: {data.decode()!r}')
|
||||
|
||||
print('Close the connection')
|
||||
await writer.close()
|
||||
data = await stream.read(100)
|
||||
print(f'Received: {data.decode()!r}')
|
||||
|
||||
asyncio.run(tcp_echo_client('Hello World!'))
|
||||
|
||||
|
@ -42,6 +37,32 @@ The following top-level asyncio functions can be used to create
|
|||
and work with streams:
|
||||
|
||||
|
||||
.. coroutinefunction:: connect(host=None, port=None, \*, \
|
||||
limit=2**16, ssl=None, family=0, \
|
||||
proto=0, flags=0, sock=None, local_addr=None, \
|
||||
server_hostname=None, ssl_handshake_timeout=None, \
|
||||
happy_eyeballs_delay=None, interleave=None)
|
||||
|
||||
Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
|
||||
object of mode :attr:`StreamMode.READWRITE`.
|
||||
|
||||
|
||||
*limit* determines the buffer size limit used by the returned :class:`Stream`
|
||||
instance. By default the *limit* is set to 64 KiB.
|
||||
|
||||
The rest of the arguments are passed directly to :meth:`loop.create_connection`.
|
||||
|
||||
The function can be used with ``await`` to get a connected stream::
|
||||
|
||||
stream = await asyncio.connect('127.0.0.1', 8888)
|
||||
|
||||
The function can also be used as an async context manager::
|
||||
|
||||
async with asyncio.connect('127.0.0.1', 8888) as stream:
|
||||
...
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
.. coroutinefunction:: open_connection(host=None, port=None, \*, \
|
||||
loop=None, limit=None, ssl=None, family=0, \
|
||||
proto=0, flags=0, sock=None, local_addr=None, \
|
||||
|
@ -69,10 +90,10 @@ and work with streams:
|
|||
|
||||
.. deprecated-removed:: 3.8 3.10
|
||||
|
||||
`open_connection()` is deprecated in favor of `connect()`.
|
||||
`open_connection()` is deprecated in favor of :func:`connect`.
|
||||
|
||||
.. coroutinefunction:: start_server(client_connected_cb, host=None, \
|
||||
port=None, \*, loop=None, limit=None, \
|
||||
port=None, \*, loop=None, limit=2**16, \
|
||||
family=socket.AF_UNSPEC, \
|
||||
flags=socket.AI_PASSIVE, sock=None, \
|
||||
backlog=100, ssl=None, reuse_address=None, \
|
||||
|
@ -106,11 +127,58 @@ and work with streams:
|
|||
|
||||
.. deprecated-removed:: 3.8 3.10
|
||||
|
||||
`start_server()` is deprecated if favor of `StreamServer()`
|
||||
`start_server()` is deprecated if favor of :class:`StreamServer`
|
||||
|
||||
.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
|
||||
|
||||
Takes a :term:`file-like object <file object>` *pipe* to return a
|
||||
:class:`Stream` object of the mode :attr:`StreamMode.READ` that has
|
||||
similar API of :class:`StreamReader`. It can also be used as an async context manager.
|
||||
|
||||
*limit* determines the buffer size limit used by the returned :class:`Stream`
|
||||
instance. By default the limit is set to 64 KiB.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
|
||||
|
||||
Takes a :term:`file-like object <file object>` *pipe* to return a
|
||||
:class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
|
||||
similar API of :class:`StreamWriter`. It can also be used as an async context manager.
|
||||
|
||||
*limit* determines the buffer size limit used by the returned :class:`Stream`
|
||||
instance. By default the limit is set to 64 KiB.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
.. rubric:: Unix Sockets
|
||||
|
||||
.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
|
||||
sock=None, server_hostname=None, \
|
||||
ssl_handshake_timeout=None)
|
||||
|
||||
Establish a Unix socket connection to socket with *path* address and
|
||||
return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
|
||||
that can be used as a reader and a writer.
|
||||
|
||||
*limit* determines the buffer size limit used by the returned :class:`Stream`
|
||||
instance. By default the *limit* is set to 64 KiB.
|
||||
|
||||
The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
|
||||
|
||||
The function can be used with ``await`` to get a connected stream::
|
||||
|
||||
stream = await asyncio.connect_unix('/tmp/example.sock')
|
||||
|
||||
The function can also be used as an async context manager::
|
||||
|
||||
async with asyncio.connect_unix('/tmp/example.sock') as stream:
|
||||
...
|
||||
|
||||
.. availability:: Unix.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
|
||||
limit=None, ssl=None, sock=None, \
|
||||
server_hostname=None, ssl_handshake_timeout=None)
|
||||
|
@ -134,7 +202,7 @@ and work with streams:
|
|||
|
||||
.. deprecated-removed:: 3.8 3.10
|
||||
|
||||
`open_unix_connection()` is deprecated if favor of `connect_unix()`.
|
||||
``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
|
||||
|
||||
|
||||
.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
|
||||
|
@ -160,11 +228,176 @@ and work with streams:
|
|||
|
||||
.. deprecated-removed:: 3.8 3.10
|
||||
|
||||
`start_unix_server()` is deprecated in favor of `UnixStreamServer()`.
|
||||
``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
|
||||
|
||||
|
||||
---------
|
||||
|
||||
StreamServer
|
||||
============
|
||||
|
||||
.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
|
||||
limit=2**16, family=socket.AF_UNSPEC, \
|
||||
flags=socket.AI_PASSIVE, sock=None, backlog=100, \
|
||||
ssl=None, reuse_address=None, reuse_port=None, \
|
||||
ssl_handshake_timeout=None, shutdown_timeout=60)
|
||||
|
||||
The *client_connected_cb* callback is called whenever a new client
|
||||
connection is established. It receives a :class:`Stream` object of the
|
||||
mode :attr:`StreamMode.READWRITE`.
|
||||
|
||||
*client_connected_cb* can be a plain callable or a
|
||||
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
|
||||
it will be automatically scheduled as a :class:`Task`.
|
||||
|
||||
*limit* determines the buffer size limit used by the
|
||||
returned :class:`Stream` instance. By default the *limit*
|
||||
is set to 64 KiB.
|
||||
|
||||
The rest of the arguments are passed directly to
|
||||
:meth:`loop.create_server`.
|
||||
|
||||
.. coroutinemethod:: start_serving()
|
||||
|
||||
Binds to the given host and port to start the server.
|
||||
|
||||
.. coroutinemethod:: serve_forever()
|
||||
|
||||
Start accepting connections until the coroutine is cancelled.
|
||||
Cancellation of ``serve_forever`` task causes the server
|
||||
to be closed.
|
||||
|
||||
This method can be called if the server is already accepting
|
||||
connections. Only one ``serve_forever`` task can exist per
|
||||
one *Server* object.
|
||||
|
||||
.. method:: is_serving()
|
||||
|
||||
Returns ``True`` if the server is bound and currently serving.
|
||||
|
||||
.. method:: bind()
|
||||
|
||||
Bind the server to the given *host* and *port*. This method is
|
||||
automatically called during ``__aenter__`` when :class:`StreamServer` is
|
||||
used as an async context manager.
|
||||
|
||||
.. method:: is_bound()
|
||||
|
||||
Return ``True`` if the server is bound.
|
||||
|
||||
.. coroutinemethod:: abort()
|
||||
|
||||
Closes the connection and cancels all pending tasks.
|
||||
|
||||
.. coroutinemethod:: close()
|
||||
|
||||
Closes the connection. This method is automatically called during
|
||||
``__aexit__`` when :class:`StreamServer` is used as an async context
|
||||
manager.
|
||||
|
||||
.. attribute:: sockets
|
||||
|
||||
Returns a tuple of socket objects the server is bound to.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
|
||||
UnixStreamServer
|
||||
================
|
||||
|
||||
.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
|
||||
limit=2**16, sock=None, backlog=100, \
|
||||
ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
|
||||
|
||||
The *client_connected_cb* callback is called whenever a new client
|
||||
connection is established. It receives a :class:`Stream` object of the
|
||||
mode :attr:`StreamMode.READWRITE`.
|
||||
|
||||
*client_connected_cb* can be a plain callable or a
|
||||
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
|
||||
it will be automatically scheduled as a :class:`Task`.
|
||||
|
||||
*limit* determines the buffer size limit used by the
|
||||
returned :class:`Stream` instance. By default the *limit*
|
||||
is set to 64 KiB.
|
||||
|
||||
The rest of the arguments are passed directly to
|
||||
:meth:`loop.create_unix_server`.
|
||||
|
||||
.. coroutinemethod:: start_serving()
|
||||
|
||||
Binds to the given host and port to start the server.
|
||||
|
||||
.. method:: is_serving()
|
||||
|
||||
Returns ``True`` if the server is bound and currently serving.
|
||||
|
||||
.. method:: bind()
|
||||
|
||||
Bind the server to the given *host* and *port*. This method is
|
||||
automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
|
||||
used as an async context manager.
|
||||
|
||||
.. method:: is_bound()
|
||||
|
||||
Return ``True`` if the server is bound.
|
||||
|
||||
.. coroutinemethod:: abort()
|
||||
|
||||
Closes the connection and cancels all pending tasks.
|
||||
|
||||
.. coroutinemethod:: close()
|
||||
|
||||
Closes the connection. This method is automatically called during
|
||||
``__aexit__`` when :class:`UnixStreamServer` is used as an async context
|
||||
manager.
|
||||
|
||||
.. attribute:: sockets
|
||||
|
||||
Returns a tuple of socket objects the server is bound to.
|
||||
|
||||
.. availability:: Unix.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
Stream
|
||||
======
|
||||
|
||||
.. class:: Stream
|
||||
|
||||
Represents a Stream object that provides APIs to read and write data
|
||||
to the IO stream . It includes the API provided by :class:`StreamReader`
|
||||
and :class:`StreamWriter`.
|
||||
|
||||
Do not instantiate *Stream* objects directly; use API like :func:`connect`
|
||||
and :class:`StreamServer` instead.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
|
||||
StreamMode
|
||||
==========
|
||||
|
||||
.. class:: StreamMode
|
||||
|
||||
A subclass of :class:`enum.Flag` that defines a set of values that can be
|
||||
used to determine the ``mode`` of :class:`Stream` objects.
|
||||
|
||||
.. data:: READ
|
||||
|
||||
The stream object is readable and provides the API of :class:`StreamReader`.
|
||||
|
||||
.. data:: WRITE
|
||||
|
||||
The stream object is writeable and provides the API of :class:`StreamWriter`.
|
||||
|
||||
.. data:: READWRITE
|
||||
|
||||
The stream object is readable and writeable and provides the API of both
|
||||
:class:`StreamReader` and :class:`StreamWriter`.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
||||
|
||||
StreamReader
|
||||
============
|
||||
|
@ -366,22 +599,17 @@ Examples
|
|||
TCP echo client using streams
|
||||
-----------------------------
|
||||
|
||||
TCP echo client using the :func:`asyncio.open_connection` function::
|
||||
TCP echo client using the :func:`asyncio.connect` function::
|
||||
|
||||
import asyncio
|
||||
|
||||
async def tcp_echo_client(message):
|
||||
reader, writer = await asyncio.open_connection(
|
||||
'127.0.0.1', 8888)
|
||||
async with asyncio.connect('127.0.0.1', 8888) as stream:
|
||||
print(f'Send: {message!r}')
|
||||
await stream.write(message.encode())
|
||||
|
||||
print(f'Send: {message!r}')
|
||||
writer.write(message.encode())
|
||||
|
||||
data = await reader.read(100)
|
||||
print(f'Received: {data.decode()!r}')
|
||||
|
||||
print('Close the connection')
|
||||
writer.close()
|
||||
data = await stream.read(100)
|
||||
print(f'Received: {data.decode()!r}')
|
||||
|
||||
asyncio.run(tcp_echo_client('Hello World!'))
|
||||
|
||||
|
@ -397,32 +625,28 @@ TCP echo client using the :func:`asyncio.open_connection` function::
|
|||
TCP echo server using streams
|
||||
-----------------------------
|
||||
|
||||
TCP echo server using the :func:`asyncio.start_server` function::
|
||||
TCP echo server using the :class:`asyncio.StreamServer` class::
|
||||
|
||||
import asyncio
|
||||
|
||||
async def handle_echo(reader, writer):
|
||||
data = await reader.read(100)
|
||||
async def handle_echo(stream):
|
||||
data = await stream.read(100)
|
||||
message = data.decode()
|
||||
addr = writer.get_extra_info('peername')
|
||||
addr = stream.get_extra_info('peername')
|
||||
|
||||
print(f"Received {message!r} from {addr!r}")
|
||||
|
||||
print(f"Send: {message!r}")
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
await stream.write(data)
|
||||
|
||||
print("Close the connection")
|
||||
writer.close()
|
||||
await stream.close()
|
||||
|
||||
async def main():
|
||||
server = await asyncio.start_server(
|
||||
handle_echo, '127.0.0.1', 8888)
|
||||
|
||||
addr = server.sockets[0].getsockname()
|
||||
print(f'Serving on {addr}')
|
||||
|
||||
async with server:
|
||||
async with asyncio.StreamServer(
|
||||
handle_echo, '127.0.0.1', 8888) as server:
|
||||
addr = server.sockets[0].getsockname()
|
||||
print(f'Serving on {addr}')
|
||||
await server.serve_forever()
|
||||
|
||||
asyncio.run(main())
|
||||
|
@ -446,11 +670,9 @@ Simple example querying HTTP headers of the URL passed on the command line::
|
|||
async def print_http_headers(url):
|
||||
url = urllib.parse.urlsplit(url)
|
||||
if url.scheme == 'https':
|
||||
reader, writer = await asyncio.open_connection(
|
||||
url.hostname, 443, ssl=True)
|
||||
stream = await asyncio.connect(url.hostname, 443, ssl=True)
|
||||
else:
|
||||
reader, writer = await asyncio.open_connection(
|
||||
url.hostname, 80)
|
||||
stream = await asyncio.connect(url.hostname, 80)
|
||||
|
||||
query = (
|
||||
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
|
||||
|
@ -458,18 +680,14 @@ Simple example querying HTTP headers of the URL passed on the command line::
|
|||
f"\r\n"
|
||||
)
|
||||
|
||||
writer.write(query.encode('latin-1'))
|
||||
while True:
|
||||
line = await reader.readline()
|
||||
if not line:
|
||||
break
|
||||
|
||||
stream.write(query.encode('latin-1'))
|
||||
while (line := await stream.readline()):
|
||||
line = line.decode('latin1').rstrip()
|
||||
if line:
|
||||
print(f'HTTP header> {line}')
|
||||
|
||||
# Ignore the body, close the socket
|
||||
writer.close()
|
||||
await stream.close()
|
||||
|
||||
url = sys.argv[1]
|
||||
asyncio.run(print_http_headers(url))
|
||||
|
@ -490,7 +708,7 @@ Register an open socket to wait for data using streams
|
|||
------------------------------------------------------
|
||||
|
||||
Coroutine waiting until a socket receives data using the
|
||||
:func:`open_connection` function::
|
||||
:func:`asyncio.connect` function::
|
||||
|
||||
import asyncio
|
||||
import socket
|
||||
|
@ -504,17 +722,15 @@ Coroutine waiting until a socket receives data using the
|
|||
rsock, wsock = socket.socketpair()
|
||||
|
||||
# Register the open socket to wait for data.
|
||||
reader, writer = await asyncio.open_connection(sock=rsock)
|
||||
async with asyncio.connect(sock=rsock) as stream:
|
||||
# Simulate the reception of data from the network
|
||||
loop.call_soon(wsock.send, 'abc'.encode())
|
||||
|
||||
# Simulate the reception of data from the network
|
||||
loop.call_soon(wsock.send, 'abc'.encode())
|
||||
# Wait for data
|
||||
data = await stream.read(100)
|
||||
|
||||
# Wait for data
|
||||
data = await reader.read(100)
|
||||
|
||||
# Got data, we are done: close the socket
|
||||
print("Received:", data.decode())
|
||||
writer.close()
|
||||
# Got data, we are done: close the socket
|
||||
print("Received:", data.decode())
|
||||
|
||||
# Close the second socket
|
||||
wsock.close()
|
||||
|
|
Loading…
Reference in New Issue