gh-113538: Revert "gh-113538: Add asycio.Server.{close,abort}_clients (#114432)" (#116632)

Revert "gh-113538: Add asycio.Server.{close,abort}_clients (#114432)"

Reason: The new test doesn't always pass:
https://github.com/python/cpython/pull/116423#issuecomment-1989425489

This reverts commit 1d0d49a7e8.
This commit is contained in:
Guido van Rossum 2024-03-11 17:31:49 -07:00 committed by GitHub
parent 2b67fc57f6
commit ba13215eb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 20 additions and 152 deletions

View File

@ -1641,31 +1641,6 @@ Do not instantiate the :class:`Server` class directly.
coroutine to wait until the server is closed (and no more coroutine to wait until the server is closed (and no more
connections are active). connections are active).
.. method:: close_clients()
Close all existing incoming client connections.
Calls :meth:`~asyncio.BaseTransport.close` on all associated
transports.
:meth:`close` should be called before :meth:`close_clients` when
closing the server to avoid races with new clients connecting.
.. versionadded:: 3.13
.. method:: abort_clients()
Close all existing incoming client connections immediately,
without waiting for pending operations to complete.
Calls :meth:`~asyncio.WriteTransport.abort` on all associated
transports.
:meth:`close` should be called before :meth:`abort_clients` when
closing the server to avoid races with new clients connecting.
.. versionadded:: 3.13
.. method:: get_loop() .. method:: get_loop()
Return the event loop associated with the server object. Return the event loop associated with the server object.

View File

@ -270,11 +270,6 @@ asyncio
the buffer size. the buffer size.
(Contributed by Jamie Phan in :gh:`115199`.) (Contributed by Jamie Phan in :gh:`115199`.)
* Add :meth:`asyncio.Server.close_clients` and
:meth:`asyncio.Server.abort_clients` methods which allow to more
forcefully close an asyncio server.
(Contributed by Pierre Ossman in :gh:`113538`.)
base64 base64
--- ---

View File

@ -279,9 +279,7 @@ class Server(events.AbstractServer):
ssl_handshake_timeout, ssl_shutdown_timeout=None): ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop self._loop = loop
self._sockets = sockets self._sockets = sockets
# Weak references so we don't break Transport's ability to self._active_count = 0
# detect abandoned transports
self._clients = weakref.WeakSet()
self._waiters = [] self._waiters = []
self._protocol_factory = protocol_factory self._protocol_factory = protocol_factory
self._backlog = backlog self._backlog = backlog
@ -294,13 +292,14 @@ class Server(events.AbstractServer):
def __repr__(self): def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>' return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
def _attach(self, transport): def _attach(self):
assert self._sockets is not None assert self._sockets is not None
self._clients.add(transport) self._active_count += 1
def _detach(self, transport): def _detach(self):
self._clients.discard(transport) assert self._active_count > 0
if len(self._clients) == 0 and self._sockets is None: self._active_count -= 1
if self._active_count == 0 and self._sockets is None:
self._wakeup() self._wakeup()
def _wakeup(self): def _wakeup(self):
@ -349,17 +348,9 @@ class Server(events.AbstractServer):
self._serving_forever_fut.cancel() self._serving_forever_fut.cancel()
self._serving_forever_fut = None self._serving_forever_fut = None
if len(self._clients) == 0: if self._active_count == 0:
self._wakeup() self._wakeup()
def close_clients(self):
for transport in self._clients.copy():
transport.close()
def abort_clients(self):
for transport in self._clients.copy():
transport.abort()
async def start_serving(self): async def start_serving(self):
self._start_serving() self._start_serving()
# Skip one loop iteration so that all 'loop.add_reader' # Skip one loop iteration so that all 'loop.add_reader'

View File

@ -175,14 +175,6 @@ class AbstractServer:
"""Stop serving. This leaves existing connections open.""" """Stop serving. This leaves existing connections open."""
raise NotImplementedError raise NotImplementedError
def close_clients(self):
"""Close all active connections."""
raise NotImplementedError
def abort_clients(self):
"""Close all active connections immediately."""
raise NotImplementedError
def get_loop(self): def get_loop(self):
"""Get the event loop the Server object is attached to.""" """Get the event loop the Server object is attached to."""
raise NotImplementedError raise NotImplementedError

View File

@ -63,7 +63,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._called_connection_lost = False self._called_connection_lost = False
self._eof_written = False self._eof_written = False
if self._server is not None: if self._server is not None:
self._server._attach(self) self._server._attach()
self._loop.call_soon(self._protocol.connection_made, self) self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None: if waiter is not None:
# only wake up the waiter when connection_made() has been called # only wake up the waiter when connection_made() has been called
@ -167,7 +167,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._sock = None self._sock = None
server = self._server server = self._server
if server is not None: if server is not None:
server._detach(self) server._detach()
self._server = None self._server = None
self._called_connection_lost = True self._called_connection_lost = True

View File

