"Test posix functions" from test import test_support try: import posix except ImportError: raise test_support.TestSkipped, "posix is not available" import errno import sys import time import os import pwd import shutil import sys import unittest import warnings warnings.filterwarnings('ignore', '.* potential security risk .*', RuntimeWarning) class PosixTester(unittest.TestCase): def setUp(self): # create empty file fp = open(test_support.TESTFN, 'w+') fp.close() def tearDown(self): os.unlink(test_support.TESTFN) def testNoArgFunctions(self): # test posix functions which take no arguments and have # no side-effects which we need to cleanup (e.g., fork, wait, abort) NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname", "times", "getloadavg", "tmpnam", "getegid", "geteuid", "getgid", "getgroups", "getpid", "getpgrp", "getppid", "getuid", ] for name in NO_ARG_FUNCTIONS: posix_func = getattr(posix, name, None) if posix_func is not None: posix_func() self.assertRaises(TypeError, posix_func, 1) def test_statvfs(self): if hasattr(posix, 'statvfs'): self.assert_(posix.statvfs(os.curdir)) def test_fstatvfs(self): if hasattr(posix, 'fstatvfs'): fp = open(test_support.TESTFN) try: self.assert_(posix.fstatvfs(fp.fileno())) finally: fp.close() def test_ftruncate(self): if hasattr(posix, 'ftruncate'): fp = open(test_support.TESTFN, 'w+') try: # we need to have some data to truncate fp.write('test') fp.flush() posix.ftruncate(fp.fileno(), 0) finally: fp.close() def test_dup(self): if hasattr(posix, 'dup'): fp = open(test_support.TESTFN) try: fd = posix.dup(fp.fileno()) self.assert_(isinstance(fd, int)) os.close(fd) finally: fp.close() def test_confstr(self): if hasattr(posix, 'confstr'): self.assertRaises(ValueError, posix.confstr, "CS_garbage") self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) def test_dup2(self): if hasattr(posix, 'dup2'): fp1 = open(test_support.TESTFN) fp2 = open(test_support.TESTFN) try: posix.dup2(fp1.fileno(), fp2.fileno()) finally: fp1.close() fp2.close() def fdopen_helper(self, *args): fd = os.open(test_support.TESTFN, os.O_RDONLY) fp2 = posix.fdopen(fd, *args) fp2.close() def test_fdopen(self): if hasattr(posix, 'fdopen'): self.fdopen_helper() self.fdopen_helper('r') self.fdopen_helper('r', 100) def test_osexlock(self): if hasattr(posix, "O_EXLOCK"): fd = os.open(test_support.TESTFN, os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) self.assertRaises(OSError, os.open, test_support.TESTFN, os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) os.close(fd) if hasattr(posix, "O_SHLOCK"): fd = os.open(test_support.TESTFN, os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) self.assertRaises(OSError, os.open, test_support.TESTFN, os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) os.close(fd) def test_osshlock(self): if hasattr(posix, "O_SHLOCK"): fd1 = os.open(test_support.TESTFN, os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) fd2 = os.open(test_support.TESTFN, os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) os.close(fd2) os.close(fd1) if hasattr(posix, "O_EXLOCK"): fd = os.open(test_support.TESTFN, os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) self.assertRaises(OSError, os.open, test_support.TESTFN, os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) os.close(fd) def test_fstat(self): if hasattr(posix, 'fstat'): fp = open(test_support.TESTFN) try: self.assert_(posix.fstat(fp.fileno())) finally: fp.close() def test_stat(self): if hasattr(posix, 'stat'): self.assert_(posix.stat(test_support.TESTFN)) def _test_all_chown_common(self, chown_func, first_param): """Common code for chown, fchown and lchown tests.""" if os.getuid() == 0: try: # Many linux distros have a nfsnobody user as MAX_UID-2 # that makes a good test case for signedness issues. # http://bugs.python.org/issue1747858 # This part of the test only runs when run as root. # Only scary people run their tests as root. ent = pwd.getpwnam('nfsnobody') chown_func(first_param, ent.pw_uid, ent.pw_gid) except KeyError: pass else: # non-root cannot chown to root, raises OSError self.assertRaises(OSError, chown_func, first_param, 0, 0) # test a successful chown call chown_func(first_param, os.getuid(), os.getgid()) def _test_chown(self): # raise an OSError if the file does not exist os.unlink(test_support.TESTFN) self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1) # re-create the file open(test_support.TESTFN, 'w').close() self._test_all_chown_common(posix.chown, test_support.TESTFN) if hasattr(posix, 'chown'): test_chown = _test_chown def _test_fchown(self): os.unlink(test_support.TESTFN) # re-create the file test_file = open(test_support.TESTFN, 'w') try: fd = test_file.fileno() self._test_all_chown_common(posix.fchown, fd) finally: test_file.close() if hasattr(posix, 'fchown'): test_fchown = _test_fchown def _test_lchown(self): os.unlink(test_support.TESTFN) # create a symlink os.symlink('/tmp/dummy-symlink-target', test_support.TESTFN) self._test_all_chown_common(posix.lchown, test_support.TESTFN) if hasattr(posix, 'lchown'): test_lchown = _test_lchown def test_chdir(self): if hasattr(posix, 'chdir'): posix.chdir(os.curdir) self.assertRaises(OSError, posix.chdir, test_support.TESTFN) def test_lsdir(self): if hasattr(posix, 'lsdir'): self.assert_(test_support.TESTFN in posix.lsdir(os.curdir)) def test_access(self): if hasattr(posix, 'access'): self.assert_(posix.access(test_support.TESTFN, os.R_OK)) def test_umask(self): if hasattr(posix, 'umask'): old_mask = posix.umask(0) self.assert_(isinstance(old_mask, int)) posix.umask(old_mask) def test_strerror(self): if hasattr(posix, 'strerror'): self.assert_(posix.strerror(0)) def test_pipe(self): if hasattr(posix, 'pipe'): reader, writer = posix.pipe() os.close(reader) os.close(writer) def test_tempnam(self): if hasattr(posix, 'tempnam'): self.assert_(posix.tempnam()) self.assert_(posix.tempnam(os.curdir)) self.assert_(posix.tempnam(os.curdir, 'blah')) def test_tmpfile(self): if hasattr(posix, 'tmpfile'): fp = posix.tmpfile() fp.close() def test_utime(self): if hasattr(posix, 'utime'): now = time.time() posix.utime(test_support.TESTFN, None) self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None)) self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None)) self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now)) posix.utime(test_support.TESTFN, (int(now), int(now))) posix.utime(test_support.TESTFN, (now, now)) def test_chflags(self): if hasattr(posix, 'chflags'): st = os.stat(test_support.TESTFN) if hasattr(st, 'st_flags'): posix.chflags(test_support.TESTFN, st.st_flags) def test_lchflags(self): if hasattr(posix, 'lchflags'): st = os.stat(test_support.TESTFN) if hasattr(st, 'st_flags'): posix.lchflags(test_support.TESTFN, st.st_flags) def test_getcwd_long_pathnames(self): if hasattr(posix, 'getcwd'): dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' curdir = os.getcwd() base_path = os.path.abspath(test_support.TESTFN) + '.getcwd' try: os.mkdir(base_path) os.chdir(base_path) except: # Just returning nothing instead of the TestSkipped exception, # because the test results in Error in that case. # Is that ok? # raise test_support.TestSkipped, "cannot create directory for testing" return try: def _create_and_do_getcwd(dirname, current_path_length = 0): try: os.mkdir(dirname) except: raise test_support.TestSkipped, "mkdir cannot create directory sufficiently deep for getcwd test" os.chdir(dirname) try: os.getcwd() if current_path_length < 4099: _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) except OSError as e: expected_errno = errno.ENAMETOOLONG if 'sunos' in sys.platform or 'openbsd' in sys.platform: expected_errno = errno.ERANGE # Issue 9185 self.assertEqual(e.errno, expected_errno) finally: os.chdir('..') os.rmdir(dirname) _create_and_do_getcwd(dirname) finally: os.chdir(curdir) shutil.rmtree(base_path) def test_getgroups(self): with os.popen('id -G') as idg: groups = idg.read().strip() if not groups: raise unittest.SkipTest("need working 'id -G'") # The order of groups isn't important, hence the calls # to sorted. self.assertEqual( list(sorted([int(x) for x in groups.split()])), list(sorted(posix.getgroups()))) class PosixGroupsTester(unittest.TestCase): if posix.getuid() == 0 and hasattr(posix, 'getgroups') and sys.platform != 'darwin': def setUp(self): self.saved_groups = posix.getgroups() def tearDown(self): if hasattr(posix, 'setgroups'): posix.setgroups(self.saved_groups) elif hasattr(posix, 'initgroups'): name = pwd.getpwuid(posix.getuid()).pw_name posix.initgroups(name, self.saved_groups[0]) if hasattr(posix, 'initgroups'): def test_initgroups(self): # find missing group groups = sorted(self.saved_groups) for g1,g2 in zip(groups[:-1], groups[1:]): g = g1 + 1 if g < g2: break else: g = g2 + 1 name = pwd.getpwuid(posix.getuid()).pw_name posix.initgroups(name, g) self.assertIn(g, posix.getgroups()) if hasattr(posix, 'setgroups'): def test_setgroups(self): for groups in [[0], range(16)]: posix.setgroups(groups) self.assertListEqual(groups, posix.getgroups()) def test_main(): test_support.run_unittest(PosixTester, PosixGroupsTester) if __name__ == '__main__': test_main()