diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index b317e3362ee..17484fe5499 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -99,6 +99,11 @@ def bytes_filename_warn(expected): yield +def create_file(filename, content=b'content'): + with open(filename, "xb", 0) as fp: + fp.write(content) + + # Tests creating TESTFN class FileTests(unittest.TestCase): def setUp(self): @@ -157,9 +162,8 @@ class FileTests(unittest.TestCase): "needs INT_MAX < PY_SSIZE_T_MAX") @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) def test_large_read(self, size): - with open(support.TESTFN, "wb") as fp: - fp.write(b'test') self.addCleanup(support.unlink, support.TESTFN) + create_file(support.TESTFN, b'test') # Issue #21932: Make sure that os.read() does not raise an # OverflowError for size larger than INT_MAX @@ -216,11 +220,12 @@ class FileTests(unittest.TestCase): def test_replace(self): TESTFN2 = support.TESTFN + ".2" - with open(support.TESTFN, 'w') as f: - f.write("1") - with open(TESTFN2, 'w') as f: - f.write("2") - self.addCleanup(os.unlink, TESTFN2) + self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(support.unlink, TESTFN2) + + create_file(support.TESTFN, b"1") + create_file(TESTFN2, b"2") + os.replace(support.TESTFN, TESTFN2) self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) with open(TESTFN2, 'r') as f: @@ -245,8 +250,7 @@ class StatAttributeTests(unittest.TestCase): def setUp(self): self.fname = support.TESTFN self.addCleanup(support.unlink, self.fname) - with open(self.fname, 'wb') as fp: - fp.write(b"ABC") + create_file(self.fname, b"ABC") @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def check_stat_attributes(self, fname): @@ -455,8 +459,7 @@ class UtimeTests(unittest.TestCase): self.addCleanup(support.rmtree, self.dirname) os.mkdir(self.dirname) - with open(self.fname, 'wb') as fp: - fp.write(b"ABC") + create_file(self.fname) def restore_float_times(state): with ignore_deprecation_warnings('stat_float_times'): @@ -555,7 +558,7 @@ class UtimeTests(unittest.TestCase): "fd support for utime required for this test.") def test_utime_fd(self): def set_time(filename, ns): - with open(filename, 'wb') as fp: + with open(filename, 'wb', 0) as fp: # use a file descriptor to test futimens(timespec) # or futimes(timeval) os.utime(fp.fileno(), ns=ns) @@ -1213,8 +1216,7 @@ class RemoveDirsTests(unittest.TestCase): os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) - with open(os.path.join(dira, 'file.txt'), 'w') as f: - f.write('text') + create_file(os.path.join(dira, 'file.txt')) os.removedirs(dirb) self.assertFalse(os.path.exists(dirb)) self.assertTrue(os.path.exists(dira)) @@ -1225,8 +1227,7 @@ class RemoveDirsTests(unittest.TestCase): os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) - with open(os.path.join(dirb, 'file.txt'), 'w') as f: - f.write('text') + create_file(os.path.join(dirb, 'file.txt')) with self.assertRaises(OSError): os.removedirs(dirb) self.assertTrue(os.path.exists(dirb)) @@ -1236,7 +1237,7 @@ class RemoveDirsTests(unittest.TestCase): class DevNullTests(unittest.TestCase): def test_devnull(self): - with open(os.devnull, 'wb') as f: + with open(os.devnull, 'wb', 0) as f: f.write(b'hello') f.close() with open(os.devnull, 'rb') as f: @@ -1323,9 +1324,9 @@ class URandomFDTests(unittest.TestCase): def test_urandom_fd_reopened(self): # Issue #21207: urandom() should detect its fd to /dev/urandom # changed to something else, and reopen it. - with open(support.TESTFN, 'wb') as f: - f.write(b"x" * 256) - self.addCleanup(os.unlink, support.TESTFN) + self.addCleanup(support.unlink, support.TESTFN) + create_file(support.TESTFN, b"x" * 256) + code = """if 1: import os import sys @@ -1464,12 +1465,10 @@ class Win32ErrorTests(unittest.TestCase): self.assertRaises(OSError, os.chdir, support.TESTFN) def test_mkdir(self): - f = open(support.TESTFN, "w") - try: + self.addCleanup(support.unlink, support.TESTFN) + + with open(support.TESTFN, "w") as f: self.assertRaises(OSError, os.mkdir, support.TESTFN) - finally: - f.close() - os.unlink(support.TESTFN) def test_utime(self): self.assertRaises(OSError, os.utime, support.TESTFN, None) @@ -1988,42 +1987,36 @@ class Win32SymlinkTests(unittest.TestCase): level1 = os.path.abspath(support.TESTFN) level2 = os.path.join(level1, "level2") level3 = os.path.join(level2, "level3") + self.addCleanup(support.rmtree, level1) + + os.mkdir(level1) + os.mkdir(level2) + os.mkdir(level3) + + file1 = os.path.abspath(os.path.join(level1, "file1")) + create_file(file1) + + orig_dir = os.getcwd() try: - os.mkdir(level1) - os.mkdir(level2) - os.mkdir(level3) + os.chdir(level2) + link = os.path.join(level2, "link") + os.symlink(os.path.relpath(file1), "link") + self.assertIn("link", os.listdir(os.getcwd())) - file1 = os.path.abspath(os.path.join(level1, "file1")) + # Check os.stat calls from the same dir as the link + self.assertEqual(os.stat(file1), os.stat("link")) - with open(file1, "w") as f: - f.write("file1") + # Check os.stat calls from a dir below the link + os.chdir(level1) + self.assertEqual(os.stat(file1), + os.stat(os.path.relpath(link))) - orig_dir = os.getcwd() - try: - os.chdir(level2) - link = os.path.join(level2, "link") - os.symlink(os.path.relpath(file1), "link") - self.assertIn("link", os.listdir(os.getcwd())) - - # Check os.stat calls from the same dir as the link - self.assertEqual(os.stat(file1), os.stat("link")) - - # Check os.stat calls from a dir below the link - os.chdir(level1) - self.assertEqual(os.stat(file1), - os.stat(os.path.relpath(link))) - - # Check os.stat calls from a dir above the link - os.chdir(level3) - self.assertEqual(os.stat(file1), - os.stat(os.path.relpath(link))) - finally: - os.chdir(orig_dir) - except OSError as err: - self.fail(err) + # Check os.stat calls from a dir above the link + os.chdir(level3) + self.assertEqual(os.stat(file1), + os.stat(os.path.relpath(link))) finally: - os.remove(file1) - shutil.rmtree(level1) + os.chdir(orig_dir) @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @@ -2159,8 +2152,8 @@ class ProgramPriorityTests(unittest.TestCase): try: new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) if base >= 19 and new_prio <= 19: - raise unittest.SkipTest( - "unable to reliably test setpriority at current nice level of %s" % base) + raise unittest.SkipTest("unable to reliably test setpriority " + "at current nice level of %s" % base) else: self.assertEqual(new_prio, base + 1) finally: @@ -2270,8 +2263,7 @@ class TestSendfile(unittest.TestCase): @classmethod def setUpClass(cls): cls.key = support.threading_setup() - with open(support.TESTFN, "wb") as f: - f.write(cls.DATA) + create_file(support.TESTFN, cls.DATA) @classmethod def tearDownClass(cls): @@ -2421,10 +2413,11 @@ class TestSendfile(unittest.TestCase): def test_trailers(self): TESTFN2 = support.TESTFN + "2" file_data = b"abcdef" - with open(TESTFN2, 'wb') as f: - f.write(file_data) - with open(TESTFN2, 'rb')as f: - self.addCleanup(os.remove, TESTFN2) + + self.addCleanup(support.unlink, TESTFN2) + create_file(TESTFN2, file_data) + + with open(TESTFN2, 'rb') as f: os.sendfile(self.sockno, f.fileno(), 0, len(file_data), trailers=[b"1234"]) self.client.close() @@ -2447,8 +2440,9 @@ class TestSendfile(unittest.TestCase): def supports_extended_attributes(): if not hasattr(os, "setxattr"): return False + try: - with open(support.TESTFN, "wb") as fp: + with open(support.TESTFN, "xb", 0) as fp: try: os.setxattr(fp.fileno(), b"user.test", b"") except OSError: @@ -2465,17 +2459,18 @@ def supports_extended_attributes(): @support.requires_linux_version(2, 6, 39) class ExtendedAttributeTests(unittest.TestCase): - def tearDown(self): - support.unlink(support.TESTFN) - def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): fn = support.TESTFN - open(fn, "wb").close() + self.addCleanup(support.unlink, fn) + create_file(fn) + with self.assertRaises(OSError) as cm: getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) + init_xattr = listxattr(fn) self.assertIsInstance(init_xattr, list) + setxattr(fn, s("user.test"), b"", **kwargs) xattr = set(init_xattr) xattr.add("user.test") @@ -2483,19 +2478,24 @@ class ExtendedAttributeTests(unittest.TestCase): self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") + with self.assertRaises(OSError) as cm: setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) self.assertEqual(cm.exception.errno, errno.EEXIST) + with self.assertRaises(OSError) as cm: setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) + setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) xattr.add("user.test2") self.assertEqual(set(listxattr(fn)), xattr) removexattr(fn, s("user.test"), **kwargs) + with self.assertRaises(OSError) as cm: getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) + xattr.remove("user.test") self.assertEqual(set(listxattr(fn)), xattr) self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") @@ -2508,11 +2508,11 @@ class ExtendedAttributeTests(unittest.TestCase): self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) def _check_xattrs(self, *args, **kwargs): - def make_bytes(s): - return bytes(s, "ascii") self._check_xattrs_str(str, *args, **kwargs) support.unlink(support.TESTFN) - self._check_xattrs_str(make_bytes, *args, **kwargs) + + self._check_xattrs_str(os.fsencode, *args, **kwargs) + support.unlink(support.TESTFN) def test_simple(self): self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, @@ -2527,10 +2527,10 @@ class ExtendedAttributeTests(unittest.TestCase): with open(path, "rb") as fp: return os.getxattr(fp.fileno(), *args) def setxattr(path, *args): - with open(path, "wb") as fp: + with open(path, "wb", 0) as fp: os.setxattr(fp.fileno(), *args) def removexattr(path, *args): - with open(path, "wb") as fp: + with open(path, "wb", 0) as fp: os.removexattr(fp.fileno(), *args) def listxattr(path, *args): with open(path, "rb") as fp: @@ -2844,8 +2844,7 @@ class TestScandir(unittest.TestCase): def create_file(self, name="file.txt"): filename = os.path.join(self.path, name) - with open(filename, "wb") as fp: - fp.write(b'python') + create_file(filename, b'python') return filename def get_entries(self, names):