diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index f763fd5f036..8f7974be66e 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1625,8 +1625,7 @@ Wait until a file descriptor received some data using the :meth:`loop.create_connection` method. * Another similar :ref:`example ` - using the high-level :func:`asyncio.open_connection` function - and streams. + using the high-level :func:`asyncio.connect` function and streams. .. _asyncio_example_unix_signals: diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index f08738dd62b..3e5a4dd8b89 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -810,7 +810,7 @@ data, and waits until the connection is closed:: .. seealso:: The :ref:`TCP echo client using streams ` - example uses the high-level :func:`asyncio.open_connection` function. + example uses the high-level :func:`asyncio.connect` function. .. _asyncio-udp-echo-server-protocol: @@ -977,7 +977,7 @@ Wait until a socket receives data using the The :ref:`register an open socket to wait for data using streams ` example uses high-level streams - created by the :func:`open_connection` function in a coroutine. + created by the :func:`asyncio.connect` function in a coroutine. .. _asyncio_example_subprocess_proto: diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 28ca5d5f339..dfe520de56b 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -18,17 +18,12 @@ streams:: import asyncio async def tcp_echo_client(message): - reader, writer = await asyncio.open_connection( - '127.0.0.1', 8888) + async with asyncio.connect('127.0.0.1', 8888) as stream: + print(f'Send: {message!r}') + await stream.write(message.encode()) - print(f'Send: {message!r}') - await writer.write(message.encode()) - - data = await reader.read(100) - print(f'Received: {data.decode()!r}') - - print('Close the connection') - await writer.close() + data = await stream.read(100) + print(f'Received: {data.decode()!r}') asyncio.run(tcp_echo_client('Hello World!')) @@ -42,6 +37,32 @@ The following top-level asyncio functions can be used to create and work with streams: +.. coroutinefunction:: connect(host=None, port=None, \*, \ + limit=2**16, ssl=None, family=0, \ + proto=0, flags=0, sock=None, local_addr=None, \ + server_hostname=None, ssl_handshake_timeout=None, \ + happy_eyeballs_delay=None, interleave=None) + + Connect to TCP socket on *host* : *port* address and return a :class:`Stream` + object of mode :attr:`StreamMode.READWRITE`. + + + *limit* determines the buffer size limit used by the returned :class:`Stream` + instance. By default the *limit* is set to 64 KiB. + + The rest of the arguments are passed directly to :meth:`loop.create_connection`. + + The function can be used with ``await`` to get a connected stream:: + + stream = await asyncio.connect('127.0.0.1', 8888) + + The function can also be used as an async context manager:: + + async with asyncio.connect('127.0.0.1', 8888) as stream: + ... + + .. versionadded:: 3.8 + .. coroutinefunction:: open_connection(host=None, port=None, \*, \ loop=None, limit=None, ssl=None, family=0, \ proto=0, flags=0, sock=None, local_addr=None, \ @@ -69,10 +90,10 @@ and work with streams: .. deprecated-removed:: 3.8 3.10 - `open_connection()` is deprecated in favor of `connect()`. + `open_connection()` is deprecated in favor of :func:`connect`. .. coroutinefunction:: start_server(client_connected_cb, host=None, \ - port=None, \*, loop=None, limit=None, \ + port=None, \*, loop=None, limit=2**16, \ family=socket.AF_UNSPEC, \ flags=socket.AI_PASSIVE, sock=None, \ backlog=100, ssl=None, reuse_address=None, \ @@ -106,11 +127,58 @@ and work with streams: .. deprecated-removed:: 3.8 3.10 - `start_server()` is deprecated if favor of `StreamServer()` + `start_server()` is deprecated if favor of :class:`StreamServer` +.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16) + + Takes a :term:`file-like object ` *pipe* to return a + :class:`Stream` object of the mode :attr:`StreamMode.READ` that has + similar API of :class:`StreamReader`. It can also be used as an async context manager. + + *limit* determines the buffer size limit used by the returned :class:`Stream` + instance. By default the limit is set to 64 KiB. + + .. versionadded:: 3.8 + +.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16) + + Takes a :term:`file-like object ` *pipe* to return a + :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has + similar API of :class:`StreamWriter`. It can also be used as an async context manager. + + *limit* determines the buffer size limit used by the returned :class:`Stream` + instance. By default the limit is set to 64 KiB. + + .. versionadded:: 3.8 .. rubric:: Unix Sockets +.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \ + sock=None, server_hostname=None, \ + ssl_handshake_timeout=None) + + Establish a Unix socket connection to socket with *path* address and + return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE` + that can be used as a reader and a writer. + + *limit* determines the buffer size limit used by the returned :class:`Stream` + instance. By default the *limit* is set to 64 KiB. + + The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`. + + The function can be used with ``await`` to get a connected stream:: + + stream = await asyncio.connect_unix('/tmp/example.sock') + + The function can also be used as an async context manager:: + + async with asyncio.connect_unix('/tmp/example.sock') as stream: + ... + + .. availability:: Unix. + + .. versionadded:: 3.8 + .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ limit=None, ssl=None, sock=None, \ server_hostname=None, ssl_handshake_timeout=None) @@ -134,7 +202,7 @@ and work with streams: .. deprecated-removed:: 3.8 3.10 - `open_unix_connection()` is deprecated if favor of `connect_unix()`. + ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`. .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ @@ -160,11 +228,176 @@ and work with streams: .. deprecated-removed:: 3.8 3.10 - `start_unix_server()` is deprecated in favor of `UnixStreamServer()`. + ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`. --------- +StreamServer +============ + +.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \ + limit=2**16, family=socket.AF_UNSPEC, \ + flags=socket.AI_PASSIVE, sock=None, backlog=100, \ + ssl=None, reuse_address=None, reuse_port=None, \ + ssl_handshake_timeout=None, shutdown_timeout=60) + + The *client_connected_cb* callback is called whenever a new client + connection is established. It receives a :class:`Stream` object of the + mode :attr:`StreamMode.READWRITE`. + + *client_connected_cb* can be a plain callable or a + :ref:`coroutine function `; if it is a coroutine function, + it will be automatically scheduled as a :class:`Task`. + + *limit* determines the buffer size limit used by the + returned :class:`Stream` instance. By default the *limit* + is set to 64 KiB. + + The rest of the arguments are passed directly to + :meth:`loop.create_server`. + + .. coroutinemethod:: start_serving() + + Binds to the given host and port to start the server. + + .. coroutinemethod:: serve_forever() + + Start accepting connections until the coroutine is cancelled. + Cancellation of ``serve_forever`` task causes the server + to be closed. + + This method can be called if the server is already accepting + connections. Only one ``serve_forever`` task can exist per + one *Server* object. + + .. method:: is_serving() + + Returns ``True`` if the server is bound and currently serving. + + .. method:: bind() + + Bind the server to the given *host* and *port*. This method is + automatically called during ``__aenter__`` when :class:`StreamServer` is + used as an async context manager. + + .. method:: is_bound() + + Return ``True`` if the server is bound. + + .. coroutinemethod:: abort() + + Closes the connection and cancels all pending tasks. + + .. coroutinemethod:: close() + + Closes the connection. This method is automatically called during + ``__aexit__`` when :class:`StreamServer` is used as an async context + manager. + + .. attribute:: sockets + + Returns a tuple of socket objects the server is bound to. + + .. versionadded:: 3.8 + + +UnixStreamServer +================ + +.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \ + limit=2**16, sock=None, backlog=100, \ + ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60) + + The *client_connected_cb* callback is called whenever a new client + connection is established. It receives a :class:`Stream` object of the + mode :attr:`StreamMode.READWRITE`. + + *client_connected_cb* can be a plain callable or a + :ref:`coroutine function `; if it is a coroutine function, + it will be automatically scheduled as a :class:`Task`. + + *limit* determines the buffer size limit used by the + returned :class:`Stream` instance. By default the *limit* + is set to 64 KiB. + + The rest of the arguments are passed directly to + :meth:`loop.create_unix_server`. + + .. coroutinemethod:: start_serving() + + Binds to the given host and port to start the server. + + .. method:: is_serving() + + Returns ``True`` if the server is bound and currently serving. + + .. method:: bind() + + Bind the server to the given *host* and *port*. This method is + automatically called during ``__aenter__`` when :class:`UnixStreamServer` is + used as an async context manager. + + .. method:: is_bound() + + Return ``True`` if the server is bound. + + .. coroutinemethod:: abort() + + Closes the connection and cancels all pending tasks. + + .. coroutinemethod:: close() + + Closes the connection. This method is automatically called during + ``__aexit__`` when :class:`UnixStreamServer` is used as an async context + manager. + + .. attribute:: sockets + + Returns a tuple of socket objects the server is bound to. + + .. availability:: Unix. + + .. versionadded:: 3.8 + +Stream +====== + +.. class:: Stream + + Represents a Stream object that provides APIs to read and write data + to the IO stream . It includes the API provided by :class:`StreamReader` + and :class:`StreamWriter`. + + Do not instantiate *Stream* objects directly; use API like :func:`connect` + and :class:`StreamServer` instead. + + .. versionadded:: 3.8 + + +StreamMode +========== + +.. class:: StreamMode + + A subclass of :class:`enum.Flag` that defines a set of values that can be + used to determine the ``mode`` of :class:`Stream` objects. + + .. data:: READ + + The stream object is readable and provides the API of :class:`StreamReader`. + + .. data:: WRITE + + The stream object is writeable and provides the API of :class:`StreamWriter`. + + .. data:: READWRITE + + The stream object is readable and writeable and provides the API of both + :class:`StreamReader` and :class:`StreamWriter`. + + .. versionadded:: 3.8 + StreamReader ============ @@ -366,22 +599,17 @@ Examples TCP echo client using streams ----------------------------- -TCP echo client using the :func:`asyncio.open_connection` function:: +TCP echo client using the :func:`asyncio.connect` function:: import asyncio async def tcp_echo_client(message): - reader, writer = await asyncio.open_connection( - '127.0.0.1', 8888) + async with asyncio.connect('127.0.0.1', 8888) as stream: + print(f'Send: {message!r}') + await stream.write(message.encode()) - print(f'Send: {message!r}') - writer.write(message.encode()) - - data = await reader.read(100) - print(f'Received: {data.decode()!r}') - - print('Close the connection') - writer.close() + data = await stream.read(100) + print(f'Received: {data.decode()!r}') asyncio.run(tcp_echo_client('Hello World!')) @@ -397,32 +625,28 @@ TCP echo client using the :func:`asyncio.open_connection` function:: TCP echo server using streams ----------------------------- -TCP echo server using the :func:`asyncio.start_server` function:: +TCP echo server using the :class:`asyncio.StreamServer` class:: import asyncio - async def handle_echo(reader, writer): - data = await reader.read(100) + async def handle_echo(stream): + data = await stream.read(100) message = data.decode() - addr = writer.get_extra_info('peername') + addr = stream.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") - writer.write(data) - await writer.drain() + await stream.write(data) print("Close the connection") - writer.close() + await stream.close() async def main(): - server = await asyncio.start_server( - handle_echo, '127.0.0.1', 8888) - - addr = server.sockets[0].getsockname() - print(f'Serving on {addr}') - - async with server: + async with asyncio.StreamServer( + handle_echo, '127.0.0.1', 8888) as server: + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') await server.serve_forever() asyncio.run(main()) @@ -446,11 +670,9 @@ Simple example querying HTTP headers of the URL passed on the command line:: async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': - reader, writer = await asyncio.open_connection( - url.hostname, 443, ssl=True) + stream = await asyncio.connect(url.hostname, 443, ssl=True) else: - reader, writer = await asyncio.open_connection( - url.hostname, 80) + stream = await asyncio.connect(url.hostname, 80) query = ( f"HEAD {url.path or '/'} HTTP/1.0\r\n" @@ -458,18 +680,14 @@ Simple example querying HTTP headers of the URL passed on the command line:: f"\r\n" ) - writer.write(query.encode('latin-1')) - while True: - line = await reader.readline() - if not line: - break - + stream.write(query.encode('latin-1')) + while (line := await stream.readline()): line = line.decode('latin1').rstrip() if line: print(f'HTTP header> {line}') # Ignore the body, close the socket - writer.close() + await stream.close() url = sys.argv[1] asyncio.run(print_http_headers(url)) @@ -490,7 +708,7 @@ Register an open socket to wait for data using streams ------------------------------------------------------ Coroutine waiting until a socket receives data using the -:func:`open_connection` function:: +:func:`asyncio.connect` function:: import asyncio import socket @@ -504,17 +722,15 @@ Coroutine waiting until a socket receives data using the rsock, wsock = socket.socketpair() # Register the open socket to wait for data. - reader, writer = await asyncio.open_connection(sock=rsock) + async with asyncio.connect(sock=rsock) as stream: + # Simulate the reception of data from the network + loop.call_soon(wsock.send, 'abc'.encode()) - # Simulate the reception of data from the network - loop.call_soon(wsock.send, 'abc'.encode()) + # Wait for data + data = await stream.read(100) - # Wait for data - data = await reader.read(100) - - # Got data, we are done: close the socket - print("Received:", data.decode()) - writer.close() + # Got data, we are done: close the socket + print("Received:", data.decode()) # Close the second socket wsock.close()