Reenable the manager tests with Amaury's threading fix

This commit is contained in:
Jesse Noller 2008-07-02 16:44:09 +00:00
parent c060b0e7eb
commit 146b7ab818
1 changed files with 13 additions and 17 deletions

View File

@ -960,7 +960,6 @@ class _TestContainers(BaseTestCase):
def sqr(x, wait=0.0): def sqr(x, wait=0.0):
time.sleep(wait) time.sleep(wait)
return x*x return x*x
"""
class _TestPool(BaseTestCase): class _TestPool(BaseTestCase):
def test_apply(self): def test_apply(self):
@ -1030,7 +1029,6 @@ class _TestPool(BaseTestCase):
join = TimingWrapper(self.pool.join) join = TimingWrapper(self.pool.join)
join() join()
self.assertTrue(join.elapsed < 0.2) self.assertTrue(join.elapsed < 0.2)
"""
# #
# Test that manager has expected number of shared objects left # Test that manager has expected number of shared objects left
# #
@ -1333,7 +1331,6 @@ class _TestConnection(BaseTestCase):
self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
"""
class _TestListenerClient(BaseTestCase): class _TestListenerClient(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads') ALLOWED_TYPES = ('processes', 'threads')
@ -1353,7 +1350,6 @@ class _TestListenerClient(BaseTestCase):
self.assertEqual(conn.recv(), 'hello') self.assertEqual(conn.recv(), 'hello')
p.join() p.join()
l.close() l.close()
"""
# #
# Test of sending connection and socket objects between processes # Test of sending connection and socket objects between processes
# #
@ -1769,28 +1765,28 @@ def test_main(run=None):
multiprocessing.get_logger().setLevel(LOG_LEVEL) multiprocessing.get_logger().setLevel(LOG_LEVEL)
#ProcessesMixin.pool = multiprocessing.Pool(4) ProcessesMixin.pool = multiprocessing.Pool(4)
#ThreadsMixin.pool = multiprocessing.dummy.Pool(4) ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
#ManagerMixin.manager.__init__() ManagerMixin.manager.__init__()
#ManagerMixin.manager.start() ManagerMixin.manager.start()
#ManagerMixin.pool = ManagerMixin.manager.Pool(4) ManagerMixin.pool = ManagerMixin.manager.Pool(4)
testcases = ( testcases = (
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) #+ sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
#sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
#sorted(testcases_manager.values(), key=lambda tc:tc.__name__) sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
) )
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
run(suite) run(suite)
#ThreadsMixin.pool.terminate() ThreadsMixin.pool.terminate()
#ProcessesMixin.pool.terminate() ProcessesMixin.pool.terminate()
#ManagerMixin.pool.terminate() ManagerMixin.pool.terminate()
#ManagerMixin.manager.shutdown() ManagerMixin.manager.shutdown()
#del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
def main(): def main():
test_main(unittest.TextTestRunner(verbosity=2).run) test_main(unittest.TextTestRunner(verbosity=2).run)