# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0 # SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0) # SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io import asyncio import contextlib import gc import logging import select import socket import sys import tempfile import threading import time import weakref import unittest try: import ssl except ImportError: ssl = None from test import support from test.test_asyncio import utils as test_utils MACOS = (sys.platform == 'darwin') BUF_MULTIPLIER = 1024 if not MACOS else 64 def tearDownModule(): asyncio.set_event_loop_policy(None) class MyBaseProto(asyncio.Protocol): connected = None done = None def __init__(self, loop=None): self.transport = None self.state = 'INITIAL' self.nbytes = 0 if loop is not None: self.connected = asyncio.Future(loop=loop) self.done = asyncio.Future(loop=loop) def connection_made(self, transport): self.transport = transport assert self.state == 'INITIAL', self.state self.state = 'CONNECTED' if self.connected: self.connected.set_result(None) def data_received(self, data): assert self.state == 'CONNECTED', self.state self.nbytes += len(data) def eof_received(self): assert self.state == 'CONNECTED', self.state self.state = 'EOF' def connection_lost(self, exc): assert self.state in ('CONNECTED', 'EOF'), self.state self.state = 'CLOSED' if self.done: self.done.set_result(None) class MessageOutFilter(logging.Filter): def __init__(self, msg): self.msg = msg def filter(self, record): if self.msg in record.msg: return False return True @unittest.skipIf(ssl is None, 'No ssl module') class TestSSL(test_utils.TestCase): PAYLOAD_SIZE = 1024 * 100 TIMEOUT = support.LONG_TIMEOUT def setUp(self): super().setUp() self.loop = asyncio.new_event_loop() self.set_event_loop(self.loop) self.addCleanup(self.loop.close) def tearDown(self): # just in case if we have transport close callbacks if not self.loop.is_closed(): test_utils.run_briefly(self.loop) self.doCleanups() support.gc_collect() super().tearDown() def tcp_server(self, server_prog, *, family=socket.AF_INET, addr=None, timeout=support.SHORT_TIMEOUT, backlog=1, max_clients=10): if addr is None: if family == getattr(socket, "AF_UNIX", None): with tempfile.NamedTemporaryFile() as tmp: addr = tmp.name else: addr = ('127.0.0.1', 0) sock = socket.socket(family, socket.SOCK_STREAM) if timeout is None: raise RuntimeError('timeout is required') if timeout <= 0: raise RuntimeError('only blocking sockets are supported') sock.settimeout(timeout) try: sock.bind(addr) sock.listen(backlog) except OSError as ex: sock.close() raise ex return TestThreadedServer( self, sock, server_prog, timeout, max_clients) def tcp_client(self, client_prog, family=socket.AF_INET, timeout=support.SHORT_TIMEOUT): sock = socket.socket(family, socket.SOCK_STREAM) if timeout is None: raise RuntimeError('timeout is required') if timeout <= 0: raise RuntimeError('only blocking sockets are supported') sock.settimeout(timeout) return TestThreadedClient( self, sock, client_prog, timeout) def unix_server(self, *args, **kwargs): return self.tcp_server(*args, family=socket.AF_UNIX, **kwargs) def unix_client(self, *args, **kwargs): return self.tcp_client(*args, family=socket.AF_UNIX, **kwargs) def _create_server_ssl_context(self, certfile, keyfile=None): sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) sslcontext.options |= ssl.OP_NO_SSLv2 sslcontext.load_cert_chain(certfile, keyfile) return sslcontext def _create_client_ssl_context(self, *, disable_verify=True): sslcontext = ssl.create_default_context() sslcontext.check_hostname = False if disable_verify: sslcontext.verify_mode = ssl.CERT_NONE return sslcontext @contextlib.contextmanager def _silence_eof_received_warning(self): # TODO This warning has to be fixed in asyncio. logger = logging.getLogger('asyncio') filter = MessageOutFilter('has no effect when using ssl') logger.addFilter(filter) try: yield finally: logger.removeFilter(filter) def _abort_socket_test(self, ex): try: self.loop.stop() finally: self.fail(ex) def new_loop(self): return asyncio.new_event_loop() def new_policy(self): return asyncio.DefaultEventLoopPolicy() async def wait_closed(self, obj): if not isinstance(obj, asyncio.StreamWriter): return try: await obj.wait_closed() except (BrokenPipeError, ConnectionError): pass def test_create_server_ssl_1(self): CNT = 0 # number of clients that were successful TOTAL_CNT = 25 # total number of clients that test will create TIMEOUT = support.LONG_TIMEOUT # timeout for this test A_DATA = b'A' * 1024 * BUF_MULTIPLIER B_DATA = b'B' * 1024 * BUF_MULTIPLIER sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY ) client_sslctx = self._create_client_ssl_context() clients = [] async def handle_client(reader, writer): nonlocal CNT data = await reader.readexactly(len(A_DATA)) self.assertEqual(data, A_DATA) writer.write(b'OK') data = await reader.readexactly(len(B_DATA)) self.assertEqual(data, B_DATA) writer.writelines([b'SP', bytearray(b'A'), memoryview(b'M')]) await writer.drain() writer.close() CNT += 1 async def test_client(addr): fut = asyncio.Future() def prog(sock): try: sock.starttls(client_sslctx) sock.connect(addr) sock.send(A_DATA) data = sock.recv_all(2) self.assertEqual(data, b'OK') sock.send(B_DATA) data = sock.recv_all(4) self.assertEqual(data, b'SPAM') sock.close() except Exception as ex: self.loop.call_soon_threadsafe(fut.set_exception, ex) else: self.loop.call_soon_threadsafe(fut.set_result, None) client = self.tcp_client(prog) client.start() clients.append(client) await fut async def start_server(): extras = {} extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT) srv = await asyncio.start_server( handle_client, '127.0.0.1', 0, family=socket.AF_INET, ssl=sslctx, **extras) try: srv_socks = srv.sockets self.assertTrue(srv_socks) addr = srv_socks[0].getsockname() tasks = [] for _ in range(TOTAL_CNT): tasks.append(test_client(addr)) await asyncio.wait_for(asyncio.gather(*tasks), TIMEOUT) finally: self.loop.call_soon(srv.close) await srv.wait_closed() with self._silence_eof_received_warning(): self.loop.run_until_complete(start_server()) self.assertEqual(CNT, TOTAL_CNT) for client in clients: client.stop() def test_create_connection_ssl_1(self): self.loop.set_exception_handler(None) CNT = 0 TOTAL_CNT = 25 A_DATA = b'A' * 1024 * BUF_MULTIPLIER B_DATA = b'B' * 1024 * BUF_MULTIPLIER sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY ) client_sslctx = self._create_client_ssl_context() def server(sock): sock.starttls( sslctx, server_side=True) data = sock.recv_all(len(A_DATA)) self.assertEqual(data, A_DATA) sock.send(b'OK') data = sock.recv_all(len(B_DATA)) self.assertEqual(data, B_DATA) sock.send(b'SPAM') sock.close() async def client(addr): extras = {} extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT) reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='', **extras) writer.write(A_DATA) self.assertEqual(await reader.readexactly(2), b'OK') writer.write(B_DATA) self.assertEqual(await reader.readexactly(4), b'SPAM') nonlocal CNT CNT += 1 writer.close() await self.wait_closed(writer) async def client_sock(addr): sock = socket.socket() sock.connect(addr) reader, writer = await asyncio.open_connection( sock=sock, ssl=client_sslctx, server_hostname='') writer.write(A_DATA) self.assertEqual(await reader.readexactly(2), b'OK') writer.write(B_DATA) self.assertEqual(await reader.readexactly(4), b'SPAM') nonlocal CNT CNT += 1 writer.close() await self.wait_closed(writer) sock.close() def run(coro): nonlocal CNT CNT = 0 async def _gather(*tasks): # trampoline return await asyncio.gather(*tasks) with self.tcp_server(server, max_clients=TOTAL_CNT, backlog=TOTAL_CNT) as srv: tasks = [] for _ in range(TOTAL_CNT): tasks.append(coro(srv.addr)) self.loop.run_until_complete(_gather(*tasks)) self.assertEqual(CNT, TOTAL_CNT) with self._silence_eof_received_warning(): run(client) with self._silence_eof_received_warning(): run(client_sock) def test_create_connection_ssl_slow_handshake(self): client_sslctx = self._create_client_ssl_context() # silence error logger self.loop.set_exception_handler(lambda *args: None) def server(sock): try: sock.recv_all(1024 * 1024) except ConnectionAbortedError: pass finally: sock.close() async def client(addr): reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='', ssl_handshake_timeout=1.0) writer.close() await self.wait_closed(writer) with self.tcp_server(server, max_clients=1, backlog=1) as srv: with self.assertRaisesRegex( ConnectionAbortedError, r'SSL handshake.*is taking longer'): self.loop.run_until_complete(client(srv.addr)) def test_create_connection_ssl_failed_certificate(self): # silence error logger self.loop.set_exception_handler(lambda *args: None) sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY ) client_sslctx = self._create_client_ssl_context(disable_verify=False) def server(sock): try: sock.starttls( sslctx, server_side=True) sock.connect() except (ssl.SSLError, OSError): pass finally: sock.close() async def client(addr): reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='', ssl_handshake_timeout=support.SHORT_TIMEOUT) writer.close() await self.wait_closed(writer) with self.tcp_server(server, max_clients=1, backlog=1) as srv: with self.assertRaises(ssl.SSLCertVerificationError): self.loop.run_until_complete(client(srv.addr)) def test_ssl_handshake_timeout(self): # bpo-29970: Check that a connection is aborted if handshake is not # completed in timeout period, instead of remaining open indefinitely client_sslctx = test_utils.simple_client_sslcontext() # silence error logger messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) server_side_aborted = False def server(sock): nonlocal server_side_aborted try: sock.recv_all(1024 * 1024) except ConnectionAbortedError: server_side_aborted = True finally: sock.close() async def client(addr): await asyncio.wait_for( self.loop.create_connection( asyncio.Protocol, *addr, ssl=client_sslctx, server_hostname='', ssl_handshake_timeout=10.0), 0.5) with self.tcp_server(server, max_clients=1, backlog=1) as srv: with self.assertRaises(asyncio.TimeoutError): self.loop.run_until_complete(client(srv.addr)) self.assertTrue(server_side_aborted) # Python issue #23197: cancelling a handshake must not raise an # exception or log an error, even if the handshake failed self.assertEqual(messages, []) def test_ssl_handshake_connection_lost(self): # #246: make sure that no connection_lost() is called before # connection_made() is called first client_sslctx = test_utils.simple_client_sslcontext() # silence error logger self.loop.set_exception_handler(lambda loop, ctx: None) connection_made_called = False connection_lost_called = False def server(sock): sock.recv(1024) # break the connection during handshake sock.close() class ClientProto(asyncio.Protocol): def connection_made(self, transport): nonlocal connection_made_called connection_made_called = True def connection_lost(self, exc): nonlocal connection_lost_called connection_lost_called = True async def client(addr): await self.loop.create_connection( ClientProto, *addr, ssl=client_sslctx, server_hostname=''), with self.tcp_server(server, max_clients=1, backlog=1) as srv: with self.assertRaises(ConnectionResetError): self.loop.run_until_complete(client(srv.addr)) if connection_lost_called: if connection_made_called: self.fail("unexpected call to connection_lost()") else: self.fail("unexpected call to connection_lost() without" "calling connection_made()") elif connection_made_called: self.fail("unexpected call to connection_made()") def test_ssl_connect_accepted_socket(self): proto = ssl.PROTOCOL_TLS_SERVER server_context = ssl.SSLContext(proto) server_context.load_cert_chain(test_utils.ONLYCERT, test_utils.ONLYKEY) if hasattr(server_context, 'check_hostname'): server_context.check_hostname = False server_context.verify_mode = ssl.CERT_NONE client_context = ssl.SSLContext(proto) if hasattr(server_context, 'check_hostname'): client_context.check_hostname = False client_context.verify_mode = ssl.CERT_NONE def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): loop = self.loop class MyProto(MyBaseProto): def connection_lost(self, exc): super().connection_lost(exc) loop.call_soon(loop.stop) def data_received(self, data): super().data_received(data) self.transport.write(expected_response) lsock = socket.socket(socket.AF_INET) lsock.bind(('127.0.0.1', 0)) lsock.listen(1) addr = lsock.getsockname() message = b'test data' response = None expected_response = b'roger' def client(): nonlocal response try: csock = socket.socket(socket.AF_INET) if client_ssl is not None: csock = client_ssl.wrap_socket(csock) csock.connect(addr) csock.sendall(message) response = csock.recv(99) csock.close() except Exception as exc: print( "Failure in client thread in test_connect_accepted_socket", exc) thread = threading.Thread(target=client, daemon=True) thread.start() conn, _ = lsock.accept() proto = MyProto(loop=loop) proto.loop = loop extras = {} if server_ssl: extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT) f = loop.create_task( loop.connect_accepted_socket( (lambda: proto), conn, ssl=server_ssl, **extras)) loop.run_forever() conn.close() lsock.close() thread.join(1) self.assertFalse(thread.is_alive()) self.assertEqual(proto.state, 'CLOSED') self.assertEqual(proto.nbytes, len(message)) self.assertEqual(response, expected_response) tr, _ = f.result() if server_ssl: self.assertIn('SSL', tr.__class__.__name__) tr.close() # let it close self.loop.run_until_complete(asyncio.sleep(0.1)) def test_start_tls_client_corrupted_ssl(self): self.loop.set_exception_handler(lambda loop, ctx: None) sslctx = test_utils.simple_server_sslcontext() client_sslctx = test_utils.simple_client_sslcontext() def server(sock): orig_sock = sock.dup() try: sock.starttls( sslctx, server_side=True) sock.sendall(b'A\n') sock.recv_all(1) orig_sock.send(b'please corrupt the SSL connection') except ssl.SSLError: pass finally: sock.close() orig_sock.close() async def client(addr): reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='') self.assertEqual(await reader.readline(), b'A\n') writer.write(b'B') with self.assertRaises(ssl.SSLError): await reader.readline() writer.close() try: await self.wait_closed(writer) except ssl.SSLError: pass return 'OK' with self.tcp_server(server, max_clients=1, backlog=1) as srv: res = self.loop.run_until_complete(client(srv.addr)) self.assertEqual(res, 'OK') def test_start_tls_client_reg_proto_1(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE server_context = test_utils.simple_server_sslcontext() client_context = test_utils.simple_client_sslcontext() def serve(sock): sock.settimeout(self.TIMEOUT) data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.starttls(server_context, server_side=True) sock.sendall(b'O') data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.unwrap() sock.close() class ClientProto(asyncio.Protocol): def __init__(self, on_data, on_eof): self.on_data = on_data self.on_eof = on_eof self.con_made_cnt = 0 def connection_made(proto, tr): proto.con_made_cnt += 1 # Ensure connection_made gets called only once. self.assertEqual(proto.con_made_cnt, 1) def data_received(self, data): self.on_data.set_result(data) def eof_received(self): self.on_eof.set_result(True) async def client(addr): await asyncio.sleep(0.5) on_data = self.loop.create_future() on_eof = self.loop.create_future() tr, proto = await self.loop.create_connection( lambda: ClientProto(on_data, on_eof), *addr) tr.write(HELLO_MSG) new_tr = await self.loop.start_tls(tr, proto, client_context) self.assertEqual(await on_data, b'O') new_tr.write(HELLO_MSG) await on_eof new_tr.close() with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( asyncio.wait_for(client(srv.addr), timeout=support.SHORT_TIMEOUT)) def test_create_connection_memory_leak(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE server_context = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY) client_context = self._create_client_ssl_context() def serve(sock): sock.settimeout(self.TIMEOUT) sock.starttls(server_context, server_side=True) sock.sendall(b'O') data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.unwrap() sock.close() class ClientProto(asyncio.Protocol): def __init__(self, on_data, on_eof): self.on_data = on_data self.on_eof = on_eof self.con_made_cnt = 0 def connection_made(proto, tr): # XXX: We assume user stores the transport in protocol proto.tr = tr proto.con_made_cnt += 1 # Ensure connection_made gets called only once. self.assertEqual(proto.con_made_cnt, 1) def data_received(self, data): self.on_data.set_result(data) def eof_received(self): self.on_eof.set_result(True) async def client(addr): await asyncio.sleep(0.5) on_data = self.loop.create_future() on_eof = self.loop.create_future() tr, proto = await self.loop.create_connection( lambda: ClientProto(on_data, on_eof), *addr, ssl=client_context) self.assertEqual(await on_data, b'O') tr.write(HELLO_MSG) await on_eof tr.close() with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( asyncio.wait_for(client(srv.addr), timeout=support.SHORT_TIMEOUT)) # No garbage is left for SSL client from loop.create_connection, even # if user stores the SSLTransport in corresponding protocol instance client_context = weakref.ref(client_context) self.assertIsNone(client_context()) def test_start_tls_client_buf_proto_1(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE server_context = test_utils.simple_server_sslcontext() client_context = test_utils.simple_client_sslcontext() client_con_made_calls = 0 def serve(sock): sock.settimeout(self.TIMEOUT) data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.starttls(server_context, server_side=True) sock.sendall(b'O') data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.sendall(b'2') data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.unwrap() sock.close() class ClientProtoFirst(asyncio.BufferedProtocol): def __init__(self, on_data): self.on_data = on_data self.buf = bytearray(1) def connection_made(self, tr): nonlocal client_con_made_calls client_con_made_calls += 1 def get_buffer(self, sizehint): return self.buf def buffer_updated(self, nsize): assert nsize == 1 self.on_data.set_result(bytes(self.buf[:nsize])) def eof_received(self): pass class ClientProtoSecond(asyncio.Protocol): def __init__(self, on_data, on_eof): self.on_data = on_data self.on_eof = on_eof self.con_made_cnt = 0 def connection_made(self, tr): nonlocal client_con_made_calls client_con_made_calls += 1 def data_received(self, data): self.on_data.set_result(data) def eof_received(self): self.on_eof.set_result(True) async def client(addr): await asyncio.sleep(0.5) on_data1 = self.loop.create_future() on_data2 = self.loop.create_future() on_eof = self.loop.create_future() tr, proto = await self.loop.create_connection( lambda: ClientProtoFirst(on_data1), *addr) tr.write(HELLO_MSG) new_tr = await self.loop.start_tls(tr, proto, client_context) self.assertEqual(await on_data1, b'O') new_tr.write(HELLO_MSG) new_tr.set_protocol(ClientProtoSecond(on_data2, on_eof)) self.assertEqual(await on_data2, b'2') new_tr.write(HELLO_MSG) await on_eof new_tr.close() # connection_made() should be called only once -- when # we establish connection for the first time. Start TLS # doesn't call connection_made() on application protocols. self.assertEqual(client_con_made_calls, 1) with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( asyncio.wait_for(client(srv.addr), timeout=self.TIMEOUT)) def test_start_tls_slow_client_cancel(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE client_context = test_utils.simple_client_sslcontext() server_waits_on_handshake = self.loop.create_future() def serve(sock): sock.settimeout(self.TIMEOUT) data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) try: self.loop.call_soon_threadsafe( server_waits_on_handshake.set_result, None) data = sock.recv_all(1024 * 1024) except ConnectionAbortedError: pass finally: sock.close() class ClientProto(asyncio.Protocol): def __init__(self, on_data, on_eof): self.on_data = on_data self.on_eof = on_eof self.con_made_cnt = 0 def connection_made(proto, tr): proto.con_made_cnt += 1 # Ensure connection_made gets called only once. self.assertEqual(proto.con_made_cnt, 1) def data_received(self, data): self.on_data.set_result(data) def eof_received(self): self.on_eof.set_result(True) async def client(addr): await asyncio.sleep(0.5) on_data = self.loop.create_future() on_eof = self.loop.create_future() tr, proto = await self.loop.create_connection( lambda: ClientProto(on_data, on_eof), *addr) tr.write(HELLO_MSG) await server_waits_on_handshake with self.assertRaises(asyncio.TimeoutError): await asyncio.wait_for( self.loop.start_tls(tr, proto, client_context), 0.5) with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( asyncio.wait_for(client(srv.addr), timeout=support.SHORT_TIMEOUT)) def test_start_tls_server_1(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE server_context = test_utils.simple_server_sslcontext() client_context = test_utils.simple_client_sslcontext() def client(sock, addr): sock.settimeout(self.TIMEOUT) sock.connect(addr) data = sock.recv_all(len(HELLO_MSG)) self.assertEqual(len(data), len(HELLO_MSG)) sock.starttls(client_context) sock.sendall(HELLO_MSG) sock.unwrap() sock.close() class ServerProto(asyncio.Protocol): def __init__(self, on_con, on_eof, on_con_lost): self.on_con = on_con self.on_eof = on_eof self.on_con_lost = on_con_lost self.data = b'' def connection_made(self, tr): self.on_con.set_result(tr) def data_received(self, data): self.data += data def eof_received(self): self.on_eof.set_result(1) def connection_lost(self, exc): if exc is None: self.on_con_lost.set_result(None) else: self.on_con_lost.set_exception(exc) async def main(proto, on_con, on_eof, on_con_lost): tr = await on_con tr.write(HELLO_MSG) self.assertEqual(proto.data, b'') new_tr = await self.loop.start_tls( tr, proto, server_context, server_side=True, ssl_handshake_timeout=self.TIMEOUT) await on_eof await on_con_lost self.assertEqual(proto.data, HELLO_MSG) new_tr.close() async def run_main(): on_con = self.loop.create_future() on_eof = self.loop.create_future() on_con_lost = self.loop.create_future() proto = ServerProto(on_con, on_eof, on_con_lost) server = await self.loop.create_server( lambda: proto, '127.0.0.1', 0) addr = server.sockets[0].getsockname() with self.tcp_client(lambda sock: client(sock, addr), timeout=self.TIMEOUT): await asyncio.wait_for( main(proto, on_con, on_eof, on_con_lost), timeout=self.TIMEOUT) server.close() await server.wait_closed() self.loop.run_until_complete(run_main()) def test_create_server_ssl_over_ssl(self): CNT = 0 # number of clients that were successful TOTAL_CNT = 25 # total number of clients that test will create TIMEOUT = support.LONG_TIMEOUT # timeout for this test A_DATA = b'A' * 1024 * BUF_MULTIPLIER B_DATA = b'B' * 1024 * BUF_MULTIPLIER sslctx_1 = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY) client_sslctx_1 = self._create_client_ssl_context() sslctx_2 = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY) client_sslctx_2 = self._create_client_ssl_context() clients = [] async def handle_client(reader, writer): nonlocal CNT data = await reader.readexactly(len(A_DATA)) self.assertEqual(data, A_DATA) writer.write(b'OK') data = await reader.readexactly(len(B_DATA)) self.assertEqual(data, B_DATA) writer.writelines([b'SP', bytearray(b'A'), memoryview(b'M')]) await writer.drain() writer.close() CNT += 1 class ServerProtocol(asyncio.StreamReaderProtocol): def connection_made(self, transport): super_ = super() transport.pause_reading() fut = self._loop.create_task(self._loop.start_tls( transport, self, sslctx_2, server_side=True)) def cb(_): try: tr = fut.result() except Exception as ex: super_.connection_lost(ex) else: super_.connection_made(tr) fut.add_done_callback(cb) def server_protocol_factory(): reader = asyncio.StreamReader() protocol = ServerProtocol(reader, handle_client) return protocol async def test_client(addr): fut = asyncio.Future() def prog(sock): try: sock.connect(addr) sock.starttls(client_sslctx_1) # because wrap_socket() doesn't work correctly on # SSLSocket, we have to do the 2nd level SSL manually incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() sslobj = client_sslctx_2.wrap_bio(incoming, outgoing) def do(func, *args): while True: try: rv = func(*args) break except ssl.SSLWantReadError: if outgoing.pending: sock.send(outgoing.read()) incoming.write(sock.recv(65536)) if outgoing.pending: sock.send(outgoing.read()) return rv do(sslobj.do_handshake) do(sslobj.write, A_DATA) data = do(sslobj.read, 2) self.assertEqual(data, b'OK') do(sslobj.write, B_DATA) data = b'' while True: chunk = do(sslobj.read, 4) if not chunk: break data += chunk self.assertEqual(data, b'SPAM') do(sslobj.unwrap) sock.close() except Exception as ex: self.loop.call_soon_threadsafe(fut.set_exception, ex) sock.close() else: self.loop.call_soon_threadsafe(fut.set_result, None) client = self.tcp_client(prog) client.start() clients.append(client) await fut async def start_server(): extras = {} srv = await self.loop.create_server( server_protocol_factory, '127.0.0.1', 0, family=socket.AF_INET, ssl=sslctx_1, **extras) try: srv_socks = srv.sockets self.assertTrue(srv_socks) addr = srv_socks[0].getsockname() tasks = [] for _ in range(TOTAL_CNT): tasks.append(test_client(addr)) await asyncio.wait_for(asyncio.gather(*tasks), TIMEOUT) finally: self.loop.call_soon(srv.close) await srv.wait_closed() with self._silence_eof_received_warning(): self.loop.run_until_complete(start_server()) self.assertEqual(CNT, TOTAL_CNT) for client in clients: client.stop() def test_shutdown_cleanly(self): CNT = 0 TOTAL_CNT = 25 A_DATA = b'A' * 1024 * BUF_MULTIPLIER sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY) client_sslctx = self._create_client_ssl_context() def server(sock): sock.starttls( sslctx, server_side=True) data = sock.recv_all(len(A_DATA)) self.assertEqual(data, A_DATA) sock.send(b'OK') sock.unwrap() sock.close() async def client(addr): extras = {} extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT) reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='', **extras) writer.write(A_DATA) self.assertEqual(await reader.readexactly(2), b'OK') self.assertEqual(await reader.read(), b'') nonlocal CNT CNT += 1 writer.close() await self.wait_closed(writer) def run(coro): nonlocal CNT CNT = 0 async def _gather(*tasks): return await asyncio.gather(*tasks) with self.tcp_server(server, max_clients=TOTAL_CNT, backlog=TOTAL_CNT) as srv: tasks = [] for _ in range(TOTAL_CNT): tasks.append(coro(srv.addr)) self.loop.run_until_complete( _gather(*tasks)) self.assertEqual(CNT, TOTAL_CNT) with self._silence_eof_received_warning(): run(client) def test_flush_before_shutdown(self): CHUNK = 1024 * 128 SIZE = 32 sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY) client_sslctx = self._create_client_ssl_context() future = None def server(sock): sock.starttls(sslctx, server_side=True) self.assertEqual(sock.recv_all(4), b'ping') sock.send(b'pong') time.sleep(0.5) # hopefully stuck the TCP buffer data = sock.recv_all(CHUNK * SIZE) self.assertEqual(len(data), CHUNK * SIZE) sock.close() def run(meth): def wrapper(sock): try: meth(sock) except Exception as ex: self.loop.call_soon_threadsafe(future.set_exception, ex) else: self.loop.call_soon_threadsafe(future.set_result, None) return wrapper async def client(addr): nonlocal future future = self.loop.create_future() reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='') sslprotocol = writer.transport._ssl_protocol writer.write(b'ping') data = await reader.readexactly(4) self.assertEqual(data, b'pong') sslprotocol.pause_writing() for _ in range(SIZE): writer.write(b'x' * CHUNK) writer.close() sslprotocol.resume_writing() await self.wait_closed(writer) try: data = await reader.read() self.assertEqual(data, b'') except ConnectionResetError: pass await future with self.tcp_server(run(server)) as srv: self.loop.run_until_complete(client(srv.addr)) def test_remote_shutdown_receives_trailing_data(self): CHUNK = 1024 * 128 SIZE = 32 sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY ) client_sslctx = self._create_client_ssl_context() future = None def server(sock): incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() sslobj = sslctx.wrap_bio(incoming, outgoing, server_side=True) while True: try: sslobj.do_handshake() except ssl.SSLWantReadError: if outgoing.pending: sock.send(outgoing.read()) incoming.write(sock.recv(16384)) else: if outgoing.pending: sock.send(outgoing.read()) break while True: try: data = sslobj.read(4) except ssl.SSLWantReadError: incoming.write(sock.recv(16384)) else: break self.assertEqual(data, b'ping') sslobj.write(b'pong') sock.send(outgoing.read()) time.sleep(0.2) # wait for the peer to fill its backlog # send close_notify but don't wait for response with self.assertRaises(ssl.SSLWantReadError): sslobj.unwrap() sock.send(outgoing.read()) # should receive all data data_len = 0 while True: try: chunk = len(sslobj.read(16384)) data_len += chunk except ssl.SSLWantReadError: incoming.write(sock.recv(16384)) except ssl.SSLZeroReturnError: break self.assertEqual(data_len, CHUNK * SIZE) # verify that close_notify is received sslobj.unwrap() sock.close() def eof_server(sock): sock.starttls(sslctx, server_side=True) self.assertEqual(sock.recv_all(4), b'ping') sock.send(b'pong') time.sleep(0.2) # wait for the peer to fill its backlog # send EOF sock.shutdown(socket.SHUT_WR) # should receive all data data = sock.recv_all(CHUNK * SIZE) self.assertEqual(len(data), CHUNK * SIZE) sock.close() async def client(addr): nonlocal future future = self.loop.create_future() reader, writer = await asyncio.open_connection( *addr, ssl=client_sslctx, server_hostname='') writer.write(b'ping') data = await reader.readexactly(4) self.assertEqual(data, b'pong') # fill write backlog in a hacky way - renegotiation won't help for _ in range(SIZE): writer.transport._test__append_write_backlog(b'x' * CHUNK) try: data = await reader.read() self.assertEqual(data, b'') except (BrokenPipeError, ConnectionResetError): pass await future writer.close() await self.wait_closed(writer) def run(meth): def wrapper(sock): try: meth(sock) except Exception as ex: self.loop.call_soon_threadsafe(future.set_exception, ex) else: self.loop.call_soon_threadsafe(future.set_result, None) return wrapper with self.tcp_server(run(server)) as srv: self.loop.run_until_complete(client(srv.addr)) with self.tcp_server(run(eof_server)) as srv: self.loop.run_until_complete(client(srv.addr)) def test_connect_timeout_warning(self): s = socket.socket(socket.AF_INET) s.bind(('127.0.0.1', 0)) addr = s.getsockname() async def test(): try: await asyncio.wait_for( self.loop.create_connection(asyncio.Protocol, *addr, ssl=True), 0.1) except (ConnectionRefusedError, asyncio.TimeoutError): pass else: self.fail('TimeoutError is not raised') with s: try: with self.assertWarns(ResourceWarning) as cm: self.loop.run_until_complete(test()) gc.collect() gc.collect() gc.collect() except AssertionError as e: self.assertEqual(str(e), 'ResourceWarning not triggered') else: self.fail('Unexpected ResourceWarning: {}'.format(cm.warning)) def test_handshake_timeout_handler_leak(self): s = socket.socket(socket.AF_INET) s.bind(('127.0.0.1', 0)) s.listen(1) addr = s.getsockname() async def test(ctx): try: await asyncio.wait_for( self.loop.create_connection(asyncio.Protocol, *addr, ssl=ctx), 0.1) except (ConnectionRefusedError, asyncio.TimeoutError): pass else: self.fail('TimeoutError is not raised') with s: ctx = ssl.create_default_context() self.loop.run_until_complete(test(ctx)) ctx = weakref.ref(ctx) # SSLProtocol should be DECREF to 0 self.assertIsNone(ctx()) def test_shutdown_timeout_handler_leak(self): loop = self.loop def server(sock): sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY ) sock = sslctx.wrap_socket(sock, server_side=True) sock.recv(32) sock.close() class Protocol(asyncio.Protocol): def __init__(self): self.fut = asyncio.Future(loop=loop) def connection_lost(self, exc): self.fut.set_result(None) async def client(addr, ctx): tr, pr = await loop.create_connection(Protocol, *addr, ssl=ctx) tr.close() await pr.fut with self.tcp_server(server) as srv: ctx = self._create_client_ssl_context() loop.run_until_complete(client(srv.addr, ctx)) ctx = weakref.ref(ctx) # asyncio has no shutdown timeout, but it ends up with a circular # reference loop - not ideal (introduces gc glitches), but at least # not leaking gc.collect() gc.collect() gc.collect() # SSLProtocol should be DECREF to 0 self.assertIsNone(ctx()) def test_shutdown_timeout_handler_not_set(self): loop = self.loop eof = asyncio.Event() extra = None def server(sock): sslctx = self._create_server_ssl_context( test_utils.ONLYCERT, test_utils.ONLYKEY ) sock = sslctx.wrap_socket(sock, server_side=True) sock.send(b'hello') assert sock.recv(1024) == b'world' sock.send(b'extra bytes') # sending EOF here sock.shutdown(socket.SHUT_WR) loop.call_soon_threadsafe(eof.set) # make sure we have enough time to reproduce the issue assert sock.recv(1024) == b'' sock.close() class Protocol(asyncio.Protocol): def __init__(self): self.fut = asyncio.Future(loop=loop) self.transport = None def connection_made(self, transport): self.transport = transport def data_received(self, data): if data == b'hello': self.transport.write(b'world') # pause reading would make incoming data stay in the sslobj self.transport.pause_reading() else: nonlocal extra extra = data def connection_lost(self, exc): if exc is None: self.fut.set_result(None) else: self.fut.set_exception(exc) async def client(addr): ctx = self._create_client_ssl_context() tr, pr = await loop.create_connection(Protocol, *addr, ssl=ctx) await eof.wait() tr.resume_reading() await pr.fut tr.close() assert extra == b'extra bytes' with self.tcp_server(server) as srv: loop.run_until_complete(client(srv.addr)) ############################################################################### # Socket Testing Utilities ############################################################################### class TestSocketWrapper: def __init__(self, sock): self.__sock = sock def recv_all(self, n): buf = b'' while len(buf) < n: data = self.recv(n - len(buf)) if data == b'': raise ConnectionAbortedError buf += data return buf def starttls(self, ssl_context, *, server_side=False, server_hostname=None, do_handshake_on_connect=True): assert isinstance(ssl_context, ssl.SSLContext) ssl_sock = ssl_context.wrap_socket( self.__sock, server_side=server_side, server_hostname=server_hostname, do_handshake_on_connect=do_handshake_on_connect) if server_side: ssl_sock.do_handshake() self.__sock.close() self.__sock = ssl_sock def __getattr__(self, name): return getattr(self.__sock, name) def __repr__(self): return '<{} {!r}>'.format(type(self).__name__, self.__sock) class SocketThread(threading.Thread): def stop(self): self._active = False self.join() def __enter__(self): self.start() return self def __exit__(self, *exc): self.stop() class TestThreadedClient(SocketThread): def __init__(self, test, sock, prog, timeout): threading.Thread.__init__(self, None, None, 'test-client') self.daemon = True self._timeout = timeout self._sock = sock self._active = True self._prog = prog self._test = test def run(self): try: self._prog(TestSocketWrapper(self._sock)) except (KeyboardInterrupt, SystemExit): raise except BaseException as ex: self._test._abort_socket_test(ex) class TestThreadedServer(SocketThread): def __init__(self, test, sock, prog, timeout, max_clients): threading.Thread.__init__(self, None, None, 'test-server') self.daemon = True self._clients = 0 self._finished_clients = 0 self._max_clients = max_clients self._timeout = timeout self._sock = sock self._active = True self._prog = prog self._s1, self._s2 = socket.socketpair() self._s1.setblocking(False) self._test = test def stop(self): try: if self._s2 and self._s2.fileno() != -1: try: self._s2.send(b'stop') except OSError: pass finally: super().stop() def run(self): try: with self._sock: self._sock.setblocking(False) self._run() finally: self._s1.close() self._s2.close() def _run(self): while self._active: if self._clients >= self._max_clients: return r, w, x = select.select( [self._sock, self._s1], [], [], self._timeout) if self._s1 in r: return if self._sock in r: try: conn, addr = self._sock.accept() except BlockingIOError: continue except socket.timeout: if not self._active: return else: raise else: self._clients += 1 conn.settimeout(self._timeout) try: with conn: self._handle_client(conn) except (KeyboardInterrupt, SystemExit): raise except BaseException as ex: self._active = False try: raise finally: self._test._abort_socket_test(ex) def _handle_client(self, sock): self._prog(TestSocketWrapper(sock)) @property def addr(self): return self._sock.getsockname()