mirror of https://github.com/python/cpython
389 lines
11 KiB
Python
389 lines
11 KiB
Python
|
import fnmatch
|
||
|
import glob
|
||
|
import os
|
||
|
import os.path
|
||
|
import shutil
|
||
|
import stat
|
||
|
|
||
|
from .iterutil import iter_many
|
||
|
|
||
|
|
||
|
C_SOURCE_SUFFIXES = ('.c', '.h')
|
||
|
|
||
|
|
||
|
def create_backup(old, backup=None):
|
||
|
if isinstance(old, str):
|
||
|
filename = old
|
||
|
else:
|
||
|
filename = getattr(old, 'name', None)
|
||
|
if not filename:
|
||
|
return None
|
||
|
if not backup or backup is True:
|
||
|
backup = f'{filename}.bak'
|
||
|
try:
|
||
|
shutil.copyfile(filename, backup)
|
||
|
except FileNotFoundError as exc:
|
||
|
if exc.filename != filename:
|
||
|
raise # re-raise
|
||
|
backup = None
|
||
|
return backup
|
||
|
|
||
|
|
||
|
##################################
|
||
|
# find files
|
||
|
|
||
|
def match_glob(filename, pattern):
|
||
|
if fnmatch.fnmatch(filename, pattern):
|
||
|
return True
|
||
|
|
||
|
# fnmatch doesn't handle ** quite right. It will not match the
|
||
|
# following:
|
||
|
#
|
||
|
# ('x/spam.py', 'x/**/*.py')
|
||
|
# ('spam.py', '**/*.py')
|
||
|
#
|
||
|
# though it *will* match the following:
|
||
|
#
|
||
|
# ('x/y/spam.py', 'x/**/*.py')
|
||
|
# ('x/spam.py', '**/*.py')
|
||
|
|
||
|
if '**/' not in pattern:
|
||
|
return False
|
||
|
|
||
|
# We only accommodate the single-"**" case.
|
||
|
return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
|
||
|
|
||
|
|
||
|
def iter_filenames(filenames, *,
|
||
|
start=None,
|
||
|
include=None,
|
||
|
exclude=None,
|
||
|
):
|
||
|
onempty = Exception('no filenames provided')
|
||
|
for filename, solo in iter_many(filenames, onempty):
|
||
|
check, start = _get_check(filename, start, include, exclude)
|
||
|
yield filename, check, solo
|
||
|
# filenames = iter(filenames or ())
|
||
|
# try:
|
||
|
# first = next(filenames)
|
||
|
# except StopIteration:
|
||
|
# raise Exception('no filenames provided')
|
||
|
# try:
|
||
|
# second = next(filenames)
|
||
|
# except StopIteration:
|
||
|
# check, _ = _get_check(first, start, include, exclude)
|
||
|
# yield first, check, False
|
||
|
# return
|
||
|
#
|
||
|
# check, start = _get_check(first, start, include, exclude)
|
||
|
# yield first, check, True
|
||
|
# check, start = _get_check(second, start, include, exclude)
|
||
|
# yield second, check, True
|
||
|
# for filename in filenames:
|
||
|
# check, start = _get_check(filename, start, include, exclude)
|
||
|
# yield filename, check, True
|
||
|
|
||
|
|
||
|
def expand_filenames(filenames):
|
||
|
for filename in filenames:
|
||
|
# XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
|
||
|
if '**/' in filename:
|
||
|
yield from glob.glob(filename.replace('**/', ''))
|
||
|
yield from glob.glob(filename)
|
||
|
|
||
|
|
||
|
def _get_check(filename, start, include, exclude):
|
||
|
if start and filename != start:
|
||
|
return (lambda: '<skipped>'), start
|
||
|
else:
|
||
|
def check():
|
||
|
if _is_excluded(filename, exclude, include):
|
||
|
return '<excluded>'
|
||
|
return None
|
||
|
return check, None
|
||
|
|
||
|
|
||
|
def _is_excluded(filename, exclude, include):
|
||
|
if include:
|
||
|
for included in include:
|
||
|
if match_glob(filename, included):
|
||
|
return False
|
||
|
return True
|
||
|
elif exclude:
|
||
|
for excluded in exclude:
|
||
|
if match_glob(filename, excluded):
|
||
|
return True
|
||
|
return False
|
||
|
else:
|
||
|
return False
|
||
|
|
||
|
|
||
|
def _walk_tree(root, *,
|
||
|
_walk=os.walk,
|
||
|
):
|
||
|
# A wrapper around os.walk that resolves the filenames.
|
||
|
for parent, _, names in _walk(root):
|
||
|
for name in names:
|
||
|
yield os.path.join(parent, name)
|
||
|
|
||
|
|
||
|
def walk_tree(root, *,
|
||
|
suffix=None,
|
||
|
walk=_walk_tree,
|
||
|
):
|
||
|
"""Yield each file in the tree under the given directory name.
|
||
|
|
||
|
If "suffix" is provided then only files with that suffix will
|
||
|
be included.
|
||
|
"""
|
||
|
if suffix and not isinstance(suffix, str):
|
||
|
raise ValueError('suffix must be a string')
|
||
|
|
||
|
for filename in walk(root):
|
||
|
if suffix and not filename.endswith(suffix):
|
||
|
continue
|
||
|
yield filename
|
||
|
|
||
|
|
||
|
def glob_tree(root, *,
|
||
|
suffix=None,
|
||
|
_glob=glob.iglob,
|
||
|
):
|
||
|
"""Yield each file in the tree under the given directory name.
|
||
|
|
||
|
If "suffix" is provided then only files with that suffix will
|
||
|
be included.
|
||
|
"""
|
||
|
suffix = suffix or ''
|
||
|
if not isinstance(suffix, str):
|
||
|
raise ValueError('suffix must be a string')
|
||
|
|
||
|
for filename in _glob(f'{root}/*{suffix}'):
|
||
|
yield filename
|
||
|
for filename in _glob(f'{root}/**/*{suffix}'):
|
||
|
yield filename
|
||
|
|
||
|
|
||
|
def iter_files(root, suffix=None, relparent=None, *,
|
||
|
get_files=os.walk,
|
||
|
_glob=glob_tree,
|
||
|
_walk=walk_tree,
|
||
|
):
|
||
|
"""Yield each file in the tree under the given directory name.
|
||
|
|
||
|
If "root" is a non-string iterable then do the same for each of
|
||
|
those trees.
|
||
|
|
||
|
If "suffix" is provided then only files with that suffix will
|
||
|
be included.
|
||
|
|
||
|
if "relparent" is provided then it is used to resolve each
|
||
|
filename as a relative path.
|
||
|
"""
|
||
|
if not isinstance(root, str):
|
||
|
roots = root
|
||
|
for root in roots:
|
||
|
yield from iter_files(root, suffix, relparent,
|
||
|
get_files=get_files,
|
||
|
_glob=_glob, _walk=_walk)
|
||
|
return
|
||
|
|
||
|
# Use the right "walk" function.
|
||
|
if get_files in (glob.glob, glob.iglob, glob_tree):
|
||
|
get_files = _glob
|
||
|
else:
|
||
|
_files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
|
||
|
get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
|
||
|
|
||
|
# Handle a single suffix.
|
||
|
if suffix and not isinstance(suffix, str):
|
||
|
filenames = get_files(root)
|
||
|
suffix = tuple(suffix)
|
||
|
else:
|
||
|
filenames = get_files(root, suffix=suffix)
|
||
|
suffix = None
|
||
|
|
||
|
for filename in filenames:
|
||
|
if suffix and not isinstance(suffix, str): # multiple suffixes
|
||
|
if not filename.endswith(suffix):
|
||
|
continue
|
||
|
if relparent:
|
||
|
filename = os.path.relpath(filename, relparent)
|
||
|
yield filename
|
||
|
|
||
|
|
||
|
def iter_files_by_suffix(root, suffixes, relparent=None, *,
|
||
|
walk=walk_tree,
|
||
|
_iter_files=iter_files,
|
||
|
):
|
||
|
"""Yield each file in the tree that has the given suffixes.
|
||
|
|
||
|
Unlike iter_files(), the results are in the original suffix order.
|
||
|
"""
|
||
|
if isinstance(suffixes, str):
|
||
|
suffixes = [suffixes]
|
||
|
# XXX Ignore repeated suffixes?
|
||
|
for suffix in suffixes:
|
||
|
yield from _iter_files(root, suffix, relparent)
|
||
|
|
||
|
|
||
|
##################################
|
||
|
# file info
|
||
|
|
||
|
# XXX posix-only?
|
||
|
|
||
|
S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
|
||
|
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
|
||
|
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
|
||
|
|
||
|
|
||
|
def is_readable(file, *, user=None, check=False):
|
||
|
filename, st, mode = _get_file_info(file)
|
||
|
if check:
|
||
|
try:
|
||
|
okay = _check_file(filename, S_IRANY)
|
||
|
except NotImplementedError:
|
||
|
okay = NotImplemented
|
||
|
if okay is not NotImplemented:
|
||
|
return okay
|
||
|
# Fall back to checking the mode.
|
||
|
return _check_mode(st, mode, S_IRANY, user)
|
||
|
|
||
|
|
||
|
def is_writable(file, *, user=None, check=False):
|
||
|
filename, st, mode = _get_file_info(file)
|
||
|
if check:
|
||
|
try:
|
||
|
okay = _check_file(filename, S_IWANY)
|
||
|
except NotImplementedError:
|
||
|
okay = NotImplemented
|
||
|
if okay is not NotImplemented:
|
||
|
return okay
|
||
|
# Fall back to checking the mode.
|
||
|
return _check_mode(st, mode, S_IWANY, user)
|
||
|
|
||
|
|
||
|
def is_executable(file, *, user=None, check=False):
|
||
|
filename, st, mode = _get_file_info(file)
|
||
|
if check:
|
||
|
try:
|
||
|
okay = _check_file(filename, S_IXANY)
|
||
|
except NotImplementedError:
|
||
|
okay = NotImplemented
|
||
|
if okay is not NotImplemented:
|
||
|
return okay
|
||
|
# Fall back to checking the mode.
|
||
|
return _check_mode(st, mode, S_IXANY, user)
|
||
|
|
||
|
|
||
|
def _get_file_info(file):
|
||
|
filename = st = mode = None
|
||
|
if isinstance(file, int):
|
||
|
mode = file
|
||
|
elif isinstance(file, os.stat_result):
|
||
|
st = file
|
||
|
else:
|
||
|
if isinstance(file, str):
|
||
|
filename = file
|
||
|
elif hasattr(file, 'name') and os.path.exists(file.name):
|
||
|
filename = file.name
|
||
|
else:
|
||
|
raise NotImplementedError(file)
|
||
|
st = os.stat(filename)
|
||
|
return filename, st, mode or st.st_mode
|
||
|
|
||
|
|
||
|
def _check_file(filename, check):
|
||
|
if not isinstance(filename, str):
|
||
|
raise Exception(f'filename required to check file, got {filename}')
|
||
|
if check & S_IRANY:
|
||
|
flags = os.O_RDONLY
|
||
|
elif check & S_IWANY:
|
||
|
flags = os.O_WRONLY
|
||
|
elif check & S_IXANY:
|
||
|
# We can worry about S_IXANY later
|
||
|
return NotImplemented
|
||
|
else:
|
||
|
raise NotImplementedError(check)
|
||
|
|
||
|
try:
|
||
|
fd = os.open(filename, flags)
|
||
|
except PermissionError:
|
||
|
return False
|
||
|
# We do not ignore other exceptions.
|
||
|
else:
|
||
|
os.close(fd)
|
||
|
return True
|
||
|
|
||
|
|
||
|
def _get_user_info(user):
|
||
|
import pwd
|
||
|
username = uid = gid = groups = None
|
||
|
if user is None:
|
||
|
uid = os.geteuid()
|
||
|
#username = os.getlogin()
|
||
|
username = pwd.getpwuid(uid)[0]
|
||
|
gid = os.getgid()
|
||
|
groups = os.getgroups()
|
||
|
else:
|
||
|
if isinstance(user, int):
|
||
|
uid = user
|
||
|
entry = pwd.getpwuid(uid)
|
||
|
username = entry.pw_name
|
||
|
elif isinstance(user, str):
|
||
|
username = user
|
||
|
entry = pwd.getpwnam(username)
|
||
|
uid = entry.pw_uid
|
||
|
else:
|
||
|
raise NotImplementedError(user)
|
||
|
gid = entry.pw_gid
|
||
|
os.getgrouplist(username, gid)
|
||
|
return username, uid, gid, groups
|
||
|
|
||
|
|
||
|
def _check_mode(st, mode, check, user):
|
||
|
orig = check
|
||
|
_, uid, gid, groups = _get_user_info(user)
|
||
|
if check & S_IRANY:
|
||
|
check -= S_IRANY
|
||
|
matched = False
|
||
|
if mode & stat.S_IRUSR:
|
||
|
if st.st_uid == uid:
|
||
|
matched = True
|
||
|
if mode & stat.S_IRGRP:
|
||
|
if st.st_uid == gid or st.st_uid in groups:
|
||
|
matched = True
|
||
|
if mode & stat.S_IROTH:
|
||
|
matched = True
|
||
|
if not matched:
|
||
|
return False
|
||
|
if check & S_IWANY:
|
||
|
check -= S_IWANY
|
||
|
matched = False
|
||
|
if mode & stat.S_IWUSR:
|
||
|
if st.st_uid == uid:
|
||
|
matched = True
|
||
|
if mode & stat.S_IWGRP:
|
||
|
if st.st_uid == gid or st.st_uid in groups:
|
||
|
matched = True
|
||
|
if mode & stat.S_IWOTH:
|
||
|
matched = True
|
||
|
if not matched:
|
||
|
return False
|
||
|
if check & S_IXANY:
|
||
|
check -= S_IXANY
|
||
|
matched = False
|
||
|
if mode & stat.S_IXUSR:
|
||
|
if st.st_uid == uid:
|
||
|
matched = True
|
||
|
if mode & stat.S_IXGRP:
|
||
|
if st.st_uid == gid or st.st_uid in groups:
|
||
|
matched = True
|
||
|
if mode & stat.S_IXOTH:
|
||
|
matched = True
|
||
|
if not matched:
|
||
|
return False
|
||
|
if check:
|
||
|
raise NotImplementedError((orig, check))
|
||
|
return True
|