Issue #20556: Used specific assert methods in threading tests.

This commit is contained in:
Serhiy Storchaka 2016-03-14 10:40:14 +02:00
commit c877658d1f
3 changed files with 22 additions and 23 deletions

View File

@ -24,14 +24,14 @@ class LockTests(unittest.TestCase):
def test_initlock(self): def test_initlock(self):
#Make sure locks start locked #Make sure locks start locked
self.assertTrue(not self.lock.locked(), self.assertFalse(self.lock.locked(),
"Lock object is not initialized unlocked.") "Lock object is not initialized unlocked.")
def test_release(self): def test_release(self):
# Test self.lock.release() # Test self.lock.release()
self.lock.acquire() self.lock.acquire()
self.lock.release() self.lock.release()
self.assertTrue(not self.lock.locked(), self.assertFalse(self.lock.locked(),
"Lock object did not release properly.") "Lock object did not release properly.")
def test_improper_release(self): def test_improper_release(self):
@ -46,7 +46,7 @@ class LockTests(unittest.TestCase):
def test_cond_acquire_fail(self): def test_cond_acquire_fail(self):
#Test acquiring locked lock returns False #Test acquiring locked lock returns False
self.lock.acquire(0) self.lock.acquire(0)
self.assertTrue(not self.lock.acquire(0), self.assertFalse(self.lock.acquire(0),
"Conditional acquiring of a locked lock incorrectly " "Conditional acquiring of a locked lock incorrectly "
"succeeded.") "succeeded.")
@ -58,9 +58,9 @@ class LockTests(unittest.TestCase):
def test_uncond_acquire_return_val(self): def test_uncond_acquire_return_val(self):
#Make sure that an unconditional locking returns True. #Make sure that an unconditional locking returns True.
self.assertTrue(self.lock.acquire(1) is True, self.assertIs(self.lock.acquire(1), True,
"Unconditional locking did not return True.") "Unconditional locking did not return True.")
self.assertTrue(self.lock.acquire() is True) self.assertIs(self.lock.acquire(), True)
def test_uncond_acquire_blocking(self): def test_uncond_acquire_blocking(self):
#Make sure that unconditional acquiring of a locked lock blocks. #Make sure that unconditional acquiring of a locked lock blocks.
@ -80,7 +80,7 @@ class LockTests(unittest.TestCase):
end_time = int(time.time()) end_time = int(time.time())
if support.verbose: if support.verbose:
print("done") print("done")
self.assertTrue((end_time - start_time) >= DELAY, self.assertGreaterEqual(end_time - start_time, DELAY,
"Blocking by unconditional acquiring failed.") "Blocking by unconditional acquiring failed.")
class MiscTests(unittest.TestCase): class MiscTests(unittest.TestCase):
@ -94,7 +94,7 @@ class MiscTests(unittest.TestCase):
#Test sanity of _thread.get_ident() #Test sanity of _thread.get_ident()
self.assertIsInstance(_thread.get_ident(), int, self.assertIsInstance(_thread.get_ident(), int,
"_thread.get_ident() returned a non-integer") "_thread.get_ident() returned a non-integer")
self.assertTrue(_thread.get_ident() != 0, self.assertNotEqual(_thread.get_ident(), 0,
"_thread.get_ident() returned 0") "_thread.get_ident() returned 0")
def test_LockType(self): def test_LockType(self):
@ -164,7 +164,7 @@ class ThreadTests(unittest.TestCase):
time.sleep(DELAY) time.sleep(DELAY)
if support.verbose: if support.verbose:
print('done') print('done')
self.assertTrue(testing_queue.qsize() == thread_count, self.assertEqual(testing_queue.qsize(), thread_count,
"Not all %s threads executed properly after %s sec." % "Not all %s threads executed properly after %s sec." %
(thread_count, DELAY)) (thread_count, DELAY))

View File

