459 lines
13 KiB
Python
459 lines
13 KiB
Python
import fnmatch
|
|
import glob
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import stat
|
|
|
|
from .iterutil import iter_many
|
|
|
|
|
|
USE_CWD = object()
|
|
|
|
|
|
C_SOURCE_SUFFIXES = ('.c', '.h')
|
|
|
|
|
|
def create_backup(old, backup=None):
|
|
if isinstance(old, str):
|
|
filename = old
|
|
else:
|
|
filename = getattr(old, 'name', None)
|
|
if not filename:
|
|
return None
|
|
if not backup or backup is True:
|
|
backup = f'{filename}.bak'
|
|
try:
|
|
shutil.copyfile(filename, backup)
|
|
except FileNotFoundError as exc:
|
|
if exc.filename != filename:
|
|
raise # re-raise
|
|
backup = None
|
|
return backup
|
|
|
|
|
|
##################################
|
|
# filenames
|
|
|
|
def fix_filename(filename, relroot=USE_CWD, *,
|
|
fixroot=True,
|
|
_badprefix=f'..{os.path.sep}',
|
|
):
|
|
"""Return a normalized, absolute-path copy of the given filename."""
|
|
if not relroot or relroot is USE_CWD:
|
|
return os.path.abspath(filename)
|
|
if fixroot:
|
|
relroot = os.path.abspath(relroot)
|
|
return _fix_filename(filename, relroot)
|
|
|
|
|
|
def _fix_filename(filename, relroot, *,
|
|
_badprefix=f'..{os.path.sep}',
|
|
):
|
|
orig = filename
|
|
|
|
# First we normalize.
|
|
filename = os.path.normpath(filename)
|
|
if filename.startswith(_badprefix):
|
|
raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
|
|
|
|
# Now make sure it is absolute (relative to relroot).
|
|
if not os.path.isabs(filename):
|
|
filename = os.path.join(relroot, filename)
|
|
else:
|
|
relpath = os.path.relpath(filename, relroot)
|
|
if os.path.join(relroot, relpath) != filename:
|
|
raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')
|
|
|
|
return filename
|
|
|
|
|
|
def fix_filenames(filenames, relroot=USE_CWD):
|
|
if not relroot or relroot is USE_CWD:
|
|
filenames = (os.path.abspath(v) for v in filenames)
|
|
else:
|
|
relroot = os.path.abspath(relroot)
|
|
filenames = (_fix_filename(v, relroot) for v in filenames)
|
|
return filenames, relroot
|
|
|
|
|
|
def format_filename(filename, relroot=USE_CWD, *,
|
|
fixroot=True,
|
|
normalize=True,
|
|
_badprefix=f'..{os.path.sep}',
|
|
):
|
|
"""Return a consistent relative-path representation of the filename."""
|
|
orig = filename
|
|
if normalize:
|
|
filename = os.path.normpath(filename)
|
|
if relroot is None:
|
|
# Otherwise leave it as-is.
|
|
return filename
|
|
elif relroot is USE_CWD:
|
|
# Make it relative to CWD.
|
|
filename = os.path.relpath(filename)
|
|
else:
|
|
# Make it relative to "relroot".
|
|
if fixroot:
|
|
relroot = os.path.abspath(relroot)
|
|
elif not relroot:
|
|
raise ValueError('missing relroot')
|
|
filename = os.path.relpath(filename, relroot)
|
|
if filename.startswith(_badprefix):
|
|
raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
|
|
return filename
|
|
|
|
|
|
##################################
|
|
# find files
|
|
|
|
def match_glob(filename, pattern):
|
|
if fnmatch.fnmatch(filename, pattern):
|
|
return True
|
|
|
|
# fnmatch doesn't handle ** quite right. It will not match the
|
|
# following:
|
|
#
|
|
# ('x/spam.py', 'x/**/*.py')
|
|
# ('spam.py', '**/*.py')
|
|
#
|
|
# though it *will* match the following:
|
|
#
|
|
# ('x/y/spam.py', 'x/**/*.py')
|
|
# ('x/spam.py', '**/*.py')
|
|
|
|
if '**/' not in pattern:
|
|
return False
|
|
|
|
# We only accommodate the single-"**" case.
|
|
return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
|
|
|
|
|
|
def process_filenames(filenames, *,
|
|
start=None,
|
|
include=None,
|
|
exclude=None,
|
|
relroot=USE_CWD,
|
|
):
|
|
if relroot and relroot is not USE_CWD:
|
|
relroot = os.path.abspath(relroot)
|
|
if start:
|
|
start = fix_filename(start, relroot, fixroot=False)
|
|
if include:
|
|
include = set(fix_filename(v, relroot, fixroot=False)
|
|
for v in include)
|
|
if exclude:
|
|
exclude = set(fix_filename(v, relroot, fixroot=False)
|
|
for v in exclude)
|
|
|
|
onempty = Exception('no filenames provided')
|
|
for filename, solo in iter_many(filenames, onempty):
|
|
filename = fix_filename(filename, relroot, fixroot=False)
|
|
relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
|
|
check, start = _get_check(filename, start, include, exclude)
|
|
yield filename, relfile, check, solo
|
|
|
|
|
|
def expand_filenames(filenames):
|
|
for filename in filenames:
|
|
# XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
|
|
if '**/' in filename:
|
|
yield from glob.glob(filename.replace('**/', ''))
|
|
yield from glob.glob(filename)
|
|
|
|
|
|
def _get_check(filename, start, include, exclude):
|
|
if start and filename != start:
|
|
return (lambda: '<skipped>'), start
|
|
else:
|
|
def check():
|
|
if _is_excluded(filename, exclude, include):
|
|
return '<excluded>'
|
|
return None
|
|
return check, None
|
|
|
|
|
|
def _is_excluded(filename, exclude, include):
|
|
if include:
|
|
for included in include:
|
|
if match_glob(filename, included):
|
|
return False
|
|
return True
|
|
elif exclude:
|
|
for excluded in exclude:
|
|
if match_glob(filename, excluded):
|
|
return True
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
|
|
def _walk_tree(root, *,
|
|
_walk=os.walk,
|
|
):
|
|
# A wrapper around os.walk that resolves the filenames.
|
|
for parent, _, names in _walk(root):
|
|
for name in names:
|
|
yield os.path.join(parent, name)
|
|
|
|
|
|
def walk_tree(root, *,
|
|
suffix=None,
|
|
walk=_walk_tree,
|
|
):
|
|
"""Yield each file in the tree under the given directory name.
|
|
|
|
If "suffix" is provided then only files with that suffix will
|
|
be included.
|
|
"""
|
|
if suffix and not isinstance(suffix, str):
|
|
raise ValueError('suffix must be a string')
|
|
|
|
for filename in walk(root):
|
|
if suffix and not filename.endswith(suffix):
|
|
continue
|
|
yield filename
|
|
|
|
|
|
def glob_tree(root, *,
|
|
suffix=None,
|
|
_glob=glob.iglob,
|
|
):
|
|
"""Yield each file in the tree under the given directory name.
|
|
|
|
If "suffix" is provided then only files with that suffix will
|
|
be included.
|
|
"""
|
|
suffix = suffix or ''
|
|
if not isinstance(suffix, str):
|
|
raise ValueError('suffix must be a string')
|
|
|
|
for filename in _glob(f'{root}/*{suffix}'):
|
|
yield filename
|
|
for filename in _glob(f'{root}/**/*{suffix}'):
|
|
yield filename
|
|
|
|
|
|
def iter_files(root, suffix=None, relparent=None, *,
|
|
get_files=os.walk,
|
|
_glob=glob_tree,
|
|
_walk=walk_tree,
|
|
):
|
|
"""Yield each file in the tree under the given directory name.
|
|
|
|
If "root" is a non-string iterable then do the same for each of
|
|
those trees.
|
|
|
|
If "suffix" is provided then only files with that suffix will
|
|
be included.
|
|
|
|
if "relparent" is provided then it is used to resolve each
|
|
filename as a relative path.
|
|
"""
|
|
if not isinstance(root, str):
|
|
roots = root
|
|
for root in roots:
|
|
yield from iter_files(root, suffix, relparent,
|
|
get_files=get_files,
|
|
_glob=_glob, _walk=_walk)
|
|
return
|
|
|
|
# Use the right "walk" function.
|
|
if get_files in (glob.glob, glob.iglob, glob_tree):
|
|
get_files = _glob
|
|
else:
|
|
_files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
|
|
get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
|
|
|
|
# Handle a single suffix.
|
|
if suffix and not isinstance(suffix, str):
|
|
filenames = get_files(root)
|
|
suffix = tuple(suffix)
|
|
else:
|
|
filenames = get_files(root, suffix=suffix)
|
|
suffix = None
|
|
|
|
for filename in filenames:
|
|
if suffix and not isinstance(suffix, str): # multiple suffixes
|
|
if not filename.endswith(suffix):
|
|
continue
|
|
if relparent:
|
|
filename = os.path.relpath(filename, relparent)
|
|
yield filename
|
|
|
|
|
|
def iter_files_by_suffix(root, suffixes, relparent=None, *,
|
|
walk=walk_tree,
|
|
_iter_files=iter_files,
|
|
):
|
|
"""Yield each file in the tree that has the given suffixes.
|
|
|
|
Unlike iter_files(), the results are in the original suffix order.
|
|
"""
|
|
if isinstance(suffixes, str):
|
|
suffixes = [suffixes]
|
|
# XXX Ignore repeated suffixes?
|
|
for suffix in suffixes:
|
|
yield from _iter_files(root, suffix, relparent)
|
|
|
|
|
|
##################################
|
|
# file info
|
|
|
|
# XXX posix-only?
|
|
|
|
S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
|
|
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
|
|
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
|
|
|
|
|
|
def is_readable(file, *, user=None, check=False):
|
|
filename, st, mode = _get_file_info(file)
|
|
if check:
|
|
try:
|
|
okay = _check_file(filename, S_IRANY)
|
|
except NotImplementedError:
|
|
okay = NotImplemented
|
|
if okay is not NotImplemented:
|
|
return okay
|
|
# Fall back to checking the mode.
|
|
return _check_mode(st, mode, S_IRANY, user)
|
|
|
|
|
|
def is_writable(file, *, user=None, check=False):
|
|
filename, st, mode = _get_file_info(file)
|
|
if check:
|
|
try:
|
|
okay = _check_file(filename, S_IWANY)
|
|
except NotImplementedError:
|
|
okay = NotImplemented
|
|
if okay is not NotImplemented:
|
|
return okay
|
|
# Fall back to checking the mode.
|
|
return _check_mode(st, mode, S_IWANY, user)
|
|
|
|
|
|
def is_executable(file, *, user=None, check=False):
|
|
filename, st, mode = _get_file_info(file)
|
|
if check:
|
|
try:
|
|
okay = _check_file(filename, S_IXANY)
|
|
except NotImplementedError:
|
|
okay = NotImplemented
|
|
if okay is not NotImplemented:
|
|
return okay
|
|
# Fall back to checking the mode.
|
|
return _check_mode(st, mode, S_IXANY, user)
|
|
|
|
|
|
def _get_file_info(file):
|
|
filename = st = mode = None
|
|
if isinstance(file, int):
|
|
mode = file
|
|
elif isinstance(file, os.stat_result):
|
|
st = file
|
|
else:
|
|
if isinstance(file, str):
|
|
filename = file
|
|
elif hasattr(file, 'name') and os.path.exists(file.name):
|
|
filename = file.name
|
|
else:
|
|
raise NotImplementedError(file)
|
|
st = os.stat(filename)
|
|
return filename, st, mode or st.st_mode
|
|
|
|
|
|
def _check_file(filename, check):
|
|
if not isinstance(filename, str):
|
|
raise Exception(f'filename required to check file, got {filename}')
|
|
if check & S_IRANY:
|
|
flags = os.O_RDONLY
|
|
elif check & S_IWANY:
|
|
flags = os.O_WRONLY
|
|
elif check & S_IXANY:
|
|
# We can worry about S_IXANY later
|
|
return NotImplemented
|
|
else:
|
|
raise NotImplementedError(check)
|
|
|
|
try:
|
|
fd = os.open(filename, flags)
|
|
except PermissionError:
|
|
return False
|
|
# We do not ignore other exceptions.
|
|
else:
|
|
os.close(fd)
|
|
return True
|
|
|
|
|
|
def _get_user_info(user):
|
|
import pwd
|
|
username = uid = gid = groups = None
|
|
if user is None:
|
|
uid = os.geteuid()
|
|
#username = os.getlogin()
|
|
username = pwd.getpwuid(uid)[0]
|
|
gid = os.getgid()
|
|
groups = os.getgroups()
|
|
else:
|
|
if isinstance(user, int):
|
|
uid = user
|
|
entry = pwd.getpwuid(uid)
|
|
username = entry.pw_name
|
|
elif isinstance(user, str):
|
|
username = user
|
|
entry = pwd.getpwnam(username)
|
|
uid = entry.pw_uid
|
|
else:
|
|
raise NotImplementedError(user)
|
|
gid = entry.pw_gid
|
|
os.getgrouplist(username, gid)
|
|
return username, uid, gid, groups
|
|
|
|
|
|
def _check_mode(st, mode, check, user):
|
|
orig = check
|
|
_, uid, gid, groups = _get_user_info(user)
|
|
if check & S_IRANY:
|
|
check -= S_IRANY
|
|
matched = False
|
|
if mode & stat.S_IRUSR:
|
|
if st.st_uid == uid:
|
|
matched = True
|
|
if mode & stat.S_IRGRP:
|
|
if st.st_uid == gid or st.st_uid in groups:
|
|
matched = True
|
|
if mode & stat.S_IROTH:
|
|
matched = True
|
|
if not matched:
|
|
return False
|
|
if check & S_IWANY:
|
|
check -= S_IWANY
|
|
matched = False
|
|
if mode & stat.S_IWUSR:
|
|
if st.st_uid == uid:
|
|
matched = True
|
|
if mode & stat.S_IWGRP:
|
|
if st.st_uid == gid or st.st_uid in groups:
|
|
matched = True
|
|
if mode & stat.S_IWOTH:
|
|
matched = True
|
|
if not matched:
|
|
return False
|
|
if check & S_IXANY:
|
|
check -= S_IXANY
|
|
matched = False
|
|
if mode & stat.S_IXUSR:
|
|
if st.st_uid == uid:
|
|
matched = True
|
|
if mode & stat.S_IXGRP:
|
|
if st.st_uid == gid or st.st_uid in groups:
|
|
matched = True
|
|
if mode & stat.S_IXOTH:
|
|
matched = True
|
|
if not matched:
|
|
return False
|
|
if check:
|
|
raise NotImplementedError((orig, check))
|
|
return True
|