From e912e652f85bfd92d7209aa0cb23e5d3975a8d72 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sat, 12 Jul 2014 03:11:53 +0200 Subject: [PATCH] asyncio: sync with Tulip * Tulip issue #183: log socket events in debug mode - Log most important socket events: socket connected, new client, connection reset or closed by peer (EOF), etc. - Log time elapsed in DNS resolution (getaddrinfo) - Log pause/resume reading - Log time of SSL handshake - Log SSL handshake errors - Add a __repr__() method to many classes * Fix ProactorEventLoop() in debug mode. ProactorEventLoop._make_self_pipe() doesn't call call_soon() directly because it checks for the current loop which fails, because the method is called to build the event loop. * Cleanup _ProactorReadPipeTransport constructor. Not need to set again _read_fut attribute to None, it is already done in the base class. --- Lib/asyncio/base_events.py | 56 +++++++++++- Lib/asyncio/proactor_events.py | 36 +++++++- Lib/asyncio/selector_events.py | 89 ++++++++++++++++--- Lib/asyncio/unix_events.py | 36 ++++++++ Lib/asyncio/windows_events.py | 12 +++ Lib/test/test_asyncio/test_proactor_events.py | 7 +- Lib/test/test_asyncio/test_selector_events.py | 12 +-- 7 files changed, 219 insertions(+), 29 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index cab44627e26..e5683fd15bc 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -94,6 +94,9 @@ class Server(events.AbstractServer): self._active_count = 0 self._waiters = [] + def __repr__(self): + return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets) + def _attach(self): assert self.sockets is not None self._active_count += 1 @@ -110,8 +113,6 @@ class Server(events.AbstractServer): return self.sockets = None for sock in sockets: - # closing sockets will call asynchronously the _detach() method - # which calls _wakeup() for the last socket self._loop._stop_serving(sock) if self._active_count == 0: self._wakeup() @@ -276,6 +277,8 @@ class BaseEventLoop(events.AbstractEventLoop): raise RuntimeError("cannot close a running event loop") if self._closed: return + if self._debug: + logger.debug("Close %r", self) self._closed = True self._ready.clear() self._scheduled.clear() @@ -402,10 +405,39 @@ class BaseEventLoop(events.AbstractEventLoop): def set_default_executor(self, executor): self._default_executor = executor + def _getaddrinfo_debug(self, host, port, family, type, proto, flags): + msg = ["%s:%r" % (host, port)] + if family: + msg.append('family=%r' % family) + if type: + msg.append('type=%r' % type) + if proto: + msg.append('proto=%r' % proto) + if flags: + msg.append('flags=%r' % flags) + msg = ', '.join(msg) + logger.debug('Get addresss info %s', msg) + + t0 = self.time() + addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) + dt = self.time() - t0 + + msg = ('Getting addresss info %s took %.3f ms: %r' + % (msg, dt * 1e3, addrinfo)) + if dt >= self.slow_callback_duration: + logger.info(msg) + else: + logger.debug(msg) + return addrinfo + def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): - return self.run_in_executor(None, socket.getaddrinfo, - host, port, family, type, proto, flags) + if self._debug: + return self.run_in_executor(None, self._getaddrinfo_debug, + host, port, family, type, proto, flags) + else: + return self.run_in_executor(None, socket.getaddrinfo, + host, port, family, type, proto, flags) def getnameinfo(self, sockaddr, flags=0): return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags) @@ -492,6 +524,8 @@ class BaseEventLoop(events.AbstractEventLoop): sock.close() sock = None continue + if self._debug: + logger.debug("connect %r to %r", sock, address) yield from self.sock_connect(sock, address) except OSError as exc: if sock is not None: @@ -524,6 +558,9 @@ class BaseEventLoop(events.AbstractEventLoop): transport, protocol = yield from self._create_connection_transport( sock, protocol_factory, ssl, server_hostname) + if self._debug: + logger.debug("connected to %s:%r: (%r, %r)", + host, port, transport, protocol) return transport, protocol @coroutine @@ -614,6 +651,15 @@ class BaseEventLoop(events.AbstractEventLoop): waiter = futures.Future(loop=self) transport = self._make_datagram_transport(sock, protocol, r_addr, waiter) + if self._debug: + if local_addr: + logger.info("Datagram endpoint local_addr=%r remote_addr=%r " + "created: (%r, %r)", + local_addr, remote_addr, transport, protocol) + else: + logger.debug("Datagram endpoint remote_addr=%r created: " + "(%r, %r)", + remote_addr, transport, protocol) yield from waiter return transport, protocol @@ -694,6 +740,8 @@ class BaseEventLoop(events.AbstractEventLoop): sock.listen(backlog) sock.setblocking(False) self._start_serving(protocol_factory, sock, ssl, server) + if self._debug: + logger.info("%r is serving", server) return server @coroutine diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index d0b601d7c0b..d09e9faa1b5 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -41,6 +41,23 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, # wait until protocol.connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) + def __repr__(self): + info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()] + if self._read_fut is not None: + ov = "pending" if self._read_fut.ov.pending else "completed" + info.append('read=%s' % ov) + if self._write_fut is not None: + if self._write_fut.ov.pending: + info.append("write=pending=%s" % self._pending_write) + else: + info.append("write=completed") + if self._buffer: + bufsize = len(self._buffer) + info.append('write_bufsize=%s' % bufsize) + if self._eof_written: + info.append('EOF written') + return '<%s>' % ' '.join(info) + def _set_extra(self, sock): self._extra['pipe'] = sock @@ -55,7 +72,10 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, self._read_fut.cancel() def _fatal_error(self, exc, message='Fatal error on pipe transport'): - if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if self._loop.get_debug(): + logger.debug("%r: %s", self, message, exc_info=True) + else: self._loop.call_exception_handler({ 'message': message, 'exception': exc, @@ -108,7 +128,6 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): super().__init__(loop, sock, protocol, waiter, extra, server) - self._read_fut = None self._paused = False self._loop.call_soon(self._loop_reading) @@ -118,6 +137,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if self._paused: raise RuntimeError('Already paused') self._paused = True + if self._loop.get_debug(): + logger.debug("%r pauses reading", self) def resume_reading(self): if not self._paused: @@ -126,6 +147,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if self._closing: return self._loop.call_soon(self._loop_reading, self._read_fut) + if self._loop.get_debug(): + logger.debug("%r resumes reading", self) def _loop_reading(self, fut=None): if self._paused: @@ -166,6 +189,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if data: self._protocol.data_received(data) elif data is not None: + if self._loop.get_debug(): + logger.debug("%r received EOF", self) keep_open = self._protocol.eof_received() if not keep_open: self.close() @@ -401,7 +426,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 - self.call_soon(self._loop_self_reading) + # don't check the current loop because _make_self_pipe() is called + # from the event loop constructor + self._call_soon(self._loop_self_reading, (), check_loop=False) def _loop_self_reading(self, f=None): try: @@ -426,6 +453,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): try: if f is not None: conn, addr = f.result() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) protocol = protocol_factory() self._make_socket_transport( conn, protocol, diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index b965046870c..d79c0801b24 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -23,6 +23,17 @@ from . import transports from .log import logger +def _test_selector_event(selector, fd, event): + # Test if the selector is monitoring 'event' events + # for the file descriptor 'fd'. + try: + key = selector.get_key(fd) + except KeyError: + return False + else: + return bool(key.events & event) + + class BaseSelectorEventLoop(base_events.BaseEventLoop): """Selector event loop. @@ -116,6 +127,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): sslcontext=None, server=None): try: conn, addr = sock.accept() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) conn.setblocking(False) except (BlockingIOError, InterruptedError, ConnectionAbortedError): pass # False alarm. @@ -419,6 +433,26 @@ class _SelectorTransport(transports._FlowControlMixin, if self._server is not None: self._server._attach() + def __repr__(self): + info = [self.__class__.__name__, 'fd=%s' % self._sock_fd] + polling = _test_selector_event(self._loop._selector, + self._sock_fd, selectors.EVENT_READ) + if polling: + info.append('read=polling') + else: + info.append('read=idle') + + polling = _test_selector_event(self._loop._selector, + self._sock_fd, selectors.EVENT_WRITE) + if polling: + state = 'polling' + else: + state = 'idle' + + bufsize = self.get_write_buffer_size() + info.append('write=<%s, bufsize=%s>' % (state, bufsize)) + return '<%s>' % ' '.join(info) + def abort(self): self._force_close(None) @@ -433,7 +467,10 @@ class _SelectorTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. - if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if self._loop.get_debug(): + logger.debug("%r: %s", self, message, exc_info=True) + else: self._loop.call_exception_handler({ 'message': message, 'exception': exc, @@ -492,6 +529,8 @@ class _SelectorSocketTransport(_SelectorTransport): raise RuntimeError('Already paused') self._paused = True self._loop.remove_reader(self._sock_fd) + if self._loop.get_debug(): + logger.debug("%r pauses reading", self) def resume_reading(self): if not self._paused: @@ -500,6 +539,8 @@ class _SelectorSocketTransport(_SelectorTransport): if self._closing: return self._loop.add_reader(self._sock_fd, self._read_ready) + if self._loop.get_debug(): + logger.debug("%r resumes reading", self) def _read_ready(self): try: @@ -512,6 +553,8 @@ class _SelectorSocketTransport(_SelectorTransport): if data: self._protocol.data_received(data) else: + if self._loop.get_debug(): + logger.debug("%r received EOF", self) keep_open = self._protocol.eof_received() if keep_open: # We're keeping the connection open so the @@ -638,31 +681,37 @@ class _SelectorSslTransport(_SelectorTransport): # SSL-specific extra info. (peercert is set later) self._extra.update(sslcontext=sslcontext) - self._on_handshake() + if self._loop.get_debug(): + logger.debug("%r starts SSL handshake", self) + start_time = self._loop.time() + else: + start_time = None + self._on_handshake(start_time) - def _on_handshake(self): + def _on_handshake(self, start_time): try: self._sock.do_handshake() except ssl.SSLWantReadError: - self._loop.add_reader(self._sock_fd, self._on_handshake) + self._loop.add_reader(self._sock_fd, + self._on_handshake, start_time) return except ssl.SSLWantWriteError: - self._loop.add_writer(self._sock_fd, self._on_handshake) - return - except Exception as exc: - self._loop.remove_reader(self._sock_fd) - self._loop.remove_writer(self._sock_fd) - self._sock.close() - if self._waiter is not None: - self._waiter.set_exception(exc) + self._loop.add_writer(self._sock_fd, + self._on_handshake, start_time) return except BaseException as exc: + if self._loop.get_debug(): + logger.warning("%r: SSL handshake failed", + self, exc_info=True) self._loop.remove_reader(self._sock_fd) self._loop.remove_writer(self._sock_fd) self._sock.close() if self._waiter is not None: self._waiter.set_exception(exc) - raise + if isinstance(exc, Exception): + return + else: + raise self._loop.remove_reader(self._sock_fd) self._loop.remove_writer(self._sock_fd) @@ -676,6 +725,10 @@ class _SelectorSslTransport(_SelectorTransport): try: ssl.match_hostname(peercert, self._server_hostname) except Exception as exc: + if self._loop.get_debug(): + logger.warning("%r: SSL handshake failed " + "on matching the hostname", + self, exc_info=True) self._sock.close() if self._waiter is not None: self._waiter.set_exception(exc) @@ -696,6 +749,10 @@ class _SelectorSslTransport(_SelectorTransport): self._loop.call_soon(self._waiter._set_result_unless_cancelled, None) + if self._loop.get_debug(): + dt = self._loop.time() - start_time + logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3) + def pause_reading(self): # XXX This is a bit icky, given the comment at the top of # _read_ready(). Is it possible to evoke a deadlock? I don't @@ -709,6 +766,8 @@ class _SelectorSslTransport(_SelectorTransport): raise RuntimeError('Already paused') self._paused = True self._loop.remove_reader(self._sock_fd) + if self._loop.get_debug(): + logger.debug("%r pauses reading", self) def resume_reading(self): if not self._paused: @@ -717,6 +776,8 @@ class _SelectorSslTransport(_SelectorTransport): if self._closing: return self._loop.add_reader(self._sock_fd, self._read_ready) + if self._loop.get_debug(): + logger.debug("%r resumes reading", self) def _read_ready(self): if self._write_wants_read: @@ -741,6 +802,8 @@ class _SelectorSslTransport(_SelectorTransport): self._protocol.data_received(data) else: try: + if self._loop.get_debug(): + logger.debug("%r received EOF", self) keep_open = self._protocol.eof_received() if keep_open: logger.warning('returning true from eof_received() ' diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 764e719d41f..09b875ce4cf 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -16,6 +16,7 @@ from . import base_subprocess from . import constants from . import events from . import selector_events +from . import selectors from . import transports from .coroutines import coroutine from .log import logger @@ -272,6 +273,20 @@ class _UnixReadPipeTransport(transports.ReadTransport): # wait until protocol.connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) + def __repr__(self): + info = [self.__class__.__name__, 'fd=%s' % self._fileno] + if self._pipe is not None: + polling = selector_events._test_selector_event( + self._loop._selector, + self._fileno, selectors.EVENT_READ) + if polling: + info.append('polling') + else: + info.append('idle') + else: + info.append('closed') + return '<%s>' % ' '.join(info) + def _read_ready(self): try: data = os.read(self._fileno, self.max_size) @@ -283,6 +298,8 @@ class _UnixReadPipeTransport(transports.ReadTransport): if data: self._protocol.data_received(data) else: + if self._loop.get_debug(): + logger.info("%r was closed by peer", self) self._closing = True self._loop.remove_reader(self._fileno) self._loop.call_soon(self._protocol.eof_received) @@ -357,11 +374,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, # wait until protocol.connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) + def __repr__(self): + info = [self.__class__.__name__, 'fd=%s' % self._fileno] + if self._pipe is not None: + polling = selector_events._test_selector_event( + self._loop._selector, + self._fileno, selectors.EVENT_WRITE) + if polling: + info.append('polling') + else: + info.append('idle') + + bufsize = self.get_write_buffer_size() + info.append('bufsize=%s' % bufsize) + else: + info.append('closed') + return '<%s>' % ' '.join(info) + def get_write_buffer_size(self): return sum(len(data) for data in self._buffer) def _read_ready(self): # Pipe was closed by peer. + if self._loop.get_debug(): + logger.info("%r was closed by peer", self) if self._buffer: self._close(BrokenPipeError()) else: diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 93b71b2a13e..9d86c96bc15 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -40,6 +40,18 @@ class _OverlappedFuture(futures.Future): super().__init__(loop=loop) self.ov = ov + def __repr__(self): + info = [self._state.lower()] + if self.ov.pending: + info.append('overlapped=pending') + else: + info.append('overlapped=completed') + if self._state == futures._FINISHED: + info.append(self._format_result()) + if self._callbacks: + info.append(self._format_callbacks()) + return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) + def cancel(self): try: self.ov.cancel() diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index ddfceae14a6..4bb4f0b369c 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -358,16 +358,17 @@ class BaseProactorEventLoopTests(test_utils.TestCase): self.loop = EventLoop(self.proactor) self.set_event_loop(self.loop, cleanup=False) - @mock.patch.object(BaseProactorEventLoop, 'call_soon') + @mock.patch.object(BaseProactorEventLoop, '_call_soon') @mock.patch.object(BaseProactorEventLoop, '_socketpair') - def test_ctor(self, socketpair, call_soon): + def test_ctor(self, socketpair, _call_soon): ssock, csock = socketpair.return_value = ( mock.Mock(), mock.Mock()) loop = BaseProactorEventLoop(self.proactor) self.assertIs(loop._ssock, ssock) self.assertIs(loop._csock, csock) self.assertEqual(loop._internal_fds, 1) - call_soon.assert_called_with(loop._loop_self_reading) + _call_soon.assert_called_with(loop._loop_self_reading, (), + check_loop=False) def test_close_self_pipe(self): self.loop._close_self_pipe() diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 35efab97f01..51869316007 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1092,15 +1092,15 @@ class SelectorSslTransportTests(test_utils.TestCase): self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError transport = _SelectorSslTransport( self.loop, self.sock, self.protocol, self.sslcontext) - transport._on_handshake() - self.loop.assert_reader(1, transport._on_handshake) + transport._on_handshake(None) + self.loop.assert_reader(1, transport._on_handshake, None) def test_on_handshake_writer_retry(self): self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError transport = _SelectorSslTransport( self.loop, self.sock, self.protocol, self.sslcontext) - transport._on_handshake() - self.loop.assert_writer(1, transport._on_handshake) + transport._on_handshake(None) + self.loop.assert_writer(1, transport._on_handshake, None) def test_on_handshake_exc(self): exc = ValueError() @@ -1108,7 +1108,7 @@ class SelectorSslTransportTests(test_utils.TestCase): transport = _SelectorSslTransport( self.loop, self.sock, self.protocol, self.sslcontext) transport._waiter = asyncio.Future(loop=self.loop) - transport._on_handshake() + transport._on_handshake(None) self.assertTrue(self.sslsock.close.called) self.assertTrue(transport._waiter.done()) self.assertIs(exc, transport._waiter.exception()) @@ -1119,7 +1119,7 @@ class SelectorSslTransportTests(test_utils.TestCase): transport._waiter = asyncio.Future(loop=self.loop) exc = BaseException() self.sslsock.do_handshake.side_effect = exc - self.assertRaises(BaseException, transport._on_handshake) + self.assertRaises(BaseException, transport._on_handshake, None) self.assertTrue(self.sslsock.close.called) self.assertTrue(transport._waiter.done()) self.assertIs(exc, transport._waiter.exception())