GH-73991: Add `pathlib.Path.copy()` (#119058)

Add a `Path.copy()` method that copies the content of one file to another.

This method is similar to `shutil.copyfile()` but differs in the following ways:

- Uses `fcntl.FICLONE` where available (see GH-81338)
- Uses `os.copy_file_range` where available (see GH-81340)
- Uses `_winapi.CopyFile2` where available, even though this copies more metadata than the other implementations. This makes `WindowsPath.copy()` more similar to `shutil.copy2()`.

The method is presently _less_ specified than the `shutil` functions to allow OS-specific optimizations that might copy more or less metadata.

Incorporates code from GH-81338 and GH-93152.

Co-authored-by: Eryk Sun <eryksun@gmail.com>
This commit is contained in:
Barney Gale 2024-06-14 17:15:49 +01:00 committed by GitHub
parent 2bacc2343c
commit 7c38097add
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 271 additions and 2 deletions

View File

@ -1429,8 +1429,22 @@ Creating files and directories
available. In previous versions, :exc:`NotImplementedError` was raised. available. In previous versions, :exc:`NotImplementedError` was raised.
Renaming and deleting Copying, renaming and deleting
^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. method:: Path.copy(target)
Copy the contents of this file to the *target* file. If *target* specifies
a file that already exists, it will be replaced.
.. note::
This method uses operating system functionality to copy file content
efficiently. The OS might also copy some metadata, such as file
permissions. After the copy is complete, users may wish to call
:meth:`Path.chmod` to set the permissions of the target file.
.. versionadded:: 3.14
.. method:: Path.rename(target) .. method:: Path.rename(target)

View File

@ -100,6 +100,13 @@ os
by :func:`os.unsetenv`, or made outside Python in the same process. by :func:`os.unsetenv`, or made outside Python in the same process.
(Contributed by Victor Stinner in :gh:`120057`.) (Contributed by Victor Stinner in :gh:`120057`.)
pathlib
-------
* Add :meth:`pathlib.Path.copy`, which copies the content of one file to
another, like :func:`shutil.copyfile`.
(Contributed by Barney Gale in :gh:`73991`.)
symtable symtable
-------- --------

View File

@ -16,6 +16,7 @@ import operator
import posixpath import posixpath
from glob import _GlobberBase, _no_recurse_symlinks from glob import _GlobberBase, _no_recurse_symlinks
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from ._os import copyfileobj
__all__ = ["UnsupportedOperation"] __all__ = ["UnsupportedOperation"]
@ -563,6 +564,15 @@ class PathBase(PurePathBase):
return (st.st_ino == other_st.st_ino and return (st.st_ino == other_st.st_ino and
st.st_dev == other_st.st_dev) st.st_dev == other_st.st_dev)
def _samefile_safe(self, other_path):
"""
Like samefile(), but returns False rather than raising OSError.
"""
try:
return self.samefile(other_path)
except (OSError, ValueError):
return False
def open(self, mode='r', buffering=-1, encoding=None, def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None): errors=None, newline=None):
""" """
@ -780,6 +790,26 @@ class PathBase(PurePathBase):
""" """
raise UnsupportedOperation(self._unsupported_msg('mkdir()')) raise UnsupportedOperation(self._unsupported_msg('mkdir()'))
def copy(self, target):
"""
Copy the contents of this file to the given target.
"""
if not isinstance(target, PathBase):
target = self.with_segments(target)
if self._samefile_safe(target):
raise OSError(f"{self!r} and {target!r} are the same file")
with self.open('rb') as source_f:
try:
with target.open('wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not target.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {target}') from e
else:
raise
def rename(self, target): def rename(self, target):
""" """
Rename this path to the target path. Rename this path to the target path.

View File

