Add test for multiprocessing.Conditon.wait() and changset 3baeb5e13dd2

This commit is contained in:
Richard Oudkerk 2012-06-05 13:15:29 +01:00
parent f86a5e8a93
commit 9844993cde
1 changed files with 28 additions and 0 deletions

View File

@ -956,6 +956,34 @@ class _TestCondition(BaseTestCase):
p.join(5) p.join(5)
self.assertTrue(success.value) self.assertTrue(success.value)
@classmethod
def _test_wait_result(cls, c, pid):
with c:
c.notify()
time.sleep(1)
if pid is not None:
os.kill(pid, signal.SIGINT)
def test_wait_result(self):
if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
pid = os.getpid()
else:
pid = None
c = self.Condition()
with c:
self.assertFalse(c.wait(0))
self.assertFalse(c.wait(0.1))
p = self.Process(target=self._test_wait_result, args=(c, pid))
p.start()
self.assertTrue(c.wait(10))
if pid is not None:
self.assertRaises(KeyboardInterrupt, c.wait, 10)
p.join()
class _TestEvent(BaseTestCase): class _TestEvent(BaseTestCase):