gh-114914: Avoid keeping dead StreamWriter alive (#115661)

In some cases we might cause a StreamWriter to stay alive even when the
application has dropped all references to it. This prevents us from
doing automatical cleanup, and complaining that the StreamWriter wasn't
properly closed.

Fortunately, the extra reference was never actually used for anything so
we can just drop it.
This commit is contained in:
Pierre Ossman (ThinLinc team) 2024-02-28 02:27:44 +01:00 committed by GitHub
parent 686ec17f50
commit a355f60b03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 10 deletions

View File

@ -201,7 +201,6 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
# is established.
self._strong_reader = stream_reader
self._reject_connection = False
self._stream_writer = None
self._task = None
self._transport = None
self._client_connected_cb = client_connected_cb
@ -214,10 +213,8 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
return None
return self._stream_reader_wr()
def _replace_writer(self, writer):
def _replace_transport(self, transport):
loop = self._loop
transport = writer.transport
self._stream_writer = writer
self._transport = transport
self._over_ssl = transport.get_extra_info('sslcontext') is not None
@ -239,11 +236,8 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
reader.set_transport(transport)
self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
self._stream_writer = StreamWriter(transport, self,
reader,
self._loop)
res = self._client_connected_cb(reader,
self._stream_writer)
writer = StreamWriter(transport, self, reader, self._loop)
res = self._client_connected_cb(reader, writer)
if coroutines.iscoroutine(res):
def callback(task):
if task.cancelled():
@ -405,7 +399,7 @@ class StreamWriter:
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
self._transport = new_transport
protocol._replace_writer(self)
protocol._replace_transport(new_transport)
def __del__(self, warnings=warnings):
if not self._transport.is_closing():

View File

@ -1130,6 +1130,31 @@ os.close(fd)
self.assertEqual(messages, [])
def test_unclosed_server_resource_warnings(self):
async def inner(rd, wr):
fut.set_result(True)
with self.assertWarns(ResourceWarning) as cm:
del wr
gc.collect()
self.assertEqual(len(cm.warnings), 1)
self.assertTrue(str(cm.warnings[0].message).startswith("unclosed <StreamWriter"))
async def outer():
srv = await asyncio.start_server(inner, socket_helper.HOSTv4, 0)
async with srv:
addr = srv.sockets[0].getsockname()
with socket.create_connection(addr):
# Give the loop some time to notice the connection
await fut
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
fut = self.loop.create_future()
self.loop.run_until_complete(outer())
self.assertEqual(messages, [])
def _basetest_unhandled_exceptions(self, handle_echo):
port = socket_helper.find_unused_port()

View File

@ -0,0 +1,2 @@
Fix an issue where an abandoned :class:`StreamWriter` would not be garbage
collected.