GH-73991: Support preserving metadata in `pathlib.Path.copy()` (#120806)

Add *preserve_metadata* keyword-only argument to `pathlib.Path.copy()`, defaulting to false. When set to true, we copy timestamps, permissions, extended attributes and flags where available, like `shutil.copystat()`. The argument has no effect on Windows, where metadata is always copied.

Internally (in the pathlib ABCs), path types gain `_readable_metadata` and `_writable_metadata` attributes. These sets of strings describe what kinds of metadata can be retrieved and stored. We take an intersection of `source._readable_metadata` and `target._writable_metadata` to minimise reads/writes. A new `_read_metadata()` method accepts a set of metadata keys and returns a dict with those keys, and a new `_write_metadata()` method accepts a dict of metadata. We *might* make these public in future, but it's hard to justify while the ABCs are still private.
This commit is contained in:
Barney Gale 2024-07-06 17:18:39 +01:00 committed by GitHub
parent 6239d41527
commit 88fc0655d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 187 additions and 11 deletions

View File

@ -1539,7 +1539,7 @@ Creating files and directories
Copying, renaming and deleting
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. method:: Path.copy(target, *, follow_symlinks=True)
.. method:: Path.copy(target, *, follow_symlinks=True, preserve_metadata=False)
Copy the contents of this file to the *target* file. If *target* specifies
a file that already exists, it will be replaced.
@ -1548,11 +1548,11 @@ Copying, renaming and deleting
will be created as a symbolic link. If *follow_symlinks* is true and this
file is a symbolic link, *target* will be a copy of the symlink target.
.. note::
This method uses operating system functionality to copy file content
efficiently. The OS might also copy some metadata, such as file
permissions. After the copy is complete, users may wish to call
:meth:`Path.chmod` to set the permissions of the target file.
If *preserve_metadata* is false (the default), only the file data is
guaranteed to be copied. Set *preserve_metadata* to true to ensure that the
file mode (permissions), flags, last access and modification times, and
extended attributes are copied where supported. This argument has no effect
on Windows, where metadata is always preserved when copying.
.. versionadded:: 3.14

View File

@ -781,7 +781,32 @@ class PathBase(PurePathBase):
"""
raise UnsupportedOperation(self._unsupported_msg('mkdir()'))
def copy(self, target, follow_symlinks=True):
# Metadata keys supported by this path type.
_readable_metadata = _writable_metadata = frozenset()
def _read_metadata(self, keys=None, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise UnsupportedOperation(self._unsupported_msg('_read_metadata()'))
def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise UnsupportedOperation(self._unsupported_msg('_write_metadata()'))
def _copy_metadata(self, target, *, follow_symlinks=True):
"""
Copies metadata (permissions, timestamps, etc) from this path to target.
"""
# Metadata types supported by both source and target.
keys = self._readable_metadata & target._writable_metadata
if keys:
metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
target._write_metadata(metadata, follow_symlinks=follow_symlinks)
def copy(self, target, *, follow_symlinks=True, preserve_metadata=False):
"""
Copy the contents of this file to the given target. If this file is a
symlink and follow_symlinks is false, a symlink will be created at the
@ -793,6 +818,8 @@ class PathBase(PurePathBase):
raise OSError(f"{self!r} and {target!r} are the same file")
if not follow_symlinks and self.is_symlink():
target.symlink_to(self.readlink())
if preserve_metadata:
self._copy_metadata(target, follow_symlinks=False)
return
with self.open('rb') as source_f:
try:
@ -805,6 +832,8 @@ class PathBase(PurePathBase):
f'Directory does not exist: {target}') from e
else:
raise
if preserve_metadata:
self._copy_metadata(target)
def copytree(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
ignore=None, on_error=None):

View File

@ -17,7 +17,8 @@ try:
except ImportError:
grp = None
from ._os import UnsupportedOperation, copyfile
from ._os import (UnsupportedOperation, copyfile, file_metadata_keys,
read_file_metadata, write_file_metadata)
from ._abc import PurePathBase, PathBase
@ -781,8 +782,12 @@ class Path(PathBase, PurePath):
if not exist_ok or not self.is_dir():
raise
_readable_metadata = _writable_metadata = file_metadata_keys
_read_metadata = read_file_metadata
_write_metadata = write_file_metadata
if copyfile:
def copy(self, target, follow_symlinks=True):
def copy(self, target, *, follow_symlinks=True, preserve_metadata=False):
"""
Copy the contents of this file to the given target. If this file is a
symlink and follow_symlinks is false, a symlink will be created at the
@ -799,7 +804,8 @@ class Path(PathBase, PurePath):
return
except UnsupportedOperation:
pass # Fall through to generic code.
PathBase.copy(self, target, follow_symlinks=follow_symlinks)
PathBase.copy(self, target, follow_symlinks=follow_symlinks,
preserve_metadata=preserve_metadata)
def chmod(self, mode, *, follow_symlinks=True):
"""

