mirror of https://github.com/python/cpython
bpo-43012: remove `pathlib._Accessor` (GH-25701)
Per Pitrou: > The original intent for the “accessor” thing was to have a variant that did all accesses under a filesystem tree in a race condition-free way using openat and friends. It turned out to be much too hairy to actually implement, so was entirely abandoned, but the accessor abstraction was left there. https://discuss.python.org/t/make-pathlib-extensible/3428/2 Accessors are: - Lacking any internal purpose - '_NormalAccessor' is the only implementation - Lacking any firm conceptual difference to `Path` objects themselves (inc. subclasses) - Non-public, i.e. underscore prefixed - '_Accessor' and '_NormalAccessor' - Unofficially used to implement customized `Path` objects, but once once [bpo-24132]() is addressed there will be a supported route for that. This patch preserves all existing behaviour.
This commit is contained in:
parent
187930f74c
commit
08f8301b21
171
Lib/pathlib.py
171
Lib/pathlib.py
|
@ -275,93 +275,6 @@ _windows_flavour = _WindowsFlavour()
|
|||
_posix_flavour = _PosixFlavour()
|
||||
|
||||
|
||||
class _Accessor:
|
||||
"""An accessor implements a particular (system-specific or not) way of
|
||||
accessing paths on the filesystem."""
|
||||
|
||||
|
||||
class _NormalAccessor(_Accessor):
|
||||
|
||||
stat = os.stat
|
||||
|
||||
open = io.open
|
||||
|
||||
listdir = os.listdir
|
||||
|
||||
scandir = os.scandir
|
||||
|
||||
chmod = os.chmod
|
||||
|
||||
mkdir = os.mkdir
|
||||
|
||||
unlink = os.unlink
|
||||
|
||||
if hasattr(os, "link"):
|
||||
link = os.link
|
||||
else:
|
||||
def link(self, src, dst):
|
||||
raise NotImplementedError("os.link() not available on this system")
|
||||
|
||||
rmdir = os.rmdir
|
||||
|
||||
rename = os.rename
|
||||
|
||||
replace = os.replace
|
||||
|
||||
if hasattr(os, "symlink"):
|
||||
symlink = os.symlink
|
||||
else:
|
||||
def symlink(self, src, dst, target_is_directory=False):
|
||||
raise NotImplementedError("os.symlink() not available on this system")
|
||||
|
||||
def touch(self, path, mode=0o666, exist_ok=True):
|
||||
if exist_ok:
|
||||
# First try to bump modification time
|
||||
# Implementation note: GNU touch uses the UTIME_NOW option of
|
||||
# the utimensat() / futimens() functions.
|
||||
try:
|
||||
os.utime(path, None)
|
||||
except OSError:
|
||||
# Avoid exception chaining
|
||||
pass
|
||||
else:
|
||||
return
|
||||
flags = os.O_CREAT | os.O_WRONLY
|
||||
if not exist_ok:
|
||||
flags |= os.O_EXCL
|
||||
fd = os.open(path, flags, mode)
|
||||
os.close(fd)
|
||||
|
||||
if hasattr(os, "readlink"):
|
||||
readlink = os.readlink
|
||||
else:
|
||||
def readlink(self, path):
|
||||
raise NotImplementedError("os.readlink() not available on this system")
|
||||
|
||||
def owner(self, path):
|
||||
try:
|
||||
import pwd
|
||||
return pwd.getpwuid(self.stat(path).st_uid).pw_name
|
||||
except ImportError:
|
||||
raise NotImplementedError("Path.owner() is unsupported on this system")
|
||||
|
||||
def group(self, path):
|
||||
try:
|
||||
import grp
|
||||
return grp.getgrgid(self.stat(path).st_gid).gr_name
|
||||
except ImportError:
|
||||
raise NotImplementedError("Path.group() is unsupported on this system")
|
||||
|
||||
getcwd = os.getcwd
|
||||
|
||||
expanduser = staticmethod(os.path.expanduser)
|
||||
|
||||
realpath = staticmethod(os.path.realpath)
|
||||
|
||||
|
||||
_normal_accessor = _NormalAccessor()
|
||||
|
||||
|
||||
#
|
||||
# Globbing helpers
|
||||
#
|
||||
|
@ -402,7 +315,7 @@ class _Selector:
|
|||
path_cls = type(parent_path)
|
||||
is_dir = path_cls.is_dir
|
||||
exists = path_cls.exists
|
||||
scandir = parent_path._accessor.scandir
|
||||
scandir = path_cls._scandir
|
||||
if not is_dir(parent_path):
|
||||
return iter([])
|
||||
return self._select_from(parent_path, is_dir, exists, scandir)
|
||||
|
@ -949,7 +862,6 @@ class Path(PurePath):
|
|||
object. You can also instantiate a PosixPath or WindowsPath directly,
|
||||
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
|
||||
"""
|
||||
_accessor = _normal_accessor
|
||||
__slots__ = ()
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
|
@ -988,7 +900,7 @@ class Path(PurePath):
|
|||
"""Return a new path pointing to the current working directory
|
||||
(as returned by os.getcwd()).
|
||||
"""
|
||||
return cls(cls._accessor.getcwd())
|
||||
return cls(os.getcwd())
|
||||
|
||||
@classmethod
|
||||
def home(cls):
|
||||
|
@ -1005,16 +917,22 @@ class Path(PurePath):
|
|||
try:
|
||||
other_st = other_path.stat()
|
||||
except AttributeError:
|
||||
other_st = self._accessor.stat(other_path)
|
||||
other_st = self.__class__(other_path).stat()
|
||||
return os.path.samestat(st, other_st)
|
||||
|
||||
def iterdir(self):
|
||||
"""Iterate over the files in this directory. Does not yield any
|
||||
result for the special paths '.' and '..'.
|
||||
"""
|
||||
for name in self._accessor.listdir(self):
|
||||
for name in os.listdir(self):
|
||||
yield self._make_child_relpath(name)
|
||||
|
||||
def _scandir(self):
|
||||
# bpo-24132: a future version of pathlib will support subclassing of
|
||||
# pathlib.Path to customize how the filesystem is accessed. This
|
||||
# includes scandir(), which is used to implement glob().
|
||||
return os.scandir(self)
|
||||
|
||||
def glob(self, pattern):
|
||||
"""Iterate over this subtree and yield all existing files (of any
|
||||
kind, including directories) matching the given relative pattern.
|
||||
|
@ -1050,7 +968,7 @@ class Path(PurePath):
|
|||
"""
|
||||
if self.is_absolute():
|
||||
return self
|
||||
return self._from_parts([self._accessor.getcwd()] + self._parts)
|
||||
return self._from_parts([self.cwd()] + self._parts)
|
||||
|
||||
def resolve(self, strict=False):
|
||||
"""
|
||||
|
@ -1064,7 +982,7 @@ class Path(PurePath):
|
|||
raise RuntimeError("Symlink loop from %r" % e.filename)
|
||||
|
||||
try:
|
||||
s = self._accessor.realpath(self, strict=strict)
|
||||
s = os.path.realpath(self, strict=strict)
|
||||
except OSError as e:
|
||||
check_eloop(e)
|
||||
raise
|
||||
|
@ -1084,19 +1002,28 @@ class Path(PurePath):
|
|||
Return the result of the stat() system call on this path, like
|
||||
os.stat() does.
|
||||
"""
|
||||
return self._accessor.stat(self, follow_symlinks=follow_symlinks)
|
||||
return os.stat(self, follow_symlinks=follow_symlinks)
|
||||
|
||||
def owner(self):
|
||||
"""
|
||||
Return the login name of the file owner.
|
||||
"""
|
||||
return self._accessor.owner(self)
|
||||
try:
|
||||
import pwd
|
||||
return pwd.getpwuid(self.stat().st_uid).pw_name
|
||||
except ImportError:
|
||||
raise NotImplementedError("Path.owner() is unsupported on this system")
|
||||
|
||||
def group(self):
|
||||
"""
|
||||
Return the group name of the file gid.
|
||||
"""
|
||||
return self._accessor.group(self)
|
||||
|
||||
try:
|
||||
import grp
|
||||
return grp.getgrgid(self.stat().st_gid).gr_name
|
||||
except ImportError:
|
||||
raise NotImplementedError("Path.group() is unsupported on this system")
|
||||
|
||||
def open(self, mode='r', buffering=-1, encoding=None,
|
||||
errors=None, newline=None):
|
||||
|
@ -1106,8 +1033,7 @@ class Path(PurePath):
|
|||
"""
|
||||
if "b" not in mode:
|
||||
encoding = io.text_encoding(encoding)
|
||||
return self._accessor.open(self, mode, buffering, encoding, errors,
|
||||
newline)
|
||||
return io.open(self, mode, buffering, encoding, errors, newline)
|
||||
|
||||
def read_bytes(self):
|
||||
"""
|
||||
|
@ -1148,21 +1074,38 @@ class Path(PurePath):
|
|||
"""
|
||||
Return the path to which the symbolic link points.
|
||||
"""
|
||||
path = self._accessor.readlink(self)
|
||||
return self._from_parts((path,))
|
||||
if not hasattr(os, "readlink"):
|
||||
raise NotImplementedError("os.readlink() not available on this system")
|
||||
return self._from_parts((os.readlink(self),))
|
||||
|
||||
def touch(self, mode=0o666, exist_ok=True):
|
||||
"""
|
||||
Create this file with the given access mode, if it doesn't exist.
|
||||
"""
|
||||
self._accessor.touch(self, mode, exist_ok)
|
||||
|
||||
if exist_ok:
|
||||
# First try to bump modification time
|
||||
# Implementation note: GNU touch uses the UTIME_NOW option of
|
||||
# the utimensat() / futimens() functions.
|
||||
try:
|
||||
os.utime(self, None)
|
||||
except OSError:
|
||||
# Avoid exception chaining
|
||||
pass
|
||||
else:
|
||||
return
|
||||
flags = os.O_CREAT | os.O_WRONLY
|
||||
if not exist_ok:
|
||||
flags |= os.O_EXCL
|
||||
fd = os.open(self, flags, mode)
|
||||
os.close(fd)
|
||||
|
||||
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
|
||||
"""
|
||||
Create a new directory at this given path.
|
||||
"""
|
||||
try:
|
||||
self._accessor.mkdir(self, mode)
|
||||
os.mkdir(self, mode)
|
||||
except FileNotFoundError:
|
||||
if not parents or self.parent == self:
|
||||
raise
|
||||
|
@ -1178,7 +1121,7 @@ class Path(PurePath):
|
|||
"""
|
||||
Change the permissions of the path, like os.chmod().
|
||||
"""
|
||||
self._accessor.chmod(self, mode, follow_symlinks=follow_symlinks)
|
||||
os.chmod(self, mode, follow_symlinks=follow_symlinks)
|
||||
|
||||
def lchmod(self, mode):
|
||||
"""
|
||||
|
@ -1193,7 +1136,7 @@ class Path(PurePath):
|
|||
If the path is a directory, use rmdir() instead.
|
||||
"""
|
||||
try:
|
||||
self._accessor.unlink(self)
|
||||
os.unlink(self)
|
||||
except FileNotFoundError:
|
||||
if not missing_ok:
|
||||
raise
|
||||
|
@ -1202,7 +1145,7 @@ class Path(PurePath):
|
|||
"""
|
||||
Remove this directory. The directory must be empty.
|
||||
"""
|
||||
self._accessor.rmdir(self)
|
||||
os.rmdir(self)
|
||||
|
||||
def lstat(self):
|
||||
"""
|
||||
|
@ -1221,7 +1164,7 @@ class Path(PurePath):
|
|||
|
||||
Returns the new Path instance pointing to the target path.
|
||||
"""
|
||||
self._accessor.rename(self, target)
|
||||
os.rename(self, target)
|
||||
return self.__class__(target)
|
||||
|
||||
def replace(self, target):
|
||||
|
@ -1234,7 +1177,7 @@ class Path(PurePath):
|
|||
|
||||
Returns the new Path instance pointing to the target path.
|
||||
"""
|
||||
self._accessor.replace(self, target)
|
||||
os.replace(self, target)
|
||||
return self.__class__(target)
|
||||
|
||||
def symlink_to(self, target, target_is_directory=False):
|
||||
|
@ -1242,7 +1185,9 @@ class Path(PurePath):
|
|||
Make this path a symlink pointing to the target path.
|
||||
Note the order of arguments (link, target) is the reverse of os.symlink.
|
||||
"""
|
||||
self._accessor.symlink(target, self, target_is_directory)
|
||||
if not hasattr(os, "symlink"):
|
||||
raise NotImplementedError("os.symlink() not available on this system")
|
||||
os.symlink(target, self, target_is_directory)
|
||||
|
||||
def hardlink_to(self, target):
|
||||
"""
|
||||
|
@ -1250,7 +1195,9 @@ class Path(PurePath):
|
|||
|
||||
Note the order of arguments (self, target) is the reverse of os.link's.
|
||||
"""
|
||||
self._accessor.link(target, self)
|
||||
if not hasattr(os, "link"):
|
||||
raise NotImplementedError("os.link() not available on this system")
|
||||
os.link(target, self)
|
||||
|
||||
def link_to(self, target):
|
||||
"""
|
||||
|
@ -1268,7 +1215,7 @@ class Path(PurePath):
|
|||
"for removal in Python 3.12. "
|
||||
"Use pathlib.Path.hardlink_to() instead.",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
self._accessor.link(self, target)
|
||||
self.__class__(target).hardlink_to(self)
|
||||
|
||||
# Convenience functions for querying the stat results
|
||||
|
||||
|
@ -1425,7 +1372,7 @@ class Path(PurePath):
|
|||
"""
|
||||
if (not (self._drv or self._root) and
|
||||
self._parts and self._parts[0][:1] == '~'):
|
||||
homedir = self._accessor.expanduser(self._parts[0])
|
||||
homedir = os.path.expanduser(self._parts[0])
|
||||
if homedir[:1] == "~":
|
||||
raise RuntimeError("Could not determine home directory.")
|
||||
return self._from_parts([homedir] + self._parts[1:])
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import contextlib
|
||||
import collections.abc
|
||||
import io
|
||||
import os
|
||||
|
@ -1459,7 +1460,7 @@ class _BasePathTest(object):
|
|||
def test_absolute_common(self):
|
||||
P = self.cls
|
||||
|
||||
with mock.patch("pathlib._normal_accessor.getcwd") as getcwd:
|
||||
with mock.patch("os.getcwd") as getcwd:
|
||||
getcwd.return_value = BASE
|
||||
|
||||
# Simple relative paths.
|
||||
|
@ -1738,21 +1739,18 @@ class _BasePathTest(object):
|
|||
# Patching is needed to avoid relying on the filesystem
|
||||
# to return the order of the files as the error will not
|
||||
# happen if the symlink is the last item.
|
||||
real_scandir = os.scandir
|
||||
def my_scandir(path):
|
||||
with real_scandir(path) as scandir_it:
|
||||
entries = list(scandir_it)
|
||||
entries.sort(key=lambda entry: entry.name)
|
||||
return contextlib.nullcontext(entries)
|
||||
|
||||
with mock.patch("os.scandir") as scandir:
|
||||
scandir.return_value = sorted(os.scandir(base))
|
||||
with mock.patch("os.scandir", my_scandir):
|
||||
self.assertEqual(len(set(base.glob("*"))), 3)
|
||||
|
||||
subdir.mkdir()
|
||||
|
||||
with mock.patch("os.scandir") as scandir:
|
||||
scandir.return_value = sorted(os.scandir(base))
|
||||
self.assertEqual(len(set(base.glob("*"))), 4)
|
||||
|
||||
subdir.chmod(000)
|
||||
|
||||
with mock.patch("os.scandir") as scandir:
|
||||
scandir.return_value = sorted(os.scandir(base))
|
||||
self.assertEqual(len(set(base.glob("*"))), 4)
|
||||
|
||||
def _check_resolve(self, p, expected, strict=True):
|
||||
|
@ -2199,6 +2197,7 @@ class _BasePathTest(object):
|
|||
p = self.cls(BASE, 'dirCPC%d' % pattern_num)
|
||||
self.assertFalse(p.exists())
|
||||
|
||||
real_mkdir = os.mkdir
|
||||
def my_mkdir(path, mode=0o777):
|
||||
path = str(path)
|
||||
# Emulate another process that would create the directory
|
||||
|
@ -2207,15 +2206,15 @@ class _BasePathTest(object):
|
|||
# function is called at most 5 times (dirCPC/dir1/dir2,
|
||||
# dirCPC/dir1, dirCPC, dirCPC/dir1, dirCPC/dir1/dir2).
|
||||
if pattern.pop():
|
||||
os.mkdir(path, mode) # From another process.
|
||||
real_mkdir(path, mode) # From another process.
|
||||
concurrently_created.add(path)
|
||||
os.mkdir(path, mode) # Our real call.
|
||||
real_mkdir(path, mode) # Our real call.
|
||||
|
||||
pattern = [bool(pattern_num & (1 << n)) for n in range(5)]
|
||||
concurrently_created = set()
|
||||
p12 = p / 'dir1' / 'dir2'
|
||||
try:
|
||||
with mock.patch("pathlib._normal_accessor.mkdir", my_mkdir):
|
||||
with mock.patch("os.mkdir", my_mkdir):
|
||||
p12.mkdir(parents=True, exist_ok=False)
|
||||
except FileExistsError:
|
||||
self.assertIn(str(p12), concurrently_created)
|
||||
|
@ -2676,7 +2675,7 @@ class WindowsPathTest(_BasePathTest, unittest.TestCase):
|
|||
self.assertEqual(str(P(share + 'a\\b').absolute()), share + 'a\\b')
|
||||
|
||||
# UNC relative paths.
|
||||
with mock.patch("pathlib._normal_accessor.getcwd") as getcwd:
|
||||
with mock.patch("os.getcwd") as getcwd:
|
||||
getcwd.return_value = share
|
||||
|
||||
self.assertEqual(str(P().absolute()), share)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
The pathlib module's obsolete and internal ``_Accessor`` class has been
|
||||
removed to prepare the terrain for upcoming enhancements to the module.
|
Loading…
Reference in New Issue