(Merge 3.4) asyncio, Tulip issue 137: In debug mode, save traceback where

Future, Task and Handle objects are created. Pass the traceback to
call_exception_handler() in the 'source_traceback' key.

The traceback is truncated to hide internal calls in asyncio, show only the
traceback from user code.

Add tests for the new source_traceback, and a test for the 'Future/Task
exception was never retrieved' log.
This commit is contained in:
Victor Stinner 2014-06-27 13:55:28 +02:00
commit eb39199f3d
8 changed files with 180 additions and 26 deletions

View File

@ -21,6 +21,7 @@ import inspect
import logging import logging
import socket import socket
import subprocess import subprocess
import traceback
import time import time
import os import os
import sys import sys
@ -290,7 +291,10 @@ class BaseEventLoop(events.AbstractEventLoop):
Any positional arguments after the callback will be passed to Any positional arguments after the callback will be passed to
the callback when it is called. the callback when it is called.
""" """
return self.call_at(self.time() + delay, callback, *args) timer = self.call_at(self.time() + delay, callback, *args)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
def call_at(self, when, callback, *args): def call_at(self, when, callback, *args):
"""Like call_later(), but uses an absolute time.""" """Like call_later(), but uses an absolute time."""
@ -299,6 +303,8 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._debug: if self._debug:
self._assert_is_current_event_loop() self._assert_is_current_event_loop()
timer = events.TimerHandle(when, callback, args, self) timer = events.TimerHandle(when, callback, args, self)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer) heapq.heappush(self._scheduled, timer)
return timer return timer
@ -312,7 +318,10 @@ class BaseEventLoop(events.AbstractEventLoop):
Any positional arguments after the callback will be passed to Any positional arguments after the callback will be passed to
the callback when it is called. the callback when it is called.
""" """
return self._call_soon(callback, args, check_loop=True) handle = self._call_soon(callback, args, check_loop=True)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args, check_loop): def _call_soon(self, callback, args, check_loop):
if tasks.iscoroutinefunction(callback): if tasks.iscoroutinefunction(callback):
@ -320,6 +329,8 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._debug and check_loop: if self._debug and check_loop:
self._assert_is_current_event_loop() self._assert_is_current_event_loop()
handle = events.Handle(callback, args, self) handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle) self._ready.append(handle)
return handle return handle
@ -344,6 +355,8 @@ class BaseEventLoop(events.AbstractEventLoop):
def call_soon_threadsafe(self, callback, *args): def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread safe.""" """Like call_soon(), but thread safe."""
handle = self._call_soon(callback, args, check_loop=False) handle = self._call_soon(callback, args, check_loop=False)
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self() self._write_to_self()
return handle return handle
@ -757,7 +770,14 @@ class BaseEventLoop(events.AbstractEventLoop):
for key in sorted(context): for key in sorted(context):
if key in {'message', 'exception'}: if key in {'message', 'exception'}:
continue continue
log_lines.append('{}: {!r}'.format(key, context[key])) value = context[key]
if key == 'source_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n'
value += tb.rstrip()
else:
value = repr(value)
log_lines.append('{}: {}'.format(key, value))
logger.error('\n'.join(log_lines), exc_info=exc_info) logger.error('\n'.join(log_lines), exc_info=exc_info)

View File

@ -11,6 +11,7 @@ __all__ = ['AbstractEventLoopPolicy',
import functools import functools
import inspect import inspect
import subprocess import subprocess
import traceback
import threading import threading
import socket import socket
import sys import sys
@ -66,7 +67,8 @@ def _format_callback(func, args, suffix=''):
class Handle: class Handle:
"""Object returned by callback registration methods.""" """Object returned by callback registration methods."""
__slots__ = ['_callback', '_args', '_cancelled', '_loop', '__weakref__'] __slots__ = ('_callback', '_args', '_cancelled', '_loop',
'_source_traceback', '__weakref__')
def __init__(self, callback, args, loop): def __init__(self, callback, args, loop):
assert not isinstance(callback, Handle), 'A Handle is not a callback' assert not isinstance(callback, Handle), 'A Handle is not a callback'
@ -74,6 +76,10 @@ class Handle:
self._callback = callback self._callback = callback
self._args = args self._args = args
self._cancelled = False self._cancelled = False
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
else:
self._source_traceback = None
def __repr__(self): def __repr__(self):
info = [] info = []
@ -91,11 +97,14 @@ class Handle:
except Exception as exc: except Exception as exc:
cb = _format_callback(self._callback, self._args) cb = _format_callback(self._callback, self._args)
msg = 'Exception in callback {}'.format(cb) msg = 'Exception in callback {}'.format(cb)
self._loop.call_exception_handler({ context = {
'message': msg, 'message': msg,
'exception': exc, 'exception': exc,
'handle': self, 'handle': self,
}) }
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs. self = None # Needed to break cycles when an exception occurs.
@ -107,7 +116,8 @@ class TimerHandle(Handle):
def __init__(self, when, callback, args, loop): def __init__(self, when, callback, args, loop):
assert when is not None assert when is not None
super().__init__(callback, args, loop) super().__init__(callback, args, loop)
if self._source_traceback:
del self._source_traceback[-1]
self._when = when self._when = when
def __repr__(self): def __repr__(self):

View File

@ -82,10 +82,11 @@ class _TracebackLogger:
in a discussion about closing files when they are collected. in a discussion about closing files when they are collected.
""" """
__slots__ = ['exc', 'tb', 'loop'] __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
def __init__(self, exc, loop): def __init__(self, future, exc):
self.loop = loop self.loop = future._loop
self.source_traceback = future._source_traceback
self.exc = exc self.exc = exc
self.tb = None self.tb = None
@ -102,11 +103,12 @@ class _TracebackLogger:
def __del__(self): def __del__(self):
if self.tb: if self.tb:
msg = 'Future/Task exception was never retrieved:\n{tb}' msg = 'Future/Task exception was never retrieved'
context = { if self.source_traceback:
'message': msg.format(tb=''.join(self.tb)), msg += '\nFuture/Task created at (most recent call last):\n'
} msg += ''.join(traceback.format_list(self.source_traceback))
self.loop.call_exception_handler(context) msg += ''.join(self.tb).rstrip()
self.loop.call_exception_handler({'message': msg})
class Future: class Future:
@ -149,6 +151,10 @@ class Future:
else: else:
self._loop = loop self._loop = loop
self._callbacks = [] self._callbacks = []
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
else:
self._source_traceback = None
def _format_callbacks(self): def _format_callbacks(self):
cb = self._callbacks cb = self._callbacks
@ -196,10 +202,13 @@ class Future:
return return
exc = self._exception exc = self._exception
context = { context = {
'message': 'Future/Task exception was never retrieved', 'message': ('%s exception was never retrieved'
% self.__class__.__name__),
'exception': exc, 'exception': exc,
'future': self, 'future': self,
} }
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context) self._loop.call_exception_handler(context)
def cancel(self): def cancel(self):
@ -335,7 +344,7 @@ class Future:
if _PY34: if _PY34:
self._log_traceback = True self._log_traceback = True
else: else:
self._tb_logger = _TracebackLogger(exception, self._loop) self._tb_logger = _TracebackLogger(self, exception)
# Arrange for the logger to be activated after all callbacks # Arrange for the logger to be activated after all callbacks
# have had a chance to call result() or exception(). # have had a chance to call result() or exception().
self._loop.call_soon(self._tb_logger.activate) self._loop.call_soon(self._tb_logger.activate)

View File

@ -195,6 +195,8 @@ class Task(futures.Future):
def __init__(self, coro, *, loop=None): def __init__(self, coro, *, loop=None):
assert iscoroutine(coro), repr(coro) # Not a coroutine function! assert iscoroutine(coro), repr(coro) # Not a coroutine function!
super().__init__(loop=loop) super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
self._coro = iter(coro) # Use the iterator just in case. self._coro = iter(coro) # Use the iterator just in case.
self._fut_waiter = None self._fut_waiter = None
self._must_cancel = False self._must_cancel = False
@ -207,10 +209,13 @@ class Task(futures.Future):
if _PY34: if _PY34:
def __del__(self): def __del__(self):
if self._state == futures._PENDING: if self._state == futures._PENDING:
self._loop.call_exception_handler({ context = {
'task': self, 'task': self,
'message': 'Task was destroyed but it is pending!', 'message': 'Task was destroyed but it is pending!',
}) }
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
futures.Future.__del__(self) futures.Future.__del__(self)
def __repr__(self): def __repr__(self):
@ -620,7 +625,10 @@ def async(coro_or_future, *, loop=None):
raise ValueError('loop argument must agree with Future') raise ValueError('loop argument must agree with Future')
return coro_or_future return coro_or_future
elif iscoroutine(coro_or_future): elif iscoroutine(coro_or_future):
return Task(coro_or_future, loop=loop) task = Task(coro_or_future, loop=loop)
if task._source_traceback:
del task._source_traceback[-1]
return task
else: else:
raise TypeError('A Future or coroutine is required') raise TypeError('A Future or coroutine is required')

View File

@ -406,19 +406,22 @@ class BaseEventLoopTests(test_utils.TestCase):
1/0 1/0
def run_loop(): def run_loop():
self.loop.call_soon(zero_error) handle = self.loop.call_soon(zero_error)
self.loop._run_once() self.loop._run_once()
return handle
self.loop.set_debug(True)
self.loop._process_events = mock.Mock() self.loop._process_events = mock.Mock()
mock_handler = mock.Mock() mock_handler = mock.Mock()
self.loop.set_exception_handler(mock_handler) self.loop.set_exception_handler(mock_handler)
run_loop() handle = run_loop()
mock_handler.assert_called_with(self.loop, { mock_handler.assert_called_with(self.loop, {
'exception': MOCK_ANY, 'exception': MOCK_ANY,
'message': test_utils.MockPattern( 'message': test_utils.MockPattern(
'Exception in callback.*zero_error'), 'Exception in callback.*zero_error'),
'handle': MOCK_ANY, 'handle': handle,
'source_traceback': handle._source_traceback,
}) })
mock_handler.reset_mock() mock_handler.reset_mock()

View File

@ -1751,10 +1751,11 @@ def noop(*args):
pass pass
class HandleTests(unittest.TestCase): class HandleTests(test_utils.TestCase):
def setUp(self): def setUp(self):
self.loop = None self.loop = mock.Mock()
self.loop.get_debug.return_value = True
def test_handle(self): def test_handle(self):
def callback(*args): def callback(*args):
@ -1789,7 +1790,8 @@ class HandleTests(unittest.TestCase):
self.loop.call_exception_handler.assert_called_with({ self.loop.call_exception_handler.assert_called_with({
'message': test_utils.MockPattern('Exception in callback.*'), 'message': test_utils.MockPattern('Exception in callback.*'),
'exception': mock.ANY, 'exception': mock.ANY,
'handle': h 'handle': h,
'source_traceback': h._source_traceback,
}) })
def test_handle_weakref(self): def test_handle_weakref(self):
@ -1837,6 +1839,35 @@ class HandleTests(unittest.TestCase):
% (cb_regex, re.escape(filename), lineno)) % (cb_regex, re.escape(filename), lineno))
self.assertRegex(repr(h), regex) self.assertRegex(repr(h), regex)
def test_handle_source_traceback(self):
loop = asyncio.get_event_loop_policy().new_event_loop()
loop.set_debug(True)
self.set_event_loop(loop)
def check_source_traceback(h):
lineno = sys._getframe(1).f_lineno - 1
self.assertIsInstance(h._source_traceback, list)
self.assertEqual(h._source_traceback[-1][:3],
(__file__,
lineno,
'test_handle_source_traceback'))
# call_soon
h = loop.call_soon(noop)
check_source_traceback(h)
# call_soon_threadsafe
h = loop.call_soon_threadsafe(noop)
check_source_traceback(h)
# call_later
h = loop.call_later(0, noop)
check_source_traceback(h)
# call_at
h = loop.call_later(0, noop)
check_source_traceback(h)
class TimerTests(unittest.TestCase): class TimerTests(unittest.TestCase):

View File

@ -2,8 +2,10 @@
import concurrent.futures import concurrent.futures
import re import re
import sys
import threading import threading
import unittest import unittest
from test import support
from unittest import mock from unittest import mock
import asyncio import asyncio
@ -284,6 +286,63 @@ class FutureTests(test_utils.TestCase):
self.assertEqual(f1.result(), 42) self.assertEqual(f1.result(), 42)
self.assertTrue(f2.cancelled()) self.assertTrue(f2.cancelled())
def test_future_source_traceback(self):
self.loop.set_debug(True)
future = asyncio.Future(loop=self.loop)
lineno = sys._getframe().f_lineno - 1
self.assertIsInstance(future._source_traceback, list)
self.assertEqual(future._source_traceback[-1][:3],
(__file__,
lineno,
'test_future_source_traceback'))
@mock.patch('asyncio.base_events.logger')
def test_future_exception_never_retrieved(self, m_log):
self.loop.set_debug(True)
def memroy_error():
try:
raise MemoryError()
except BaseException as exc:
return exc
exc = memroy_error()
future = asyncio.Future(loop=self.loop)
source_traceback = future._source_traceback
future.set_exception(exc)
future = None
test_utils.run_briefly(self.loop)
support.gc_collect()
if sys.version_info >= (3, 4):
frame = source_traceback[-1]
regex = (r'^Future exception was never retrieved\n'
r'future: <Future finished exception=MemoryError\(\)>\n'
r'source_traceback: Object created at \(most recent call last\):\n'
r' File'
r'.*\n'
r' File "%s", line %s, in test_future_exception_never_retrieved\n'
r' future = asyncio\.Future\(loop=self\.loop\)$'
% (frame[0], frame[1]))
exc_info = (type(exc), exc, exc.__traceback__)
m_log.error.assert_called_once_with(mock.ANY, exc_info=exc_info)
else:
frame = source_traceback[-1]
regex = (r'^Future/Task exception was never retrieved\n'
r'Future/Task created at \(most recent call last\):\n'
r' File'
r'.*\n'
r' File "%s", line %s, in test_future_exception_never_retrieved\n'
r' future = asyncio\.Future\(loop=self\.loop\)\n'
r'Traceback \(most recent call last\):\n'
r'.*\n'
r'MemoryError$'
% (frame[0], frame[1]))
m_log.error.assert_called_once_with(mock.ANY, exc_info=False)
message = m_log.error.call_args[0][0]
self.assertRegex(message, re.compile(regex, re.DOTALL))
class FutureDoneCallbackTests(test_utils.TestCase): class FutureDoneCallbackTests(test_utils.TestCase):

View File

@ -1546,6 +1546,7 @@ class TaskTests(test_utils.TestCase):
raise Exception("code never reached") raise Exception("code never reached")
mock_handler = mock.Mock() mock_handler = mock.Mock()
self.loop.set_debug(True)
self.loop.set_exception_handler(mock_handler) self.loop.set_exception_handler(mock_handler)
# schedule the task # schedule the task
@ -1560,6 +1561,7 @@ class TaskTests(test_utils.TestCase):
# remove the future used in kill_me(), and references to the task # remove the future used in kill_me(), and references to the task
del coro.gi_frame.f_locals['future'] del coro.gi_frame.f_locals['future']
coro = None coro = None
source_traceback = task._source_traceback
task = None task = None
# no more reference to kill_me() task: the task is destroyed by the GC # no more reference to kill_me() task: the task is destroyed by the GC
@ -1570,6 +1572,7 @@ class TaskTests(test_utils.TestCase):
mock_handler.assert_called_with(self.loop, { mock_handler.assert_called_with(self.loop, {
'message': 'Task was destroyed but it is pending!', 'message': 'Task was destroyed but it is pending!',
'task': mock.ANY, 'task': mock.ANY,
'source_traceback': source_traceback,
}) })
mock_handler.reset_mock() mock_handler.reset_mock()
@ -1604,6 +1607,17 @@ class TaskTests(test_utils.TestCase):
self.assertRegex(message, re.compile(regex, re.DOTALL)) self.assertRegex(message, re.compile(regex, re.DOTALL))
def test_task_source_traceback(self):
self.loop.set_debug(True)
task = asyncio.Task(coroutine_function(), loop=self.loop)
lineno = sys._getframe().f_lineno - 1
self.assertIsInstance(task._source_traceback, list)
self.assertEqual(task._source_traceback[-1][:3],
(__file__,
lineno,
'test_task_source_traceback'))
class GatherTestsBase: class GatherTestsBase: