From bd8c629eb54860df775f0072f4cf5fbd23dededb Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Wed, 1 Apr 2015 12:56:39 +0300 Subject: [PATCH] Issue #23799: Added test.test_support.start_threads() for running and cleaning up multiple threads. --- Lib/test/test_bz2.py | 12 +++++------ Lib/test/test_capi.py | 32 +++++++++++++--------------- Lib/test/test_gc.py | 12 +++-------- Lib/test/test_io.py | 32 +++++++++++----------------- Lib/test/test_support.py | 35 ++++++++++++++++++++++++++++++- Lib/test/test_threadedtempfile.py | 27 +++++++----------------- Lib/test/test_threading_local.py | 19 +++++++---------- Misc/NEWS | 3 +++ 8 files changed, 85 insertions(+), 87 deletions(-) diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index e2c222ad8a8..bf5eb2055be 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -1,4 +1,4 @@ -from test import test_support +from test import test_support as support from test.test_support import TESTFN, _4G, bigmemtest, import_module, findfile import unittest @@ -306,10 +306,8 @@ class BZ2FileTest(BaseTest): for i in range(5): f.write(data) threads = [threading.Thread(target=comp) for i in range(nthreads)] - for t in threads: - t.start() - for t in threads: - t.join() + with support.start_threads(threads): + pass def testMixedIterationReads(self): # Issue #8397: mixed iteration and reads should be forbidden. @@ -482,13 +480,13 @@ class FuncTest(BaseTest): self.assertEqual(text.strip("a"), "") def test_main(): - test_support.run_unittest( + support.run_unittest( BZ2FileTest, BZ2CompressorTest, BZ2DecompressorTest, FuncTest ) - test_support.reap_children() + support.reap_children() if __name__ == '__main__': test_main() diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index a2cb5c790f2..2029265e1cd 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -6,7 +6,7 @@ import sys import time import random import unittest -from test import test_support +from test import test_support as support try: import thread import threading @@ -14,7 +14,7 @@ except ImportError: thread = None threading = None # Skip this test if the _testcapi module isn't available. -_testcapi = test_support.import_module('_testcapi') +_testcapi = support.import_module('_testcapi') @unittest.skipUnless(threading, 'Threading required for this test.') @@ -42,7 +42,7 @@ class TestPendingCalls(unittest.TestCase): #this busy loop is where we expect to be interrupted to #run our callbacks. Note that callbacks are only run on the #main thread - if False and test_support.verbose: + if False and support.verbose: print "(%i)"%(len(l),), for i in xrange(1000): a = i*i @@ -51,7 +51,7 @@ class TestPendingCalls(unittest.TestCase): count += 1 self.assertTrue(count < 10000, "timeout waiting for %i callbacks, got %i"%(n, len(l))) - if False and test_support.verbose: + if False and support.verbose: print "(%i)"%(len(l),) def test_pendingcalls_threaded(self): @@ -67,15 +67,11 @@ class TestPendingCalls(unittest.TestCase): context.lock = threading.Lock() context.event = threading.Event() - for i in range(context.nThreads): - t = threading.Thread(target=self.pendingcalls_thread, args = (context,)) - t.start() - threads.append(t) - - self.pendingcalls_wait(context.l, n, context) - - for t in threads: - t.join() + threads = [threading.Thread(target=self.pendingcalls_thread, + args=(context,)) + for i in range(context.nThreads)] + with support.start_threads(threads): + self.pendingcalls_wait(context.l, n, context) def pendingcalls_thread(self, context): try: @@ -84,7 +80,7 @@ class TestPendingCalls(unittest.TestCase): with context.lock: context.nFinished += 1 nFinished = context.nFinished - if False and test_support.verbose: + if False and support.verbose: print "finished threads: ", nFinished if nFinished == context.nThreads: context.event.set() @@ -103,7 +99,7 @@ class TestPendingCalls(unittest.TestCase): @unittest.skipUnless(threading and thread, 'Threading required for this test.') class TestThreadState(unittest.TestCase): - @test_support.reap_threads + @support.reap_threads def test_thread_state(self): # some extra thread-state tests driven via _testcapi def target(): @@ -129,14 +125,14 @@ def test_main(): for name in dir(_testcapi): if name.startswith('test_'): test = getattr(_testcapi, name) - if test_support.verbose: + if support.verbose: print "internal", name try: test() except _testcapi.error: - raise test_support.TestFailed, sys.exc_info()[1] + raise support.TestFailed, sys.exc_info()[1] - test_support.run_unittest(TestPendingCalls, TestThreadState) + support.run_unittest(TestPendingCalls, TestThreadState) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index 5746c39a4cc..ed01c9802fc 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -1,5 +1,5 @@ import unittest -from test.test_support import verbose, run_unittest +from test.test_support import verbose, run_unittest, start_threads import sys import time import gc @@ -352,19 +352,13 @@ class GCTests(unittest.TestCase): old_checkinterval = sys.getcheckinterval() sys.setcheckinterval(3) try: - exit = False + exit = [] threads = [] for i in range(N_THREADS): t = threading.Thread(target=run_thread) threads.append(t) - try: - for t in threads: - t.start() - finally: + with start_threads(threads, lambda: exit.append(1)): time.sleep(1.0) - exit = True - for t in threads: - t.join() finally: sys.setcheckinterval(old_checkinterval) gc.collect() diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index fc68e4dbf83..2914a805b52 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -985,11 +985,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): errors.append(e) raise threads = [threading.Thread(target=f) for x in range(20)] - for t in threads: - t.start() - time.sleep(0.02) # yield - for t in threads: - t.join() + with support.start_threads(threads): + time.sleep(0.02) # yield self.assertFalse(errors, "the following exceptions were caught: %r" % errors) s = b''.join(results) @@ -1299,11 +1296,8 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): errors.append(e) raise threads = [threading.Thread(target=f) for x in range(20)] - for t in threads: - t.start() - time.sleep(0.02) # yield - for t in threads: - t.join() + with support.start_threads(threads): + time.sleep(0.02) # yield self.assertFalse(errors, "the following exceptions were caught: %r" % errors) bufio.close() @@ -2555,14 +2549,10 @@ class TextIOWrapperTest(unittest.TestCase): text = "Thread%03d\n" % n event.wait() f.write(text) - threads = [threading.Thread(target=lambda n=x: run(n)) + threads = [threading.Thread(target=run, args=(x,)) for x in range(20)] - for t in threads: - t.start() - time.sleep(0.02) - event.set() - for t in threads: - t.join() + with support.start_threads(threads, event.set): + time.sleep(0.02) with self.open(support.TESTFN) as f: content = f.read() for n in range(20): @@ -3042,9 +3032,11 @@ class SignalsTest(unittest.TestCase): # return with a successful (partial) result rather than an EINTR. # The buffered IO layer must check for pending signal # handlers, which in this case will invoke alarm_interrupt(). - self.assertRaises(ZeroDivisionError, - wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1)) - t.join() + try: + with self.assertRaises(ZeroDivisionError): + wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) + finally: + t.join() # We got one byte, get another one and check that it isn't a # repeat of the first one. read_results.append(os.read(r, 1)) diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 1fbd5b444ba..75563cbb21f 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -37,7 +37,7 @@ __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", "captured_stdout", "TransientResource", "transient_internet", "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", - "threading_cleanup", "reap_children", "cpython_only", + "threading_cleanup", "reap_threads", "start_threads", "cpython_only", "check_impl_detail", "get_attribute", "py3k_bytes", "import_fresh_module", "threading_cleanup", "reap_children", "strip_python_stderr", "IPV6_ENABLED"] @@ -1508,6 +1508,39 @@ def reap_children(): except: break +@contextlib.contextmanager +def start_threads(threads, unlock=None): + threads = list(threads) + started = [] + try: + try: + for t in threads: + t.start() + started.append(t) + except: + if verbose: + print("Can't start %d threads, only %d threads started" % + (len(threads), len(started))) + raise + yield + finally: + if unlock: + unlock() + endtime = starttime = time.time() + for timeout in range(1, 16): + endtime += 60 + for t in started: + t.join(max(endtime - time.time(), 0.01)) + started = [t for t in started if t.isAlive()] + if not started: + break + if verbose: + print('Unable to join %d threads during a period of ' + '%d minutes' % (len(started), timeout)) + started = [t for t in started if t.isAlive()] + if started: + raise AssertionError('Unable to join %d threads' % len(started)) + @contextlib.contextmanager def swap_attr(obj, attr, new_val): """Temporary swap out an attribute with a new object. diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py index 81d9687be25..c2c30dec7cf 100644 --- a/Lib/test/test_threadedtempfile.py +++ b/Lib/test/test_threadedtempfile.py @@ -18,7 +18,7 @@ FILES_PER_THREAD = 50 import tempfile -from test.test_support import threading_setup, threading_cleanup, run_unittest, import_module +from test.test_support import start_threads, run_unittest, import_module threading = import_module('threading') import unittest import StringIO @@ -46,25 +46,12 @@ class TempFileGreedy(threading.Thread): class ThreadedTempFileTest(unittest.TestCase): def test_main(self): - threads = [] - thread_info = threading_setup() - - for i in range(NUM_THREADS): - t = TempFileGreedy() - threads.append(t) - t.start() - - startEvent.set() - - ok = 0 - errors = [] - for t in threads: - t.join() - ok += t.ok_count - if t.error_count: - errors.append(str(t.getName()) + str(t.errors.getvalue())) - - threading_cleanup(*thread_info) + threads = [TempFileGreedy() for i in range(NUM_THREADS)] + with start_threads(threads, startEvent.set): + pass + ok = sum(t.ok_count for t in threads) + errors = [str(t.getName()) + str(t.errors.getvalue()) + for t in threads if t.error_count] msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok, '\n'.join(errors)) diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py index 4c9f2961ae8..b161315c6fb 100644 --- a/Lib/test/test_threading_local.py +++ b/Lib/test/test_threading_local.py @@ -1,12 +1,12 @@ import unittest from doctest import DocTestSuite -from test import test_support +from test import test_support as support import weakref import gc # Modules under test -_thread = test_support.import_module('thread') -threading = test_support.import_module('threading') +_thread = support.import_module('thread') +threading = support.import_module('threading') import _threading_local @@ -63,14 +63,9 @@ class BaseLocalTest: # Simply check that the variable is correctly set self.assertEqual(local.x, i) - threads= [] - for i in range(10): - t = threading.Thread(target=f, args=(i,)) - t.start() - threads.append(t) - - for t in threads: - t.join() + with support.start_threads(threading.Thread(target=f, args=(i,)) + for i in range(10)): + pass def test_derived_cycle_dealloc(self): # http://bugs.python.org/issue6990 @@ -228,7 +223,7 @@ def test_main(): setUp=setUp, tearDown=tearDown) ) - test_support.run_unittest(suite) + support.run_unittest(suite) if __name__ == '__main__': test_main() diff --git a/Misc/NEWS b/Misc/NEWS index 9c3cd3d1862..410f0b0e172 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -182,6 +182,9 @@ Tools/Demos Tests ----- +- Issue #23799: Added test.test_support.start_threads() for running and + cleaning up multiple threads. + - Issue #22390: test.regrtest now emits a warning if temporary files or directories are left after running a test.