Bug #1048941: shutil.rmtree error handling was always broken
Rewrite rmtree again, this time without os.walk(). Error handling had been broken since Python 2.3, and the os.walk() version inherited this.
This commit is contained in:
parent
57341c37c9
commit
ef5ffc4765
|
@ -127,39 +127,45 @@ def copytree(src, dst, symlinks=False):
|
|||
if errors:
|
||||
raise Error, errors
|
||||
|
||||
def _raise_err(err):
|
||||
raise err
|
||||
|
||||
def rmtree(path, ignore_errors=False, onerror=None):
|
||||
"""Recursively delete a directory tree.
|
||||
|
||||
If ignore_errors is set, errors are ignored; otherwise, if
|
||||
onerror is set, it is called to handle the error; otherwise, an
|
||||
exception is raised.
|
||||
If ignore_errors is set, errors are ignored; otherwise, if onerror
|
||||
is set, it is called to handle the error with arguments (func,
|
||||
path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
|
||||
path is the argument to that function that caused it to fail; and
|
||||
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
|
||||
is false and onerror is None, an exception is raised.
|
||||
|
||||
"""
|
||||
# This strange way of calling functions is necessary to keep the onerror
|
||||
# argument working. Maybe sys._getframe hackery would work as well, but
|
||||
# this is simple.
|
||||
func = os.listdir
|
||||
arg = path
|
||||
try:
|
||||
for (dirpath, dirnames, filenames) in os.walk(path, topdown=False,
|
||||
onerror=_raise_err):
|
||||
for filename in filenames:
|
||||
func = os.remove
|
||||
arg = os.path.join(dirpath, filename)
|
||||
func(arg)
|
||||
func = os.rmdir
|
||||
arg = dirpath
|
||||
func(arg)
|
||||
except OSError:
|
||||
exc = sys.exc_info()
|
||||
if ignore_errors:
|
||||
def onerror(*args):
|
||||
pass
|
||||
elif onerror is not None:
|
||||
onerror(func, arg, exc)
|
||||
elif onerror is None:
|
||||
def onerror(*args):
|
||||
raise
|
||||
names = []
|
||||
try:
|
||||
names = os.listdir(path)
|
||||
except os.error, err:
|
||||
onerror(os.listdir, path, sys.exc_info())
|
||||
for name in names:
|
||||
fullname = os.path.join(path, name)
|
||||
try:
|
||||
mode = os.lstat(fullname).st_mode
|
||||
except os.error:
|
||||
mode = 0
|
||||
if stat.S_ISDIR(mode):
|
||||
rmtree(fullname, ignore_errors, onerror)
|
||||
else:
|
||||
raise exc[0], (exc[1][0], exc[1][1] + ' removing '+arg)
|
||||
try:
|
||||
os.remove(fullname)
|
||||
except os.error, err:
|
||||
onerror(os.remove, fullname, sys.exc_info())
|
||||
try:
|
||||
os.rmdir(path)
|
||||
except os.error:
|
||||
onerror(os.rmdir, path, sys.exc_info())
|
||||
|
||||
def move(src, dst):
|
||||
"""Recursively move a file or directory to another location.
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
|
||||
# Copyright (C) 2003 Python Software Foundation
|
||||
|
||||
import unittest
|
||||
import shutil
|
||||
import tempfile
|
||||
import stat
|
||||
import os
|
||||
import os.path
|
||||
from test import test_support
|
||||
|
@ -13,8 +15,32 @@ class TestShutil(unittest.TestCase):
|
|||
# filename is guaranteed not to exist
|
||||
filename = tempfile.mktemp()
|
||||
self.assertRaises(OSError, shutil.rmtree, filename)
|
||||
self.assertEqual(shutil.rmtree(filename, True), None)
|
||||
shutil.rmtree(filename, False, lambda func, arg, exc: None)
|
||||
|
||||
if hasattr(os, 'chmod'):
|
||||
def test_on_error(self):
|
||||
self.errorState = 0
|
||||
os.mkdir(TESTFN)
|
||||
f = open(os.path.join(TESTFN, 'a'), 'w')
|
||||
f.close()
|
||||
# Make TESTFN unwritable.
|
||||
os.chmod(TESTFN, stat.S_IRUSR)
|
||||
|
||||
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
|
||||
|
||||
# Make TESTFN writable again.
|
||||
os.chmod(TESTFN, stat.S_IRWXU)
|
||||
shutil.rmtree(TESTFN)
|
||||
|
||||
def check_args_to_onerror(self, func, arg, exc):
|
||||
if self.errorState == 0:
|
||||
self.assertEqual(func, os.remove)
|
||||
self.assertEqual(arg, os.path.join(TESTFN, 'a'))
|
||||
self.assertEqual(exc[0], OSError)
|
||||
self.errorState = 1
|
||||
else:
|
||||
self.assertEqual(func, os.rmdir)
|
||||
self.assertEqual(arg, TESTFN)
|
||||
self.assertEqual(exc[0], OSError)
|
||||
|
||||
def test_rmtree_dont_delete_file(self):
|
||||
# When called on a file instead of a directory, don't delete it.
|
||||
|
@ -63,7 +89,6 @@ class TestShutil(unittest.TestCase):
|
|||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def test_main():
|
||||
test_support.run_unittest(TestShutil)
|
||||
|
||||
|
|
Loading…
Reference in New Issue