bpo-38453: Ensure ntpath.realpath correctly resolves relative paths (GH-16967)

Ensure isabs() is always True for \\?\ prefixed paths
Avoid unnecessary usage of readlink() to avoid resolving broken links incorrectly
Ensure shutil tests run in test directory
This commit is contained in:
Steve Dower 2019-11-15 09:49:21 -08:00 committed by GitHub
parent b22030073b
commit abde52cd8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 45 deletions

View File

@ -61,6 +61,14 @@ def normcase(s):
def isabs(s): def isabs(s):
"""Test whether a path is absolute""" """Test whether a path is absolute"""
s = os.fspath(s) s = os.fspath(s)
# Paths beginning with \\?\ are always absolute, but do not
# necessarily contain a drive.
if isinstance(s, bytes):
if s.replace(b'/', b'\\').startswith(b'\\\\?\\'):
return True
else:
if s.replace('/', '\\').startswith('\\\\?\\'):
return True
s = splitdrive(s)[1] s = splitdrive(s)[1]
return len(s) > 0 and s[0] in _get_bothseps(s) return len(s) > 0 and s[0] in _get_bothseps(s)
@ -526,10 +534,7 @@ except ImportError:
# realpath is a no-op on systems without _getfinalpathname support. # realpath is a no-op on systems without _getfinalpathname support.
realpath = abspath realpath = abspath
else: else:
def _readlink_deep(path, seen=None): def _readlink_deep(path):
if seen is None:
seen = set()
# These error codes indicate that we should stop reading links and # These error codes indicate that we should stop reading links and
# return the path we currently have. # return the path we currently have.
# 1: ERROR_INVALID_FUNCTION # 1: ERROR_INVALID_FUNCTION
@ -546,10 +551,22 @@ else:
# 4393: ERROR_REPARSE_TAG_INVALID # 4393: ERROR_REPARSE_TAG_INVALID
allowed_winerror = 1, 2, 3, 5, 21, 32, 50, 67, 87, 4390, 4392, 4393 allowed_winerror = 1, 2, 3, 5, 21, 32, 50, 67, 87, 4390, 4392, 4393
seen = set()
while normcase(path) not in seen: while normcase(path) not in seen:
seen.add(normcase(path)) seen.add(normcase(path))
try: try:
old_path = path
path = _nt_readlink(path) path = _nt_readlink(path)
# Links may be relative, so resolve them against their
# own location
if not isabs(path):
# If it's something other than a symlink, we don't know
# what it's actually going to be resolved against, so
# just return the old path.
if not islink(old_path):
path = old_path
break
path = normpath(join(dirname(old_path), path))
except OSError as ex: except OSError as ex:
if ex.winerror in allowed_winerror: if ex.winerror in allowed_winerror:
break break
@ -579,23 +596,31 @@ else:
# Non-strict algorithm is to find as much of the target directory # Non-strict algorithm is to find as much of the target directory
# as we can and join the rest. # as we can and join the rest.
tail = '' tail = ''
seen = set()
while path: while path:
try: try:
path = _readlink_deep(path, seen)
path = _getfinalpathname(path) path = _getfinalpathname(path)
return join(path, tail) if tail else path return join(path, tail) if tail else path
except OSError as ex: except OSError as ex:
if ex.winerror not in allowed_winerror: if ex.winerror not in allowed_winerror:
raise raise
try:
# The OS could not resolve this path fully, so we attempt
# to follow the link ourselves. If we succeed, join the tail
# and return.
new_path = _readlink_deep(path)
if new_path != path:
return join(new_path, tail) if tail else new_path
except OSError:
# If we fail to readlink(), let's keep traversing
pass
path, name = split(path) path, name = split(path)
# TODO (bpo-38186): Request the real file name from the directory # TODO (bpo-38186): Request the real file name from the directory
# entry using FindFirstFileW. For now, we will return the path # entry using FindFirstFileW. For now, we will return the path
# as best we have it # as best we have it
if path and not name: if path and not name:
return abspath(path + tail) return path + tail
tail = join(name, tail) if tail else name tail = join(name, tail) if tail else name
return abspath(tail) return tail
def realpath(path): def realpath(path):
path = normpath(path) path = normpath(path)
@ -604,12 +629,20 @@ else:
unc_prefix = b'\\\\?\\UNC\\' unc_prefix = b'\\\\?\\UNC\\'
new_unc_prefix = b'\\\\' new_unc_prefix = b'\\\\'
cwd = os.getcwdb() cwd = os.getcwdb()
# bpo-38081: Special case for realpath(b'nul')
if normcase(path) == normcase(os.fsencode(devnull)):
return b'\\\\.\\NUL'
else: else:
prefix = '\\\\?\\' prefix = '\\\\?\\'
unc_prefix = '\\\\?\\UNC\\' unc_prefix = '\\\\?\\UNC\\'
new_unc_prefix = '\\\\' new_unc_prefix = '\\\\'
cwd = os.getcwd() cwd = os.getcwd()
# bpo-38081: Special case for realpath('nul')
if normcase(path) == normcase(devnull):
return '\\\\.\\NUL'
had_prefix = path.startswith(prefix) had_prefix = path.startswith(prefix)
if not had_prefix and not isabs(path):
path = join(cwd, path)
try: try:
path = _getfinalpathname(path) path = _getfinalpathname(path)
initial_winerror = 0 initial_winerror = 0

