bpo-46426: Improve tests for the dir_fd argument (GH-30668)

Ensure that directory file descriptors refer to directories different
from the current directory, and that src_dir_fd and dst_dir_fd refer
to different directories.

Add context manager open_dir_fd() in test.support.os_helper.
This commit is contained in:
Serhiy Storchaka 2022-01-21 09:54:50 +02:00 committed by GitHub
parent 40fcd16889
commit 54610bb448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 208 additions and 230 deletions

View File

@ -455,6 +455,17 @@ def create_empty_file(filename):
os.close(fd)
@contextlib.contextmanager
def open_dir_fd(path):
"""Open a file descriptor to a directory."""
assert os.path.isdir(path)
dir_fd = os.open(path, os.O_RDONLY)
try:
yield dir_fd
finally:
os.close(dir_fd)
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory
is case-insensitive."""

View File

@ -848,12 +848,9 @@ class UtimeTests(unittest.TestCase):
def test_utime_dir_fd(self):
def set_time(filename, ns):
dirname, name = os.path.split(filename)
dirfd = os.open(dirname, os.O_RDONLY)
try:
with os_helper.open_dir_fd(dirname) as dirfd:
# pass dir_fd to test utimensat(timespec) or futimesat(timeval)
os.utime(name, dir_fd=dirfd, ns=ns)
finally:
os.close(dirfd)
self._test_utime(set_time)
def test_utime_directory(self):
@ -4341,8 +4338,7 @@ class TestScandir(unittest.TestCase):
os.symlink('file.txt', os.path.join(self.path, 'link'))
expected_names.append('link')
fd = os.open(self.path, os.O_RDONLY)
try:
with os_helper.open_dir_fd(self.path) as fd:
with os.scandir(fd) as it:
entries = list(it)
names = [entry.name for entry in entries]
@ -4357,8 +4353,6 @@ class TestScandir(unittest.TestCase):
self.assertEqual(entry.stat(), st)
st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
self.assertEqual(entry.stat(follow_symlinks=False), st)
finally:
os.close(fd)
def test_empty_path(self):
self.assertRaises(FileNotFoundError, os.scandir, '')

View File

@ -21,6 +21,7 @@ import tempfile
import unittest
import warnings
import textwrap
from contextlib import contextmanager
_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
os_helper.TESTFN + '-dummy-symlink')
@ -1081,187 +1082,6 @@ class PosixTester(unittest.TestCase):
symdiff = idg_groups.symmetric_difference(posix.getgroups())
self.assertTrue(not symdiff or symdiff == {posix.getegid()})
# tests for the posix *at functions follow
@unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
def test_access_dir_fd(self):
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK, dir_fd=f))
finally:
posix.close(f)
@unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
def test_chmod_dir_fd(self):
os.chmod(os_helper.TESTFN, stat.S_IRUSR)
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.chmod(os_helper.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
s = posix.stat(os_helper.TESTFN)
self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
finally:
posix.close(f)
@unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd),
"test needs dir_fd support in os.chown()")
def test_chown_dir_fd(self):
os_helper.unlink(os_helper.TESTFN)
os_helper.create_empty_file(os_helper.TESTFN)
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.chown(os_helper.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
finally:
posix.close(f)
@unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
def test_stat_dir_fd(self):
os_helper.unlink(os_helper.TESTFN)
with open(os_helper.TESTFN, 'w') as outfile:
outfile.write("testline\n")
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
s1 = posix.stat(os_helper.TESTFN)
s2 = posix.stat(os_helper.TESTFN, dir_fd=f)
self.assertEqual(s1, s2)
s2 = posix.stat(os_helper.TESTFN, dir_fd=None)
self.assertEqual(s1, s2)
self.assertRaisesRegex(TypeError, 'should be integer or None, not',
posix.stat, os_helper.TESTFN, dir_fd=posix.getcwd())
self.assertRaisesRegex(TypeError, 'should be integer or None, not',
posix.stat, os_helper.TESTFN, dir_fd=float(f))
self.assertRaises(OverflowError,
posix.stat, os_helper.TESTFN, dir_fd=10**20)
finally:
posix.close(f)
@unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
def test_utime_dir_fd(self):
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
now = time.time()
posix.utime(os_helper.TESTFN, None, dir_fd=f)
posix.utime(os_helper.TESTFN, dir_fd=f)
self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
now, dir_fd=f)
self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
(None, None), dir_fd=f)
self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
(now, None), dir_fd=f)
self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
(None, now), dir_fd=f)
self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
(now, "x"), dir_fd=f)
posix.utime(os_helper.TESTFN, (int(now), int(now)), dir_fd=f)
posix.utime(os_helper.TESTFN, (now, now), dir_fd=f)
posix.utime(os_helper.TESTFN,
(int(now), int((now - int(now)) * 1e9)), dir_fd=f)
posix.utime(os_helper.TESTFN, dir_fd=f,
times=(int(now), int((now - int(now)) * 1e9)))
# try dir_fd and follow_symlinks together
if os.utime in os.supports_follow_symlinks:
try:
posix.utime(os_helper.TESTFN, follow_symlinks=False,
dir_fd=f)
except ValueError:
# whoops! using both together not supported on this platform.
pass
finally:
posix.close(f)
@unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
def test_link_dir_fd(self):
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.link(os_helper.TESTFN, os_helper.TESTFN + 'link',
src_dir_fd=f, dst_dir_fd=f)
except PermissionError as e:
self.skipTest('posix.link(): %s' % e)
else:
# should have same inodes
self.assertEqual(posix.stat(os_helper.TESTFN)[1],
posix.stat(os_helper.TESTFN + 'link')[1])
finally:
posix.close(f)
os_helper.unlink(os_helper.TESTFN + 'link')
@unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
def test_mkdir_dir_fd(self):
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mkdir(os_helper.TESTFN + 'dir', dir_fd=f)
posix.stat(os_helper.TESTFN + 'dir') # should not raise exception
finally:
posix.close(f)
os_helper.rmtree(os_helper.TESTFN + 'dir')
@unittest.skipUnless(hasattr(os, 'mknod')
and (os.mknod in os.supports_dir_fd)
and hasattr(stat, 'S_IFIFO'),
"test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
os_helper.unlink(os_helper.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(os_helper.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
finally:
posix.close(f)
@unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
def test_open_dir_fd(self):
os_helper.unlink(os_helper.TESTFN)
with open(os_helper.TESTFN, 'w') as outfile:
outfile.write("testline\n")
a = posix.open(posix.getcwd(), posix.O_RDONLY)
b = posix.open(os_helper.TESTFN, posix.O_RDONLY, dir_fd=a)
try:
res = posix.read(b, 9).decode(encoding="utf-8")
self.assertEqual("testline\n", res)
finally:
posix.close(a)
posix.close(b)
@unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd),
"test needs dir_fd support in os.readlink()")
def test_readlink_dir_fd(self):
os.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link')
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'),
posix.readlink(os_helper.TESTFN + 'link', dir_fd=f))
finally:
os_helper.unlink(os_helper.TESTFN + 'link')
posix.close(f)
@unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
def test_rename_dir_fd(self):
os_helper.unlink(os_helper.TESTFN)
os_helper.create_empty_file(os_helper.TESTFN + 'ren')
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN, src_dir_fd=f, dst_dir_fd=f)
except:
posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN)
raise
else:
posix.stat(os_helper.TESTFN) # should not raise exception
finally:
posix.close(f)
@unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal')
@unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result")
def test_cld_xxxx_constants(self):
@ -1272,47 +1092,6 @@ class PosixTester(unittest.TestCase):
os.CLD_STOPPED
os.CLD_CONTINUED
@unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
def test_symlink_dir_fd(self):
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link',
dir_fd=f)
self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'),
os_helper.TESTFN)
finally:
posix.close(f)
os_helper.unlink(os_helper.TESTFN + 'link')
@unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
def test_unlink_dir_fd(self):
f = posix.open(posix.getcwd(), posix.O_RDONLY)
os_helper.create_empty_file(os_helper.TESTFN + 'del')
posix.stat(os_helper.TESTFN + 'del') # should not raise exception
try:
posix.unlink(os_helper.TESTFN + 'del', dir_fd=f)
except:
os_helper.unlink(os_helper.TESTFN + 'del')
raise
else:
self.assertRaises(OSError, posix.stat, os_helper.TESTFN + 'link')
finally:
posix.close(f)
@unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
def test_mkfifo_dir_fd(self):
os_helper.unlink(os_helper.TESTFN)
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
try:
posix.mkfifo(os_helper.TESTFN,
stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
except PermissionError as e:
self.skipTest('posix.mkfifo(): %s' % e)
self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
finally:
posix.close(f)
requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
"don't have scheduling support")
requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
@ -1519,6 +1298,200 @@ class PosixTester(unittest.TestCase):
self.assertEqual(cm.exception.errno, errno.EINVAL)
os.close(os.pidfd_open(os.getpid(), 0))
# tests for the posix *at functions follow
class TestPosixDirFd(unittest.TestCase):
count = 0
@contextmanager
def prepare(self):
TestPosixDirFd.count += 1
name = f'{os_helper.TESTFN}_{self.count}'
base_dir = f'{os_helper.TESTFN}_{self.count}base'
posix.mkdir(base_dir)
self.addCleanup(posix.rmdir, base_dir)
fullname = os.path.join(base_dir, name)
assert not os.path.exists(fullname)
with os_helper.open_dir_fd(base_dir) as dir_fd:
yield (dir_fd, name, fullname)
@contextmanager
def prepare_file(self):
with self.prepare() as (dir_fd, name, fullname):
os_helper.create_empty_file(fullname)
self.addCleanup(posix.unlink, fullname)
yield (dir_fd, name, fullname)
@unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
def test_access_dir_fd(self):
with self.prepare_file() as (dir_fd, name, fullname):
self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd))
@unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
def test_chmod_dir_fd(self):
with self.prepare_file() as (dir_fd, name, fullname):
posix.chmod(fullname, stat.S_IRUSR)
posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
s = posix.stat(fullname)
self.assertEqual(s.st_mode & stat.S_IRWXU,
stat.S_IRUSR | stat.S_IWUSR)
@unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd),
"test needs dir_fd support in os.chown()")
def test_chown_dir_fd(self):
with self.prepare_file() as (dir_fd, name, fullname):
posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd)
@unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
def test_stat_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
with open(fullname, 'w') as outfile:
outfile.write("testline\n")
self.addCleanup(posix.unlink, fullname)
s1 = posix.stat(fullname)
s2 = posix.stat(name, dir_fd=dir_fd)
self.assertEqual(s1, s2)
s2 = posix.stat(fullname, dir_fd=None)
self.assertEqual(s1, s2)
self.assertRaisesRegex(TypeError, 'should be integer or None, not',
posix.stat, name, dir_fd=posix.getcwd())
self.assertRaisesRegex(TypeError, 'should be integer or None, not',
posix.stat, name, dir_fd=float(dir_fd))
self.assertRaises(OverflowError,
posix.stat, name, dir_fd=10**20)
@unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
def test_utime_dir_fd(self):
with self.prepare_file() as (dir_fd, name, fullname):
now = time.time()
posix.utime(name, None, dir_fd=dir_fd)
posix.utime(name, dir_fd=dir_fd)
self.assertRaises(TypeError, posix.utime, name,
now, dir_fd=dir_fd)
self.assertRaises(TypeError, posix.utime, name,
(None, None), dir_fd=dir_fd)
self.assertRaises(TypeError, posix.utime, name,
(now, None), dir_fd=dir_fd)
self.assertRaises(TypeError, posix.utime, name,
(None, now), dir_fd=dir_fd)
self.assertRaises(TypeError, posix.utime, name,
(now, "x"), dir_fd=dir_fd)
posix.utime(name, (int(now), int(now)), dir_fd=dir_fd)
posix.utime(name, (now, now), dir_fd=dir_fd)
posix.utime(name,
(int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd)
posix.utime(name, dir_fd=dir_fd,
times=(int(now), int((now - int(now)) * 1e9)))
# try dir_fd and follow_symlinks together
if os.utime in os.supports_follow_symlinks:
try:
posix.utime(name, follow_symlinks=False, dir_fd=dir_fd)
except ValueError:
# whoops! using both together not supported on this platform.
pass
@unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
def test_link_dir_fd(self):
with self.prepare_file() as (dir_fd, name, fullname), \
self.prepare() as (dir_fd2, linkname, fulllinkname):
try:
posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
except PermissionError as e:
self.skipTest('posix.link(): %s' % e)
self.addCleanup(posix.unlink, fulllinkname)
# should have same inodes
self.assertEqual(posix.stat(fullname)[1],
posix.stat(fulllinkname)[1])
@unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
def test_mkdir_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
posix.mkdir(name, dir_fd=dir_fd)
self.addCleanup(posix.rmdir, fullname)
posix.stat(fullname) # should not raise exception
@unittest.skipUnless(hasattr(os, 'mknod')
and (os.mknod in os.supports_dir_fd)
and hasattr(stat, 'S_IFIFO'),
"test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
with self.prepare() as (dir_fd, name, fullname):
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
try:
posix.mknod(name, mode, 0, dir_fd=dir_fd)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
else:
self.addCleanup(posix.unlink, fullname)
self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
@unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
def test_open_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
with open(fullname, 'wb') as outfile:
outfile.write(b"testline\n")
self.addCleanup(posix.unlink, fullname)
fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd)
try:
res = posix.read(fd, 9)
self.assertEqual(b"testline\n", res)
finally:
posix.close(fd)
@unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd),
"test needs dir_fd support in os.readlink()")
def test_readlink_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
os.symlink('symlink', fullname)
self.addCleanup(posix.unlink, fullname)
self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink')
@unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
def test_rename_dir_fd(self):
with self.prepare_file() as (dir_fd, name, fullname), \
self.prepare() as (dir_fd2, name2, fullname2):
posix.rename(name, name2,
src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
posix.stat(fullname2) # should not raise exception
posix.rename(fullname2, fullname)
@unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
def test_symlink_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
posix.symlink('symlink', name, dir_fd=dir_fd)
self.addCleanup(posix.unlink, fullname)
self.assertEqual(posix.readlink(fullname), 'symlink')
@unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
def test_unlink_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
os_helper.create_empty_file(fullname)
posix.stat(fullname) # should not raise exception
try:
posix.unlink(name, dir_fd=dir_fd)
self.assertRaises(OSError, posix.stat, fullname)
except:
self.addCleanup(posix.unlink, fullname)
raise
@unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
def test_mkfifo_dir_fd(self):
with self.prepare() as (dir_fd, name, fullname):
try:
posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
except PermissionError as e:
self.skipTest('posix.mkfifo(): %s' % e)
self.addCleanup(posix.unlink, fullname)
self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
class PosixGroupsTester(unittest.TestCase):
def setUp(self):