From 5dbb48aaac0ff74648b355ebdde222856004b1ef Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 1 Jun 2018 15:51:02 +0200 Subject: [PATCH] [3.6] bpo-31234: Add test.support.wait_threads_exit() (GH-3578) (GH-7315) * bpo-31234: Add test.support.wait_threads_exit() (GH-3578) Use _thread.count() to wait until threads exit. The new context manager prevents the "dangling thread" warning. (cherry picked from commit ff40ecda73178dfcad24e26240d684356ef20793) * bpo-31234: Try to fix lock_tests warning (#3557) Try to fix the "Warning -- threading_cleanup() failed to cleanup 1 threads" warning in test.lock_tests: wait a little bit longer to give time to the threads to complete. Warning seen on test_thread and test_importlib. (cherry picked from commit 096ae3373abac2c8b3a26a3fe33cc8bd4cbccd4e) --- Lib/test/lock_tests.py | 26 +++++++--- Lib/test/support/__init__.py | 36 +++++++++++++ Lib/test/test_socket.py | 4 ++ Lib/test/test_thread.py | 93 +++++++++++++++++----------------- Lib/test/test_threading.py | 14 ++--- Lib/test/test_threadsignals.py | 92 ++++++++++++++++++--------------- 6 files changed, 163 insertions(+), 102 deletions(-) diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index c1775702487..5b1f033c6f8 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -31,6 +31,9 @@ class Bunch(object): self.started = [] self.finished = [] self._can_exit = not wait_before_exit + self.wait_thread = support.wait_threads_exit() + self.wait_thread.__enter__() + def task(): tid = threading.get_ident() self.started.append(tid) @@ -40,6 +43,7 @@ class Bunch(object): self.finished.append(tid) while not self._can_exit: _wait() + try: for i in range(n): start_new_thread(task, ()) @@ -54,6 +58,8 @@ class Bunch(object): def wait_for_finished(self): while len(self.finished) < self.n: _wait() + # Wait for threads exit + self.wait_thread.__exit__(None, None, None) def do_finish(self): self._can_exit = True @@ -220,20 +226,23 @@ class LockTests(BaseLockTests): # Lock needs to be released before re-acquiring. lock = self.locktype() phase = [] + def f(): lock.acquire() phase.append(None) lock.acquire() phase.append(None) - start_new_thread(f, ()) - while len(phase) == 0: + + with support.wait_threads_exit(): + start_new_thread(f, ()) + while len(phase) == 0: + _wait() _wait() - _wait() - self.assertEqual(len(phase), 1) - lock.release() - while len(phase) == 1: - _wait() - self.assertEqual(len(phase), 2) + self.assertEqual(len(phase), 1) + lock.release() + while len(phase) == 1: + _wait() + self.assertEqual(len(phase), 2) def test_different_thread(self): # Lock can be released from a different thread. @@ -304,6 +313,7 @@ class RLockTests(BaseLockTests): self.assertRaises(RuntimeError, lock.release) finally: b.do_finish() + b.wait_for_finished() def test__is_owned(self): lock = self.locktype() diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 3def27b12ed..8f1aeee72f1 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -2112,6 +2112,42 @@ def reap_threads(func): threading_cleanup(*key) return decorator + +@contextlib.contextmanager +def wait_threads_exit(timeout=60.0): + """ + bpo-31234: Context manager to wait until all threads created in the with + statement exit. + + Use _thread.count() to check if threads exited. Indirectly, wait until + threads exit the internal t_bootstrap() C function of the _thread module. + + threading_setup() and threading_cleanup() are designed to emit a warning + if a test leaves running threads in the background. This context manager + is designed to cleanup threads started by the _thread.start_new_thread() + which doesn't allow to wait for thread exit, whereas thread.Thread has a + join() method. + """ + old_count = _thread._count() + try: + yield + finally: + start_time = time.monotonic() + deadline = start_time + timeout + while True: + count = _thread._count() + if count <= old_count: + break + if time.monotonic() > deadline: + dt = time.monotonic() - start_time + msg = (f"wait_threads() failed to cleanup {count - old_count} " + f"threads after {dt:.1f} seconds " + f"(count: {count}, old count: {old_count})") + raise AssertionError(msg) + time.sleep(0.010) + gc_collect() + + def reap_children(): """Use this function at the end of test_main() whenever sub-processes are started. This will help ensure that no extra children (zombies) diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 2d2ec04fc09..e41ddfd8afd 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -245,6 +245,9 @@ class ThreadableTest: self.server_ready.set() def _setUp(self): + self.wait_threads = support.wait_threads_exit() + self.wait_threads.__enter__() + self.server_ready = threading.Event() self.client_ready = threading.Event() self.done = threading.Event() @@ -271,6 +274,7 @@ class ThreadableTest: def _tearDown(self): self.__tearDown() self.done.wait() + self.wait_threads.__exit__(None, None, None) if self.queue.qsize(): exc = self.queue.get() diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 2dd1593eaa0..52f6c798b87 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -59,12 +59,13 @@ class ThreadRunningTests(BasicThreadTest): self.done_mutex.release() def test_starting_threads(self): - # Basic test for thread creation. - for i in range(NUMTASKS): - self.newtask() - verbose_print("waiting for tasks to complete...") - self.done_mutex.acquire() - verbose_print("all tasks done") + with support.wait_threads_exit(): + # Basic test for thread creation. + for i in range(NUMTASKS): + self.newtask() + verbose_print("waiting for tasks to complete...") + self.done_mutex.acquire() + verbose_print("all tasks done") def test_stack_size(self): # Various stack size tests. @@ -94,12 +95,13 @@ class ThreadRunningTests(BasicThreadTest): verbose_print("trying stack_size = (%d)" % tss) self.next_ident = 0 self.created = 0 - for i in range(NUMTASKS): - self.newtask() + with support.wait_threads_exit(): + for i in range(NUMTASKS): + self.newtask() - verbose_print("waiting for all tasks to complete") - self.done_mutex.acquire() - verbose_print("all tasks done") + verbose_print("waiting for all tasks to complete") + self.done_mutex.acquire() + verbose_print("all tasks done") thread.stack_size(0) @@ -109,25 +111,28 @@ class ThreadRunningTests(BasicThreadTest): mut = thread.allocate_lock() mut.acquire() started = [] + def task(): started.append(None) mut.acquire() mut.release() - thread.start_new_thread(task, ()) - while not started: - time.sleep(POLL_SLEEP) - self.assertEqual(thread._count(), orig + 1) - # Allow the task to finish. - mut.release() - # The only reliable way to be sure that the thread ended from the - # interpreter's point of view is to wait for the function object to be - # destroyed. - done = [] - wr = weakref.ref(task, lambda _: done.append(None)) - del task - while not done: - time.sleep(POLL_SLEEP) - self.assertEqual(thread._count(), orig) + + with support.wait_threads_exit(): + thread.start_new_thread(task, ()) + while not started: + time.sleep(POLL_SLEEP) + self.assertEqual(thread._count(), orig + 1) + # Allow the task to finish. + mut.release() + # The only reliable way to be sure that the thread ended from the + # interpreter's point of view is to wait for the function object to be + # destroyed. + done = [] + wr = weakref.ref(task, lambda _: done.append(None)) + del task + while not done: + time.sleep(POLL_SLEEP) + self.assertEqual(thread._count(), orig) def test_save_exception_state_on_error(self): # See issue #14474 @@ -140,16 +145,14 @@ class ThreadRunningTests(BasicThreadTest): except ValueError: pass real_write(self, *args) - c = thread._count() started = thread.allocate_lock() with support.captured_output("stderr") as stderr: real_write = stderr.write stderr.write = mywrite started.acquire() - thread.start_new_thread(task, ()) - started.acquire() - while thread._count() > c: - time.sleep(POLL_SLEEP) + with support.wait_threads_exit(): + thread.start_new_thread(task, ()) + started.acquire() self.assertIn("Traceback", stderr.getvalue()) @@ -181,13 +184,14 @@ class Barrier: class BarrierTest(BasicThreadTest): def test_barrier(self): - self.bar = Barrier(NUMTASKS) - self.running = NUMTASKS - for i in range(NUMTASKS): - thread.start_new_thread(self.task2, (i,)) - verbose_print("waiting for tasks to end") - self.done_mutex.acquire() - verbose_print("tasks done") + with support.wait_threads_exit(): + self.bar = Barrier(NUMTASKS) + self.running = NUMTASKS + for i in range(NUMTASKS): + thread.start_new_thread(self.task2, (i,)) + verbose_print("waiting for tasks to end") + self.done_mutex.acquire() + verbose_print("tasks done") def task2(self, ident): for i in range(NUMTRIPS): @@ -225,11 +229,10 @@ class TestForkInThread(unittest.TestCase): @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') @support.reap_threads def test_forkinthread(self): - running = True status = "not set" def thread1(): - nonlocal running, status + nonlocal status # fork in a thread pid = os.fork() @@ -244,13 +247,11 @@ class TestForkInThread(unittest.TestCase): # parent os.close(self.write_fd) pid, status = os.waitpid(pid, 0) - running = False - thread.start_new_thread(thread1, ()) - self.assertEqual(os.read(self.read_fd, 2), b"OK", - "Unable to fork() in thread") - while running: - time.sleep(POLL_SLEEP) + with support.wait_threads_exit(): + thread.start_new_thread(thread1, ()) + self.assertEqual(os.read(self.read_fd, 2), b"OK", + "Unable to fork() in thread") self.assertEqual(status, 0) def tearDown(self): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index b42314fdbb2..43ef22b24d4 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -125,9 +125,10 @@ class ThreadTests(BaseTestCase): done.set() done = threading.Event() ident = [] - _thread.start_new_thread(f, ()) - done.wait() - self.assertIsNotNone(ident[0]) + with support.wait_threads_exit(): + tid = _thread.start_new_thread(f, ()) + done.wait() + self.assertEqual(ident[0], tid) # Kill the "immortal" _DummyThread del threading._active[ident[0]] @@ -165,9 +166,10 @@ class ThreadTests(BaseTestCase): mutex = threading.Lock() mutex.acquire() - tid = _thread.start_new_thread(f, (mutex,)) - # Wait for the thread to finish. - mutex.acquire() + with support.wait_threads_exit(): + tid = _thread.start_new_thread(f, (mutex,)) + # Wait for the thread to finish. + mutex.acquire() self.assertIn(tid, threading._active) self.assertIsInstance(threading._active[tid], threading._DummyThread) #Issue 29376 diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py index 7d4d8c4106f..99b60cd9e6b 100644 --- a/Lib/test/test_threadsignals.py +++ b/Lib/test/test_threadsignals.py @@ -4,8 +4,8 @@ import unittest import signal import os import sys -from test.support import run_unittest, import_module -thread = import_module('_thread') +from test import support +thread = support.import_module('_thread') import time if (sys.platform[:3] == 'win'): @@ -39,13 +39,15 @@ def send_signals(): class ThreadSignals(unittest.TestCase): def test_signals(self): - # Test signal handling semantics of threads. - # We spawn a thread, have the thread send two signals, and - # wait for it to finish. Check that we got both signals - # and that they were run by the main thread. - signalled_all.acquire() - self.spawnSignallingThread() - signalled_all.acquire() + with support.wait_threads_exit(): + # Test signal handling semantics of threads. + # We spawn a thread, have the thread send two signals, and + # wait for it to finish. Check that we got both signals + # and that they were run by the main thread. + signalled_all.acquire() + self.spawnSignallingThread() + signalled_all.acquire() + # the signals that we asked the kernel to send # will come back, but we don't know when. # (it might even be after the thread exits @@ -118,17 +120,19 @@ class ThreadSignals(unittest.TestCase): # thread. def other_thread(): rlock.acquire() - thread.start_new_thread(other_thread, ()) - # Wait until we can't acquire it without blocking... - while rlock.acquire(blocking=False): - rlock.release() - time.sleep(0.01) - signal.alarm(1) - t1 = time.time() - self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) - dt = time.time() - t1 - # See rationale above in test_lock_acquire_interruption - self.assertLess(dt, 3.0) + + with support.wait_threads_exit(): + thread.start_new_thread(other_thread, ()) + # Wait until we can't acquire it without blocking... + while rlock.acquire(blocking=False): + rlock.release() + time.sleep(0.01) + signal.alarm(1) + t1 = time.time() + self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) + dt = time.time() - t1 + # See rationale above in test_lock_acquire_interruption + self.assertLess(dt, 3.0) finally: signal.alarm(0) signal.signal(signal.SIGALRM, oldalrm) @@ -137,6 +141,7 @@ class ThreadSignals(unittest.TestCase): self.sig_recvd = False def my_handler(signal, frame): self.sig_recvd = True + old_handler = signal.signal(signal.SIGUSR1, my_handler) try: def other_thread(): @@ -151,14 +156,16 @@ class ThreadSignals(unittest.TestCase): # the lock acquisition. Then we'll let it run. time.sleep(0.5) lock.release() - thread.start_new_thread(other_thread, ()) - # Wait until we can't acquire it without blocking... - while lock.acquire(blocking=False): - lock.release() - time.sleep(0.01) - result = lock.acquire() # Block while we receive a signal. - self.assertTrue(self.sig_recvd) - self.assertTrue(result) + + with support.wait_threads_exit(): + thread.start_new_thread(other_thread, ()) + # Wait until we can't acquire it without blocking... + while lock.acquire(blocking=False): + lock.release() + time.sleep(0.01) + result = lock.acquire() # Block while we receive a signal. + self.assertTrue(self.sig_recvd) + self.assertTrue(result) finally: signal.signal(signal.SIGUSR1, old_handler) @@ -197,19 +204,20 @@ class ThreadSignals(unittest.TestCase): os.kill(process_pid, signal.SIGUSR1) done.release() - # Send the signals from the non-main thread, since the main thread - # is the only one that can process signals. - thread.start_new_thread(send_signals, ()) - timed_acquire() - # Wait for thread to finish - done.acquire() - # This allows for some timing and scheduling imprecision - self.assertLess(self.end - self.start, 2.0) - self.assertGreater(self.end - self.start, 0.3) - # If the signal is received several times before PyErr_CheckSignals() - # is called, the handler will get called less than 40 times. Just - # check it's been called at least once. - self.assertGreater(self.sigs_recvd, 0) + with support.wait_threads_exit(): + # Send the signals from the non-main thread, since the main thread + # is the only one that can process signals. + thread.start_new_thread(send_signals, ()) + timed_acquire() + # Wait for thread to finish + done.acquire() + # This allows for some timing and scheduling imprecision + self.assertLess(self.end - self.start, 2.0) + self.assertGreater(self.end - self.start, 0.3) + # If the signal is received several times before PyErr_CheckSignals() + # is called, the handler will get called less than 40 times. Just + # check it's been called at least once. + self.assertGreater(self.sigs_recvd, 0) finally: signal.signal(signal.SIGUSR1, old_handler) @@ -223,7 +231,7 @@ def test_main(): oldsigs = registerSignals(handle_signals, handle_signals, handle_signals) try: - run_unittest(ThreadSignals) + support.run_unittest(ThreadSignals) finally: registerSignals(*oldsigs)