mirror of https://github.com/python/cpython
GH-73991: Make `pathlib.Path.delete()` private. (#123315)
Per feedback from Paul Moore on GH-123158, it's better to defer making `Path.delete()` public than ship it with under-designed error handling capabilities. We leave a remnant `_delete()` method, which is used by `move()`. Any functionality not needed by `move()` is deleted.
This commit is contained in:
parent
a1ddaaef58
commit
033d537cd4
|
@ -1657,7 +1657,7 @@ Copying, moving and deleting
|
|||
.. method:: Path.unlink(missing_ok=False)
|
||||
|
||||
Remove this file or symbolic link. If the path points to a directory,
|
||||
use :func:`Path.rmdir` or :func:`Path.delete` instead.
|
||||
use :func:`Path.rmdir` instead.
|
||||
|
||||
If *missing_ok* is false (the default), :exc:`FileNotFoundError` is
|
||||
raised if the path does not exist.
|
||||
|
@ -1671,42 +1671,7 @@ Copying, moving and deleting
|
|||
|
||||
.. method:: Path.rmdir()
|
||||
|
||||
Remove this directory. The directory must be empty; use
|
||||
:meth:`Path.delete` to remove a non-empty directory.
|
||||
|
||||
|
||||
.. method:: Path.delete(ignore_errors=False, on_error=None)
|
||||
|
||||
Delete this file or directory. If this path refers to a non-empty
|
||||
directory, its files and sub-directories are deleted recursively.
|
||||
|
||||
If *ignore_errors* is true, errors resulting from failed deletions will be
|
||||
ignored. If *ignore_errors* is false or omitted, and a callable is given as
|
||||
the optional *on_error* argument, it will be called with one argument of
|
||||
type :exc:`OSError` each time an exception is raised. The callable can
|
||||
handle the error to continue the deletion process or re-raise it to stop.
|
||||
Note that the filename is available as the :attr:`~OSError.filename`
|
||||
attribute of the exception object. If neither *ignore_errors* nor
|
||||
*on_error* are supplied, exceptions are propagated to the caller.
|
||||
|
||||
.. note::
|
||||
|
||||
When deleting non-empty directories on platforms that lack the necessary
|
||||
file descriptor-based functions, the :meth:`~Path.delete` implementation
|
||||
is susceptible to a symlink attack: given proper timing and
|
||||
circumstances, attackers can manipulate symlinks on the filesystem to
|
||||
delete files they would not be able to access otherwise. Applications
|
||||
can use the :data:`~Path.delete.avoids_symlink_attacks` method attribute
|
||||
to determine whether the implementation is immune to this attack.
|
||||
|
||||
.. attribute:: delete.avoids_symlink_attacks
|
||||
|
||||
Indicates whether the current platform and implementation provides a
|
||||
symlink attack resistant version of :meth:`~Path.delete`. Currently
|
||||
this is only true for platforms supporting fd-based directory access
|
||||
functions.
|
||||
|
||||
.. versionadded:: 3.14
|
||||
Remove this directory. The directory must be empty.
|
||||
|
||||
|
||||
Permissions and ownership
|
||||
|
|
|
@ -185,14 +185,13 @@ os
|
|||
pathlib
|
||||
-------
|
||||
|
||||
* Add methods to :class:`pathlib.Path` to recursively copy, move, or remove
|
||||
files and directories:
|
||||
* Add methods to :class:`pathlib.Path` to recursively copy or move files and
|
||||
directories:
|
||||
|
||||
* :meth:`~pathlib.Path.copy` copies a file or directory tree to a destination.
|
||||
* :meth:`~pathlib.Path.copy_into` copies *into* a destination directory.
|
||||
* :meth:`~pathlib.Path.move` moves a file or directory tree to a destination.
|
||||
* :meth:`~pathlib.Path.move_into` moves *into* a destination directory.
|
||||
* :meth:`~pathlib.Path.delete` removes a file or directory tree.
|
||||
|
||||
(Contributed by Barney Gale in :gh:`73991`.)
|
||||
|
||||
|
|
|
@ -962,7 +962,7 @@ class PathBase(PurePathBase):
|
|||
if err.errno != EXDEV:
|
||||
raise
|
||||
target = self.copy(target, follow_symlinks=False, preserve_metadata=True)
|
||||
self.delete()
|
||||
self._delete()
|
||||
return target
|
||||
|
||||
def move_into(self, target_dir):
|
||||
|
@ -1004,47 +1004,29 @@ class PathBase(PurePathBase):
|
|||
"""
|
||||
raise UnsupportedOperation(self._unsupported_msg('rmdir()'))
|
||||
|
||||
def delete(self, ignore_errors=False, on_error=None):
|
||||
def _delete(self):
|
||||
"""
|
||||
Delete this file or directory (including all sub-directories).
|
||||
|
||||
If *ignore_errors* is true, exceptions raised from scanning the
|
||||
filesystem and removing files and directories are ignored. Otherwise,
|
||||
if *on_error* is set, it will be called to handle the error. If
|
||||
neither *ignore_errors* nor *on_error* are set, exceptions are
|
||||
propagated to the caller.
|
||||
"""
|
||||
if ignore_errors:
|
||||
def on_error(err):
|
||||
pass
|
||||
elif on_error is None:
|
||||
def on_error(err):
|
||||
raise err
|
||||
if self.is_dir(follow_symlinks=False):
|
||||
results = self.walk(
|
||||
on_error=on_error,
|
||||
top_down=False, # So we rmdir() empty directories.
|
||||
follow_symlinks=False)
|
||||
for dirpath, dirnames, filenames in results:
|
||||
for name in filenames:
|
||||
try:
|
||||
dirpath.joinpath(name).unlink()
|
||||
except OSError as err:
|
||||
on_error(err)
|
||||
for name in dirnames:
|
||||
try:
|
||||
dirpath.joinpath(name).rmdir()
|
||||
except OSError as err:
|
||||
on_error(err)
|
||||
delete_self = self.rmdir
|
||||
if self.is_symlink() or self.is_junction():
|
||||
self.unlink()
|
||||
elif self.is_dir():
|
||||
self._rmtree()
|
||||
else:
|
||||
delete_self = self.unlink
|
||||
try:
|
||||
delete_self()
|
||||
except OSError as err:
|
||||
err.filename = str(self)
|
||||
on_error(err)
|
||||
delete.avoids_symlink_attacks = False
|
||||
self.unlink()
|
||||
|
||||
def _rmtree(self):
|
||||
def on_error(err):
|
||||
raise err
|
||||
results = self.walk(
|
||||
on_error=on_error,
|
||||
top_down=False, # So we rmdir() empty directories.
|
||||
follow_symlinks=False)
|
||||
for dirpath, _, filenames in results:
|
||||
for filename in filenames:
|
||||
filepath = dirpath / filename
|
||||
filepath.unlink()
|
||||
dirpath.rmdir()
|
||||
|
||||
def owner(self, *, follow_symlinks=True):
|
||||
"""
|
||||
|
|
|
@ -824,34 +824,7 @@ class Path(PathBase, PurePath):
|
|||
"""
|
||||
os.rmdir(self)
|
||||
|
||||
def delete(self, ignore_errors=False, on_error=None):
|
||||
"""
|
||||
Delete this file or directory (including all sub-directories).
|
||||
|
||||
If *ignore_errors* is true, exceptions raised from scanning the
|
||||
filesystem and removing files and directories are ignored. Otherwise,
|
||||
if *on_error* is set, it will be called to handle the error. If
|
||||
neither *ignore_errors* nor *on_error* are set, exceptions are
|
||||
propagated to the caller.
|
||||
"""
|
||||
if self.is_dir(follow_symlinks=False):
|
||||
onexc = None
|
||||
if on_error:
|
||||
def onexc(func, filename, err):
|
||||
err.filename = filename
|
||||
on_error(err)
|
||||
shutil.rmtree(str(self), ignore_errors, onexc=onexc)
|
||||
else:
|
||||
try:
|
||||
self.unlink()
|
||||
except OSError as err:
|
||||
if not ignore_errors:
|
||||
if on_error:
|
||||
on_error(err)
|
||||
else:
|
||||
raise
|
||||
|
||||
delete.avoids_symlink_attacks = shutil.rmtree.avoids_symlink_attacks
|
||||
_rmtree = shutil.rmtree
|
||||
|
||||
def rename(self, target):
|
||||
"""
|
||||
|
|
|
@ -41,9 +41,6 @@ root_in_posix = False
|
|||
if hasattr(os, 'geteuid'):
|
||||
root_in_posix = (os.geteuid() == 0)
|
||||
|
||||
delete_use_fd_functions = (
|
||||
{os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
|
||||
os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)
|
||||
|
||||
def patch_replace(old_test):
|
||||
def new_replace(self, target):
|
||||
|
@ -776,11 +773,6 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
target = base / 'copyE'
|
||||
self.assertRaises(PermissionError, source.copy, target)
|
||||
self.assertFalse(target.exists())
|
||||
errors = []
|
||||
source.copy(target, on_error=errors.append)
|
||||
self.assertEqual(len(errors), 1)
|
||||
self.assertIsInstance(errors[0], PermissionError)
|
||||
self.assertFalse(target.exists())
|
||||
|
||||
def test_copy_dir_preserve_metadata(self):
|
||||
base = self.cls(self.base)
|
||||
|
@ -980,27 +972,6 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
self.assertEqual(expected_gid, gid_2)
|
||||
self.assertEqual(expected_name, link.group(follow_symlinks=False))
|
||||
|
||||
def test_delete_uses_safe_fd_version_if_available(self):
|
||||
if delete_use_fd_functions:
|
||||
self.assertTrue(self.cls.delete.avoids_symlink_attacks)
|
||||
d = self.cls(self.base, 'a')
|
||||
d.mkdir()
|
||||
try:
|
||||
real_open = os.open
|
||||
|
||||
class Called(Exception):
|
||||
pass
|
||||
|
||||
def _raiser(*args, **kwargs):
|
||||
raise Called
|
||||
|
||||
os.open = _raiser
|
||||
self.assertRaises(Called, d.delete)
|
||||
finally:
|
||||
os.open = real_open
|
||||
else:
|
||||
self.assertFalse(self.cls.delete.avoids_symlink_attacks)
|
||||
|
||||
@unittest.skipIf(sys.platform[:6] == 'cygwin',
|
||||
"This test can't be run on Cygwin (issue #1071513).")
|
||||
@os_helper.skip_if_dac_override
|
||||
|
@ -1022,10 +993,7 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
child_dir_path.chmod(new_mode)
|
||||
tmp.chmod(new_mode)
|
||||
|
||||
errors = []
|
||||
tmp.delete(on_error=errors.append)
|
||||
# Test whether onerror has actually been called.
|
||||
self.assertEqual(len(errors), 3)
|
||||
self.assertRaises(PermissionError, tmp._delete)
|
||||
finally:
|
||||
tmp.chmod(old_dir_mode)
|
||||
child_file_path.chmod(old_child_file_mode)
|
||||
|
@ -1050,7 +1018,7 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
link3 = dir1 / 'link3'
|
||||
_winapi.CreateJunction(str(file1), str(link3))
|
||||
# make sure junctions are removed but not followed
|
||||
dir1.delete()
|
||||
dir1._delete()
|
||||
self.assertFalse(dir1.exists())
|
||||
self.assertTrue(dir3.exists())
|
||||
self.assertTrue(file1.exists())
|
||||
|
@ -1060,76 +1028,16 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
import _winapi
|
||||
tmp = self.cls(self.base, 'delete')
|
||||
tmp.mkdir()
|
||||
try:
|
||||
src = tmp / 'cheese'
|
||||
dst = tmp / 'shop'
|
||||
src.mkdir()
|
||||
spam = src / 'spam'
|
||||
spam.write_text('')
|
||||
_winapi.CreateJunction(str(src), str(dst))
|
||||
self.assertRaises(OSError, dst.delete)
|
||||
dst.delete(ignore_errors=True)
|
||||
finally:
|
||||
tmp.delete(ignore_errors=True)
|
||||
|
||||
@needs_windows
|
||||
def test_delete_outer_junction_on_error(self):
|
||||
import _winapi
|
||||
tmp = self.cls(self.base, 'delete')
|
||||
tmp.mkdir()
|
||||
dir_ = tmp / 'dir'
|
||||
dir_.mkdir()
|
||||
link = tmp / 'link'
|
||||
_winapi.CreateJunction(str(dir_), str(link))
|
||||
try:
|
||||
self.assertRaises(OSError, link.delete)
|
||||
self.assertTrue(dir_.exists())
|
||||
self.assertTrue(link.exists(follow_symlinks=False))
|
||||
errors = []
|
||||
|
||||
def on_error(error):
|
||||
errors.append(error)
|
||||
|
||||
link.delete(on_error=on_error)
|
||||
self.assertEqual(len(errors), 1)
|
||||
self.assertIsInstance(errors[0], OSError)
|
||||
self.assertEqual(errors[0].filename, str(link))
|
||||
finally:
|
||||
os.unlink(str(link))
|
||||
|
||||
@unittest.skipUnless(delete_use_fd_functions, "requires safe delete")
|
||||
def test_delete_fails_on_close(self):
|
||||
# Test that the error handler is called for failed os.close() and that
|
||||
# os.close() is only called once for a file descriptor.
|
||||
tmp = self.cls(self.base, 'delete')
|
||||
tmp.mkdir()
|
||||
dir1 = tmp / 'dir1'
|
||||
dir1.mkdir()
|
||||
dir2 = dir1 / 'dir2'
|
||||
dir2.mkdir()
|
||||
|
||||
def close(fd):
|
||||
orig_close(fd)
|
||||
nonlocal close_count
|
||||
close_count += 1
|
||||
raise OSError
|
||||
|
||||
close_count = 0
|
||||
with swap_attr(os, 'close', close) as orig_close:
|
||||
with self.assertRaises(OSError):
|
||||
dir1.delete()
|
||||
self.assertTrue(dir2.is_dir())
|
||||
self.assertEqual(close_count, 2)
|
||||
|
||||
close_count = 0
|
||||
errors = []
|
||||
|
||||
with swap_attr(os, 'close', close) as orig_close:
|
||||
dir1.delete(on_error=errors.append)
|
||||
self.assertEqual(len(errors), 2)
|
||||
self.assertEqual(errors[0].filename, str(dir2))
|
||||
self.assertEqual(errors[1].filename, str(dir1))
|
||||
self.assertEqual(close_count, 2)
|
||||
src = tmp / 'cheese'
|
||||
dst = tmp / 'shop'
|
||||
src.mkdir()
|
||||
spam = src / 'spam'
|
||||
spam.write_text('')
|
||||
_winapi.CreateJunction(str(src), str(dst))
|
||||
dst._delete()
|
||||
self.assertFalse(dst.exists())
|
||||
self.assertTrue(spam.exists())
|
||||
self.assertTrue(src.exists())
|
||||
|
||||
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
|
||||
@unittest.skipIf(sys.platform == "vxworks",
|
||||
|
@ -1137,79 +1045,22 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
def test_delete_on_named_pipe(self):
|
||||
p = self.cls(self.base, 'pipe')
|
||||
os.mkfifo(p)
|
||||
p.delete()
|
||||
p._delete()
|
||||
self.assertFalse(p.exists())
|
||||
|
||||
p = self.cls(self.base, 'dir')
|
||||
p.mkdir()
|
||||
os.mkfifo(p / 'mypipe')
|
||||
p.delete()
|
||||
p._delete()
|
||||
self.assertFalse(p.exists())
|
||||
|
||||
@unittest.skipIf(sys.platform[:6] == 'cygwin',
|
||||
"This test can't be run on Cygwin (issue #1071513).")
|
||||
@os_helper.skip_if_dac_override
|
||||
@os_helper.skip_unless_working_chmod
|
||||
def test_delete_deleted_race_condition(self):
|
||||
# bpo-37260
|
||||
#
|
||||
# Test that a file or a directory deleted after it is enumerated
|
||||
# by scandir() but before unlink() or rmdr() is called doesn't
|
||||
# generate any errors.
|
||||
def on_error(exc):
|
||||
assert exc.filename
|
||||
if not isinstance(exc, PermissionError):
|
||||
raise
|
||||
# Make the parent and the children writeable.
|
||||
for p, mode in zip(paths, old_modes):
|
||||
p.chmod(mode)
|
||||
# Remove other dirs except one.
|
||||
keep = next(p for p in dirs if str(p) != exc.filename)
|
||||
for p in dirs:
|
||||
if p != keep:
|
||||
p.rmdir()
|
||||
# Remove other files except one.
|
||||
keep = next(p for p in files if str(p) != exc.filename)
|
||||
for p in files:
|
||||
if p != keep:
|
||||
p.unlink()
|
||||
|
||||
tmp = self.cls(self.base, 'delete')
|
||||
tmp.mkdir()
|
||||
paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
|
||||
dirs = paths[1::2]
|
||||
files = paths[2::2]
|
||||
for path in dirs:
|
||||
path.mkdir()
|
||||
for path in files:
|
||||
path.write_text('')
|
||||
|
||||
old_modes = [path.stat().st_mode for path in paths]
|
||||
|
||||
# Make the parent and the children non-writeable.
|
||||
new_mode = stat.S_IREAD | stat.S_IEXEC
|
||||
for path in reversed(paths):
|
||||
path.chmod(new_mode)
|
||||
|
||||
try:
|
||||
tmp.delete(on_error=on_error)
|
||||
except:
|
||||
# Test failed, so cleanup artifacts.
|
||||
for path, mode in zip(paths, old_modes):
|
||||
try:
|
||||
path.chmod(mode)
|
||||
except OSError:
|
||||
pass
|
||||
tmp.delete()
|
||||
raise
|
||||
|
||||
def test_delete_does_not_choke_on_failing_lstat(self):
|
||||
try:
|
||||
orig_lstat = os.lstat
|
||||
tmp = self.cls(self.base, 'delete')
|
||||
|
||||
def raiser(fn, *args, **kwargs):
|
||||
if fn != str(tmp):
|
||||
if fn != tmp:
|
||||
raise OSError()
|
||||
else:
|
||||
return orig_lstat(fn)
|
||||
|
@ -1219,7 +1070,7 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
|
|||
tmp.mkdir()
|
||||
foo = tmp / 'foo'
|
||||
foo.write_text('')
|
||||
tmp.delete()
|
||||
tmp._delete()
|
||||
finally:
|
||||
os.lstat = orig_lstat
|
||||
|
||||
|
|
|
@ -2884,20 +2884,20 @@ class DummyPathTest(DummyPurePathTest):
|
|||
|
||||
def test_delete_file(self):
|
||||
p = self.cls(self.base) / 'fileA'
|
||||
p.delete()
|
||||
p._delete()
|
||||
self.assertFileNotFound(p.stat)
|
||||
self.assertFileNotFound(p.unlink)
|
||||
|
||||
def test_delete_dir(self):
|
||||
base = self.cls(self.base)
|
||||
base.joinpath('dirA').delete()
|
||||
base.joinpath('dirA')._delete()
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirA').stat)
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirA', 'linkC').lstat)
|
||||
base.joinpath('dirB').delete()
|
||||
base.joinpath('dirB')._delete()
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirB').stat)
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'fileB').stat)
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'linkD').lstat)
|
||||
base.joinpath('dirC').delete()
|
||||
base.joinpath('dirC')._delete()
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirC').stat)
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD').stat)
|
||||
self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD', 'fileD').stat)
|
||||
|
@ -2912,7 +2912,7 @@ class DummyPathTest(DummyPurePathTest):
|
|||
dir_.mkdir()
|
||||
link = tmp / 'link'
|
||||
link.symlink_to(dir_)
|
||||
link.delete()
|
||||
link._delete()
|
||||
self.assertTrue(dir_.exists())
|
||||
self.assertFalse(link.exists(follow_symlinks=False))
|
||||
|
||||
|
@ -2934,7 +2934,7 @@ class DummyPathTest(DummyPurePathTest):
|
|||
link3 = dir1 / 'link3'
|
||||
link3.symlink_to(file1)
|
||||
# make sure symlinks are removed but not followed
|
||||
dir1.delete()
|
||||
dir1._delete()
|
||||
self.assertFalse(dir1.exists())
|
||||
self.assertTrue(dir3.exists())
|
||||
self.assertTrue(file1.exists())
|
||||
|
@ -2944,15 +2944,7 @@ class DummyPathTest(DummyPurePathTest):
|
|||
tmp.mkdir()
|
||||
# filename is guaranteed not to exist
|
||||
filename = tmp / 'foo'
|
||||
self.assertRaises(FileNotFoundError, filename.delete)
|
||||
# test that ignore_errors option is honored
|
||||
filename.delete(ignore_errors=True)
|
||||
# test on_error
|
||||
errors = []
|
||||
filename.delete(on_error=errors.append)
|
||||
self.assertEqual(len(errors), 1)
|
||||
self.assertIsInstance(errors[0], FileNotFoundError)
|
||||
self.assertEqual(errors[0].filename, str(filename))
|
||||
self.assertRaises(FileNotFoundError, filename._delete)
|
||||
|
||||
def setUpWalk(self):
|
||||
# Build:
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
Add :meth:`pathlib.Path.delete`, which recursively removes a file or directory.
|
Loading…
Reference in New Issue