@ -59,7 +59,7 @@ class TestThread(threading.Thread):
self.nrunning.inc() self.nrunning.inc()
if verbose: if verbose:
print(self.nrunning.get(), 'tasks are running') print(self.nrunning.get(), 'tasks are running')
self.testcase.assertTrue(self.nrunning.get() <= 3) self.testcase.assertLessEqual(self.nrunning.get(), 3)
time.sleep(delay) time.sleep(delay)
if verbose: if verbose:
@ -67,7 +67,7 @@ class TestThread(threading.Thread):
with self.mutex: with self.mutex:
self.nrunning.dec() self.nrunning.dec()
self.testcase.assertTrue(self.nrunning.get() >= 0) self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
if verbose: if verbose:
print('%s is finished. %d tasks are running' % print('%s is finished. %d tasks are running' %
(self.name, self.nrunning.get())) (self.name, self.nrunning.get()))
@ -101,26 +101,25 @@ class ThreadTests(BaseTestCase):
for i in range(NUMTASKS): for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t) threads.append(t)
self.assertEqual(t.ident, None) self.assertIsNone(t.ident)
self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t))) self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
t.start() t.start()
if verbose: if verbose:
print('waiting for all tasks to complete') print('waiting for all tasks to complete')
for t in threads: for t in threads:
t.join() t.join()
self.assertTrue(not t.is_alive()) self.assertFalse(t.is_alive())
self.assertNotEqual(t.ident, 0) self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None) self.assertIsNotNone(t.ident)
self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>', self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
repr(t)))
if verbose: if verbose:
print('all tasks done') print('all tasks done')
self.assertEqual(numrunning.get(), 0) self.assertEqual(numrunning.get(), 0)
def test_ident_of_no_threading_threads(self): def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads. # The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None) self.assertIsNotNone(threading.currentThread().ident)
def f(): def f():
ident.append(threading.currentThread().ident) ident.append(threading.currentThread().ident)
done.set() done.set()
@ -128,7 +127,7 @@ class ThreadTests(BaseTestCase):
ident = [] ident = []
_thread.start_new_thread(f, ()) _thread.start_new_thread(f, ())
done.wait() done.wait()
self.assertFalse(ident[0] is None) self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread # Kill the "immortal" _DummyThread
del threading._active[ident[0]] del threading._active[ident[0]]
@ -245,7 +244,7 @@ class ThreadTests(BaseTestCase):
self.assertTrue(ret) self.assertTrue(ret)
if verbose: if verbose:
print(" verifying worker hasn't exited") print(" verifying worker hasn't exited")
self.assertTrue(not t.finished) self.assertFalse(t.finished)
if verbose: if verbose:
print(" attempting to raise asynch exception in worker") print(" attempting to raise asynch exception in worker")
result = set_async_exc(ctypes.c_long(t.id), exception) result = set_async_exc(ctypes.c_long(t.id), exception)
@ -416,9 +415,9 @@ class ThreadTests(BaseTestCase):
def test_repr_daemon(self): def test_repr_daemon(self):
t = threading.Thread() t = threading.Thread()
self.assertFalse('daemon' in repr(t)) self.assertNotIn('daemon', repr(t))
t.daemon = True t.daemon = True
self.assertTrue('daemon' in repr(t)) self.assertIn('daemon', repr(t))
def test_deamon_param(self): def test_deamon_param(self):
t = threading.Thread() t = threading.Thread()
@ -570,7 +569,7 @@ class ThreadTests(BaseTestCase):
tstate_lock.release() tstate_lock.release()
self.assertFalse(t.is_alive()) self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock. # And verify the thread disposed of _tstate_lock.
self.assertTrue(t._tstate_lock is None) self.assertIsNone(t._tstate_lock)
def test_repr_stopped(self): def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately. # Verify that "stopped" shows up in repr(Thread) appropriately.

View File

@ -189,7 +189,7 @@ class BaseLocalTest:
wr = weakref.ref(x) wr = weakref.ref(x)
del x del x
gc.collect() gc.collect()
self.assertIs(wr(), None) self.assertIsNone(wr())
class ThreadLocalTest(unittest.TestCase, BaseLocalTest): class ThreadLocalTest(unittest.TestCase, BaseLocalTest):