@ -18,6 +18,7 @@ except ImportError:
grp = None grp = None
from ._abc import UnsupportedOperation, PurePathBase, PathBase from ._abc import UnsupportedOperation, PurePathBase, PathBase
from ._os import copyfile
__all__ = [ __all__ = [
@ -780,6 +781,21 @@ class Path(PathBase, PurePath):
if not exist_ok or not self.is_dir(): if not exist_ok or not self.is_dir():
raise raise
if copyfile:
def copy(self, target):
"""
Copy the contents of this file to the given target.
"""
try:
target = os.fspath(target)
except TypeError:
if isinstance(target, PathBase):
# Target is an instance of PathBase but not os.PathLike.
# Use generic implementation from PathBase.
return PathBase.copy(self, target)
raise
copyfile(os.fspath(self), target)
def chmod(self, mode, *, follow_symlinks=True): def chmod(self, mode, *, follow_symlinks=True):
""" """
Change the permissions of the path, like os.chmod(). Change the permissions of the path, like os.chmod().

138
Lib/pathlib/_os.py Normal file
View File

@ -0,0 +1,138 @@
"""
Low-level OS functionality wrappers used by pathlib.
"""
from errno import EBADF, EOPNOTSUPP, ETXTBSY, EXDEV
import os
import sys
try:
import fcntl
except ImportError:
fcntl = None
try:
import posix
except ImportError:
posix = None
try:
import _winapi
except ImportError:
_winapi = None
def get_copy_blocksize(infd):
"""Determine blocksize for fastcopying on Linux.
Hopefully the whole file will be copied in a single call.
The copying itself should be performed in a loop 'till EOF is
reached (0 return) so a blocksize smaller or bigger than the actual
file size should not make any difference, also in case the file
content changes while being copied.
"""
try:
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
except OSError:
blocksize = 2 ** 27 # 128 MiB
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
# see gh-82500.
if sys.maxsize < 2 ** 32:
blocksize = min(blocksize, 2 ** 30)
return blocksize
if fcntl and hasattr(fcntl, 'FICLONE'):
def clonefd(source_fd, target_fd):
"""
Perform a lightweight copy of two files, where the data blocks are
copied only when modified. This is known as Copy on Write (CoW),
instantaneous copy or reflink.
"""
fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
else:
clonefd = None
if posix and hasattr(posix, '_fcopyfile'):
def copyfd(source_fd, target_fd):
"""
Copy a regular file content using high-performance fcopyfile(3)
syscall (macOS).
"""
posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
elif hasattr(os, 'copy_file_range'):
def copyfd(source_fd, target_fd):
"""
Copy data from one regular mmap-like fd to another by using a
high-performance copy_file_range(2) syscall that gives filesystems
an opportunity to implement the use of reflinks or server-side
copy.
This should work on Linux >= 4.5 only.
"""
blocksize = get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.copy_file_range(source_fd, target_fd, blocksize,
offset_dst=offset)
if sent == 0:
break # EOF
offset += sent
elif hasattr(os, 'sendfile'):
def copyfd(source_fd, target_fd):
"""Copy data from one regular mmap-like fd to another by using
high-performance sendfile(2) syscall.
This should work on Linux >= 2.6.33 only.
"""
blocksize = get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.sendfile(target_fd, source_fd, offset, blocksize)
if sent == 0:
break # EOF
offset += sent
else:
copyfd = None
if _winapi and hasattr(_winapi, 'CopyFile2'):
def copyfile(source, target):
"""
Copy from one file to another using CopyFile2 (Windows only).
"""
_winapi.CopyFile2(source, target, 0)
else:
copyfile = None
def copyfileobj(source_f, target_f):
"""
Copy data from file-like object source_f to file-like object target_f.
"""
try:
source_fd = source_f.fileno()
target_fd = target_f.fileno()
except Exception:
pass # Fall through to generic code.
else:
try:
# Use OS copy-on-write where available.
if clonefd:
try:
clonefd(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
raise err
# Use OS copy where available.
if copyfd:
copyfd(source_fd, target_fd)
return
except OSError as err:
# Produce more useful error messages.
err.filename = source_f.name
err.filename2 = target_f.name
raise err
# Last resort: copy with fileobj read() and write().
read_source = source_f.read
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)

View File

@ -1696,6 +1696,68 @@ class DummyPathTest(DummyPurePathTest):
self.assertEqual((p / 'fileA').read_bytes(), self.assertEqual((p / 'fileA').read_bytes(),
b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq') b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq')
def test_copy_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'copyA'
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_directory(self):
base = self.cls(self.base)
source = base / 'dirA'
target = base / 'copyA'
with self.assertRaises(OSError):
source.copy(target)
@needs_symlinks
def test_copy_symlink(self):
base = self.cls(self.base)
source = base / 'linkA'
target = base / 'copyA'
source.copy(target)
self.assertTrue(target.exists())
self.assertFalse(target.is_symlink())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_to_existing_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirB' / 'fileB'
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())
def test_copy_to_existing_directory(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirA'
with self.assertRaises(OSError):
source.copy(target)
@needs_symlinks
def test_copy_to_existing_symlink(self):
base = self.cls(self.base)
source = base / 'dirB' / 'fileB'
target = base / 'linkA'
real_target = base / 'fileA'
source.copy(target)
self.assertTrue(target.exists())
self.assertTrue(target.is_symlink())
self.assertTrue(real_target.exists())
self.assertFalse(real_target.is_symlink())
self.assertEqual(source.read_text(), real_target.read_text())
def test_copy_empty(self):
base = self.cls(self.base)
source = base / 'empty'
target = base / 'copyA'
source.write_bytes(b'')
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(target.read_bytes(), b'')
def test_iterdir(self): def test_iterdir(self):
P = self.cls P = self.cls
p = P(self.base) p = P(self.base)

View File

@ -0,0 +1,2 @@
Add :meth:`pathlib.Path.copy`, which copies the content of one file to another,
like :func:`shutil.copyfile`.