View File

@ -2,7 +2,7 @@
Low-level OS functionality wrappers used by pathlib.
"""
from errno import EBADF, EOPNOTSUPP, ETXTBSY, EXDEV
from errno import *
import os
import stat
import sys
@ -178,3 +178,100 @@ def copyfileobj(source_f, target_f):
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)
# Kinds of metadata supported by the operating system.
file_metadata_keys = {'mode', 'times_ns'}
if hasattr(os.stat_result, 'st_flags'):
file_metadata_keys.add('flags')
if hasattr(os, 'listxattr'):
file_metadata_keys.add('xattrs')
file_metadata_keys = frozenset(file_metadata_keys)
def read_file_metadata(path, keys=None, *, follow_symlinks=True):
"""
Returns local path metadata as a dict with string keys.
"""
if keys is None:
keys = file_metadata_keys
assert keys.issubset(file_metadata_keys)
result = {}
for key in keys:
if key == 'xattrs':
try:
result['xattrs'] = [
(attr, os.getxattr(path, attr, follow_symlinks=follow_symlinks))
for attr in os.listxattr(path, follow_symlinks=follow_symlinks)]
except OSError as err:
if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
continue
st = os.stat(path, follow_symlinks=follow_symlinks)
if key == 'mode':
result['mode'] = stat.S_IMODE(st.st_mode)
elif key == 'times_ns':
result['times_ns'] = st.st_atime_ns, st.st_mtime_ns
elif key == 'flags':
result['flags'] = st.st_flags
return result
def write_file_metadata(path, metadata, *, follow_symlinks=True):
"""
Sets local path metadata from the given dict with string keys.
"""
assert frozenset(metadata.keys()).issubset(file_metadata_keys)
def _nop(*args, ns=None, follow_symlinks=None):
pass
if follow_symlinks:
# use the real function if it exists
def lookup(name):
return getattr(os, name, _nop)
else:
# use the real function only if it exists
# *and* it supports follow_symlinks
def lookup(name):
fn = getattr(os, name, _nop)
if fn in os.supports_follow_symlinks:
return fn
return _nop
times_ns = metadata.get('times_ns')
if times_ns is not None:
lookup("utime")(path, ns=times_ns, follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
xattrs = metadata.get('xattrs')
if xattrs is not None:
for attr, value in xattrs:
try:
os.setxattr(path, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
mode = metadata.get('mode')
if mode is not None:
try:
lookup("chmod")(path, mode, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
flags = metadata.get('flags')
if flags is not None:
try:
lookup("chflags")(path, flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise

View File

@ -653,6 +653,50 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
self.assertIsInstance(f, io.RawIOBase)
self.assertEqual(f.read().strip(), b"this is file A")
def test_copy_file_preserve_metadata(self):
base = self.cls(self.base)
source = base / 'fileA'
if hasattr(os, 'setxattr'):
os.setxattr(source, b'user.foo', b'42')
if hasattr(os, 'chmod'):
os.chmod(source, stat.S_IRWXU | stat.S_IRWXO)
if hasattr(os, 'chflags') and hasattr(stat, 'UF_NODUMP'):
os.chflags(source, stat.UF_NODUMP)
source_st = source.stat()
target = base / 'copyA'
source.copy(target, preserve_metadata=True)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
target_st = target.stat()
self.assertLessEqual(source_st.st_atime, target_st.st_atime)
self.assertLessEqual(source_st.st_mtime, target_st.st_mtime)
if hasattr(os, 'getxattr'):
self.assertEqual(os.getxattr(target, b'user.foo'), b'42')
self.assertEqual(source_st.st_mode, target_st.st_mode)
if hasattr(source_st, 'st_flags'):
self.assertEqual(source_st.st_flags, target_st.st_flags)
@needs_symlinks
def test_copy_link_preserve_metadata(self):
base = self.cls(self.base)
source = base / 'linkA'
if hasattr(os, 'lchmod'):
os.lchmod(source, stat.S_IRWXU | stat.S_IRWXO)
if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
os.lchflags(source, stat.UF_NODUMP)
source_st = source.lstat()
target = base / 'copyA'
source.copy(target, follow_symlinks=False, preserve_metadata=True)
self.assertTrue(target.exists())
self.assertTrue(target.is_symlink())
self.assertEqual(source.readlink(), target.readlink())
target_st = target.lstat()
self.assertLessEqual(source_st.st_atime, target_st.st_atime)
self.assertLessEqual(source_st.st_mtime, target_st.st_mtime)
self.assertEqual(source_st.st_mode, target_st.st_mode)
if hasattr(source_st, 'st_flags'):
self.assertEqual(source_st.st_flags, target_st.st_flags)
@unittest.skipIf(sys.platform == "win32" or sys.platform == "wasi", "directories are always readable on Windows and WASI")
@unittest.skipIf(root_in_posix, "test fails with root privilege")
def test_copytree_no_read_permission(self):