mirror of https://github.com/python/cpython
(Merge 3.4) asyncio: sync with Tulip
* PipeServer.close() now cancels the "accept pipe" future which cancels the overlapped operation. * Fix _SelectorTransport.__repr__() if the transport was closed * Fix debug log in BaseEventLoop.create_connection(): get the socket object from the transport because SSL transport closes the old socket and creates a new SSL socket object. Remove also the _SelectorSslTransport._rawsock attribute: it contained the closed socket (not very useful) and it was not used. * Issue #22063: socket operations (sock_recv, sock_sendall, sock_connect, sock_accept) of the proactor event loop don't raise an exception in debug mode if the socket are in blocking mode. Overlapped operations also work on blocking sockets. * Fix unit tests in debug mode: mock a non-blocking socket for socket operations which now raise an exception if the socket is blocking. * _fatal_error() method of _UnixReadPipeTransport and _UnixWritePipeTransport now log all exceptions in debug mode * Don't log expected errors in unit tests * Tulip issue 200: _WaitHandleFuture._unregister_wait() now catchs and logs exceptions. * Tulip issue 200: Log errors in debug mode instead of simply ignoring them.
This commit is contained in:
commit
83b9ea4942
|
@ -578,6 +578,9 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
transport, protocol = yield from self._create_connection_transport(
|
||||
sock, protocol_factory, ssl, server_hostname)
|
||||
if self._debug:
|
||||
# Get the socket from the transport because SSL transport closes
|
||||
# the old socket and creates a new SSL socket
|
||||
sock = transport.get_extra_info('socket')
|
||||
logger.debug("%r connected to %s:%r: (%r, %r)",
|
||||
sock, host, port, transport, protocol)
|
||||
return transport, protocol
|
||||
|
@ -725,6 +728,10 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
sock = socket.socket(af, socktype, proto)
|
||||
except socket.error:
|
||||
# Assume it's a bad family/type/protocol combination.
|
||||
if self._debug:
|
||||
logger.warning('create_server() failed to create '
|
||||
'socket.socket(%r, %r, %r)',
|
||||
af, socktype, proto, exc_info=True)
|
||||
continue
|
||||
sockets.append(sock)
|
||||
if reuse_address:
|
||||
|
|
|
@ -172,6 +172,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
|
|||
except ConnectionAbortedError as exc:
|
||||
if not self._closing:
|
||||
self._fatal_error(exc, 'Fatal read error on pipe transport')
|
||||
elif self._loop.get_debug():
|
||||
logger.debug("Read error on pipe transport while closing",
|
||||
exc_info=True)
|
||||
except ConnectionResetError as exc:
|
||||
self._force_close(exc)
|
||||
except OSError as exc:
|
||||
|
@ -324,12 +327,16 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport,
|
|||
try:
|
||||
self._extra['sockname'] = sock.getsockname()
|
||||
except (socket.error, AttributeError):
|
||||
pass
|
||||
if self._loop.get_debug():
|
||||
logger.warning("getsockname() failed on %r",
|
||||
sock, exc_info=True)
|
||||
if 'peername' not in self._extra:
|
||||
try:
|
||||
self._extra['peername'] = sock.getpeername()
|
||||
except (socket.error, AttributeError):
|
||||
pass
|
||||
if self._loop.get_debug():
|
||||
logger.warning("getpeername() failed on %r",
|
||||
sock, exc_info=True)
|
||||
|
||||
def can_write_eof(self):
|
||||
return True
|
||||
|
@ -385,18 +392,12 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
self._selector = None
|
||||
|
||||
def sock_recv(self, sock, n):
|
||||
if self.get_debug() and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
return self._proactor.recv(sock, n)
|
||||
|
||||
def sock_sendall(self, sock, data):
|
||||
if self.get_debug() and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
return self._proactor.send(sock, data)
|
||||
|
||||
def sock_connect(self, sock, address):
|
||||
if self.get_debug() and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
try:
|
||||
base_events._check_resolved_address(sock, address)
|
||||
except ValueError as err:
|
||||
|
@ -407,8 +408,6 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
return self._proactor.connect(sock, address)
|
||||
|
||||
def sock_accept(self, sock):
|
||||
if self.get_debug() and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
return self._proactor.accept(sock)
|
||||
|
||||
def _socketpair(self):
|
||||
|
@ -470,11 +469,14 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
except OSError as exc:
|
||||
if sock.fileno() != -1:
|
||||
self.call_exception_handler({
|
||||
'message': 'Accept failed',
|
||||
'message': 'Accept failed on a socket',
|
||||
'exception': exc,
|
||||
'socket': sock,
|
||||
})
|
||||
sock.close()
|
||||
elif self._debug:
|
||||
logger.debug("Accept failed on socket %r",
|
||||
sock, exc_info=True)
|
||||
except futures.CancelledError:
|
||||
sock.close()
|
||||
else:
|
||||
|
|
|
@ -450,6 +450,8 @@ class _SelectorTransport(transports._FlowControlMixin,
|
|||
|
||||
def __repr__(self):
|
||||
info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
|
||||
# test if the transport was closed
|
||||
if self._loop is not None:
|
||||
polling = _test_selector_event(self._loop._selector,
|
||||
self._sock_fd, selectors.EVENT_READ)
|
||||
if polling:
|
||||
|
@ -689,7 +691,6 @@ class _SelectorSslTransport(_SelectorTransport):
|
|||
|
||||
self._server_hostname = server_hostname
|
||||
self._waiter = waiter
|
||||
self._rawsock = rawsock
|
||||
self._sslcontext = sslcontext
|
||||
self._paused = False
|
||||
|
||||
|
|
|
@ -417,3 +417,9 @@ def disable_logger():
|
|||
yield
|
||||
finally:
|
||||
logger.setLevel(old_level)
|
||||
|
||||
def mock_nonblocking_socket():
|
||||
"""Create a mock of a non-blocking socket."""
|
||||
sock = mock.Mock(socket.socket)
|
||||
sock.gettimeout.return_value = 0.0
|
||||
return sock
|
||||
|
|
|
@ -336,7 +336,10 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
|
||||
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
|
||||
# should be called by exception handler only
|
||||
if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
|
||||
if (isinstance(exc, OSError) and exc.errno == errno.EIO):
|
||||
if self._loop.get_debug():
|
||||
logger.debug("%r: %s", self, message, exc_info=True)
|
||||
else:
|
||||
self._loop.call_exception_handler({
|
||||
'message': message,
|
||||
'exception': exc,
|
||||
|
@ -508,7 +511,10 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
|||
|
||||
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
|
||||
# should be called by exception handler only
|
||||
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
|
||||
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
|
||||
if self._loop.get_debug():
|
||||
logger.debug("%r: %s", self, message, exc_info=True)
|
||||
else:
|
||||
self._loop.call_exception_handler({
|
||||
'message': message,
|
||||
'exception': exc,
|
||||
|
@ -749,7 +755,9 @@ class SafeChildWatcher(BaseChildWatcher):
|
|||
except KeyError: # pragma: no cover
|
||||
# May happen if .remove_child_handler() is called
|
||||
# after os.waitpid() returns.
|
||||
pass
|
||||
if self._loop.get_debug():
|
||||
logger.warning("Child watcher got an unexpected pid: %r",
|
||||
pid, exc_info=True)
|
||||
else:
|
||||
callback(pid, returncode, *args)
|
||||
|
||||
|
|
|
@ -111,10 +111,17 @@ class _WaitHandleFuture(futures.Future):
|
|||
return
|
||||
try:
|
||||
_overlapped.UnregisterWait(self._wait_handle)
|
||||
except OSError as e:
|
||||
if e.winerror != _overlapped.ERROR_IO_PENDING:
|
||||
raise
|
||||
except OSError as exc:
|
||||
# ERROR_IO_PENDING is not an error, the wait was unregistered
|
||||
if exc.winerror != _overlapped.ERROR_IO_PENDING:
|
||||
context = {
|
||||
'message': 'Failed to unregister the wait handle',
|
||||
'exception': exc,
|
||||
'future': self,
|
||||
}
|
||||
if self._source_traceback:
|
||||
context['source_traceback'] = self._source_traceback
|
||||
self._loop.call_exception_handler(context)
|
||||
self._wait_handle = None
|
||||
self._iocp = None
|
||||
self._ov = None
|
||||
|
@ -145,6 +152,11 @@ class PipeServer(object):
|
|||
def __init__(self, address):
|
||||
self._address = address
|
||||
self._free_instances = weakref.WeakSet()
|
||||
# initialize the pipe attribute before calling _server_pipe_handle()
|
||||
# because this function can raise an exception and the destructor calls
|
||||
# the close() method
|
||||
self._pipe = None
|
||||
self._accept_pipe_future = None
|
||||
self._pipe = self._server_pipe_handle(True)
|
||||
|
||||
def _get_unconnected_pipe(self):
|
||||
|
@ -174,6 +186,9 @@ class PipeServer(object):
|
|||
return pipe
|
||||
|
||||
def close(self):
|
||||
if self._accept_pipe_future is not None:
|
||||
self._accept_pipe_future.cancel()
|
||||
self._accept_pipe_future = None
|
||||
# Close all instances which have not been connected to by a client.
|
||||
if self._address is not None:
|
||||
for pipe in self._free_instances:
|
||||
|
@ -216,7 +231,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
|||
def start_serving_pipe(self, protocol_factory, address):
|
||||
server = PipeServer(address)
|
||||
|
||||
def loop(f=None):
|
||||
def loop_accept_pipe(f=None):
|
||||
pipe = None
|
||||
try:
|
||||
if f:
|
||||
|
@ -237,13 +252,17 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
|||
'pipe': pipe,
|
||||
})
|
||||
pipe.close()
|
||||
elif self._debug:
|
||||
logger.warning("Accept pipe failed on pipe %r",
|
||||
pipe, exc_info=True)
|
||||
except futures.CancelledError:
|
||||
if pipe:
|
||||
pipe.close()
|
||||
else:
|
||||
f.add_done_callback(loop)
|
||||
server._accept_pipe_future = f
|
||||
f.add_done_callback(loop_accept_pipe)
|
||||
|
||||
self.call_soon(loop)
|
||||
self.call_soon(loop_accept_pipe)
|
||||
return [server]
|
||||
|
||||
@coroutine
|
||||
|
|
|
@ -792,6 +792,9 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
|
|||
class _SelectorTransportMock:
|
||||
_sock = None
|
||||
|
||||
def get_extra_info(self, key):
|
||||
return mock.Mock()
|
||||
|
||||
def close(self):
|
||||
self._sock.close()
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
|
|||
|
||||
|
||||
import asyncio
|
||||
from asyncio import proactor_events
|
||||
from asyncio import selector_events
|
||||
from asyncio import test_utils
|
||||
|
||||
|
@ -383,6 +384,7 @@ class EventLoopTestsMixin:
|
|||
self.assertEqual(read, data)
|
||||
|
||||
def _basetest_sock_client_ops(self, httpd, sock):
|
||||
if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
|
||||
# in debug mode, socket operations must fail
|
||||
# if the socket is not in blocking mode
|
||||
self.loop.set_debug(True)
|
||||
|
@ -1229,6 +1231,7 @@ class EventLoopTestsMixin:
|
|||
"Don't support pipes for Windows")
|
||||
def test_write_pipe_disconnect_on_close(self):
|
||||
rsock, wsock = test_utils.socketpair()
|
||||
rsock.setblocking(False)
|
||||
pipeobj = io.open(wsock.detach(), 'wb', 1024)
|
||||
|
||||
proto = MyWritePipeProto(loop=self.loop)
|
||||
|
@ -1366,6 +1369,7 @@ class EventLoopTestsMixin:
|
|||
for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
|
||||
sock = socket.socket(family, sock_type)
|
||||
with sock:
|
||||
sock.setblocking(False)
|
||||
connect = self.loop.sock_connect(sock, address)
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
self.loop.run_until_complete(connect)
|
||||
|
|
|
@ -58,6 +58,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
self.loop.remove_reader = mock.Mock()
|
||||
self.loop.remove_writer = mock.Mock()
|
||||
waiter = asyncio.Future(loop=self.loop)
|
||||
with test_utils.disable_logger():
|
||||
transport = self.loop._make_ssl_transport(
|
||||
m, asyncio.Protocol(), m, waiter)
|
||||
self.assertIsInstance(transport, _SelectorSslTransport)
|
||||
|
@ -127,6 +128,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
|
||||
def test_write_to_self_tryagain(self):
|
||||
self.loop._csock.send.side_effect = BlockingIOError
|
||||
with test_utils.disable_logger():
|
||||
self.assertIsNone(self.loop._write_to_self())
|
||||
|
||||
def test_write_to_self_exception(self):
|
||||
|
@ -135,7 +137,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
self.assertRaises(RuntimeError, self.loop._write_to_self)
|
||||
|
||||
def test_sock_recv(self):
|
||||
sock = mock.Mock()
|
||||
sock = test_utils.mock_nonblocking_socket()
|
||||
self.loop._sock_recv = mock.Mock()
|
||||
|
||||
f = self.loop.sock_recv(sock, 1024)
|
||||
|
@ -183,7 +185,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
self.assertIs(err, f.exception())
|
||||
|
||||
def test_sock_sendall(self):
|
||||
sock = mock.Mock()
|
||||
sock = test_utils.mock_nonblocking_socket()
|
||||
self.loop._sock_sendall = mock.Mock()
|
||||
|
||||
f = self.loop.sock_sendall(sock, b'data')
|
||||
|
@ -193,7 +195,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
self.loop._sock_sendall.call_args[0])
|
||||
|
||||
def test_sock_sendall_nodata(self):
|
||||
sock = mock.Mock()
|
||||
sock = test_utils.mock_nonblocking_socket()
|
||||
self.loop._sock_sendall = mock.Mock()
|
||||
|
||||
f = self.loop.sock_sendall(sock, b'')
|
||||
|
@ -295,7 +297,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
self.loop.add_writer.call_args[0])
|
||||
|
||||
def test_sock_connect(self):
|
||||
sock = mock.Mock()
|
||||
sock = test_utils.mock_nonblocking_socket()
|
||||
self.loop._sock_connect = mock.Mock()
|
||||
|
||||
f = self.loop.sock_connect(sock, ('127.0.0.1', 8080))
|
||||
|
@ -361,7 +363,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
|
|||
self.assertIsInstance(f.exception(), OSError)
|
||||
|
||||
def test_sock_accept(self):
|
||||
sock = mock.Mock()
|
||||
sock = test_utils.mock_nonblocking_socket()
|
||||
self.loop._sock_accept = mock.Mock()
|
||||
|
||||
f = self.loop.sock_accept(sock)
|
||||
|
@ -782,6 +784,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
|
|||
transport = _SelectorSocketTransport(
|
||||
self.loop, self.sock, self.protocol)
|
||||
transport._force_close = mock.Mock()
|
||||
with test_utils.disable_logger():
|
||||
transport._read_ready()
|
||||
transport._force_close.assert_called_with(err)
|
||||
|
||||
|
@ -1219,6 +1222,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
|
|||
err = self.sslsock.recv.side_effect = ConnectionResetError()
|
||||
transport = self._make_one()
|
||||
transport._force_close = mock.Mock()
|
||||
with test_utils.disable_logger():
|
||||
transport._read_ready()
|
||||
transport._force_close.assert_called_with(err)
|
||||
|
||||
|
|
|
@ -148,6 +148,7 @@ class SubprocessMixin:
|
|||
|
||||
coro = write_stdin(proc, large_data)
|
||||
# drain() must raise BrokenPipeError or ConnectionResetError
|
||||
with test_utils.disable_logger():
|
||||
self.assertRaises((BrokenPipeError, ConnectionResetError),
|
||||
self.loop.run_until_complete, coro)
|
||||
self.loop.run_until_complete(proc.wait())
|
||||
|
@ -156,6 +157,7 @@ class SubprocessMixin:
|
|||
proc, large_data = self.prepare_broken_pipe_test()
|
||||
|
||||
# communicate() must ignore BrokenPipeError when feeding stdin
|
||||
with test_utils.disable_logger():
|
||||
self.loop.run_until_complete(proc.communicate(large_data))
|
||||
self.loop.run_until_complete(proc.wait())
|
||||
|
||||
|
|
Loading…
Reference in New Issue