mirror of https://github.com/python/cpython
629 lines
19 KiB
Python
629 lines
19 KiB
Python
import argparse
|
|
import contextlib
|
|
import fnmatch
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import sys
|
|
|
|
from . import fsutil, strutil, iterutil, logging as loggingutil
|
|
|
|
|
|
_NOT_SET = object()
|
|
|
|
|
|
def get_prog(spec=None, *, absolute=False, allowsuffix=True):
|
|
if spec is None:
|
|
_, spec = _find_script()
|
|
# This is more natural for prog than __file__ would be.
|
|
filename = sys.argv[0]
|
|
elif isinstance(spec, str):
|
|
filename = os.path.normpath(spec)
|
|
spec = None
|
|
else:
|
|
filename = spec.origin
|
|
if _is_standalone(filename):
|
|
# Check if "installed".
|
|
if allowsuffix or not filename.endswith('.py'):
|
|
basename = os.path.basename(filename)
|
|
found = shutil.which(basename)
|
|
if found:
|
|
script = os.path.abspath(filename)
|
|
found = os.path.abspath(found)
|
|
if os.path.normcase(script) == os.path.normcase(found):
|
|
return basename
|
|
# It is only "standalone".
|
|
if absolute:
|
|
filename = os.path.abspath(filename)
|
|
return filename
|
|
elif spec is not None:
|
|
module = spec.name
|
|
if module.endswith('.__main__'):
|
|
module = module[:-9]
|
|
return f'{sys.executable} -m {module}'
|
|
else:
|
|
if absolute:
|
|
filename = os.path.abspath(filename)
|
|
return f'{sys.executable} {filename}'
|
|
|
|
|
|
def _find_script():
|
|
frame = sys._getframe(2)
|
|
while frame.f_globals['__name__'] != '__main__':
|
|
frame = frame.f_back
|
|
|
|
# This should match sys.argv[0].
|
|
filename = frame.f_globals['__file__']
|
|
# This will be None if -m wasn't used..
|
|
spec = frame.f_globals['__spec__']
|
|
return filename, spec
|
|
|
|
|
|
def is_installed(filename, *, allowsuffix=True):
|
|
if not allowsuffix and filename.endswith('.py'):
|
|
return False
|
|
filename = os.path.abspath(os.path.normalize(filename))
|
|
found = shutil.which(os.path.basename(filename))
|
|
if not found:
|
|
return False
|
|
if found != filename:
|
|
return False
|
|
return _is_standalone(filename)
|
|
|
|
|
|
def is_standalone(filename):
|
|
filename = os.path.abspath(os.path.normalize(filename))
|
|
return _is_standalone(filename)
|
|
|
|
|
|
def _is_standalone(filename):
|
|
return fsutil.is_executable(filename)
|
|
|
|
|
|
##################################
|
|
# logging
|
|
|
|
VERBOSITY = 3
|
|
|
|
TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
|
|
TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def configure_logger(verbosity, logger=None, **kwargs):
|
|
if logger is None:
|
|
# Configure the root logger.
|
|
logger = logging.getLogger()
|
|
loggingutil.configure_logger(logger, verbosity, **kwargs)
|
|
|
|
|
|
##################################
|
|
# selections
|
|
|
|
class UnsupportedSelectionError(Exception):
|
|
def __init__(self, values, possible):
|
|
self.values = tuple(values)
|
|
self.possible = tuple(possible)
|
|
super().__init__(f'unsupported selections {self.unique}')
|
|
|
|
@property
|
|
def unique(self):
|
|
return tuple(sorted(set(self.values)))
|
|
|
|
|
|
def normalize_selection(selected: str, *, possible=None):
|
|
if selected in (None, True, False):
|
|
return selected
|
|
elif isinstance(selected, str):
|
|
selected = [selected]
|
|
elif not selected:
|
|
return ()
|
|
|
|
unsupported = []
|
|
_selected = set()
|
|
for item in selected:
|
|
if not item:
|
|
continue
|
|
for value in item.strip().replace(',', ' ').split():
|
|
if not value:
|
|
continue
|
|
# XXX Handle subtraction (leading "-").
|
|
if possible and value not in possible and value != 'all':
|
|
unsupported.append(value)
|
|
_selected.add(value)
|
|
if unsupported:
|
|
raise UnsupportedSelectionError(unsupported, tuple(possible))
|
|
if 'all' in _selected:
|
|
return True
|
|
return frozenset(selected)
|
|
|
|
|
|
##################################
|
|
# CLI parsing helpers
|
|
|
|
class CLIArgSpec(tuple):
|
|
def __new__(cls, *args, **kwargs):
|
|
return super().__new__(cls, (args, kwargs))
|
|
|
|
def __repr__(self):
|
|
args, kwargs = self
|
|
args = [repr(arg) for arg in args]
|
|
for name, value in kwargs.items():
|
|
args.append(f'{name}={value!r}')
|
|
return f'{type(self).__name__}({", ".join(args)})'
|
|
|
|
def __call__(self, parser, *, _noop=(lambda a: None)):
|
|
self.apply(parser)
|
|
return _noop
|
|
|
|
def apply(self, parser):
|
|
args, kwargs = self
|
|
parser.add_argument(*args, **kwargs)
|
|
|
|
|
|
def apply_cli_argspecs(parser, specs):
|
|
processors = []
|
|
for spec in specs:
|
|
if callable(spec):
|
|
procs = spec(parser)
|
|
_add_procs(processors, procs)
|
|
else:
|
|
args, kwargs = spec
|
|
parser.add_argument(args, kwargs)
|
|
return processors
|
|
|
|
|
|
def _add_procs(flattened, procs):
|
|
# XXX Fail on non-empty, non-callable procs?
|
|
if not procs:
|
|
return
|
|
if callable(procs):
|
|
flattened.append(procs)
|
|
else:
|
|
#processors.extend(p for p in procs if callable(p))
|
|
for proc in procs:
|
|
_add_procs(flattened, proc)
|
|
|
|
|
|
def add_verbosity_cli(parser):
|
|
parser.add_argument('-q', '--quiet', action='count', default=0)
|
|
parser.add_argument('-v', '--verbose', action='count', default=0)
|
|
|
|
def process_args(args):
|
|
ns = vars(args)
|
|
key = 'verbosity'
|
|
if key in ns:
|
|
parser.error(f'duplicate arg {key!r}')
|
|
ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
|
|
return key
|
|
return process_args
|
|
|
|
|
|
def add_traceback_cli(parser):
|
|
parser.add_argument('--traceback', '--tb', action='store_true',
|
|
default=TRACEBACK)
|
|
parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
|
|
action='store_const', const=False)
|
|
|
|
def process_args(args):
|
|
ns = vars(args)
|
|
key = 'traceback_cm'
|
|
if key in ns:
|
|
parser.error(f'duplicate arg {key!r}')
|
|
showtb = ns.pop('traceback')
|
|
|
|
@contextlib.contextmanager
|
|
def traceback_cm():
|
|
restore = loggingutil.hide_emit_errors()
|
|
try:
|
|
yield
|
|
except BrokenPipeError:
|
|
# It was piped to "head" or something similar.
|
|
pass
|
|
except NotImplementedError:
|
|
raise # re-raise
|
|
except Exception as exc:
|
|
if not showtb:
|
|
sys.exit(f'ERROR: {exc}')
|
|
raise # re-raise
|
|
except KeyboardInterrupt:
|
|
if not showtb:
|
|
sys.exit('\nINTERRUPTED')
|
|
raise # re-raise
|
|
except BaseException as exc:
|
|
if not showtb:
|
|
sys.exit(f'{type(exc).__name__}: {exc}')
|
|
raise # re-raise
|
|
finally:
|
|
restore()
|
|
ns[key] = traceback_cm()
|
|
return key
|
|
return process_args
|
|
|
|
|
|
def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
|
|
# if opt is True:
|
|
# parser.add_argument(f'--{dest}', action='append', **kwargs)
|
|
# elif isinstance(opt, str) and opt.startswith('-'):
|
|
# parser.add_argument(opt, dest=dest, action='append', **kwargs)
|
|
# else:
|
|
# arg = dest if not opt else opt
|
|
# kwargs.setdefault('nargs', '+')
|
|
# parser.add_argument(arg, dest=dest, action='append', **kwargs)
|
|
if not isinstance(opt, str):
|
|
parser.error(f'opt must be a string, got {opt!r}')
|
|
elif opt.startswith('-'):
|
|
parser.add_argument(opt, dest=dest, action='append', **kwargs)
|
|
else:
|
|
kwargs.setdefault('nargs', '+')
|
|
#kwargs.setdefault('metavar', opt.upper())
|
|
parser.add_argument(opt, dest=dest, action='append', **kwargs)
|
|
|
|
def process_args(args):
|
|
ns = vars(args)
|
|
|
|
# XXX Use normalize_selection()?
|
|
if isinstance(ns[dest], str):
|
|
ns[dest] = [ns[dest]]
|
|
selections = []
|
|
for many in ns[dest] or ():
|
|
for value in many.split(sep):
|
|
if value not in choices:
|
|
parser.error(f'unknown {dest} {value!r}')
|
|
selections.append(value)
|
|
ns[dest] = selections
|
|
return process_args
|
|
|
|
|
|
def add_files_cli(parser, *, excluded=None, nargs=None):
|
|
process_files = add_file_filtering_cli(parser, excluded=excluded)
|
|
parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
|
|
return [
|
|
process_files,
|
|
]
|
|
|
|
|
|
def add_file_filtering_cli(parser, *, excluded=None):
|
|
parser.add_argument('--start')
|
|
parser.add_argument('--include', action='append')
|
|
parser.add_argument('--exclude', action='append')
|
|
|
|
excluded = tuple(excluded or ())
|
|
|
|
def process_args(args):
|
|
ns = vars(args)
|
|
key = 'iter_filenames'
|
|
if key in ns:
|
|
parser.error(f'duplicate arg {key!r}')
|
|
|
|
_include = tuple(ns.pop('include') or ())
|
|
_exclude = excluded + tuple(ns.pop('exclude') or ())
|
|
kwargs = dict(
|
|
start=ns.pop('start'),
|
|
include=tuple(_parse_files(_include)),
|
|
exclude=tuple(_parse_files(_exclude)),
|
|
# We use the default for "show_header"
|
|
)
|
|
def process_filenames(filenames, relroot=None):
|
|
return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
|
|
ns[key] = process_filenames
|
|
return process_args
|
|
|
|
|
|
def _parse_files(filenames):
|
|
for filename, _ in strutil.parse_entries(filenames):
|
|
yield filename.strip()
|
|
|
|
|
|
def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
|
|
parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
|
|
parser.add_argument('--no-progress', dest='track_progress', action='store_false')
|
|
parser.set_defaults(track_progress=True)
|
|
|
|
def process_args(args):
|
|
if args.track_progress:
|
|
ns = vars(args)
|
|
verbosity = ns.get('verbosity', VERBOSITY)
|
|
if verbosity <= threshold:
|
|
args.track_progress = track_progress_compact
|
|
else:
|
|
args.track_progress = track_progress_flat
|
|
return process_args
|
|
|
|
|
|
def add_failure_filtering_cli(parser, pool, *, default=False):
|
|
parser.add_argument('--fail', action='append',
|
|
metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
|
|
parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
|
|
|
|
def process_args(args):
|
|
ns = vars(args)
|
|
|
|
fail = ns.pop('fail')
|
|
try:
|
|
fail = normalize_selection(fail, possible=pool)
|
|
except UnsupportedSelectionError as exc:
|
|
parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
|
|
else:
|
|
if fail is None:
|
|
fail = default
|
|
|
|
if fail is True:
|
|
def ignore_exc(_exc):
|
|
return False
|
|
elif fail is False:
|
|
def ignore_exc(_exc):
|
|
return True
|
|
else:
|
|
def ignore_exc(exc):
|
|
for err in fail:
|
|
if type(exc) == pool[err]:
|
|
return False
|
|
else:
|
|
return True
|
|
args.ignore_exc = ignore_exc
|
|
return process_args
|
|
|
|
|
|
def add_kind_filtering_cli(parser, *, default=None):
|
|
parser.add_argument('--kinds', action='append')
|
|
|
|
def process_args(args):
|
|
ns = vars(args)
|
|
|
|
kinds = []
|
|
for kind in ns.pop('kinds') or default or ():
|
|
kinds.extend(kind.strip().replace(',', ' ').split())
|
|
|
|
if not kinds:
|
|
match_kind = (lambda k: True)
|
|
else:
|
|
included = set()
|
|
excluded = set()
|
|
for kind in kinds:
|
|
if kind.startswith('-'):
|
|
kind = kind[1:]
|
|
excluded.add(kind)
|
|
if kind in included:
|
|
included.remove(kind)
|
|
else:
|
|
included.add(kind)
|
|
if kind in excluded:
|
|
excluded.remove(kind)
|
|
if excluded:
|
|
if included:
|
|
... # XXX fail?
|
|
def match_kind(kind, *, _excluded=excluded):
|
|
return kind not in _excluded
|
|
else:
|
|
def match_kind(kind, *, _included=included):
|
|
return kind in _included
|
|
args.match_kind = match_kind
|
|
return process_args
|
|
|
|
|
|
COMMON_CLI = [
|
|
add_verbosity_cli,
|
|
add_traceback_cli,
|
|
#add_dryrun_cli,
|
|
]
|
|
|
|
|
|
def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
|
|
arg_processors = {}
|
|
if isinstance(subset, str):
|
|
cmdname = subset
|
|
try:
|
|
_, argspecs, _ = commands[cmdname]
|
|
except KeyError:
|
|
raise ValueError(f'unsupported subset {subset!r}')
|
|
parser.set_defaults(cmd=cmdname)
|
|
arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
|
|
else:
|
|
if subset is None:
|
|
cmdnames = subset = list(commands)
|
|
elif not subset:
|
|
raise NotImplementedError
|
|
elif isinstance(subset, set):
|
|
cmdnames = [k for k in commands if k in subset]
|
|
subset = sorted(subset)
|
|
else:
|
|
cmdnames = [n for n in subset if n in commands]
|
|
if len(cmdnames) < len(subset):
|
|
bad = tuple(n for n in subset if n not in commands)
|
|
raise ValueError(f'unsupported subset {bad}')
|
|
|
|
common = argparse.ArgumentParser(add_help=False)
|
|
common_processors = apply_cli_argspecs(common, commonspecs)
|
|
subs = parser.add_subparsers(dest='cmd')
|
|
for cmdname in cmdnames:
|
|
description, argspecs, _ = commands[cmdname]
|
|
sub = subs.add_parser(
|
|
cmdname,
|
|
description=description,
|
|
parents=[common],
|
|
)
|
|
cmd_processors = _add_cmd_cli(sub, (), argspecs)
|
|
arg_processors[cmdname] = common_processors + cmd_processors
|
|
return arg_processors
|
|
|
|
|
|
def _add_cmd_cli(parser, commonspecs, argspecs):
|
|
processors = []
|
|
argspecs = list(commonspecs or ()) + list(argspecs or ())
|
|
for argspec in argspecs:
|
|
if callable(argspec):
|
|
procs = argspec(parser)
|
|
_add_procs(processors, procs)
|
|
else:
|
|
if not argspec:
|
|
raise NotImplementedError
|
|
args = list(argspec)
|
|
if not isinstance(args[-1], str):
|
|
kwargs = args.pop()
|
|
if not isinstance(args[0], str):
|
|
try:
|
|
args, = args
|
|
except (TypeError, ValueError):
|
|
parser.error(f'invalid cmd args {argspec!r}')
|
|
else:
|
|
kwargs = {}
|
|
parser.add_argument(*args, **kwargs)
|
|
# There will be nothing to process.
|
|
return processors
|
|
|
|
|
|
def _flatten_processors(processors):
|
|
for proc in processors:
|
|
if proc is None:
|
|
continue
|
|
if callable(proc):
|
|
yield proc
|
|
else:
|
|
yield from _flatten_processors(proc)
|
|
|
|
|
|
def process_args(args, processors, *, keys=None):
|
|
processors = _flatten_processors(processors)
|
|
ns = vars(args)
|
|
extracted = {}
|
|
if keys is None:
|
|
for process_args in processors:
|
|
for key in process_args(args):
|
|
extracted[key] = ns.pop(key)
|
|
else:
|
|
remainder = set(keys)
|
|
for process_args in processors:
|
|
hanging = process_args(args)
|
|
if isinstance(hanging, str):
|
|
hanging = [hanging]
|
|
for key in hanging or ():
|
|
if key not in remainder:
|
|
raise NotImplementedError(key)
|
|
extracted[key] = ns.pop(key)
|
|
remainder.remove(key)
|
|
if remainder:
|
|
raise NotImplementedError(sorted(remainder))
|
|
return extracted
|
|
|
|
|
|
def process_args_by_key(args, processors, keys):
|
|
extracted = process_args(args, processors, keys=keys)
|
|
return [extracted[key] for key in keys]
|
|
|
|
|
|
##################################
|
|
# commands
|
|
|
|
def set_command(name, add_cli):
|
|
"""A decorator factory to set CLI info."""
|
|
def decorator(func):
|
|
if hasattr(func, '__cli__'):
|
|
raise Exception(f'already set')
|
|
func.__cli__ = (name, add_cli)
|
|
return func
|
|
return decorator
|
|
|
|
|
|
##################################
|
|
# main() helpers
|
|
|
|
def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
|
|
# We expect each filename to be a normalized, absolute path.
|
|
for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
|
|
if (reason := check()):
|
|
logger.debug(f'{filename}: {reason}')
|
|
continue
|
|
yield filename
|
|
|
|
|
|
def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
|
|
filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
|
|
for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
|
|
if show:
|
|
print()
|
|
print(relfile)
|
|
print('-------------------------------------------')
|
|
if (reason := check()):
|
|
print(reason)
|
|
continue
|
|
yield filename, relfile
|
|
|
|
|
|
def _iter_filenames(filenames, process, relroot):
|
|
if process is None:
|
|
yield from fsutil.process_filenames(filenames, relroot=relroot)
|
|
return
|
|
|
|
onempty = Exception('no filenames provided')
|
|
items = process(filenames, relroot=relroot)
|
|
items, peeked = iterutil.peek_and_iter(items)
|
|
if not items:
|
|
raise onempty
|
|
if isinstance(peeked, str):
|
|
if relroot and relroot is not fsutil.USE_CWD:
|
|
relroot = os.path.abspath(relroot)
|
|
check = (lambda: True)
|
|
for filename, ismany in iterutil.iter_many(items, onempty):
|
|
relfile = fsutil.format_filename(filename, relroot, fixroot=False)
|
|
yield filename, relfile, check, ismany
|
|
elif len(peeked) == 4:
|
|
yield from items
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
|
|
def track_progress_compact(items, *, groups=5, **mark_kwargs):
|
|
last = os.linesep
|
|
marks = iter_marks(groups=groups, **mark_kwargs)
|
|
for item in items:
|
|
last = next(marks)
|
|
print(last, end='', flush=True)
|
|
yield item
|
|
if not last.endswith(os.linesep):
|
|
print()
|
|
|
|
|
|
def track_progress_flat(items, fmt='<{}>'):
|
|
for item in items:
|
|
print(fmt.format(item), flush=True)
|
|
yield item
|
|
|
|
|
|
def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
|
|
mark = mark or ''
|
|
group = group if group and group > 1 else 1
|
|
groups = groups if groups and groups > 1 else 1
|
|
|
|
sep = f'{mark}{sep}' if sep else mark
|
|
end = f'{mark}{os.linesep}'
|
|
div = os.linesep
|
|
perline = group * groups
|
|
if lines is _NOT_SET:
|
|
# By default we try to put about 100 in each line group.
|
|
perlines = 100 // perline * perline
|
|
elif not lines or lines < 0:
|
|
perlines = None
|
|
else:
|
|
perlines = perline * lines
|
|
|
|
if perline == 1:
|
|
yield end
|
|
elif group == 1:
|
|
yield sep
|
|
|
|
count = 1
|
|
while True:
|
|
if count % perline == 0:
|
|
yield end
|
|
if perlines and count % perlines == 0:
|
|
yield div
|
|
elif count % group == 0:
|
|
yield sep
|
|
else:
|
|
yield mark
|
|
count += 1
|