Cleanup properly proactor event loop
* store the "self reading" future when the "self pipe" is closed (when the event loop is closed) * store "accept" futures to cancel them when we stop serving * close the "accept socket" if the "accept future" is cancelled Fix many warnings which can be seen when unit tests are run in verbose mode.
This commit is contained in:
parent
c8935fe860
commit
7de2646cdf
|
@ -330,6 +330,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
logger.debug('Using proactor: %s', proactor.__class__.__name__)
|
||||
self._proactor = proactor
|
||||
self._selector = proactor # convenient alias
|
||||
self._self_reading_future = None
|
||||
self._accept_futures = {} # socket file descriptor => Future
|
||||
proactor.set_loop(self)
|
||||
self._make_self_pipe()
|
||||
|
||||
|
@ -365,6 +367,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
self._proactor = None
|
||||
self._selector = None
|
||||
super().close()
|
||||
self._accept_futures.clear()
|
||||
|
||||
def sock_recv(self, sock, n):
|
||||
return self._proactor.recv(sock, n)
|
||||
|
@ -382,6 +385,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
raise NotImplementedError
|
||||
|
||||
def _close_self_pipe(self):
|
||||
if self._self_reading_future is not None:
|
||||
self._self_reading_future.cancel()
|
||||
self._self_reading_future = None
|
||||
self._ssock.close()
|
||||
self._ssock = None
|
||||
self._csock.close()
|
||||
|
@ -405,6 +411,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
self.close()
|
||||
raise
|
||||
else:
|
||||
self._self_reading_future = f
|
||||
f.add_done_callback(self._loop_self_reading)
|
||||
|
||||
def _write_to_self(self):
|
||||
|
@ -430,6 +437,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
except futures.CancelledError:
|
||||
sock.close()
|
||||
else:
|
||||
self._accept_futures[sock.fileno()] = f
|
||||
f.add_done_callback(loop)
|
||||
|
||||
self.call_soon(loop)
|
||||
|
@ -438,5 +446,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
pass # XXX hard work currently done in poll
|
||||
|
||||
def _stop_serving(self, sock):
|
||||
for future in self._accept_futures.values():
|
||||
future.cancel()
|
||||
self._proactor._stop_serving(sock)
|
||||
sock.close()
|
||||
|
|
|
@ -168,9 +168,6 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
|||
self.call_soon(loop)
|
||||
return [server]
|
||||
|
||||
def _stop_serving(self, server):
|
||||
server.close()
|
||||
|
||||
@tasks.coroutine
|
||||
def _make_subprocess_transport(self, protocol, args, shell,
|
||||
stdin, stdout, stderr, bufsize,
|
||||
|
@ -260,7 +257,19 @@ class IocpProactor:
|
|||
conn.settimeout(listener.gettimeout())
|
||||
return conn, conn.getpeername()
|
||||
|
||||
return self._register(ov, listener, finish_accept)
|
||||
@tasks.coroutine
|
||||
def accept_coro(future, conn):
|
||||
# Coroutine closing the accept socket if the future is cancelled
|
||||
try:
|
||||
yield from future
|
||||
except futures.CancelledError:
|
||||
conn.close()
|
||||
raise
|
||||
|
||||
future = self._register(ov, listener, finish_accept)
|
||||
coro = accept_coro(future, conn)
|
||||
tasks.async(coro, loop=self._loop)
|
||||
return future
|
||||
|
||||
def connect(self, conn, address):
|
||||
self._register_with_iocp(conn)
|
||||
|
|
Loading…
Reference in New Issue