asyncio: sync with Tulip

* Tulip issue #184: Log subprocess events in debug mode

  - Log stdin, stdout and stderr transports and protocols
  - Log process identifier (pid)
  - Log connection of pipes
  - Log process exit
  - Log Process.communicate() tasks: feed stdin, read stdout and stderr
  - Add __repr__() method to many classes related to subprocesses


* Add BaseSubprocessTransport._pid attribute. Store the pid so it is still
  accessible after the process exited. It's more convinient for debug.

* create_connection(): add the socket in the "connected to" debug log

* Clean up some docstrings and comments. Remove unused unimplemented
  _read_from_self().
This commit is contained in:
Victor Stinner 2014-07-14 18:33:40 +02:00
parent b1ebfdddb3
commit acdb782a83
6 changed files with 181 additions and 43 deletions

View File

@ -1,7 +1,7 @@
"""Base implementation of event loop.
The event loop can be broken up into a multiplexer (the part
responsible for notifying us of IO events) and the event loop proper,
responsible for notifying us of I/O events) and the event loop proper,
which wraps a multiplexer with functionality for scheduling callbacks,
immediately or at a given time in the future.
@ -50,6 +50,15 @@ def _format_handle(handle):
return str(handle)
def _format_pipe(fd):
if fd == subprocess.PIPE:
return '<pipe>'
elif fd == subprocess.STDOUT:
return '<stdout>'
else:
return repr(fd)
class _StopError(BaseException):
"""Raised to stop the event loop."""
@ -70,7 +79,7 @@ def _check_resolved_address(sock, address):
type_mask |= socket.SOCK_NONBLOCK
if hasattr(socket, 'SOCK_CLOEXEC'):
type_mask |= socket.SOCK_CLOEXEC
# Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
# Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
# already resolved.
try:
socket.getaddrinfo(host, port,
@ -158,7 +167,8 @@ class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro):
"""Schedule a coroutine object.
Return a task object."""
Return a task object.
"""
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
@ -197,12 +207,13 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create subprocess transport."""
raise NotImplementedError
def _read_from_self(self):
"""XXX"""
raise NotImplementedError
def _write_to_self(self):
"""XXX"""
"""Write a byte to self-pipe, to wake up the event loop.
This may be called from a different thread.
The subclass is responsible for implementing the self-pipe.
"""
raise NotImplementedError
def _process_events(self, event_list):
@ -233,7 +244,7 @@ class BaseEventLoop(events.AbstractEventLoop):
If the argument is a coroutine, it is wrapped in a Task.
XXX TBD: It would be disastrous to call run_until_complete()
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
@ -261,7 +272,7 @@ class BaseEventLoop(events.AbstractEventLoop):
Every callback scheduled before stop() is called will run.
Callback scheduled after stop() is called won't. However,
those callbacks will run if run() is called again later.
those callbacks will run if run_*() is called again later.
"""
self.call_soon(_raise_stop_error)
@ -274,7 +285,7 @@ class BaseEventLoop(events.AbstractEventLoop):
The event loop must not be running.
"""
if self._running:
raise RuntimeError("cannot close a running event loop")
raise RuntimeError("Cannot close a running event loop")
if self._closed:
return
if self._debug:
@ -292,11 +303,16 @@ class BaseEventLoop(events.AbstractEventLoop):
return self._closed
def is_running(self):
"""Returns running status of event loop."""
"""Returns True if the event loop is running."""
return self._running
def time(self):
"""Return the time according to the event loop's clock."""
"""Return the time according to the event loop's clock.
This is a float expressed in seconds since an epoch, but the
epoch, precision, accuracy and drift are unspecified and may
differ per event loop.
"""
return time.monotonic()
def call_later(self, delay, callback, *args):
@ -306,7 +322,7 @@ class BaseEventLoop(events.AbstractEventLoop):
can be used to cancel the call.
The delay can be an int or float, expressed in seconds. It is
always a relative time.
always relative to the current time.
Each callback will be called exactly once. If two callbacks
are scheduled for exactly the same time, it undefined which
@ -321,7 +337,10 @@ class BaseEventLoop(events.AbstractEventLoop):
return timer
def call_at(self, when, callback, *args):
"""Like call_later(), but uses an absolute time."""
"""Like call_later(), but uses an absolute time.
Absolute time corresponds to the event loop's time() method.
"""
if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
if self._debug:
@ -335,7 +354,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args):
"""Arrange for a callback to be called as soon as possible.
This operates as a FIFO queue, callbacks are called in the
This operates as a FIFO queue: callbacks are called in the
order in which they are registered. Each callback will be
called exactly once.
@ -361,7 +380,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def _assert_is_current_event_loop(self):
"""Asserts that this event loop is the current event loop.
Non-threadsafe methods of this class make this assumption and will
Non-thread-safe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated.
Should only be called when (self._debug == True). The caller is
@ -373,11 +392,11 @@ class BaseEventLoop(events.AbstractEventLoop):
return
if current is not self:
raise RuntimeError(
"non-threadsafe operation invoked on an event loop other "
"Non-thread-safe operation invoked on an event loop other "
"than the current one")
def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread safe."""
"""Like call_soon(), but thread-safe."""
handle = self._call_soon(callback, args, check_loop=False)
if handle._source_traceback:
del handle._source_traceback[-1]
@ -386,7 +405,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def run_in_executor(self, executor, callback, *args):
if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with run_in_executor()")
raise TypeError("Coroutines cannot be used with run_in_executor()")
if isinstance(callback, events.Handle):
assert not args
assert not isinstance(callback, events.TimerHandle)
@ -416,13 +435,13 @@ class BaseEventLoop(events.AbstractEventLoop):
if flags:
msg.append('flags=%r' % flags)
msg = ', '.join(msg)
logger.debug('Get addresss info %s', msg)
logger.debug('Get address info %s', msg)
t0 = self.time()
addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
dt = self.time() - t0
msg = ('Getting addresss info %s took %.3f ms: %r'
msg = ('Getting address info %s took %.3f ms: %r'
% (msg, dt * 1e3, addrinfo))
if dt >= self.slow_callback_duration:
logger.info(msg)
@ -559,8 +578,8 @@ class BaseEventLoop(events.AbstractEventLoop):
transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
logger.debug("connected to %s:%r: (%r, %r)",
host, port, transport, protocol)
logger.debug("%r connected to %s:%r: (%r, %r)",
sock, host, port, transport, protocol)
return transport, protocol
@coroutine
@ -589,7 +608,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError('unexpected address family')
addr_pairs_info = (((family, proto), (None, None)),)
else:
# join addresss by (family, protocol)
# join address by (family, protocol)
addr_infos = collections.OrderedDict()
for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
@ -674,7 +693,7 @@ class BaseEventLoop(events.AbstractEventLoop):
reuse_address=None):
"""Create a TCP server bound to host and port.
Return an Server object which can be used to stop the service.
Return a Server object which can be used to stop the service.
This method is a coroutine.
"""
@ -731,8 +750,7 @@ class BaseEventLoop(events.AbstractEventLoop):
sock.close()
else:
if sock is None:
raise ValueError(
'host and port was not specified and no sock specified')
raise ValueError('Neither host/port nor sock were specified')
sockets = [sock]
server = Server(self, sockets)
@ -750,6 +768,9 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter = futures.Future(loop=self)
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
yield from waiter
if self._debug:
logger.debug('Read pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol)
return transport, protocol
@coroutine
@ -758,8 +779,24 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter = futures.Future(loop=self)
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
yield from waiter
if self._debug:
logger.debug('Write pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol)
return transport, protocol
def _log_subprocess(self, msg, stdin, stdout, stderr):
info = [msg]
if stdin is not None:
info.append('stdin=%s' % _format_pipe(stdin))
if stdout is not None and stderr == subprocess.STDOUT:
info.append('stdout=stderr=%s' % _format_pipe(stdout))
else:
if stdout is not None:
info.append('stdout=%s' % _format_pipe(stdout))
if stderr is not None:
info.append('stderr=%s' % _format_pipe(stderr))
logger.debug(' '.join(info))
@coroutine
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@ -774,8 +811,15 @@ class BaseEventLoop(events.AbstractEventLoop):
if bufsize != 0:
raise ValueError("bufsize must be 0")
protocol = protocol_factory()
if self._debug:
# don't log parameters: they may contain sensitive information
# (password) and may be too long
debug_log = 'run shell command %r' % cmd
self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = yield from self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
if self._debug:
logger.info('%s: %r' % (debug_log, transport))
return transport, protocol
@coroutine
@ -796,9 +840,16 @@ class BaseEventLoop(events.AbstractEventLoop):
"a bytes or text string, not %s"
% type(arg).__name__)
protocol = protocol_factory()
if self._debug:
# don't log parameters: they may contain sensitive information
# (password) and may be too long
debug_log = 'execute program %r' % program
self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = yield from self._make_subprocess_transport(
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
if self._debug:
logger.info('%s: %r' % (debug_log, transport))
return transport, protocol
def set_exception_handler(self, handler):
@ -808,7 +859,7 @@ class BaseEventLoop(events.AbstractEventLoop):
be set.
If handler is a callable object, it should have a
matching signature to '(loop, context)', where 'loop'
signature matching '(loop, context)', where 'loop'
will be a reference to the active event loop, 'context'
will be a dict object (see `call_exception_handler()`
documentation for details about context).
@ -825,7 +876,7 @@ class BaseEventLoop(events.AbstractEventLoop):
handler is set, and can be called by a custom exception
handler that wants to defer to the default behavior.
context parameter has the same meaning as in
The context parameter has the same meaning as in
`call_exception_handler()`.
"""
message = context.get('message')
@ -854,10 +905,10 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.error('\n'.join(log_lines), exc_info=exc_info)
def call_exception_handler(self, context):
"""Call the current event loop exception handler.
"""Call the current event loop's exception handler.
The context argument is a dict containing the following keys:
context is a dict object containing the following keys
(new keys maybe introduced later):
- 'message': Error message;
- 'exception' (optional): Exception object;
- 'future' (optional): Future instance;
@ -866,8 +917,10 @@ class BaseEventLoop(events.AbstractEventLoop):
- 'transport' (optional): Transport instance;
- 'socket' (optional): Socket instance.
Note: this method should not be overloaded in subclassed
event loops. For any custom exception handling, use
New keys maybe introduced in the future.
Note: do not overload this method in an event loop subclass.
For custom exception handling, use the
`set_exception_handler()` method.
"""
if self._exception_handler is None:
@ -892,7 +945,7 @@ class BaseEventLoop(events.AbstractEventLoop):
'context': context,
})
except Exception:
# Guard 'default_exception_handler' in case it's
# Guard 'default_exception_handler' in case it is
# overloaded.
logger.error('Exception in default exception handler '
'while handling an unexpected error '
@ -900,7 +953,7 @@ class BaseEventLoop(events.AbstractEventLoop):
exc_info=True)
def _add_callback(self, handle):
"""Add a Handle to ready or scheduled."""
"""Add a Handle to _scheduled (TimerHandle) or _ready."""
assert isinstance(handle, events.Handle), 'A Handle is required here'
if handle._cancelled:
return
@ -971,7 +1024,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is threadsafe without using locks.
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()

View File

@ -4,6 +4,7 @@ import subprocess
from . import protocols
from . import transports
from .coroutines import coroutine
from .log import logger
class BaseSubprocessTransport(transports.SubprocessTransport):
@ -14,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
super().__init__(extra)
self._protocol = protocol
self._loop = loop
self._pid = None
self._pipes = {}
if stdin == subprocess.PIPE:
@ -27,7 +29,36 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._returncode = None
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
else:
program = args[0]
logger.debug('process %r created: pid %s',
program, self._pid)
def __repr__(self):
info = [self.__class__.__name__, 'pid=%s' % self._pid]
if self._returncode is not None:
info.append('returncode=%s' % self._returncode)
stdin = self._pipes.get(0)
if stdin is not None:
info.append('stdin=%s' % stdin.pipe)
stdout = self._pipes.get(1)
stderr = self._pipes.get(2)
if stdout is not None and stderr is stdout:
info.append('stdout=stderr=%s' % stdout.pipe)
else:
if stdout is not None:
info.append('stdout=%s' % stdout.pipe)
if stderr is not None:
info.append('stderr=%s' % stderr.pipe)
return '<%s>' % ' '.join(info)
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
raise NotImplementedError
@ -45,7 +76,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self.terminate()
def get_pid(self):
return self._proc.pid
return self._pid
def get_returncode(self):
return self._returncode
@ -108,6 +139,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def _process_exited(self, returncode):
assert returncode is not None, returncode
assert self._returncode is None, self._returncode
if self._loop.get_debug():
logger.info('%r exited with return code %r',
self, returncode)
self._returncode = returncode
self._call(self._protocol.process_exited)
self._try_finish()
@ -141,6 +175,10 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol):
def connection_made(self, transport):
self.pipe = transport
def __repr__(self):
return ('<%s fd=%s pipe=%r>'
% (self.__class__.__name__, self.fd, self.pipe))
def connection_lost(self, exc):
self.disconnected = True
self.proc._pipe_connection_lost(self.fd, exc)

View File

@ -15,6 +15,7 @@ from . import events
from . import futures
from . import protocols
from .coroutines import coroutine
from .log import logger
_DEFAULT_LIMIT = 2**16
@ -153,10 +154,15 @@ class FlowControlMixin(protocols.Protocol):
def pause_writing(self):
assert not self._paused
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses writing", self)
def resume_writing(self):
assert self._paused
self._paused = False
if self._loop.get_debug():
logger.debug("%r resumes writing", self)
waiter = self._drain_waiter
if waiter is not None:
self._drain_waiter = None
@ -244,6 +250,12 @@ class StreamWriter:
self._reader = reader
self._loop = loop
def __repr__(self):
info = [self.__class__.__name__, 'transport=%r' % self._transport]
if self._reader is not None:
info.append('reader=%r' % self._reader)
return '<%s>' % ' '.join(info)
@property
def transport(self):
return self._transport

View File

@ -9,6 +9,7 @@ from . import protocols
from . import streams
from . import tasks
from .coroutines import coroutine
from .log import logger
PIPE = subprocess.PIPE
@ -28,6 +29,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self._waiters = collections.deque()
self._transport = None
def __repr__(self):
info = [self.__class__.__name__]
if self.stdin is not None:
info.append('stdin=%r' % self.stdin)
if self.stdout is not None:
info.append('stdout=%r' % self.stdout)
if self.stderr is not None:
info.append('stderr=%r' % self.stderr)
return '<%s>' % ' '.join(info)
def connection_made(self, transport):
self._transport = transport
if transport.get_pipe_transport(1):
@ -91,6 +102,9 @@ class Process:
self.stderr = protocol.stderr
self.pid = transport.get_pid()
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.pid)
@property
def returncode(self):
return self._transport.get_returncode()
@ -126,7 +140,13 @@ class Process:
@coroutine
def _feed_stdin(self, input):
self.stdin.write(input)
if self._loop.get_debug():
logger.debug('%r communicate: feed stdin (%s bytes)',
self, len(input))
yield from self.stdin.drain()
if self._loop.get_debug():
logger.debug('%r communicate: close stdin', self)
self.stdin.close()
@coroutine
@ -141,7 +161,13 @@ class Process:
else:
assert fd == 1
stream = self.stdout
if self._loop.get_debug():
name = 'stdout' if fd == 1 else 'stderr'
logger.debug('%r communicate: read %s', self, name)
output = yield from stream.read()
if self._loop.get_debug():
name = 'stdout' if fd == 1 else 'stderr'
logger.debug('%r communicate: close %s', self, name)
transport.close()
return output

View File

@ -565,7 +565,7 @@ class AbstractChildWatcher:
process 'pid' terminates. Specifying another callback for the same
process replaces the previous handler.
Note: callback() must be thread-safe
Note: callback() must be thread-safe.
"""
raise NotImplementedError()
@ -721,6 +721,9 @@ class SafeChildWatcher(BaseChildWatcher):
return
returncode = self._compute_returncode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
try:
callback, args = self._callbacks.pop(pid)
@ -818,8 +821,16 @@ class FastChildWatcher(BaseChildWatcher):
if self._forks:
# It may not be registered yet.
self._zombies[pid] = returncode
if self._loop.get_debug():
logger.debug('unknown process %s exited '
'with returncode %s',
pid, returncode)
continue
callback = None
else:
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
pid, returncode)
if callback is None:
logger.warning(

View File

@ -43,8 +43,6 @@ class BaseEventLoopTests(test_utils.TestCase):
NotImplementedError, self.loop._process_events, [])
self.assertRaises(
NotImplementedError, self.loop._write_to_self)
self.assertRaises(
NotImplementedError, self.loop._read_from_self)
self.assertRaises(
NotImplementedError,
self.loop._make_read_pipe_transport, m, m)