from test.test_importlib import util as test_util init = test_util.import_importlib('importlib') import sys import threading import unittest import weakref from test import support from test.support import threading_helper from test import lock_tests threading_helper.requires_working_threading(module=True) class ModuleLockAsRLockTests: locktype = classmethod(lambda cls: cls.LockType("some_lock")) # _is_owned() unsupported test__is_owned = None # acquire(blocking=False) unsupported test_try_acquire = None test_try_acquire_contended = None # `with` unsupported test_with = None # acquire(timeout=...) unsupported test_timeout = None # _release_save() unsupported test_release_save_unacquired = None # lock status in repr unsupported test_repr = None test_locked_repr = None def tearDown(self): for splitinit in init.values(): splitinit._bootstrap._blocking_on.clear() LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock for kind, splitinit in init.items()} (Frozen_ModuleLockAsRLockTests, Source_ModuleLockAsRLockTests ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests, LockType=LOCK_TYPES) class DeadlockAvoidanceTests: def setUp(self): try: self.old_switchinterval = sys.getswitchinterval() support.setswitchinterval(0.000001) except AttributeError: self.old_switchinterval = None def tearDown(self): if self.old_switchinterval is not None: sys.setswitchinterval(self.old_switchinterval) def run_deadlock_avoidance_test(self, create_deadlock): NLOCKS = 10 locks = [self.LockType(str(i)) for i in range(NLOCKS)] pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] if create_deadlock: NTHREADS = NLOCKS else: NTHREADS = NLOCKS - 1 barrier = threading.Barrier(NTHREADS) results = [] def _acquire(lock): """Try to acquire the lock. Return True on success, False on deadlock.""" try: lock.acquire() except self.DeadlockError: return False else: return True def f(): a, b = pairs.pop() ra = _acquire(a) barrier.wait() rb = _acquire(b) results.append((ra, rb)) if rb: b.release() if ra: a.release() lock_tests.Bunch(f, NTHREADS).wait_for_finished() self.assertEqual(len(results), NTHREADS) return results def test_deadlock(self): results = self.run_deadlock_avoidance_test(True) # At least one of the threads detected a potential deadlock on its # second acquire() call. It may be several of them, because the # deadlock avoidance mechanism is conservative. nb_deadlocks = results.count((True, False)) self.assertGreaterEqual(nb_deadlocks, 1) self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) def test_no_deadlock(self): results = self.run_deadlock_avoidance_test(False) self.assertEqual(results.count((True, False)), 0) self.assertEqual(results.count((True, True)), len(results)) DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError for kind, splitinit in init.items()} (Frozen_DeadlockAvoidanceTests, Source_DeadlockAvoidanceTests ) = test_util.test_both(DeadlockAvoidanceTests, LockType=LOCK_TYPES, DeadlockError=DEADLOCK_ERRORS) class LifetimeTests: @property def bootstrap(self): return self.init._bootstrap def test_lock_lifetime(self): name = "xyzzy" self.assertNotIn(name, self.bootstrap._module_locks) lock = self.bootstrap._get_module_lock(name) self.assertIn(name, self.bootstrap._module_locks) wr = weakref.ref(lock) del lock support.gc_collect() self.assertNotIn(name, self.bootstrap._module_locks) self.assertIsNone(wr()) def test_all_locks(self): support.gc_collect() self.assertEqual(0, len(self.bootstrap._module_locks), self.bootstrap._module_locks) (Frozen_LifetimeTests, Source_LifetimeTests ) = test_util.test_both(LifetimeTests, init=init) def setUpModule(): thread_info = threading_helper.threading_setup() unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) if __name__ == '__main__': unittest.main()