View File

@ -286,14 +286,16 @@ class TestNtpath(NtpathTestCase):
ABSTFN + r"\missing") ABSTFN + r"\missing")
self.assertPathEqual(ntpath.realpath(r"broken\foo"), self.assertPathEqual(ntpath.realpath(r"broken\foo"),
ABSTFN + r"\missing\foo") ABSTFN + r"\missing\foo")
# bpo-38453: We no longer recursively resolve segments of relative
# symlinks that the OS cannot resolve.
self.assertPathEqual(ntpath.realpath(r"broken1"), self.assertPathEqual(ntpath.realpath(r"broken1"),
ABSTFN + r"\missing\bar") ABSTFN + r"\broken\bar")
self.assertPathEqual(ntpath.realpath(r"broken1\baz"), self.assertPathEqual(ntpath.realpath(r"broken1\baz"),
ABSTFN + r"\missing\bar\baz") ABSTFN + r"\broken\bar\baz")
self.assertPathEqual(ntpath.realpath("broken2"), self.assertPathEqual(ntpath.realpath("broken2"),
ABSTFN + r"\missing") ABSTFN + r"\self\self\missing")
self.assertPathEqual(ntpath.realpath("broken3"), self.assertPathEqual(ntpath.realpath("broken3"),
ABSTFN + r"\missing") ABSTFN + r"\subdir\parent\subdir\parent\missing")
self.assertPathEqual(ntpath.realpath("broken4"), self.assertPathEqual(ntpath.realpath("broken4"),
ABSTFN + r"\missing") ABSTFN + r"\missing")
self.assertPathEqual(ntpath.realpath("broken5"), self.assertPathEqual(ntpath.realpath("broken5"),
@ -304,13 +306,13 @@ class TestNtpath(NtpathTestCase):
self.assertPathEqual(ntpath.realpath(rb"broken\foo"), self.assertPathEqual(ntpath.realpath(rb"broken\foo"),
os.fsencode(ABSTFN + r"\missing\foo")) os.fsencode(ABSTFN + r"\missing\foo"))
self.assertPathEqual(ntpath.realpath(rb"broken1"), self.assertPathEqual(ntpath.realpath(rb"broken1"),
os.fsencode(ABSTFN + r"\missing\bar")) os.fsencode(ABSTFN + r"\broken\bar"))
self.assertPathEqual(ntpath.realpath(rb"broken1\baz"), self.assertPathEqual(ntpath.realpath(rb"broken1\baz"),
os.fsencode(ABSTFN + r"\missing\bar\baz")) os.fsencode(ABSTFN + r"\broken\bar\baz"))
self.assertPathEqual(ntpath.realpath(b"broken2"), self.assertPathEqual(ntpath.realpath(b"broken2"),
os.fsencode(ABSTFN + r"\missing")) os.fsencode(ABSTFN + r"\self\self\missing"))
self.assertPathEqual(ntpath.realpath(rb"broken3"), self.assertPathEqual(ntpath.realpath(rb"broken3"),
os.fsencode(ABSTFN + r"\missing")) os.fsencode(ABSTFN + r"\subdir\parent\subdir\parent\missing"))
self.assertPathEqual(ntpath.realpath(b"broken4"), self.assertPathEqual(ntpath.realpath(b"broken4"),
os.fsencode(ABSTFN + r"\missing")) os.fsencode(ABSTFN + r"\missing"))
self.assertPathEqual(ntpath.realpath(b"broken5"), self.assertPathEqual(ntpath.realpath(b"broken5"),
@ -319,8 +321,8 @@ class TestNtpath(NtpathTestCase):
@support.skip_unless_symlink @support.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_symlink_loops(self): def test_realpath_symlink_loops(self):
# Bug #930024, return the path unchanged if we get into an infinite # Symlink loops are non-deterministic as to which path is returned, but
# symlink loop. # it will always be the fully resolved path of one member of the cycle
ABSTFN = ntpath.abspath(support.TESTFN) ABSTFN = ntpath.abspath(support.TESTFN)
self.addCleanup(support.unlink, ABSTFN) self.addCleanup(support.unlink, ABSTFN)
self.addCleanup(support.unlink, ABSTFN + "1") self.addCleanup(support.unlink, ABSTFN + "1")
@ -332,8 +334,6 @@ class TestNtpath(NtpathTestCase):
os.symlink(ABSTFN, ABSTFN) os.symlink(ABSTFN, ABSTFN)
self.assertPathEqual(ntpath.realpath(ABSTFN), ABSTFN) self.assertPathEqual(ntpath.realpath(ABSTFN), ABSTFN)
# cycles are non-deterministic as to which path is returned, but
# it will always be the fully resolved path of one member of the cycle
os.symlink(ABSTFN + "1", ABSTFN + "2") os.symlink(ABSTFN + "1", ABSTFN + "2")
os.symlink(ABSTFN + "2", ABSTFN + "1") os.symlink(ABSTFN + "2", ABSTFN + "1")
expected = (ABSTFN + "1", ABSTFN + "2") expected = (ABSTFN + "1", ABSTFN + "2")
@ -402,6 +402,34 @@ class TestNtpath(NtpathTestCase):
def test_realpath_nul(self): def test_realpath_nul(self):
tester("ntpath.realpath('NUL')", r'\\.\NUL') tester("ntpath.realpath('NUL')", r'\\.\NUL')
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_cwd(self):
ABSTFN = ntpath.abspath(support.TESTFN)
support.unlink(ABSTFN)
support.rmtree(ABSTFN)
os.mkdir(ABSTFN)
self.addCleanup(support.rmtree, ABSTFN)
test_dir_long = ntpath.join(ABSTFN, "MyVeryLongDirectoryName")
test_dir_short = ntpath.join(ABSTFN, "MYVERY~1")
test_file_long = ntpath.join(test_dir_long, "file.txt")
test_file_short = ntpath.join(test_dir_short, "file.txt")
os.mkdir(test_dir_long)
with open(test_file_long, "wb") as f:
f.write(b"content")
self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short))
with support.change_cwd(test_dir_long):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
with support.change_cwd(test_dir_long.lower()):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
with support.change_cwd(test_dir_short):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
def test_expandvars(self): def test_expandvars(self):
with support.EnvironmentVarGuard() as env: with support.EnvironmentVarGuard() as env:
env.clear() env.clear()

View File

@ -123,12 +123,12 @@ def supports_file2file_sendfile():
srcname = None srcname = None
dstname = None dstname = None
try: try:
with tempfile.NamedTemporaryFile("wb", delete=False) as f: with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
srcname = f.name srcname = f.name
f.write(b"0123456789") f.write(b"0123456789")
with open(srcname, "rb") as src: with open(srcname, "rb") as src:
with tempfile.NamedTemporaryFile("wb", delete=False) as dst: with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
dstname = dst.name dstname = dst.name
infd = src.fileno() infd = src.fileno()
outfd = dst.fileno() outfd = dst.fileno()
@ -162,12 +162,12 @@ def _maxdataOK():
class BaseTest: class BaseTest:
def mkdtemp(self): def mkdtemp(self, prefix=None):
"""Create a temporary directory that will be cleaned up. """Create a temporary directory that will be cleaned up.
Returns the path of the directory. Returns the path of the directory.
""" """
d = tempfile.mkdtemp() d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
self.addCleanup(support.rmtree, d) self.addCleanup(support.rmtree, d)
return d return d
@ -231,6 +231,7 @@ class TestRmTree(BaseTest, unittest.TestCase):
os.mkdir(dir_) os.mkdir(dir_)
link = os.path.join(tmp, 'link') link = os.path.join(tmp, 'link')
_winapi.CreateJunction(dir_, link) _winapi.CreateJunction(dir_, link)
self.addCleanup(support.unlink, link)
self.assertRaises(OSError, shutil.rmtree, link) self.assertRaises(OSError, shutil.rmtree, link)
self.assertTrue(os.path.exists(dir_)) self.assertTrue(os.path.exists(dir_))
self.assertTrue(os.path.lexists(link)) self.assertTrue(os.path.lexists(link))
@ -267,7 +268,7 @@ class TestRmTree(BaseTest, unittest.TestCase):
def test_rmtree_errors(self): def test_rmtree_errors(self):
# filename is guaranteed not to exist # filename is guaranteed not to exist
filename = tempfile.mktemp() filename = tempfile.mktemp(dir=self.mkdtemp())
self.assertRaises(FileNotFoundError, shutil.rmtree, filename) self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
# test that ignore_errors option is honored # test that ignore_errors option is honored
shutil.rmtree(filename, ignore_errors=True) shutil.rmtree(filename, ignore_errors=True)
@ -401,7 +402,7 @@ class TestRmTree(BaseTest, unittest.TestCase):
def test_rmtree_dont_delete_file(self): def test_rmtree_dont_delete_file(self):
# When called on a file instead of a directory, don't delete it. # When called on a file instead of a directory, don't delete it.
handle, path = tempfile.mkstemp() handle, path = tempfile.mkstemp(dir=self.mkdtemp())
os.close(handle) os.close(handle)
self.assertRaises(NotADirectoryError, shutil.rmtree, path) self.assertRaises(NotADirectoryError, shutil.rmtree, path)
os.remove(path) os.remove(path)
@ -438,8 +439,8 @@ class TestRmTree(BaseTest, unittest.TestCase):
class TestCopyTree(BaseTest, unittest.TestCase): class TestCopyTree(BaseTest, unittest.TestCase):
def test_copytree_simple(self): def test_copytree_simple(self):
src_dir = tempfile.mkdtemp() src_dir = self.mkdtemp()
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') dst_dir = os.path.join(self.mkdtemp(), 'destination')
self.addCleanup(shutil.rmtree, src_dir) self.addCleanup(shutil.rmtree, src_dir)
self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
write_file((src_dir, 'test.txt'), '123') write_file((src_dir, 'test.txt'), '123')
@ -457,8 +458,8 @@ class TestCopyTree(BaseTest, unittest.TestCase):
self.assertEqual(actual, '456') self.assertEqual(actual, '456')
def test_copytree_dirs_exist_ok(self): def test_copytree_dirs_exist_ok(self):
src_dir = tempfile.mkdtemp() src_dir = self.mkdtemp()
dst_dir = tempfile.mkdtemp() dst_dir = self.mkdtemp()
self.addCleanup(shutil.rmtree, src_dir) self.addCleanup(shutil.rmtree, src_dir)
self.addCleanup(shutil.rmtree, dst_dir) self.addCleanup(shutil.rmtree, dst_dir)
@ -517,9 +518,9 @@ class TestCopyTree(BaseTest, unittest.TestCase):
# creating data # creating data
join = os.path.join join = os.path.join
exists = os.path.exists exists = os.path.exists
src_dir = tempfile.mkdtemp() src_dir = self.mkdtemp()
try: try:
dst_dir = join(tempfile.mkdtemp(), 'destination') dst_dir = join(self.mkdtemp(), 'destination')
write_file((src_dir, 'test.txt'), '123') write_file((src_dir, 'test.txt'), '123')
write_file((src_dir, 'test.tmp'), '123') write_file((src_dir, 'test.tmp'), '123')
os.mkdir(join(src_dir, 'test_dir')) os.mkdir(join(src_dir, 'test_dir'))
@ -579,7 +580,7 @@ class TestCopyTree(BaseTest, unittest.TestCase):
shutil.rmtree(os.path.dirname(dst_dir)) shutil.rmtree(os.path.dirname(dst_dir))
def test_copytree_retains_permissions(self): def test_copytree_retains_permissions(self):
tmp_dir = tempfile.mkdtemp() tmp_dir = self.mkdtemp()
src_dir = os.path.join(tmp_dir, 'source') src_dir = os.path.join(tmp_dir, 'source')
os.mkdir(src_dir) os.mkdir(src_dir)
dst_dir = os.path.join(tmp_dir, 'destination') dst_dir = os.path.join(tmp_dir, 'destination')
@ -591,6 +592,7 @@ class TestCopyTree(BaseTest, unittest.TestCase):
write_file((src_dir, 'restrictive.txt'), '456') write_file((src_dir, 'restrictive.txt'), '456')
os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
restrictive_subdir = tempfile.mkdtemp(dir=src_dir) restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
self.addCleanup(support.rmtree, restrictive_subdir)
os.chmod(restrictive_subdir, 0o600) os.chmod(restrictive_subdir, 0o600)
shutil.copytree(src_dir, dst_dir) shutil.copytree(src_dir, dst_dir)
@ -609,8 +611,8 @@ class TestCopyTree(BaseTest, unittest.TestCase):
# When copying to VFAT, copystat() raises OSError. On Windows, the # When copying to VFAT, copystat() raises OSError. On Windows, the
# exception object has a meaningful 'winerror' attribute, but not # exception object has a meaningful 'winerror' attribute, but not
# on other operating systems. Do not assume 'winerror' is set. # on other operating systems. Do not assume 'winerror' is set.
src_dir = tempfile.mkdtemp() src_dir = self.mkdtemp()
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') dst_dir = os.path.join(self.mkdtemp(), 'destination')
self.addCleanup(shutil.rmtree, src_dir) self.addCleanup(shutil.rmtree, src_dir)
self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
@ -628,10 +630,8 @@ class TestCopyTree(BaseTest, unittest.TestCase):
self.assertEqual(b, os.path.join(dst, 'foo')) self.assertEqual(b, os.path.join(dst, 'foo'))
flag = [] flag = []
src = tempfile.mkdtemp() src = self.mkdtemp()
self.addCleanup(support.rmtree, src) dst = tempfile.mktemp(dir=self.mkdtemp())
dst = tempfile.mktemp()
self.addCleanup(support.rmtree, dst)
with open(os.path.join(src, 'foo'), 'w') as f: with open(os.path.join(src, 'foo'), 'w') as f:
f.close() f.close()
shutil.copytree(src, dst, copy_function=custom_cpfun) shutil.copytree(src, dst, copy_function=custom_cpfun)
@ -1624,8 +1624,7 @@ class TestMisc(BaseTest, unittest.TestCase):
class TestWhich(BaseTest, unittest.TestCase): class TestWhich(BaseTest, unittest.TestCase):
def setUp(self): def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="Tmp") self.temp_dir = self.mkdtemp(prefix="Tmp")
self.addCleanup(support.rmtree, self.temp_dir)
# Give the temp_file an ".exe" suffix for all. # Give the temp_file an ".exe" suffix for all.
# It's needed on Windows and not harmful on other platforms. # It's needed on Windows and not harmful on other platforms.
self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
@ -1857,7 +1856,7 @@ class TestMove(BaseTest, unittest.TestCase):
def test_move_dir(self): def test_move_dir(self):
# Move a dir to another location on the same filesystem. # Move a dir to another location on the same filesystem.
dst_dir = tempfile.mktemp() dst_dir = tempfile.mktemp(dir=self.mkdtemp())
try: try:
self._check_move_dir(self.src_dir, dst_dir, dst_dir) self._check_move_dir(self.src_dir, dst_dir, dst_dir)
finally: finally:
@ -2156,7 +2155,7 @@ class TestCopyFileObj(unittest.TestCase):
# If file size < 1 MiB memoryview() length must be equal to # If file size < 1 MiB memoryview() length must be equal to
# the actual file size. # the actual file size.
with tempfile.NamedTemporaryFile(delete=False) as f: with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
f.write(b'foo') f.write(b'foo')
fname = f.name fname = f.name
self.addCleanup(support.unlink, fname) self.addCleanup(support.unlink, fname)
@ -2165,7 +2164,7 @@ class TestCopyFileObj(unittest.TestCase):
self.assertEqual(m.call_args[0][2], 3) self.assertEqual(m.call_args[0][2], 3)
# Empty files should not rely on readinto() variant. # Empty files should not rely on readinto() variant.
with tempfile.NamedTemporaryFile(delete=False) as f: with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
pass pass
fname = f.name fname = f.name
self.addCleanup(support.unlink, fname) self.addCleanup(support.unlink, fname)
@ -2231,7 +2230,7 @@ class _ZeroCopyFileTest(object):
self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
def test_non_existent_src(self): def test_non_existent_src(self):
name = tempfile.mktemp() name = tempfile.mktemp(dir=os.getcwd())
with self.assertRaises(FileNotFoundError) as cm: with self.assertRaises(FileNotFoundError) as cm:
shutil.copyfile(name, "new") shutil.copyfile(name, "new")
self.assertEqual(cm.exception.filename, name) self.assertEqual(cm.exception.filename, name)

View File

@ -0,0 +1 @@
Ensure ntpath.realpath() correctly resolves relative paths.