bpo-22367: Update test_fcntl.py for spawn process mode (GH-17154) (GH-17253)
(cherry picked from commit 9960230f76
)
Co-authored-by: Dong-hee Na <donghee.na92@gmail.com>
This commit is contained in:
parent
bff5255664
commit
d4d79209e6
|
@ -51,6 +51,21 @@ class BadFile:
|
|||
def fileno(self):
|
||||
return self.fn
|
||||
|
||||
def try_lockf_on_other_process_fail(fname, cmd):
|
||||
f = open(fname, 'wb+')
|
||||
try:
|
||||
fcntl.lockf(f, cmd)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
def try_lockf_on_other_process(fname, cmd):
|
||||
f = open(fname, 'wb+')
|
||||
fcntl.lockf(f, cmd)
|
||||
fcntl.lockf(f, fcntl.LOCK_UN)
|
||||
f.close()
|
||||
|
||||
class TestFcntl(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -138,28 +153,23 @@ class TestFcntl(unittest.TestCase):
|
|||
self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
|
||||
self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
|
||||
|
||||
@unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
|
||||
def test_lockf_exclusive(self):
|
||||
self.f = open(TESTFN, 'wb+')
|
||||
cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
|
||||
def try_lockf_on_other_process():
|
||||
self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd)
|
||||
|
||||
fcntl.lockf(self.f, cmd)
|
||||
p = Process(target=try_lockf_on_other_process)
|
||||
p = Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd))
|
||||
p.start()
|
||||
p.join()
|
||||
fcntl.lockf(self.f, fcntl.LOCK_UN)
|
||||
self.assertEqual(p.exitcode, 0)
|
||||
|
||||
@unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
|
||||
def test_lockf_share(self):
|
||||
self.f = open(TESTFN, 'wb+')
|
||||
cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
|
||||
def try_lockf_on_other_process():
|
||||
fcntl.lockf(self.f, cmd)
|
||||
fcntl.lockf(self.f, fcntl.LOCK_UN)
|
||||
|
||||
fcntl.lockf(self.f, cmd)
|
||||
p = Process(target=try_lockf_on_other_process)
|
||||
p = Process(target=try_lockf_on_other_process, args=(TESTFN, cmd))
|
||||
p.start()
|
||||
p.join()
|
||||
fcntl.lockf(self.f, fcntl.LOCK_UN)
|
||||
|
|
Loading…
Reference in New Issue