Issue #20455: asyncio: write a new write pipe transport class for proactor (on
Windows) instead of using the "duplex" pipe transport. The new class uses a simpler overlapped read to be notified when the pipe is closed. So the protocol doesn't need to implement eof_received(): connection_lost() is called instead. _UnixWritePipeTransport has the same approach.
This commit is contained in:
parent
61b3c9bacc
commit
b60e9ca69d
|
@ -205,7 +205,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
|
|||
self.close()
|
||||
|
||||
|
||||
class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
|
||||
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
|
||||
transports.WriteTransport):
|
||||
"""Transport for write pipes."""
|
||||
|
||||
|
@ -286,8 +286,27 @@ class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
|
|||
self._force_close(None)
|
||||
|
||||
|
||||
class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
|
||||
def __init__(self, *args, **kw):
|
||||
super().__init__(*args, **kw)
|
||||
self._read_fut = self._loop._proactor.recv(self._sock, 16)
|
||||
self._read_fut.add_done_callback(self._pipe_closed)
|
||||
|
||||
def _pipe_closed(self, fut):
|
||||
if fut.cancelled():
|
||||
# the transport has been closed
|
||||
return
|
||||
assert fut is self._read_fut, (fut, self._read_fut)
|
||||
self._read_fut = None
|
||||
assert fut.result() == b''
|
||||
if self._write_fut is not None:
|
||||
self._force_close(exc)
|
||||
else:
|
||||
self.close()
|
||||
|
||||
|
||||
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
|
||||
_ProactorWritePipeTransport,
|
||||
_ProactorBaseWritePipeTransport,
|
||||
transports.Transport):
|
||||
"""Transport for duplex pipes."""
|
||||
|
||||
|
@ -299,7 +318,7 @@ class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
|
|||
|
||||
|
||||
class _ProactorSocketTransport(_ProactorReadPipeTransport,
|
||||
_ProactorWritePipeTransport,
|
||||
_ProactorBaseWritePipeTransport,
|
||||
transports.Transport):
|
||||
"""Transport for connected sockets."""
|
||||
|
||||
|
@ -353,15 +372,10 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
|
||||
|
||||
def _make_write_pipe_transport(self, sock, protocol, waiter=None,
|
||||
extra=None, check_for_hangup=True):
|
||||
if check_for_hangup:
|
||||
# We want connection_lost() to be called when other end closes
|
||||
return _ProactorDuplexPipeTransport(self,
|
||||
sock, protocol, waiter, extra)
|
||||
else:
|
||||
# If other end closes we may not notice for a long time
|
||||
return _ProactorWritePipeTransport(self, sock, protocol, waiter,
|
||||
extra)
|
||||
extra=None):
|
||||
# We want connection_lost() to be called when other end closes
|
||||
return _ProactorWritePipeTransport(self,
|
||||
sock, protocol, waiter, extra)
|
||||
|
||||
def close(self):
|
||||
if self._proactor is not None:
|
||||
|
|
Loading…
Reference in New Issue