asyncio: sync with Tulip

* Tulip issue #183: log socket events in debug mode

  - Log most important socket events: socket connected, new client, connection
    reset or closed by peer (EOF), etc.
  - Log time elapsed in DNS resolution (getaddrinfo)
  - Log pause/resume reading
  - Log time of SSL handshake
  - Log SSL handshake errors
  - Add a __repr__() method to many classes

* Fix ProactorEventLoop() in debug mode. ProactorEventLoop._make_self_pipe()
  doesn't call call_soon() directly because it checks for the current loop
  which fails, because the method is called to build the event loop.

* Cleanup _ProactorReadPipeTransport constructor. Not need to set again
  _read_fut attribute to None, it is already done in the base class.
This commit is contained in:
Victor Stinner 2014-07-12 03:11:53 +02:00
parent 8ebeb03740
commit e912e652f8
7 changed files with 219 additions and 29 deletions

View File

@ -94,6 +94,9 @@ class Server(events.AbstractServer):
self._active_count = 0
self._waiters = []
def __repr__(self):
return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
def _attach(self):
assert self.sockets is not None
self._active_count += 1
@ -110,8 +113,6 @@ class Server(events.AbstractServer):
return
self.sockets = None
for sock in sockets:
# closing sockets will call asynchronously the _detach() method
# which calls _wakeup() for the last socket
self._loop._stop_serving(sock)
if self._active_count == 0:
self._wakeup()
@ -276,6 +277,8 @@ class BaseEventLoop(events.AbstractEventLoop):
raise RuntimeError("cannot close a running event loop")
if self._closed:
return
if self._debug:
logger.debug("Close %r", self)
self._closed = True
self._ready.clear()
self._scheduled.clear()
@ -402,10 +405,39 @@ class BaseEventLoop(events.AbstractEventLoop):
def set_default_executor(self, executor):
self._default_executor = executor
def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
msg = ["%s:%r" % (host, port)]
if family:
msg.append('family=%r' % family)
if type:
msg.append('type=%r' % type)
if proto:
msg.append('proto=%r' % proto)
if flags:
msg.append('flags=%r' % flags)
msg = ', '.join(msg)
logger.debug('Get addresss info %s', msg)
t0 = self.time()
addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
dt = self.time() - t0
msg = ('Getting addresss info %s took %.3f ms: %r'
% (msg, dt * 1e3, addrinfo))
if dt >= self.slow_callback_duration:
logger.info(msg)
else:
logger.debug(msg)
return addrinfo
def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
return self.run_in_executor(None, socket.getaddrinfo,
host, port, family, type, proto, flags)
if self._debug:
return self.run_in_executor(None, self._getaddrinfo_debug,
host, port, family, type, proto, flags)
else:
return self.run_in_executor(None, socket.getaddrinfo,
host, port, family, type, proto, flags)
def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
@ -492,6 +524,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock.close()
sock = None
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
yield from self.sock_connect(sock, address)
except OSError as exc:
if sock is not None:
@ -524,6 +558,9 @@ class BaseEventLoop(events.AbstractEventLoop):
transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
logger.debug("connected to %s:%r: (%r, %r)",
host, port, transport, protocol)
return transport, protocol
@coroutine
@ -614,6 +651,15 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter = futures.Future(loop=self)
transport = self._make_datagram_transport(sock, protocol, r_addr,
waiter)
if self._debug:
if local_addr:
logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
"created: (%r, %r)",
local_addr, remote_addr, transport, protocol)
else:
logger.debug("Datagram endpoint remote_addr=%r created: "
"(%r, %r)",
remote_addr, transport, protocol)
yield from waiter
return transport, protocol
@ -694,6 +740,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock.listen(backlog)
sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server)
if self._debug:
logger.info("%r is serving", server)
return server
@coroutine

View File

