Issue #14082: shutil.copy2() now copies extended attributes, if possible.

Patch by Hynek Schlawack.
This commit is contained in:
Antoine Pitrou 2012-05-12 19:02:01 +02:00
parent 4d688e3275
commit 424246fbf3
6 changed files with 141 additions and 22 deletions

View File

@ -102,14 +102,14 @@ Directory and files operations
.. function:: copy2(src, dst[, symlinks=False])
Similar to :func:`shutil.copy`, but metadata is copied as well -- in fact,
this is just :func:`shutil.copy` followed by :func:`copystat`. This is
Similar to :func:`shutil.copy`, but metadata is copied as well. This is
similar to the Unix command :program:`cp -p`. If *symlinks* is true,
symbolic links won't be followed but recreated instead -- this resembles
GNU's :program:`cp -P`.
.. versionchanged:: 3.3
Added *symlinks* argument.
Added *symlinks* argument, try to copy extended file system attributes
too (currently Linux only).
.. function:: ignore_patterns(\*patterns)

View File

@ -166,6 +166,36 @@ def copystat(src, dst, symlinks=False):
else:
raise
if hasattr(os, 'listxattr'):
def _copyxattr(src, dst, symlinks=False):
"""Copy extended filesystem attributes from `src` to `dst`.
Overwrite existing attributes.
If the optional flag `symlinks` is set, symlinks won't be followed.
"""
if symlinks:
listxattr = os.llistxattr
removexattr = os.lremovexattr
setxattr = os.lsetxattr
getxattr = os.lgetxattr
else:
listxattr = os.listxattr
removexattr = os.removexattr
setxattr = os.setxattr
getxattr = os.getxattr
for attr in listxattr(src):
try:
setxattr(dst, attr, getxattr(src, attr))
except OSError as e:
if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA):
raise
else:
def _copyxattr(*args, **kwargs):
pass
def copy(src, dst, symlinks=False):
"""Copy data and mode bits ("cp src dst").
@ -193,6 +223,7 @@ def copy2(src, dst, symlinks=False):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, symlinks=symlinks)
copystat(src, dst, symlinks=symlinks)
_copyxattr(src, dst, symlinks=symlinks)
def ignore_patterns(*patterns):
"""Function that can be used as copytree() ignore parameter.

View File

@ -1696,6 +1696,35 @@ def skip_unless_symlink(test):
msg = "Requires functional symlink implementation"
return test if ok else unittest.skip(msg)(test)
_can_xattr = None
def can_xattr():
global _can_xattr
if _can_xattr is not None:
return _can_xattr
if not hasattr(os, "setxattr"):
can = False
else:
try:
with open(TESTFN, "wb") as fp:
try:
os.fsetxattr(fp.fileno(), b"user.test", b"")
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match("2.6.(\d{1,2})", kernel_version)
can = m is None or int(m.group(1)) >= 39
except OSError:
can = False
finally:
unlink(TESTFN)
_can_xattr = can
return can
def skip_unless_xattr(test):
"""Skip decorator for tests that require functional extended attributes"""
ok = can_xattr()
msg = "no non-broken extended attribute support"
return test if ok else unittest.skip(msg)(test)
def patch(test_instance, object_to_patch, attr_name, new_value):
"""Override 'object_to_patch'.'attr_name' with 'new_value'.

View File

@ -1810,25 +1810,7 @@ class TestSendfile(unittest.TestCase):
raise
def supports_extended_attributes():
if not hasattr(os, "setxattr"):
return False
try:
with open(support.TESTFN, "wb") as fp:
try:
os.fsetxattr(fp.fileno(), b"user.test", b"")
except OSError:
return False
finally:
support.unlink(support.TESTFN)
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match("2.6.(\d{1,2})", kernel_version)
return m is None or int(m.group(1)) >= 39
@unittest.skipUnless(supports_extended_attributes(),
"no non-broken extended attribute support")
@support.skip_unless_xattr
class ExtendedAttributeTests(unittest.TestCase):
def tearDown(self):

View File

@ -311,6 +311,67 @@ class TestShutil(unittest.TestCase):
finally:
os.chflags = old_chflags
@support.skip_unless_xattr
def test_copyxattr(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
write_file(src, 'foo')
dst = os.path.join(tmp_dir, 'bar')
write_file(dst, 'bar')
# no xattr == no problem
shutil._copyxattr(src, dst)
# common case
os.setxattr(src, 'user.foo', b'42')
os.setxattr(src, 'user.bar', b'43')
shutil._copyxattr(src, dst)
self.assertEqual(os.listxattr(src), os.listxattr(dst))
self.assertEqual(
os.getxattr(src, 'user.foo'),
os.getxattr(dst, 'user.foo'))
# check errors don't affect other attrs
os.remove(dst)
write_file(dst, 'bar')
os_error = OSError(errno.EPERM, 'EPERM')
def _raise_on_user_foo(fname, attr, val):
if attr == 'user.foo':
raise os_error
else:
orig_setxattr(fname, attr, val)
try:
orig_setxattr = os.setxattr
os.setxattr = _raise_on_user_foo
shutil._copyxattr(src, dst)
self.assertEqual(['user.bar'], os.listxattr(dst))
finally:
os.setxattr = orig_setxattr
@support.skip_unless_symlink
@support.skip_unless_xattr
@unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
'root privileges required')
def test_copyxattr_symlinks(self):
# On Linux, it's only possible to access non-user xattr for symlinks;
# which in turn require root privileges. This test should be expanded
# as soon as other platforms gain support for extended attributes.
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
src_link = os.path.join(tmp_dir, 'baz')
write_file(src, 'foo')
os.symlink(src, src_link)
os.setxattr(src, 'trusted.foo', b'42')
os.lsetxattr(src_link, 'trusted.foo', b'43')
dst = os.path.join(tmp_dir, 'bar')
dst_link = os.path.join(tmp_dir, 'qux')
write_file(dst, 'bar')
os.symlink(dst, dst_link)
shutil._copyxattr(src_link, dst_link, symlinks=True)
self.assertEqual(os.lgetxattr(dst_link, 'trusted.foo'), b'43')
self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
shutil._copyxattr(src_link, dst, symlinks=True)
self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
@support.skip_unless_symlink
def test_copy_symlinks(self):
tmp_dir = self.mkdtemp()
@ -369,6 +430,19 @@ class TestShutil(unittest.TestCase):
if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
@support.skip_unless_xattr
def test_copy2_xattr(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
write_file(src, 'foo')
os.setxattr(src, 'user.foo', b'42')
shutil.copy2(src, dst)
self.assertEqual(
os.getxattr(src, 'user.foo'),
os.getxattr(dst, 'user.foo'))
os.remove(dst)
@support.skip_unless_symlink
def test_copyfile_symlinks(self):
tmp_dir = self.mkdtemp()

View File

@ -23,6 +23,9 @@ Core and Builtins
Library
-------
- Issue #14082: shutil.copy2() now copies extended attributes, if possible.
Patch by Hynek Schlawack.
- Issue #13959: Make importlib.abc.FileLoader.load_module()/get_filename() and
importlib.machinery.ExtensionFileLoader.load_module() have their single
argument be optional. Allows for the replacement (and thus deprecation) of