@ -791,7 +791,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._paused = False # Set when pause_reading() called self._paused = False # Set when pause_reading() called
if self._server is not None: if self._server is not None:
self._server._attach(self) self._server._attach()
loop._transports[self._sock_fd] = self loop._transports[self._sock_fd] = self
def __repr__(self): def __repr__(self):
@ -868,8 +868,6 @@ class _SelectorTransport(transports._FlowControlMixin,
if self._sock is not None: if self._sock is not None:
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self) _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self._sock.close() self._sock.close()
if self._server is not None:
self._server._detach(self)
def _fatal_error(self, exc, message='Fatal error on transport'): def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only. # Should be called from exception handler only.
@ -908,7 +906,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._loop = None self._loop = None
server = self._server server = self._server
if server is not None: if server is not None:
server._detach(self) server._detach()
self._server = None self._server = None
def get_write_buffer_size(self): def get_write_buffer_size(self):

View File

@ -125,12 +125,8 @@ class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
class TestServer2(unittest.IsolatedAsyncioTestCase): class TestServer2(unittest.IsolatedAsyncioTestCase):
async def test_wait_closed_basic(self): async def test_wait_closed_basic(self):
async def serve(rd, wr): async def serve(*args):
try: pass
await rd.read()
finally:
wr.close()
await wr.wait_closed()
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close) self.addCleanup(srv.close)
@ -141,8 +137,7 @@ class TestServer2(unittest.IsolatedAsyncioTestCase):
self.assertFalse(task1.done()) self.assertFalse(task1.done())
# active count != 0, not closed: should block # active count != 0, not closed: should block
addr = srv.sockets[0].getsockname() srv._attach()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
task2 = asyncio.create_task(srv.wait_closed()) task2 = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0) await asyncio.sleep(0)
self.assertFalse(task1.done()) self.assertFalse(task1.done())
@ -157,8 +152,7 @@ class TestServer2(unittest.IsolatedAsyncioTestCase):
self.assertFalse(task2.done()) self.assertFalse(task2.done())
self.assertFalse(task3.done()) self.assertFalse(task3.done())
wr.close() srv._detach()
await wr.wait_closed()
# active count == 0, closed: should unblock # active count == 0, closed: should unblock
await task1 await task1
await task2 await task2
@ -167,12 +161,8 @@ class TestServer2(unittest.IsolatedAsyncioTestCase):
async def test_wait_closed_race(self): async def test_wait_closed_race(self):
# Test a regression in 3.12.0, should be fixed in 3.12.1 # Test a regression in 3.12.0, should be fixed in 3.12.1
async def serve(rd, wr): async def serve(*args):
try: pass
await rd.read()
finally:
wr.close()
await wr.wait_closed()
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close) self.addCleanup(srv.close)
@ -180,83 +170,13 @@ class TestServer2(unittest.IsolatedAsyncioTestCase):
task = asyncio.create_task(srv.wait_closed()) task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0) await asyncio.sleep(0)
self.assertFalse(task.done()) self.assertFalse(task.done())
addr = srv.sockets[0].getsockname() srv._attach()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
loop.call_soon(srv.close) loop.call_soon(srv.close)
loop.call_soon(wr.close) loop.call_soon(srv._detach)
await srv.wait_closed() await srv.wait_closed()
async def test_close_clients(self):
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)
addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
self.addCleanup(wr.close)
task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())
srv.close()
srv.close_clients()
await asyncio.sleep(0)
await asyncio.sleep(0)
self.assertTrue(task.done())
async def test_abort_clients(self):
async def serve(rd, wr):
nonlocal s_rd, s_wr
s_rd = rd
s_wr = wr
await wr.wait_closed()
s_rd = s_wr = None
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)
addr = srv.sockets[0].getsockname()
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1], limit=4096)
self.addCleanup(c_wr.close)
# Limit the socket buffers so we can reliably overfill them
s_sock = s_wr.get_extra_info('socket')
s_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)
c_sock = c_wr.get_extra_info('socket')
c_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# Get the reader in to a paused state by sending more than twice
# the configured limit
s_wr.write(b'a' * 4096)
s_wr.write(b'a' * 4096)
s_wr.write(b'a' * 4096)
while c_wr.transport.is_reading():
await asyncio.sleep(0)
# Get the writer in a waiting state by sending data until the
# socket buffers are full on both server and client sockets and
# the kernel stops accepting more data
s_wr.write(b'a' * c_sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
s_wr.write(b'a' * s_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF))
self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0)
task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())
srv.close()
srv.abort_clients()
await asyncio.sleep(0)
await asyncio.sleep(0)
self.assertTrue(task.done())
# Test the various corner cases of Unix server socket removal # Test the various corner cases of Unix server socket removal

View File

@ -1,3 +0,0 @@
Add :meth:`asyncio.Server.close_clients` and
:meth:`asyncio.Server.abort_clients` methods which allow to more forcefully
close an asyncio server.