77 lines
2.3 KiB
Python
77 lines
2.3 KiB
Python
import functools
|
|
import inspect
|
|
import reprlib
|
|
import sys
|
|
import traceback
|
|
|
|
from . import constants
|
|
|
|
|
|
def _get_function_source(func):
|
|
func = inspect.unwrap(func)
|
|
if inspect.isfunction(func):
|
|
code = func.__code__
|
|
return (code.co_filename, code.co_firstlineno)
|
|
if isinstance(func, functools.partial):
|
|
return _get_function_source(func.func)
|
|
if isinstance(func, functools.partialmethod):
|
|
return _get_function_source(func.func)
|
|
return None
|
|
|
|
|
|
def _format_callback_source(func, args):
|
|
func_repr = _format_callback(func, args, None)
|
|
source = _get_function_source(func)
|
|
if source:
|
|
func_repr += f' at {source[0]}:{source[1]}'
|
|
return func_repr
|
|
|
|
|
|
def _format_args_and_kwargs(args, kwargs):
|
|
"""Format function arguments and keyword arguments.
|
|
|
|
Special case for a single parameter: ('hello',) is formatted as ('hello').
|
|
"""
|
|
# use reprlib to limit the length of the output
|
|
items = []
|
|
if args:
|
|
items.extend(reprlib.repr(arg) for arg in args)
|
|
if kwargs:
|
|
items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
|
|
return '({})'.format(', '.join(items))
|
|
|
|
|
|
def _format_callback(func, args, kwargs, suffix=''):
|
|
if isinstance(func, functools.partial):
|
|
suffix = _format_args_and_kwargs(args, kwargs) + suffix
|
|
return _format_callback(func.func, func.args, func.keywords, suffix)
|
|
|
|
if hasattr(func, '__qualname__') and func.__qualname__:
|
|
func_repr = func.__qualname__
|
|
elif hasattr(func, '__name__') and func.__name__:
|
|
func_repr = func.__name__
|
|
else:
|
|
func_repr = repr(func)
|
|
|
|
func_repr += _format_args_and_kwargs(args, kwargs)
|
|
if suffix:
|
|
func_repr += suffix
|
|
return func_repr
|
|
|
|
|
|
def extract_stack(f=None, limit=None):
|
|
"""Replacement for traceback.extract_stack() that only does the
|
|
necessary work for asyncio debug mode.
|
|
"""
|
|
if f is None:
|
|
f = sys._getframe().f_back
|
|
if limit is None:
|
|
# Limit the amount of work to a reasonable amount, as extract_stack()
|
|
# can be called for each coroutine and future in debug mode.
|
|
limit = constants.DEBUG_STACK_DEPTH
|
|
stack = traceback.StackSummary.extract(traceback.walk_stack(f),
|
|
limit=limit,
|
|
lookup_lines=False)
|
|
stack.reverse()
|
|
return stack
|