import os import unittest import random from test import support from test.support import threading_helper import _thread as thread import time import warnings import weakref from test import lock_tests threading_helper.requires_working_threading(module=True) NUMTASKS = 10 NUMTRIPS = 3 _print_mutex = thread.allocate_lock() def verbose_print(arg): """Helper function for printing out debugging output.""" if support.verbose: with _print_mutex: print(arg) class BasicThreadTest(unittest.TestCase): def setUp(self): self.done_mutex = thread.allocate_lock() self.done_mutex.acquire() self.running_mutex = thread.allocate_lock() self.random_mutex = thread.allocate_lock() self.created = 0 self.running = 0 self.next_ident = 0 key = threading_helper.threading_setup() self.addCleanup(threading_helper.threading_cleanup, *key) class ThreadRunningTests(BasicThreadTest): def newtask(self): with self.running_mutex: self.next_ident += 1 verbose_print("creating task %s" % self.next_ident) thread.start_new_thread(self.task, (self.next_ident,)) self.created += 1 self.running += 1 def task(self, ident): with self.random_mutex: delay = random.random() / 10000.0 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) time.sleep(delay) verbose_print("task %s done" % ident) with self.running_mutex: self.running -= 1 if self.created == NUMTASKS and self.running == 0: self.done_mutex.release() def test_starting_threads(self): with threading_helper.wait_threads_exit(): # Basic test for thread creation. for i in range(NUMTASKS): self.newtask() verbose_print("waiting for tasks to complete...") self.done_mutex.acquire() verbose_print("all tasks done") def test_stack_size(self): # Various stack size tests. self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") thread.stack_size(0) self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') def test_nt_and_posix_stack_size(self): try: thread.stack_size(4096) except ValueError: verbose_print("caught expected ValueError setting " "stack_size(4096)") except thread.error: self.skipTest("platform does not support changing thread stack " "size") fail_msg = "stack_size(%d) failed - should succeed" for tss in (262144, 0x100000, 0): thread.stack_size(tss) self.assertEqual(thread.stack_size(), tss, fail_msg % tss) verbose_print("successfully set stack_size(%d)" % tss) for tss in (262144, 0x100000): verbose_print("trying stack_size = (%d)" % tss) self.next_ident = 0 self.created = 0 with threading_helper.wait_threads_exit(): for i in range(NUMTASKS): self.newtask() verbose_print("waiting for all tasks to complete") self.done_mutex.acquire() verbose_print("all tasks done") thread.stack_size(0) def test__count(self): # Test the _count() function. orig = thread._count() mut = thread.allocate_lock() mut.acquire() started = [] def task(): started.append(None) mut.acquire() mut.release() with threading_helper.wait_threads_exit(): thread.start_new_thread(task, ()) for _ in support.sleeping_retry(support.LONG_TIMEOUT): if started: break self.assertEqual(thread._count(), orig + 1) # Allow the task to finish. mut.release() # The only reliable way to be sure that the thread ended from the # interpreter's point of view is to wait for the function object to # be destroyed. done = [] wr = weakref.ref(task, lambda _: done.append(None)) del task for _ in support.sleeping_retry(support.LONG_TIMEOUT): if done: break support.gc_collect() # For PyPy or other GCs. self.assertEqual(thread._count(), orig) def test_unraisable_exception(self): def task(): started.release() raise ValueError("task failed") started = thread.allocate_lock() with support.catch_unraisable_exception() as cm: with threading_helper.wait_threads_exit(): started.acquire() thread.start_new_thread(task, ()) started.acquire() self.assertEqual(str(cm.unraisable.exc_value), "task failed") self.assertIsNone(cm.unraisable.object) self.assertEqual(cm.unraisable.err_msg, f"Exception ignored in thread started by {task!r}") self.assertIsNotNone(cm.unraisable.exc_traceback) def test_join_thread(self): finished = [] def task(): time.sleep(0.05) finished.append(thread.get_ident()) with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(task) handle.join() self.assertEqual(len(finished), 1) self.assertEqual(handle.ident, finished[0]) def test_join_thread_already_exited(self): def task(): pass with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(task) time.sleep(0.05) handle.join() def test_join_several_times(self): def task(): pass with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(task) handle.join() # Subsequent join() calls should succeed handle.join() def test_joinable_not_joined(self): handle_destroyed = thread.allocate_lock() handle_destroyed.acquire() def task(): handle_destroyed.acquire() with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(task) del handle handle_destroyed.release() def test_join_from_self(self): errors = [] handles = [] start_joinable_thread_returned = thread.allocate_lock() start_joinable_thread_returned.acquire() task_tried_to_join = thread.allocate_lock() task_tried_to_join.acquire() def task(): start_joinable_thread_returned.acquire() try: handles[0].join() except Exception as e: errors.append(e) finally: task_tried_to_join.release() with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(task) handles.append(handle) start_joinable_thread_returned.release() # Can still join after joining failed in other thread task_tried_to_join.acquire() handle.join() assert len(errors) == 1 with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): raise errors[0] def test_join_then_self_join(self): # make sure we can't deadlock in the following scenario with # threads t0 and t1 (see comment in `ThreadHandle_join()` for more # details): # # - t0 joins t1 # - t1 self joins def make_lock(): lock = thread.allocate_lock() lock.acquire() return lock error = None self_joiner_handle = None self_joiner_started = make_lock() self_joiner_barrier = make_lock() def self_joiner(): nonlocal error self_joiner_started.release() self_joiner_barrier.acquire() try: self_joiner_handle.join() except Exception as e: error = e joiner_started = make_lock() def joiner(): joiner_started.release() self_joiner_handle.join() with threading_helper.wait_threads_exit(): self_joiner_handle = thread.start_joinable_thread(self_joiner) # Wait for the self-joining thread to start self_joiner_started.acquire() # Start the thread that joins the self-joiner joiner_handle = thread.start_joinable_thread(joiner) # Wait for the joiner to start joiner_started.acquire() # Not great, but I don't think there's a deterministic way to make # sure that the self-joining thread has been joined. time.sleep(0.1) # Unblock the self-joiner self_joiner_barrier.release() self_joiner_handle.join() joiner_handle.join() with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): raise error def test_join_with_timeout(self): lock = thread.allocate_lock() lock.acquire() def thr(): lock.acquire() with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(thr) handle.join(0.1) self.assertFalse(handle.is_done()) lock.release() handle.join() self.assertTrue(handle.is_done()) def test_join_unstarted(self): handle = thread._ThreadHandle() with self.assertRaisesRegex(RuntimeError, "thread not started"): handle.join() def test_set_done_unstarted(self): handle = thread._ThreadHandle() with self.assertRaisesRegex(RuntimeError, "thread not started"): handle._set_done() def test_start_duplicate_handle(self): lock = thread.allocate_lock() lock.acquire() def func(): lock.acquire() handle = thread._ThreadHandle() with threading_helper.wait_threads_exit(): thread.start_joinable_thread(func, handle=handle) with self.assertRaisesRegex(RuntimeError, "thread already started"): thread.start_joinable_thread(func, handle=handle) lock.release() handle.join() def test_start_with_none_handle(self): def func(): pass with threading_helper.wait_threads_exit(): handle = thread.start_joinable_thread(func, handle=None) handle.join() class Barrier: def __init__(self, num_threads): self.num_threads = num_threads self.waiting = 0 self.checkin_mutex = thread.allocate_lock() self.checkout_mutex = thread.allocate_lock() self.checkout_mutex.acquire() def enter(self): self.checkin_mutex.acquire() self.waiting = self.waiting + 1 if self.waiting == self.num_threads: self.waiting = self.num_threads - 1 self.checkout_mutex.release() return self.checkin_mutex.release() self.checkout_mutex.acquire() self.waiting = self.waiting - 1 if self.waiting == 0: self.checkin_mutex.release() return self.checkout_mutex.release() class BarrierTest(BasicThreadTest): def test_barrier(self): with threading_helper.wait_threads_exit(): self.bar = Barrier(NUMTASKS) self.running = NUMTASKS for i in range(NUMTASKS): thread.start_new_thread(self.task2, (i,)) verbose_print("waiting for tasks to end") self.done_mutex.acquire() verbose_print("tasks done") def task2(self, ident): for i in range(NUMTRIPS): if ident == 0: # give it a good chance to enter the next # barrier before the others are all out # of the current one delay = 0 else: with self.random_mutex: delay = random.random() / 10000.0 verbose_print("task %s will run for %sus" % (ident, round(delay * 1e6))) time.sleep(delay) verbose_print("task %s entering %s" % (ident, i)) self.bar.enter() verbose_print("task %s leaving barrier" % ident) with self.running_mutex: self.running -= 1 # Must release mutex before releasing done, else the main thread can # exit and set mutex to None as part of global teardown; then # mutex.release() raises AttributeError. finished = self.running == 0 if finished: self.done_mutex.release() class LockTests(lock_tests.LockTests): locktype = thread.allocate_lock class TestForkInThread(unittest.TestCase): def setUp(self): self.read_fd, self.write_fd = os.pipe() @support.requires_fork() @threading_helper.reap_threads def test_forkinthread(self): pid = None def fork_thread(read_fd, write_fd): nonlocal pid # Ignore the warning about fork with threads. with warnings.catch_warnings(category=DeprecationWarning, action="ignore"): # fork in a thread (DANGER, undefined per POSIX) if (pid := os.fork()): # parent process return # child process try: os.close(read_fd) os.write(write_fd, b"OK") finally: os._exit(0) with threading_helper.wait_threads_exit(): thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd)) self.assertEqual(os.read(self.read_fd, 2), b"OK") os.close(self.write_fd) self.assertIsNotNone(pid) support.wait_process(pid, exitcode=0) def tearDown(self): try: os.close(self.read_fd) except OSError: pass try: os.close(self.write_fd) except OSError: pass if __name__ == "__main__": unittest.main()