@ -41,6 +41,23 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
if self._read_fut is not None:
ov = "pending" if self._read_fut.ov.pending else "completed"
info.append('read=%s' % ov)
if self._write_fut is not None:
if self._write_fut.ov.pending:
info.append("write=pending=%s" % self._pending_write)
else:
info.append("write=completed")
if self._buffer:
bufsize = len(self._buffer)
info.append('write_bufsize=%s' % bufsize)
if self._eof_written:
info.append('EOF written')
return '<%s>' % ' '.join(info)
def _set_extra(self, sock):
self._extra['pipe'] = sock
@ -55,7 +72,10 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._read_fut.cancel()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
@ -108,7 +128,6 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
self._read_fut = None
self._paused = False
self._loop.call_soon(self._loop_reading)
@ -118,6 +137,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
@ -126,6 +147,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._closing:
return
self._loop.call_soon(self._loop_reading, self._read_fut)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _loop_reading(self, fut=None):
if self._paused:
@ -166,6 +189,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if data:
self._protocol.data_received(data)
elif data is not None:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if not keep_open:
self.close()
@ -401,7 +426,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
self.call_soon(self._loop_self_reading)
# don't check the current loop because _make_self_pipe() is called
# from the event loop constructor
self._call_soon(self._loop_self_reading, (), check_loop=False)
def _loop_self_reading(self, f=None):
try:
@ -426,6 +453,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
try:
if f is not None:
conn, addr = f.result()
if self._debug:
logger.debug("%r got a new connection from %r: %r",
server, addr, conn)
protocol = protocol_factory()
self._make_socket_transport(
conn, protocol,

View File

@ -23,6 +23,17 @@ from . import transports
from .log import logger
def _test_selector_event(selector, fd, event):
# Test if the selector is monitoring 'event' events
# for the file descriptor 'fd'.
try:
key = selector.get_key(fd)
except KeyError:
return False
else:
return bool(key.events & event)
class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.
@ -116,6 +127,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
sslcontext=None, server=None):
try:
conn, addr = sock.accept()
if self._debug:
logger.debug("%r got a new connection from %r: %r",
server, addr, conn)
conn.setblocking(False)
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
pass # False alarm.
@ -419,6 +433,26 @@ class _SelectorTransport(transports._FlowControlMixin,
if self._server is not None:
self._server._attach()
def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
polling = _test_selector_event(self._loop._selector,
self._sock_fd, selectors.EVENT_READ)
if polling:
info.append('read=polling')
else:
info.append('read=idle')
polling = _test_selector_event(self._loop._selector,
self._sock_fd, selectors.EVENT_WRITE)
if polling:
state = 'polling'
else:
state = 'idle'
bufsize = self.get_write_buffer_size()
info.append('write=<%s, bufsize=%s>' % (state, bufsize))
return '<%s>' % ' '.join(info)
def abort(self):
self._force_close(None)
@ -433,7 +467,10 @@ class _SelectorTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
@ -492,6 +529,8 @@ class _SelectorSocketTransport(_SelectorTransport):
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
@ -500,6 +539,8 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _read_ready(self):
try:
@ -512,6 +553,8 @@ class _SelectorSocketTransport(_SelectorTransport):
if data:
self._protocol.data_received(data)
else:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
# We're keeping the connection open so the
@ -638,31 +681,37 @@ class _SelectorSslTransport(_SelectorTransport):
# SSL-specific extra info. (peercert is set later)
self._extra.update(sslcontext=sslcontext)
self._on_handshake()
if self._loop.get_debug():
logger.debug("%r starts SSL handshake", self)
start_time = self._loop.time()
else:
start_time = None
self._on_handshake(start_time)
def _on_handshake(self):
def _on_handshake(self, start_time):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
self._loop.add_reader(self._sock_fd, self._on_handshake)
self._loop.add_reader(self._sock_fd,
self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
self._loop.add_writer(self._sock_fd, self._on_handshake)
return
except Exception as exc:
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
self._loop.add_writer(self._sock_fd,
self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
raise
if isinstance(exc, Exception):
return
else:
raise
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
@ -676,6 +725,10 @@ class _SelectorSslTransport(_SelectorTransport):
try:
ssl.match_hostname(peercert, self._server_hostname)
except Exception as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed "
"on matching the hostname",
self, exc_info=True)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
@ -696,6 +749,10 @@ class _SelectorSslTransport(_SelectorTransport):
self._loop.call_soon(self._waiter._set_result_unless_cancelled,
None)
if self._loop.get_debug():
dt = self._loop.time() - start_time
logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
def pause_reading(self):
# XXX This is a bit icky, given the comment at the top of
# _read_ready(). Is it possible to evoke a deadlock? I don't
@ -709,6 +766,8 @@ class _SelectorSslTransport(_SelectorTransport):
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
@ -717,6 +776,8 @@ class _SelectorSslTransport(_SelectorTransport):
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _read_ready(self):
if self._write_wants_read:
@ -741,6 +802,8 @@ class _SelectorSslTransport(_SelectorTransport):
self._protocol.data_received(data)
else:
try:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
logger.warning('returning true from eof_received() '

View File

@ -16,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import events
from . import selector_events
from . import selectors
from . import transports
from .coroutines import coroutine
from .log import logger
@ -272,6 +273,20 @@ class _UnixReadPipeTransport(transports.ReadTransport):
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._fileno]
if self._pipe is not None:
polling = selector_events._test_selector_event(
self._loop._selector,
self._fileno, selectors.EVENT_READ)
if polling:
info.append('polling')
else:
info.append('idle')
else:
info.append('closed')
return '<%s>' % ' '.join(info)
def _read_ready(self):
try:
data = os.read(self._fileno, self.max_size)
@ -283,6 +298,8 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if data:
self._protocol.data_received(data)
else:
if self._loop.get_debug():
logger.info("%r was closed by peer", self)
self._closing = True
self._loop.remove_reader(self._fileno)
self._loop.call_soon(self._protocol.eof_received)
@ -357,11 +374,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._fileno]
if self._pipe is not None:
polling = selector_events._test_selector_event(
self._loop._selector,
self._fileno, selectors.EVENT_WRITE)
if polling:
info.append('polling')
else:
info.append('idle')
bufsize = self.get_write_buffer_size()
info.append('bufsize=%s' % bufsize)
else:
info.append('closed')
return '<%s>' % ' '.join(info)
def get_write_buffer_size(self):
return sum(len(data) for data in self._buffer)
def _read_ready(self):
# Pipe was closed by peer.
if self._loop.get_debug():
logger.info("%r was closed by peer", self)
if self._buffer:
self._close(BrokenPipeError())
else:

View File

@ -40,6 +40,18 @@ class _OverlappedFuture(futures.Future):
super().__init__(loop=loop)
self.ov = ov
def __repr__(self):
info = [self._state.lower()]
if self.ov.pending:
info.append('overlapped=pending')
else:
info.append('overlapped=completed')
if self._state == futures._FINISHED:
info.append(self._format_result())
if self._callbacks:
info.append(self._format_callbacks())
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
def cancel(self):
try:
self.ov.cancel()

View File

@ -358,16 +358,17 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.loop = EventLoop(self.proactor)
self.set_event_loop(self.loop, cleanup=False)
@mock.patch.object(BaseProactorEventLoop, 'call_soon')
@mock.patch.object(BaseProactorEventLoop, '_call_soon')
@mock.patch.object(BaseProactorEventLoop, '_socketpair')
def test_ctor(self, socketpair, call_soon):
def test_ctor(self, socketpair, _call_soon):
ssock, csock = socketpair.return_value = (
mock.Mock(), mock.Mock())
loop = BaseProactorEventLoop(self.proactor)
self.assertIs(loop._ssock, ssock)
self.assertIs(loop._csock, csock)
self.assertEqual(loop._internal_fds, 1)
call_soon.assert_called_with(loop._loop_self_reading)
_call_soon.assert_called_with(loop._loop_self_reading, (),
check_loop=False)
def test_close_self_pipe(self):
self.loop._close_self_pipe()

View File

@ -1092,15 +1092,15 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
transport._on_handshake()
self.loop.assert_reader(1, transport._on_handshake)
transport._on_handshake(None)
self.loop.assert_reader(1, transport._on_handshake, None)
def test_on_handshake_writer_retry(self):
self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
transport._on_handshake()
self.loop.assert_writer(1, transport._on_handshake)
transport._on_handshake(None)
self.loop.assert_writer(1, transport._on_handshake, None)
def test_on_handshake_exc(self):
exc = ValueError()
@ -1108,7 +1108,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
transport._waiter = asyncio.Future(loop=self.loop)
transport._on_handshake()
transport._on_handshake(None)
self.assertTrue(self.sslsock.close.called)
self.assertTrue(transport._waiter.done())
self.assertIs(exc, transport._waiter.exception())
@ -1119,7 +1119,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport._waiter = asyncio.Future(loop=self.loop)
exc = BaseException()
self.sslsock.do_handshake.side_effect = exc
self.assertRaises(BaseException, transport._on_handshake)
self.assertRaises(BaseException, transport._on_handshake, None)
self.assertTrue(self.sslsock.close.called)
self.assertTrue(transport._waiter.done())
self.assertIs(exc, transport._waiter.exception())