pathlib ABCs: remove duplicate `realpath()` implementation. (#119178)

Add private `posixpath._realpath()` function, which is a generic version of `realpath()` that can be parameterised with string tokens (`sep`, `curdir`, `pardir`) and query functions (`getcwd`, `lstat`, `readlink`). Also add support for limiting the number of symlink traversals.

In the private `pathlib._abc.PathBase` class, call `posixpath._realpath()` and remove our re-implementation of the same algorithm.

No change to any public APIs, either in `posixpath` or `pathlib`.

Co-authored-by: Nice Zombies <nineteendo19d0@gmail.com>
This commit is contained in:
Barney Gale 2024-06-05 18:54:50 +01:00 committed by GitHub
parent 14e3c7071b
commit e83ce850f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 56 additions and 69 deletions

View File

@ -12,8 +12,8 @@ resemble pathlib's PurePath and Path respectively.
""" """
import functools import functools
import posixpath
from glob import _Globber, _no_recurse_symlinks from glob import _Globber, _no_recurse_symlinks
from errno import ENOTDIR, ELOOP
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
@ -696,65 +696,34 @@ class PathBase(PurePathBase):
""" """
if self._resolving: if self._resolving:
return self return self
path_root, parts = self._stack
path = self.with_segments(path_root)
try:
path = path.absolute()
except UnsupportedOperation:
path_tail = []
else:
path_root, path_tail = path._stack
path_tail.reverse()
# If the user has *not* overridden the `readlink()` method, then symlinks are unsupported def getcwd():
# and (in non-strict mode) we can improve performance by not calling `stat()`. return str(self.with_segments().absolute())
querying = strict or getattr(self.readlink, '_supported', True)
link_count = 0 if strict or getattr(self.readlink, '_supported', True):
while parts: def lstat(path_str):
part = parts.pop() path = self.with_segments(path_str)
if not part or part == '.':
continue
if part == '..':
if not path_tail:
if path_root:
# Delete '..' segment immediately following root
continue
elif path_tail[-1] != '..':
# Delete '..' segment and its predecessor
path_tail.pop()
continue
path_tail.append(part)
if querying and part != '..':
path = self.with_segments(path_root + self.parser.sep.join(path_tail))
path._resolving = True path._resolving = True
try: return path.lstat()
st = path.stat(follow_symlinks=False)
if S_ISLNK(st.st_mode): def readlink(path_str):
# Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are path = self.with_segments(path_str)
# encountered during resolution. path._resolving = True
link_count += 1 return str(path.readlink())
if link_count >= self._max_symlinks: else:
raise OSError(ELOOP, "Too many symbolic links in path", self._raw_path) # If the user has *not* overridden the `readlink()` method, then
target_root, target_parts = path.readlink()._stack # symlinks are unsupported and (in non-strict mode) we can improve
# If the symlink target is absolute (like '/etc/hosts'), set the current # performance by not calling `path.lstat()`.
# path to its uppermost parent (like '/'). def skip(path_str):
if target_root: # This exception will be internally consumed by `_realpath()`.
path_root = target_root raise OSError("Operation skipped.")
path_tail.clear()
else: lstat = readlink = skip
path_tail.pop()
# Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to return self.with_segments(posixpath._realpath(
# the stack of unresolved path parts. str(self), strict, self.parser.sep,
parts.extend(target_parts) getcwd=getcwd, lstat=lstat, readlink=readlink,
continue maxlinks=self._max_symlinks))
elif parts and not S_ISDIR(st.st_mode):
raise NotADirectoryError(ENOTDIR, "Not a directory", self._raw_path)
except OSError:
if strict:
raise
else:
querying = False
return self.with_segments(path_root + self.parser.sep.join(path_tail))
def symlink_to(self, target, target_is_directory=False): def symlink_to(self, target, target_is_directory=False):
""" """

View File

@ -22,6 +22,7 @@ defpath = '/bin:/usr/bin'
altsep = None altsep = None
devnull = '/dev/null' devnull = '/dev/null'
import errno
import os import os
import sys import sys
import stat import stat
@ -401,7 +402,10 @@ symbolic links encountered in the path."""
curdir = '.' curdir = '.'
pardir = '..' pardir = '..'
getcwd = os.getcwd getcwd = os.getcwd
return _realpath(filename, strict, sep, curdir, pardir, getcwd)
def _realpath(filename, strict=False, sep=sep, curdir=curdir, pardir=pardir,
getcwd=os.getcwd, lstat=os.lstat, readlink=os.readlink, maxlinks=None):
# The stack of unresolved path parts. When popped, a special value of None # The stack of unresolved path parts. When popped, a special value of None
# indicates that a symlink target has been resolved, and that the original # indicates that a symlink target has been resolved, and that the original
# symlink path can be retrieved by popping again. The [::-1] slice is a # symlink path can be retrieved by popping again. The [::-1] slice is a
@ -418,6 +422,10 @@ symbolic links encountered in the path."""
# the same links. # the same links.
seen = {} seen = {}
# Number of symlinks traversed. When the number of traversals is limited
# by *maxlinks*, this is used instead of *seen* to detect symlink loops.
link_count = 0
while rest: while rest:
name = rest.pop() name = rest.pop()
if name is None: if name is None:
@ -436,11 +444,19 @@ symbolic links encountered in the path."""
else: else:
newpath = path + sep + name newpath = path + sep + name
try: try:
st = os.lstat(newpath) st = lstat(newpath)
if not stat.S_ISLNK(st.st_mode): if not stat.S_ISLNK(st.st_mode):
path = newpath path = newpath
continue continue
if newpath in seen: elif maxlinks is not None:
link_count += 1
if link_count > maxlinks:
if strict:
raise OSError(errno.ELOOP, os.strerror(errno.ELOOP),
newpath)
path = newpath
continue
elif newpath in seen:
# Already seen this path # Already seen this path
path = seen[newpath] path = seen[newpath]
if path is not None: if path is not None:
@ -448,26 +464,28 @@ symbolic links encountered in the path."""
continue continue
# The symlink is not resolved, so we must have a symlink loop. # The symlink is not resolved, so we must have a symlink loop.
if strict: if strict:
# Raise OSError(errno.ELOOP) raise OSError(errno.ELOOP, os.strerror(errno.ELOOP),
os.stat(newpath) newpath)
path = newpath path = newpath
continue continue
target = os.readlink(newpath) target = readlink(newpath)
except OSError: except OSError:
if strict: if strict:
raise raise
path = newpath path = newpath
continue continue
# Resolve the symbolic link # Resolve the symbolic link
seen[newpath] = None # not resolved symlink
if target.startswith(sep): if target.startswith(sep):
# Symlink target is absolute; reset resolved path. # Symlink target is absolute; reset resolved path.
path = sep path = sep
# Push the symlink path onto the stack, and signal its specialness by if maxlinks is None:
# also pushing None. When these entries are popped, we'll record the # Mark this symlink as seen but not fully resolved.
# fully-resolved symlink target in the 'seen' mapping. seen[newpath] = None
rest.append(newpath) # Push the symlink path onto the stack, and signal its specialness
rest.append(None) # by also pushing None. When these entries are popped, we'll
# record the fully-resolved symlink target in the 'seen' mapping.
rest.append(newpath)
rest.append(None)
# Push the unresolved symlink target parts onto the stack. # Push the unresolved symlink target parts onto the stack.
rest.extend(target.split(sep)[::-1]) rest.extend(target.split(sep)[::-1])