GH-89812: Add `pathlib._PathBase` (#106337)

Add private `pathlib._PathBase` class. This will be used by an experimental PyPI package to incubate a `tarfile.TarPath` class.

Co-authored-by: Adam Turner <9087854+AA-Turner@users.noreply.github.com>
This commit is contained in:
Barney Gale 2023-09-30 15:45:01 +01:00 committed by GitHub
parent 0449fe999d
commit 89966a694b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 687 additions and 161 deletions

View File

@ -5,6 +5,7 @@ paths with operations that have semantics appropriate for different
operating systems.
"""
import contextlib
import fnmatch
import functools
import io
@ -15,10 +16,19 @@ import re
import sys
import warnings
from _collections_abc import Sequence
from errno import ENOENT, ENOTDIR, EBADF, ELOOP
from errno import ENOENT, ENOTDIR, EBADF, ELOOP, EINVAL
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from urllib.parse import quote_from_bytes as urlquote_from_bytes
try:
import pwd
except ImportError:
pwd = None
try:
import grp
except ImportError:
grp = None
__all__ = [
"UnsupportedOperation",
@ -30,6 +40,9 @@ __all__ = [
# Internals
#
# Maximum number of symlinks to follow in _PathBase.resolve()
_MAX_SYMLINKS = 40
# Reference for Windows paths can be found at
# https://learn.microsoft.com/en-gb/windows/win32/fileio/naming-a-file .
_WIN_RESERVED_NAMES = frozenset(
@ -292,6 +305,11 @@ class PurePath:
# The `_hash` slot stores the hash of the case-normalized string
# path. It's set when `__hash__()` is called for the first time.
'_hash',
# The '_resolving' slot stores a boolean indicating whether the path
# is being processed by `_PathBase.resolve()`. This prevents duplicate
# work from occurring when `resolve()` calls `stat()` or `readlink()`.
'_resolving',
)
pathmod = os.path
@ -331,6 +349,7 @@ class PurePath:
f"not {type(path).__name__!r}")
paths.append(path)
self._raw_paths = paths
self._resolving = False
def with_segments(self, *pathsegments):
"""Construct a new path object from any number of path-like objects.
@ -416,7 +435,7 @@ class PurePath:
return "{}({!r})".format(self.__class__.__name__, self.as_posix())
def as_uri(self):
"""Return the path as a 'file' URI."""
"""Return the path as a URI."""
if not self.is_absolute():
raise ValueError("relative path can't be expressed as a file URI")
@ -691,7 +710,9 @@ class PurePath:
tail = self._tail
if not tail:
return self
return self._from_parsed_parts(drv, root, tail[:-1])
path = self._from_parsed_parts(drv, root, tail[:-1])
path._resolving = self._resolving
return path
@property
def parents(self):
@ -776,23 +797,35 @@ class PureWindowsPath(PurePath):
# Filesystem-accessing classes
class Path(PurePath):
"""PurePath subclass that can make system calls.
class _PathBase(PurePath):
"""Base class for concrete path objects.
Path represents a filesystem path but unlike PurePath, also offers
methods to do system calls on path objects. Depending on your system,
instantiating a Path will return either a PosixPath or a WindowsPath
object. You can also instantiate a PosixPath or WindowsPath directly,
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
This class provides dummy implementations for many methods that derived
classes can override selectively; the default implementations raise
UnsupportedOperation. The most basic methods, such as stat() and open(),
directly raise UnsupportedOperation; these basic methods are called by
other methods such as is_dir() and read_text().
The Path class derives this class to implement local filesystem paths.
Users may derive their own classes to implement virtual filesystem paths,
such as paths in archive files or on remote storage systems.
"""
__slots__ = ()
__bytes__ = None
__fspath__ = None # virtual paths have no local file system representation
def _unsupported(self, method_name):
msg = f"{type(self).__name__}.{method_name}() is unsupported"
if isinstance(self, Path):
msg += " on this system"
raise UnsupportedOperation(msg)
def stat(self, *, follow_symlinks=True):
"""
Return the result of the stat() system call on this path, like
os.stat() does.
"""
return os.stat(self, follow_symlinks=follow_symlinks)
self._unsupported("stat")
def lstat(self):
"""
@ -859,7 +892,21 @@ class Path(PurePath):
"""
Check if this path is a mount point
"""
return os.path.ismount(self)
# Need to exist and be a dir
if not self.exists() or not self.is_dir():
return False
try:
parent_dev = self.parent.stat().st_dev
except OSError:
return False
dev = self.stat().st_dev
if dev != parent_dev:
return True
ino = self.stat().st_ino
parent_ino = self.parent.stat().st_ino
return ino == parent_ino
def is_symlink(self):
"""
@ -880,7 +927,10 @@ class Path(PurePath):
"""
Whether this path is a junction.
"""
return os.path.isjunction(self)
# Junctions are a Windows-only feature, not present in POSIX nor the
# majority of virtual filesystems. There is no cross-platform idiom
# to check for junctions (using stat().st_mode).
return False
def is_block_device(self):
"""
@ -964,9 +1014,7 @@ class Path(PurePath):
Open the file pointed by this path and return a file object, as
the built-in open() function does.
"""
if "b" not in mode:
encoding = io.text_encoding(encoding)
return io.open(self, mode, buffering, encoding, errors, newline)
self._unsupported("open")
def read_bytes(self):
"""
@ -1009,13 +1057,12 @@ class Path(PurePath):
The children are yielded in arbitrary order, and the
special entries '.' and '..' are not included.
"""
return (self._make_child_relpath(name) for name in os.listdir(self))
self._unsupported("iterdir")
def _scandir(self):
# bpo-24132: a future version of pathlib will support subclassing of
# pathlib.Path to customize how the filesystem is accessed. This
# includes scandir(), which is used to implement glob().
return os.scandir(self)
# Emulate os.scandir(), which returns an object that can be used as a
# context manager. This method is called by walk() and glob().
return contextlib.nullcontext(self.iterdir())
def _make_child_relpath(self, name):
sep = self.pathmod.sep
@ -1144,13 +1191,13 @@ class Path(PurePath):
# blow up for a minor reason when (say) a thousand readable
# directories are still left to visit. That logic is copied here.
try:
scandir_it = path._scandir()
scandir_obj = path._scandir()
except OSError as error:
if on_error is not None:
on_error(error)
continue
with scandir_it:
with scandir_obj as scandir_it:
dirnames = []
filenames = []
for entry in scandir_it:
@ -1172,6 +1219,224 @@ class Path(PurePath):
paths += [path._make_child_relpath(d) for d in reversed(dirnames)]
def absolute(self):
"""Return an absolute version of this path
No normalization or symlink resolution is performed.
Use resolve() to resolve symlinks and remove '..' segments.
"""
self._unsupported("absolute")
@classmethod
def cwd(cls):
"""Return a new path pointing to the current working directory."""
# We call 'absolute()' rather than using 'os.getcwd()' directly to
# enable users to replace the implementation of 'absolute()' in a
# subclass and benefit from the new behaviour here. This works because
# os.path.abspath('.') == os.getcwd().
return cls().absolute()
def expanduser(self):
""" Return a new path with expanded ~ and ~user constructs
(as returned by os.path.expanduser)
"""
self._unsupported("expanduser")
@classmethod
def home(cls):
"""Return a new path pointing to expanduser('~').
"""
return cls("~").expanduser()
def readlink(self):
"""
Return the path to which the symbolic link points.
"""
self._unsupported("readlink")
readlink._supported = False
def _split_stack(self):
"""
Split the path into a 2-tuple (anchor, parts), where *anchor* is the
uppermost parent of the path (equivalent to path.parents[-1]), and
*parts* is a reversed list of parts following the anchor.
"""
return self._from_parsed_parts(self.drive, self.root, []), self._tail[::-1]
def resolve(self, strict=False):
"""
Make the path absolute, resolving all symlinks on the way and also
normalizing it.
"""
if self._resolving:
return self
try:
path = self.absolute()
except UnsupportedOperation:
path = self
# If the user has *not* overridden the `readlink()` method, then symlinks are unsupported
# and (in non-strict mode) we can improve performance by not calling `stat()`.
querying = strict or getattr(self.readlink, '_supported', True)
link_count = 0
stat_cache = {}
target_cache = {}
path, parts = path._split_stack()
while parts:
part = parts.pop()
if part == '..':
if not path._tail:
if path.root:
# Delete '..' segment immediately following root
continue
elif path._tail[-1] != '..':
# Delete '..' segment and its predecessor
path = path.parent
continue
# Join the current part onto the path.
path_parent = path
path = path._make_child_relpath(part)
if querying and part != '..':
path._resolving = True
try:
st = stat_cache.get(path)
if st is None:
st = stat_cache[path] = path.stat(follow_symlinks=False)
if S_ISLNK(st.st_mode):
# Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are
# encountered during resolution.
link_count += 1
if link_count >= _MAX_SYMLINKS:
raise OSError(ELOOP, "Too many symbolic links in path", str(path))
target = target_cache.get(path)
if target is None:
target = target_cache[path] = path.readlink()
target, target_parts = target._split_stack()
# If the symlink target is absolute (like '/etc/hosts'), set the current
# path to its uppermost parent (like '/'). If not, the symlink target is
# relative to the symlink parent, which we recorded earlier.
path = target if target.root else path_parent
# Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to
# the stack of unresolved path parts.
parts.extend(target_parts)
elif parts and not S_ISDIR(st.st_mode):
raise NotADirectoryError(ENOTDIR, "Not a directory", str(path))
except OSError:
if strict:
raise
else:
querying = False
path._resolving = False
return path
def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
self._unsupported("symlink_to")
def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.
Note the order of arguments (self, target) is the reverse of os.link's.
"""
self._unsupported("hardlink_to")
def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode, if it doesn't exist.
"""
self._unsupported("touch")
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
self._unsupported("mkdir")
def rename(self, target):
"""
Rename this path to the target path.
The target path may be absolute or relative. Relative paths are
interpreted relative to the current working directory, *not* the
directory of the Path object.
Returns the new Path instance pointing to the target path.
"""
self._unsupported("rename")
def replace(self, target):
"""
Rename this path to the target path, overwriting if that path exists.
The target path may be absolute or relative. Relative paths are
interpreted relative to the current working directory, *not* the
directory of the Path object.
Returns the new Path instance pointing to the target path.
"""
self._unsupported("replace")
def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
"""
self._unsupported("chmod")
def lchmod(self, mode):
"""
Like chmod(), except if the path points to a symlink, the symlink's
permissions are changed, rather than its target's.
"""
self.chmod(mode, follow_symlinks=False)
def unlink(self, missing_ok=False):
"""
Remove this file or link.
If the path is a directory, use rmdir() instead.
"""
self._unsupported("unlink")
def rmdir(self):
"""
Remove this directory. The directory must be empty.
"""
self._unsupported("rmdir")
def owner(self):
"""
Return the login name of the file owner.
"""
self._unsupported("owner")
def group(self):
"""
Return the group name of the file gid.
"""
self._unsupported("group")
def as_uri(self):
"""Return the path as a URI."""
self._unsupported("as_uri")
class Path(_PathBase):
"""PurePath subclass that can make system calls.
Path represents a filesystem path but unlike PurePath, also offers
methods to do system calls on path objects. Depending on your system,
instantiating a Path will return either a PosixPath or a WindowsPath
object. You can also instantiate a PosixPath or WindowsPath directly,
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
"""
__slots__ = ()
__bytes__ = PurePath.__bytes__
__fspath__ = PurePath.__fspath__
as_uri = PurePath.as_uri
def __init__(self, *args, **kwargs):
if kwargs:
msg = ("support for supplying keyword arguments to pathlib.PurePath "
@ -1184,27 +1449,51 @@ class Path(PurePath):
cls = WindowsPath if os.name == 'nt' else PosixPath
return object.__new__(cls)
@classmethod
def cwd(cls):
"""Return a new path pointing to the current working directory."""
# We call 'absolute()' rather than using 'os.getcwd()' directly to
# enable users to replace the implementation of 'absolute()' in a
# subclass and benefit from the new behaviour here. This works because
# os.path.abspath('.') == os.getcwd().
return cls().absolute()
@classmethod
def home(cls):
"""Return a new path pointing to the user's home directory (as
returned by os.path.expanduser('~')).
def stat(self, *, follow_symlinks=True):
"""
return cls("~").expanduser()
Return the result of the stat() system call on this path, like
os.stat() does.
"""
return os.stat(self, follow_symlinks=follow_symlinks)
def is_mount(self):
"""
Check if this path is a mount point
"""
return os.path.ismount(self)
def is_junction(self):
"""
Whether this path is a junction.
"""
return os.path.isjunction(self)
def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
Open the file pointed by this path and return a file object, as
the built-in open() function does.
"""
if "b" not in mode:
encoding = io.text_encoding(encoding)
return io.open(self, mode, buffering, encoding, errors, newline)
def iterdir(self):
"""Yield path objects of the directory contents.
The children are yielded in arbitrary order, and the
special entries '.' and '..' are not included.
"""
return (self._make_child_relpath(name) for name in os.listdir(self))
def _scandir(self):
return os.scandir(self)
def absolute(self):
"""Return an absolute version of this path by prepending the current
working directory. No normalization or symlink resolution is performed.
"""Return an absolute version of this path
No normalization or symlink resolution is performed.
Use resolve() to get the canonical path to a file.
Use resolve() to resolve symlinks and remove '..' segments.
"""
if self.is_absolute():
return self
@ -1232,34 +1521,26 @@ class Path(PurePath):
return self.with_segments(os.path.realpath(self, strict=strict))
def owner(self):
"""
Return the login name of the file owner.
"""
try:
import pwd
if pwd:
def owner(self):
"""
Return the login name of the file owner.
"""
return pwd.getpwuid(self.stat().st_uid).pw_name
except ImportError:
raise UnsupportedOperation("Path.owner() is unsupported on this system")
def group(self):
"""
Return the group name of the file gid.
"""
try:
import grp
if grp:
def group(self):
"""
Return the group name of the file gid.
"""
return grp.getgrgid(self.stat().st_gid).gr_name
except ImportError:
raise UnsupportedOperation("Path.group() is unsupported on this system")
def readlink(self):
"""
Return the path to which the symbolic link points.
"""
if not hasattr(os, "readlink"):
raise UnsupportedOperation("os.readlink() not available on this system")
return self.with_segments(os.readlink(self))
if hasattr(os, "readlink"):
def readlink(self):
"""
Return the path to which the symbolic link points.
"""
return self.with_segments(os.readlink(self))
def touch(self, mode=0o666, exist_ok=True):
"""
@ -1306,13 +1587,6 @@ class Path(PurePath):
"""
os.chmod(self, mode, follow_symlinks=follow_symlinks)
def lchmod(self, mode):
"""
Like chmod(), except if the path points to a symlink, the symlink's
permissions are changed, rather than its target's.
"""
self.chmod(mode, follow_symlinks=False)
def unlink(self, missing_ok=False):
"""
Remove this file or link.
@ -1356,24 +1630,22 @@ class Path(PurePath):
os.replace(self, target)
return self.with_segments(target)
def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
if not hasattr(os, "symlink"):
raise UnsupportedOperation("os.symlink() not available on this system")
os.symlink(target, self, target_is_directory)
if hasattr(os, "symlink"):
def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
os.symlink(target, self, target_is_directory)
def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.
if hasattr(os, "link"):
def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.
Note the order of arguments (self, target) is the reverse of os.link's.
"""
if not hasattr(os, "link"):
raise UnsupportedOperation("os.link() not available on this system")
os.link(target, self)
Note the order of arguments (self, target) is the reverse of os.link's.
"""
os.link(target, self)
def expanduser(self):
""" Return a new path with expanded ~ and ~user constructs

View File

@ -1582,14 +1582,172 @@ class WindowsPathAsPureTest(PureWindowsPathTest):
#
# Tests for the concrete classes.
# Tests for the virtual classes.
#
class PathTest(unittest.TestCase):
"""Tests for the FS-accessing functionalities of the Path classes."""
class PathBaseTest(PurePathTest):
cls = pathlib._PathBase
cls = pathlib.Path
can_symlink = os_helper.can_symlink()
def test_unsupported_operation(self):
P = self.cls
p = self.cls()
e = pathlib.UnsupportedOperation
self.assertRaises(e, p.stat)
self.assertRaises(e, p.lstat)
self.assertRaises(e, p.exists)
self.assertRaises(e, p.samefile, 'foo')
self.assertRaises(e, p.is_dir)
self.assertRaises(e, p.is_file)
self.assertRaises(e, p.is_mount)
self.assertRaises(e, p.is_symlink)
self.assertRaises(e, p.is_block_device)
self.assertRaises(e, p.is_char_device)
self.assertRaises(e, p.is_fifo)
self.assertRaises(e, p.is_socket)
self.assertRaises(e, p.open)
self.assertRaises(e, p.read_bytes)
self.assertRaises(e, p.read_text)
self.assertRaises(e, p.write_bytes, b'foo')
self.assertRaises(e, p.write_text, 'foo')
self.assertRaises(e, p.iterdir)
self.assertRaises(e, p.glob, '*')
self.assertRaises(e, p.rglob, '*')
self.assertRaises(e, lambda: list(p.walk()))
self.assertRaises(e, p.absolute)
self.assertRaises(e, P.cwd)
self.assertRaises(e, p.expanduser)
self.assertRaises(e, p.home)
self.assertRaises(e, p.readlink)
self.assertRaises(e, p.symlink_to, 'foo')
self.assertRaises(e, p.hardlink_to, 'foo')
self.assertRaises(e, p.mkdir)
self.assertRaises(e, p.touch)
self.assertRaises(e, p.rename, 'foo')
self.assertRaises(e, p.replace, 'foo')
self.assertRaises(e, p.chmod, 0o755)
self.assertRaises(e, p.lchmod, 0o755)
self.assertRaises(e, p.unlink)
self.assertRaises(e, p.rmdir)
self.assertRaises(e, p.owner)
self.assertRaises(e, p.group)
self.assertRaises(e, p.as_uri)
def test_as_uri_common(self):
e = pathlib.UnsupportedOperation
self.assertRaises(e, self.cls().as_uri)
def test_fspath_common(self):
self.assertRaises(TypeError, os.fspath, self.cls())
def test_as_bytes_common(self):
self.assertRaises(TypeError, bytes, self.cls())
def test_matches_path_api(self):
our_names = {name for name in dir(self.cls) if name[0] != '_'}
path_names = {name for name in dir(pathlib.Path) if name[0] != '_'}
self.assertEqual(our_names, path_names)
for attr_name in our_names:
our_attr = getattr(self.cls, attr_name)
path_attr = getattr(pathlib.Path, attr_name)
self.assertEqual(our_attr.__doc__, path_attr.__doc__)
class DummyPathIO(io.BytesIO):
"""
Used by DummyPath to implement `open('w')`
"""
def __init__(self, files, path):
super().__init__()
self.files = files
self.path = path
def close(self):
self.files[self.path] = self.getvalue()
super().close()
class DummyPath(pathlib._PathBase):
"""
Simple implementation of PathBase that keeps files and directories in
memory.
"""
_files = {}
_directories = {}
_symlinks = {}
def stat(self, *, follow_symlinks=True):
if follow_symlinks:
path = str(self.resolve())
else:
path = str(self.parent.resolve() / self.name)
if path in self._files:
st_mode = stat.S_IFREG
elif path in self._directories:
st_mode = stat.S_IFDIR
elif path in self._symlinks:
st_mode = stat.S_IFLNK
else:
raise FileNotFoundError(errno.ENOENT, "Not found", str(self))
return os.stat_result((st_mode, hash(str(self)), 0, 0, 0, 0, 0, 0, 0, 0))
def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
if buffering != -1:
raise NotImplementedError
path_obj = self.resolve()
path = str(path_obj)
name = path_obj.name
parent = str(path_obj.parent)
if path in self._directories:
raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
text = 'b' not in mode
mode = ''.join(c for c in mode if c not in 'btU')
if mode == 'r':
if path not in self._files:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
stream = io.BytesIO(self._files[path])
elif mode == 'w':
if parent not in self._directories:
raise FileNotFoundError(errno.ENOENT, "File not found", parent)
stream = DummyPathIO(self._files, path)
self._files[path] = b''
self._directories[parent].add(name)
else:
raise NotImplementedError
if text:
stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline)
return stream
def iterdir(self):
path = str(self.resolve())
if path in self._files:
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", path)
elif path in self._directories:
return (self / name for name in self._directories[path])
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
try:
self._directories[str(self.parent)].add(self.name)
self._directories[str(self)] = set()
except KeyError:
if not parents or self.parent == self:
raise FileNotFoundError(errno.ENOENT, "File not found", str(self.parent)) from None
self.parent.mkdir(parents=True, exist_ok=True)
self.mkdir(mode, parents=False, exist_ok=exist_ok)
except FileExistsError:
if not exist_ok:
raise
class DummyPathTest(unittest.TestCase):
"""Tests for PathBase methods that use stat(), open() and iterdir()."""
cls = DummyPath
can_symlink = False
# (BASE)
# |
@ -1612,37 +1770,38 @@ class PathTest(unittest.TestCase):
#
def setUp(self):
def cleanup():
os.chmod(join('dirE'), 0o777)
os_helper.rmtree(BASE)
self.addCleanup(cleanup)
os.mkdir(BASE)
os.mkdir(join('dirA'))
os.mkdir(join('dirB'))
os.mkdir(join('dirC'))
os.mkdir(join('dirC', 'dirD'))
os.mkdir(join('dirE'))
with open(join('fileA'), 'wb') as f:
f.write(b"this is file A\n")
with open(join('dirB', 'fileB'), 'wb') as f:
f.write(b"this is file B\n")
with open(join('dirC', 'fileC'), 'wb') as f:
f.write(b"this is file C\n")
with open(join('dirC', 'novel.txt'), 'wb') as f:
f.write(b"this is a novel\n")
with open(join('dirC', 'dirD', 'fileD'), 'wb') as f:
f.write(b"this is file D\n")
os.chmod(join('dirE'), 0)
if self.can_symlink:
# Relative symlinks.
os.symlink('fileA', join('linkA'))
os.symlink('non-existing', join('brokenLink'))
os.symlink('dirB', join('linkB'), target_is_directory=True)
os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True)
# This one goes upwards, creating a loop.
os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True)
# Broken symlink (pointing to itself).
os.symlink('brokenLinkLoop', join('brokenLinkLoop'))
# note: this must be kept in sync with `PathTest.setUp()`
cls = self.cls
cls._files.clear()
cls._directories.clear()
cls._symlinks.clear()
join = cls.pathmod.join
cls._files.update({
join(BASE, 'fileA'): b'this is file A\n',
join(BASE, 'dirB', 'fileB'): b'this is file B\n',
join(BASE, 'dirC', 'fileC'): b'this is file C\n',
join(BASE, 'dirC', 'dirD', 'fileD'): b'this is file D\n',
join(BASE, 'dirC', 'novel.txt'): b'this is a novel\n',
})
cls._directories.update({
BASE: {'dirA', 'dirB', 'dirC', 'dirE', 'fileA'},
join(BASE, 'dirA'): set(),
join(BASE, 'dirB'): {'fileB'},
join(BASE, 'dirC'): {'dirD', 'fileC', 'novel.txt'},
join(BASE, 'dirC', 'dirD'): {'fileD'},
join(BASE, 'dirE'): {},
})
dirname = BASE
while True:
dirname, basename = cls.pathmod.split(dirname)
if not basename:
break
cls._directories[dirname] = {basename}
def tempdir(self):
path = self.cls(BASE).with_name('tmp-dirD')
path.mkdir()
return path
def assertFileNotFound(self, func, *args, **kwargs):
with self.assertRaises(FileNotFoundError) as cm:
@ -1991,9 +2150,11 @@ class PathTest(unittest.TestCase):
def test_glob_many_open_files(self):
depth = 30
P = self.cls
base = P(BASE) / 'deep'
p = P(base, *(['d']*depth))
p.mkdir(parents=True)
p = base = P(BASE) / 'deep'
p.mkdir()
for _ in range(depth):
p /= 'd'
p.mkdir()
pattern = '/'.join(['*'] * depth)
iters = [base.glob(pattern) for j in range(100)]
for it in iters:
@ -2080,6 +2241,7 @@ class PathTest(unittest.TestCase):
self.assertEqual((P / 'brokenLink').readlink(),
self.cls('non-existing'))
self.assertEqual((P / 'linkB').readlink(), self.cls('dirB'))
self.assertEqual((P / 'linkB' / 'linkD').readlink(), self.cls('../dirB'))
with self.assertRaises(OSError):
(P / 'fileA').readlink()
@ -2128,7 +2290,7 @@ class PathTest(unittest.TestCase):
self._check_resolve_relative(p, P(BASE, 'dirB', 'fileB', 'foo', 'in',
'spam'), False)
p = P(BASE, 'dirA', 'linkC', '..', 'foo', 'in', 'spam')
if os.name == 'nt':
if os.name == 'nt' and isinstance(p, pathlib.Path):
# In Windows, if linkY points to dirB, 'dirA\linkY\..'
# resolves to 'dirA' without resolving linkY first.
self._check_resolve_relative(p, P(BASE, 'dirA', 'foo', 'in',
@ -2138,9 +2300,7 @@ class PathTest(unittest.TestCase):
# resolves to 'dirB/..' first before resolving to parent of dirB.
self._check_resolve_relative(p, P(BASE, 'foo', 'in', 'spam'), False)
# Now create absolute symlinks.
d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
dir=os.getcwd()))
self.addCleanup(os_helper.rmtree, d)
d = self.tempdir()
P(BASE, 'dirA', 'linkX').symlink_to(d)
P(BASE, str(d), 'linkY').symlink_to(join('dirB'))
p = P(BASE, 'dirA', 'linkX', 'linkY', 'fileB')
@ -2150,7 +2310,7 @@ class PathTest(unittest.TestCase):
self._check_resolve_relative(p, P(BASE, 'dirB', 'foo', 'in', 'spam'),
False)
p = P(BASE, 'dirA', 'linkX', 'linkY', '..', 'foo', 'in', 'spam')
if os.name == 'nt':
if os.name == 'nt' and isinstance(p, pathlib.Path):
# In Windows, if linkY points to dirB, 'dirA\linkY\..'
# resolves to 'dirA' without resolving linkY first.
self._check_resolve_relative(p, P(d, 'foo', 'in', 'spam'), False)
@ -2174,6 +2334,38 @@ class PathTest(unittest.TestCase):
# Non-strict
self.assertEqual(r.resolve(strict=False), p / '3' / '4')
def _check_symlink_loop(self, *args):
path = self.cls(*args)
with self.assertRaises(OSError) as cm:
path.resolve(strict=True)
self.assertEqual(cm.exception.errno, errno.ELOOP)
def test_resolve_loop(self):
if not self.can_symlink:
self.skipTest("symlinks required")
if os.name == 'nt' and issubclass(self.cls, pathlib.Path):
self.skipTest("symlink loops work differently with concrete Windows paths")
# Loops with relative symlinks.
self.cls(BASE, 'linkX').symlink_to('linkX/inside')
self._check_symlink_loop(BASE, 'linkX')
self.cls(BASE, 'linkY').symlink_to('linkY')
self._check_symlink_loop(BASE, 'linkY')
self.cls(BASE, 'linkZ').symlink_to('linkZ/../linkZ')
self._check_symlink_loop(BASE, 'linkZ')
# Non-strict
p = self.cls(BASE, 'linkZ', 'foo')
self.assertEqual(p.resolve(strict=False), p)
# Loops with absolute symlinks.
self.cls(BASE, 'linkU').symlink_to(join('linkU/inside'))
self._check_symlink_loop(BASE, 'linkU')
self.cls(BASE, 'linkV').symlink_to(join('linkV'))
self._check_symlink_loop(BASE, 'linkV')
self.cls(BASE, 'linkW').symlink_to(join('linkW/../linkW'))
self._check_symlink_loop(BASE, 'linkW')
# Non-strict
q = self.cls(BASE, 'linkW', 'foo')
self.assertEqual(q.resolve(strict=False), q)
def test_stat(self):
statA = self.cls(BASE).joinpath('fileA').stat()
statB = self.cls(BASE).joinpath('dirB', 'fileB').stat()
@ -2382,6 +2574,10 @@ class PathTest(unittest.TestCase):
self.assertEqualNormCase(str(p), BASE)
# Resolve relative paths.
try:
self.cls().absolute()
except pathlib.UnsupportedOperation:
return
old_path = os.getcwd()
os.chdir(BASE)
try:
@ -2409,6 +2605,92 @@ class PathTest(unittest.TestCase):
def test_complex_symlinks_relative_dot_dot(self):
self._check_complex_symlinks(os.path.join('dirA', '..'))
class DummyPathWithSymlinks(DummyPath):
def readlink(self):
path = str(self.parent.resolve() / self.name)
if path in self._symlinks:
return self.with_segments(self._symlinks[path])
elif path in self._files or path in self._directories:
raise OSError(errno.EINVAL, "Not a symlink", path)
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
def symlink_to(self, target, target_is_directory=False):
self._directories[str(self.parent)].add(self.name)
self._symlinks[str(self)] = str(target)
class DummyPathWithSymlinksTest(DummyPathTest):
cls = DummyPathWithSymlinks
can_symlink = True
def setUp(self):
super().setUp()
cls = self.cls
join = cls.pathmod.join
cls._symlinks.update({
join(BASE, 'linkA'): 'fileA',
join(BASE, 'linkB'): 'dirB',
join(BASE, 'dirA', 'linkC'): join('..', 'dirB'),
join(BASE, 'dirB', 'linkD'): join('..', 'dirB'),
join(BASE, 'brokenLink'): 'non-existing',
join(BASE, 'brokenLinkLoop'): 'brokenLinkLoop',
})
cls._directories[BASE].update({'linkA', 'linkB', 'brokenLink', 'brokenLinkLoop'})
cls._directories[join(BASE, 'dirA')].add('linkC')
cls._directories[join(BASE, 'dirB')].add('linkD')
#
# Tests for the concrete classes.
#
class PathTest(DummyPathTest):
"""Tests for the FS-accessing functionalities of the Path classes."""
cls = pathlib.Path
can_symlink = os_helper.can_symlink()
def setUp(self):
# note: this must be kept in sync with `DummyPathTest.setUp()`
def cleanup():
os.chmod(join('dirE'), 0o777)
os_helper.rmtree(BASE)
self.addCleanup(cleanup)
os.mkdir(BASE)
os.mkdir(join('dirA'))
os.mkdir(join('dirB'))
os.mkdir(join('dirC'))
os.mkdir(join('dirC', 'dirD'))
os.mkdir(join('dirE'))
with open(join('fileA'), 'wb') as f:
f.write(b"this is file A\n")
with open(join('dirB', 'fileB'), 'wb') as f:
f.write(b"this is file B\n")
with open(join('dirC', 'fileC'), 'wb') as f:
f.write(b"this is file C\n")
with open(join('dirC', 'novel.txt'), 'wb') as f:
f.write(b"this is a novel\n")
with open(join('dirC', 'dirD', 'fileD'), 'wb') as f:
f.write(b"this is file D\n")
os.chmod(join('dirE'), 0)
if self.can_symlink:
# Relative symlinks.
os.symlink('fileA', join('linkA'))
os.symlink('non-existing', join('brokenLink'))
os.symlink('dirB', join('linkB'), target_is_directory=True)
os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True)
# This one goes upwards, creating a loop.
os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True)
# Broken symlink (pointing to itself).
os.symlink('brokenLinkLoop', join('brokenLinkLoop'))
def tempdir(self):
d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
dir=os.getcwd()))
self.addCleanup(os_helper.rmtree, d)
return d
def test_concrete_class(self):
if self.cls is pathlib.Path:
expected = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath
@ -3178,12 +3460,6 @@ class PosixPathTest(PathTest):
self.assertEqual(str(P('//a').absolute()), '//a')
self.assertEqual(str(P('//a/b').absolute()), '//a/b')
def _check_symlink_loop(self, *args):
path = self.cls(*args)
with self.assertRaises(OSError) as cm:
path.resolve(strict=True)
self.assertEqual(cm.exception.errno, errno.ELOOP)
@unittest.skipIf(
is_emscripten or is_wasi,
"umask is not implemented on Emscripten/WASI."
@ -3230,30 +3506,6 @@ class PosixPathTest(PathTest):
st = os.stat(join('masked_new_file'))
self.assertEqual(stat.S_IMODE(st.st_mode), 0o750)
def test_resolve_loop(self):
if not self.can_symlink:
self.skipTest("symlinks required")
# Loops with relative symlinks.
os.symlink('linkX/inside', join('linkX'))
self._check_symlink_loop(BASE, 'linkX')
os.symlink('linkY', join('linkY'))
self._check_symlink_loop(BASE, 'linkY')
os.symlink('linkZ/../linkZ', join('linkZ'))
self._check_symlink_loop(BASE, 'linkZ')
# Non-strict
p = self.cls(BASE, 'linkZ', 'foo')
self.assertEqual(p.resolve(strict=False), p)
# Loops with absolute symlinks.
os.symlink(join('linkU/inside'), join('linkU'))
self._check_symlink_loop(BASE, 'linkU')
os.symlink(join('linkV'), join('linkV'))
self._check_symlink_loop(BASE, 'linkV')
os.symlink(join('linkW/../linkW'), join('linkW'))
self._check_symlink_loop(BASE, 'linkW')
# Non-strict
q = self.cls(BASE, 'linkW', 'foo')
self.assertEqual(q.resolve(strict=False), q)
def test_glob(self):
P = self.cls
p = P(BASE)

View File

@ -0,0 +1,2 @@
Add private ``pathlib._PathBase`` class, which provides experimental support
for virtual filesystems, and may be made public in a future version of Python.