[merge from 3.5] Issue11551 - Increase the test coverage of _dummy_thread module to 100%.
Initial patch contributed by Denver Coneybeare.
This commit is contained in:
commit
9dda0c9569
|
@ -1,19 +1,13 @@
|
||||||
"""Generic thread tests.
|
|
||||||
|
|
||||||
Meant to be used by dummy_thread and thread. To allow for different modules
|
|
||||||
to be used, test_main() can be called with the module to use as the thread
|
|
||||||
implementation as its sole argument.
|
|
||||||
|
|
||||||
"""
|
|
||||||
import _dummy_thread as _thread
|
import _dummy_thread as _thread
|
||||||
import time
|
import time
|
||||||
import queue
|
import queue
|
||||||
import random
|
import random
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
DELAY = 0
|
||||||
|
|
||||||
DELAY = 0 # Set > 0 when testing a module other than _dummy_thread, such as
|
|
||||||
# the '_thread' module.
|
|
||||||
|
|
||||||
class LockTests(unittest.TestCase):
|
class LockTests(unittest.TestCase):
|
||||||
"""Test lock objects."""
|
"""Test lock objects."""
|
||||||
|
@ -34,6 +28,12 @@ class LockTests(unittest.TestCase):
|
||||||
self.assertFalse(self.lock.locked(),
|
self.assertFalse(self.lock.locked(),
|
||||||
"Lock object did not release properly.")
|
"Lock object did not release properly.")
|
||||||
|
|
||||||
|
def test_LockType_context_manager(self):
|
||||||
|
with _thread.LockType():
|
||||||
|
pass
|
||||||
|
self.assertFalse(self.lock.locked(),
|
||||||
|
"Acquired Lock was not released")
|
||||||
|
|
||||||
def test_improper_release(self):
|
def test_improper_release(self):
|
||||||
#Make sure release of an unlocked thread raises RuntimeError
|
#Make sure release of an unlocked thread raises RuntimeError
|
||||||
self.assertRaises(RuntimeError, self.lock.release)
|
self.assertRaises(RuntimeError, self.lock.release)
|
||||||
|
@ -83,39 +83,72 @@ class LockTests(unittest.TestCase):
|
||||||
self.assertGreaterEqual(end_time - start_time, DELAY,
|
self.assertGreaterEqual(end_time - start_time, DELAY,
|
||||||
"Blocking by unconditional acquiring failed.")
|
"Blocking by unconditional acquiring failed.")
|
||||||
|
|
||||||
|
@mock.patch('time.sleep')
|
||||||
|
def test_acquire_timeout(self, mock_sleep):
|
||||||
|
"""Test invoking acquire() with a positive timeout when the lock is
|
||||||
|
already acquired. Ensure that time.sleep() is invoked with the given
|
||||||
|
timeout and that False is returned."""
|
||||||
|
|
||||||
|
self.lock.acquire()
|
||||||
|
retval = self.lock.acquire(waitflag=0, timeout=1)
|
||||||
|
self.assertTrue(mock_sleep.called)
|
||||||
|
mock_sleep.assert_called_once_with(1)
|
||||||
|
self.assertEqual(retval, False)
|
||||||
|
|
||||||
|
def test_lock_representation(self):
|
||||||
|
self.lock.acquire()
|
||||||
|
self.assertIn("locked", repr(self.lock))
|
||||||
|
self.lock.release()
|
||||||
|
self.assertIn("unlocked", repr(self.lock))
|
||||||
|
|
||||||
|
|
||||||
class MiscTests(unittest.TestCase):
|
class MiscTests(unittest.TestCase):
|
||||||
"""Miscellaneous tests."""
|
"""Miscellaneous tests."""
|
||||||
|
|
||||||
def test_exit(self):
|
def test_exit(self):
|
||||||
#Make sure _thread.exit() raises SystemExit
|
|
||||||
self.assertRaises(SystemExit, _thread.exit)
|
self.assertRaises(SystemExit, _thread.exit)
|
||||||
|
|
||||||
def test_ident(self):
|
def test_ident(self):
|
||||||
#Test sanity of _thread.get_ident()
|
|
||||||
self.assertIsInstance(_thread.get_ident(), int,
|
self.assertIsInstance(_thread.get_ident(), int,
|
||||||
"_thread.get_ident() returned a non-integer")
|
"_thread.get_ident() returned a non-integer")
|
||||||
self.assertNotEqual(_thread.get_ident(), 0,
|
self.assertNotEqual(_thread.get_ident(), 0,
|
||||||
"_thread.get_ident() returned 0")
|
"_thread.get_ident() returned 0")
|
||||||
|
|
||||||
def test_LockType(self):
|
def test_LockType(self):
|
||||||
#Make sure _thread.LockType is the same type as _thread.allocate_locke()
|
|
||||||
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
|
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
|
||||||
"_thread.LockType is not an instance of what "
|
"_thread.LockType is not an instance of what "
|
||||||
"is returned by _thread.allocate_lock()")
|
"is returned by _thread.allocate_lock()")
|
||||||
|
|
||||||
|
def test_set_sentinel(self):
|
||||||
|
self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
|
||||||
|
"_thread._set_sentinel() did not return a "
|
||||||
|
"LockType instance.")
|
||||||
|
|
||||||
def test_interrupt_main(self):
|
def test_interrupt_main(self):
|
||||||
#Calling start_new_thread with a function that executes interrupt_main
|
#Calling start_new_thread with a function that executes interrupt_main
|
||||||
# should raise KeyboardInterrupt upon completion.
|
# should raise KeyboardInterrupt upon completion.
|
||||||
def call_interrupt():
|
def call_interrupt():
|
||||||
_thread.interrupt_main()
|
_thread.interrupt_main()
|
||||||
self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
|
|
||||||
call_interrupt, tuple())
|
self.assertRaises(KeyboardInterrupt,
|
||||||
|
_thread.start_new_thread,
|
||||||
|
call_interrupt,
|
||||||
|
tuple())
|
||||||
|
|
||||||
def test_interrupt_in_main(self):
|
def test_interrupt_in_main(self):
|
||||||
# Make sure that if interrupt_main is called in main threat that
|
|
||||||
# KeyboardInterrupt is raised instantly.
|
|
||||||
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
|
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
|
||||||
|
|
||||||
|
def test_stack_size_None(self):
|
||||||
|
retval = _thread.stack_size(None)
|
||||||
|
self.assertEqual(retval, 0)
|
||||||
|
|
||||||
|
def test_stack_size_not_None(self):
|
||||||
|
with self.assertRaises(_thread.error) as cm:
|
||||||
|
_thread.stack_size("")
|
||||||
|
self.assertEqual(cm.exception.args[0],
|
||||||
|
"setting thread stack size not supported")
|
||||||
|
|
||||||
|
|
||||||
class ThreadTests(unittest.TestCase):
|
class ThreadTests(unittest.TestCase):
|
||||||
"""Test thread creation."""
|
"""Test thread creation."""
|
||||||
|
|
||||||
|
@ -129,31 +162,43 @@ class ThreadTests(unittest.TestCase):
|
||||||
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
|
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
|
||||||
result = testing_queue.get()
|
result = testing_queue.get()
|
||||||
self.assertTrue(result[0] and result[1],
|
self.assertTrue(result[0] and result[1],
|
||||||
"Argument passing for thread creation using tuple failed")
|
"Argument passing for thread creation "
|
||||||
_thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
|
"using tuple failed")
|
||||||
'arg1':True, 'arg2':True})
|
|
||||||
|
_thread.start_new_thread(
|
||||||
|
arg_tester,
|
||||||
|
tuple(),
|
||||||
|
{'queue':testing_queue, 'arg1':True, 'arg2':True})
|
||||||
|
|
||||||
result = testing_queue.get()
|
result = testing_queue.get()
|
||||||
self.assertTrue(result[0] and result[1],
|
self.assertTrue(result[0] and result[1],
|
||||||
"Argument passing for thread creation using kwargs failed")
|
"Argument passing for thread creation "
|
||||||
_thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
|
"using kwargs failed")
|
||||||
|
|
||||||
|
_thread.start_new_thread(
|
||||||
|
arg_tester,
|
||||||
|
(testing_queue, True),
|
||||||
|
{'arg2':True})
|
||||||
|
|
||||||
result = testing_queue.get()
|
result = testing_queue.get()
|
||||||
self.assertTrue(result[0] and result[1],
|
self.assertTrue(result[0] and result[1],
|
||||||
"Argument passing for thread creation using both tuple"
|
"Argument passing for thread creation using both tuple"
|
||||||
" and kwargs failed")
|
" and kwargs failed")
|
||||||
|
|
||||||
def test_multi_creation(self):
|
def test_multi_thread_creation(self):
|
||||||
#Make sure multiple threads can be created.
|
|
||||||
def queue_mark(queue, delay):
|
def queue_mark(queue, delay):
|
||||||
"""Wait for ``delay`` seconds and then put something into ``queue``"""
|
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
queue.put(_thread.get_ident())
|
queue.put(_thread.get_ident())
|
||||||
|
|
||||||
thread_count = 5
|
thread_count = 5
|
||||||
testing_queue = queue.Queue(thread_count)
|
testing_queue = queue.Queue(thread_count)
|
||||||
|
|
||||||
if support.verbose:
|
if support.verbose:
|
||||||
print()
|
print()
|
||||||
print("*** Testing multiple thread creation "\
|
print("*** Testing multiple thread creation "
|
||||||
"(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
|
"(will take approx. %s to %s sec.) ***" % (
|
||||||
|
DELAY, thread_count))
|
||||||
|
|
||||||
for count in range(thread_count):
|
for count in range(thread_count):
|
||||||
if DELAY:
|
if DELAY:
|
||||||
local_delay = round(random.random(), 1)
|
local_delay = round(random.random(), 1)
|
||||||
|
@ -165,18 +210,47 @@ class ThreadTests(unittest.TestCase):
|
||||||
if support.verbose:
|
if support.verbose:
|
||||||
print('done')
|
print('done')
|
||||||
self.assertEqual(testing_queue.qsize(), thread_count,
|
self.assertEqual(testing_queue.qsize(), thread_count,
|
||||||
"Not all %s threads executed properly after %s sec." %
|
"Not all %s threads executed properly "
|
||||||
(thread_count, DELAY))
|
"after %s sec." % (thread_count, DELAY))
|
||||||
|
|
||||||
def test_main(imported_module=None):
|
def test_args_not_tuple(self):
|
||||||
global _thread, DELAY
|
"""
|
||||||
if imported_module:
|
Test invoking start_new_thread() with a non-tuple value for "args".
|
||||||
_thread = imported_module
|
Expect TypeError with a meaningful error message to be raised.
|
||||||
DELAY = 2
|
"""
|
||||||
if support.verbose:
|
with self.assertRaises(TypeError) as cm:
|
||||||
print()
|
_thread.start_new_thread(mock.Mock(), [])
|
||||||
print("*** Using %s as _thread module ***" % _thread)
|
self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
|
||||||
support.run_unittest(LockTests, MiscTests, ThreadTests)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def test_kwargs_not_dict(self):
|
||||||
test_main()
|
"""
|
||||||
|
Test invoking start_new_thread() with a non-dict value for "kwargs".
|
||||||
|
Expect TypeError with a meaningful error message to be raised.
|
||||||
|
"""
|
||||||
|
with self.assertRaises(TypeError) as cm:
|
||||||
|
_thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
|
||||||
|
self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
|
||||||
|
|
||||||
|
def test_SystemExit(self):
|
||||||
|
"""
|
||||||
|
Test invoking start_new_thread() with a function that raises
|
||||||
|
SystemExit.
|
||||||
|
The exception should be discarded.
|
||||||
|
"""
|
||||||
|
func = mock.Mock(side_effect=SystemExit())
|
||||||
|
try:
|
||||||
|
_thread.start_new_thread(func, tuple())
|
||||||
|
except SystemExit:
|
||||||
|
self.fail("start_new_thread raised SystemExit.")
|
||||||
|
|
||||||
|
@mock.patch('traceback.print_exc')
|
||||||
|
def test_RaiseException(self, mock_print_exc):
|
||||||
|
"""
|
||||||
|
Test invoking start_new_thread() with a function that raises exception.
|
||||||
|
|
||||||
|
The exception should be discarded and the traceback should be printed
|
||||||
|
via traceback.print_exc()
|
||||||
|
"""
|
||||||
|
func = mock.Mock(side_effect=Exception)
|
||||||
|
_thread.start_new_thread(func, tuple())
|
||||||
|
self.assertTrue(mock_print_exc.called)
|
||||||
|
|
Loading…
Reference in New Issue