mirror of https://github.com/python/cpython
Issue #23799: Added test.support.start_threads() for running and cleaning up
multiple threads.
This commit is contained in:
parent
8218bd4caf
commit
263dcd20a3
|
@ -6,6 +6,7 @@ if __name__ != 'test.support':
|
||||||
import collections.abc
|
import collections.abc
|
||||||
import contextlib
|
import contextlib
|
||||||
import errno
|
import errno
|
||||||
|
import faulthandler
|
||||||
import fnmatch
|
import fnmatch
|
||||||
import functools
|
import functools
|
||||||
import gc
|
import gc
|
||||||
|
@ -96,7 +97,7 @@ __all__ = [
|
||||||
# logging
|
# logging
|
||||||
"TestHandler",
|
"TestHandler",
|
||||||
# threads
|
# threads
|
||||||
"threading_setup", "threading_cleanup",
|
"threading_setup", "threading_cleanup", "reap_threads", "start_threads",
|
||||||
# miscellaneous
|
# miscellaneous
|
||||||
"check_warnings", "EnvironmentVarGuard", "run_with_locale", "swap_item",
|
"check_warnings", "EnvironmentVarGuard", "run_with_locale", "swap_item",
|
||||||
"swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
|
"swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
|
||||||
|
@ -1940,6 +1941,42 @@ def reap_children():
|
||||||
except:
|
except:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def start_threads(threads, unlock=None):
|
||||||
|
threads = list(threads)
|
||||||
|
started = []
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
started.append(t)
|
||||||
|
except:
|
||||||
|
if verbose:
|
||||||
|
print("Can't start %d threads, only %d threads started" %
|
||||||
|
(len(threads), len(started)))
|
||||||
|
raise
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if unlock:
|
||||||
|
unlock()
|
||||||
|
endtime = starttime = time.time()
|
||||||
|
for timeout in range(1, 16):
|
||||||
|
endtime += 60
|
||||||
|
for t in started:
|
||||||
|
t.join(max(endtime - time.time(), 0.01))
|
||||||
|
started = [t for t in started if t.isAlive()]
|
||||||
|
if not started:
|
||||||
|
break
|
||||||
|
if verbose:
|
||||||
|
print('Unable to join %d threads during a period of '
|
||||||
|
'%d minutes' % (len(started), timeout))
|
||||||
|
finally:
|
||||||
|
started = [t for t in started if t.isAlive()]
|
||||||
|
if started:
|
||||||
|
faulthandler.dump_traceback(sys.stdout)
|
||||||
|
raise AssertionError('Unable to join %d threads' % len(started))
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def swap_attr(obj, attr, new_val):
|
def swap_attr(obj, attr, new_val):
|
||||||
"""Temporary swap out an attribute with a new object.
|
"""Temporary swap out an attribute with a new object.
|
||||||
|
|
|
@ -493,10 +493,8 @@ class BZ2FileTest(BaseTest):
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
f.write(data)
|
f.write(data)
|
||||||
threads = [threading.Thread(target=comp) for i in range(nthreads)]
|
threads = [threading.Thread(target=comp) for i in range(nthreads)]
|
||||||
for t in threads:
|
with support.start_threads(threads):
|
||||||
t.start()
|
pass
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
def testWithoutThreading(self):
|
def testWithoutThreading(self):
|
||||||
module = support.import_fresh_module("bz2", blocked=("threading",))
|
module = support.import_fresh_module("bz2", blocked=("threading",))
|
||||||
|
|
|
@ -202,15 +202,11 @@ class TestPendingCalls(unittest.TestCase):
|
||||||
context.lock = threading.Lock()
|
context.lock = threading.Lock()
|
||||||
context.event = threading.Event()
|
context.event = threading.Event()
|
||||||
|
|
||||||
for i in range(context.nThreads):
|
threads = [threading.Thread(target=self.pendingcalls_thread,
|
||||||
t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
|
args=(context,))
|
||||||
t.start()
|
for i in range(context.nThreads)]
|
||||||
threads.append(t)
|
with support.start_threads(threads):
|
||||||
|
self.pendingcalls_wait(context.l, n, context)
|
||||||
self.pendingcalls_wait(context.l, n, context)
|
|
||||||
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
def pendingcalls_thread(self, context):
|
def pendingcalls_thread(self, context):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import unittest
|
import unittest
|
||||||
from test.support import (verbose, refcount_test, run_unittest,
|
from test.support import (verbose, refcount_test, run_unittest,
|
||||||
strip_python_stderr, cpython_only)
|
strip_python_stderr, cpython_only, start_threads)
|
||||||
from test.script_helper import assert_python_ok, make_script, temp_dir
|
from test.script_helper import assert_python_ok, make_script, temp_dir
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
@ -397,19 +397,13 @@ class GCTests(unittest.TestCase):
|
||||||
old_switchinterval = sys.getswitchinterval()
|
old_switchinterval = sys.getswitchinterval()
|
||||||
sys.setswitchinterval(1e-5)
|
sys.setswitchinterval(1e-5)
|
||||||
try:
|
try:
|
||||||
exit = False
|
exit = []
|
||||||
threads = []
|
threads = []
|
||||||
for i in range(N_THREADS):
|
for i in range(N_THREADS):
|
||||||
t = threading.Thread(target=run_thread)
|
t = threading.Thread(target=run_thread)
|
||||||
threads.append(t)
|
threads.append(t)
|
||||||
try:
|
with start_threads(threads, lambda: exit.append(1)):
|
||||||
for t in threads:
|
|
||||||
t.start()
|
|
||||||
finally:
|
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
exit = True
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
finally:
|
finally:
|
||||||
sys.setswitchinterval(old_switchinterval)
|
sys.setswitchinterval(old_switchinterval)
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
|
@ -1070,11 +1070,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
|
||||||
errors.append(e)
|
errors.append(e)
|
||||||
raise
|
raise
|
||||||
threads = [threading.Thread(target=f) for x in range(20)]
|
threads = [threading.Thread(target=f) for x in range(20)]
|
||||||
for t in threads:
|
with support.start_threads(threads):
|
||||||
t.start()
|
time.sleep(0.02) # yield
|
||||||
time.sleep(0.02) # yield
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
self.assertFalse(errors,
|
self.assertFalse(errors,
|
||||||
"the following exceptions were caught: %r" % errors)
|
"the following exceptions were caught: %r" % errors)
|
||||||
s = b''.join(results)
|
s = b''.join(results)
|
||||||
|
@ -1393,11 +1390,8 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
|
||||||
errors.append(e)
|
errors.append(e)
|
||||||
raise
|
raise
|
||||||
threads = [threading.Thread(target=f) for x in range(20)]
|
threads = [threading.Thread(target=f) for x in range(20)]
|
||||||
for t in threads:
|
with support.start_threads(threads):
|
||||||
t.start()
|
time.sleep(0.02) # yield
|
||||||
time.sleep(0.02) # yield
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
self.assertFalse(errors,
|
self.assertFalse(errors,
|
||||||
"the following exceptions were caught: %r" % errors)
|
"the following exceptions were caught: %r" % errors)
|
||||||
bufio.close()
|
bufio.close()
|
||||||
|
@ -2691,14 +2685,10 @@ class TextIOWrapperTest(unittest.TestCase):
|
||||||
text = "Thread%03d\n" % n
|
text = "Thread%03d\n" % n
|
||||||
event.wait()
|
event.wait()
|
||||||
f.write(text)
|
f.write(text)
|
||||||
threads = [threading.Thread(target=lambda n=x: run(n))
|
threads = [threading.Thread(target=run, args=(x,))
|
||||||
for x in range(20)]
|
for x in range(20)]
|
||||||
for t in threads:
|
with support.start_threads(threads, event.set):
|
||||||
t.start()
|
time.sleep(0.02)
|
||||||
time.sleep(0.02)
|
|
||||||
event.set()
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
with self.open(support.TESTFN) as f:
|
with self.open(support.TESTFN) as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
for n in range(20):
|
for n in range(20):
|
||||||
|
@ -3402,11 +3392,11 @@ class SignalsTest(unittest.TestCase):
|
||||||
# handlers, which in this case will invoke alarm_interrupt().
|
# handlers, which in this case will invoke alarm_interrupt().
|
||||||
signal.alarm(1)
|
signal.alarm(1)
|
||||||
try:
|
try:
|
||||||
self.assertRaises(ZeroDivisionError,
|
with self.assertRaises(ZeroDivisionError):
|
||||||
wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
|
wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
|
||||||
finally:
|
finally:
|
||||||
signal.alarm(0)
|
signal.alarm(0)
|
||||||
t.join()
|
t.join()
|
||||||
# We got one byte, get another one and check that it isn't a
|
# We got one byte, get another one and check that it isn't a
|
||||||
# repeat of the first one.
|
# repeat of the first one.
|
||||||
read_results.append(os.read(r, 1))
|
read_results.append(os.read(r, 1))
|
||||||
|
|
|
@ -14,7 +14,7 @@ import shutil
|
||||||
import unittest
|
import unittest
|
||||||
from test.support import (
|
from test.support import (
|
||||||
verbose, import_module, run_unittest, TESTFN, reap_threads,
|
verbose, import_module, run_unittest, TESTFN, reap_threads,
|
||||||
forget, unlink, rmtree)
|
forget, unlink, rmtree, start_threads)
|
||||||
threading = import_module('threading')
|
threading = import_module('threading')
|
||||||
|
|
||||||
def task(N, done, done_tasks, errors):
|
def task(N, done, done_tasks, errors):
|
||||||
|
@ -115,10 +115,10 @@ class ThreadedImportTests(unittest.TestCase):
|
||||||
errors = []
|
errors = []
|
||||||
done_tasks = []
|
done_tasks = []
|
||||||
done.clear()
|
done.clear()
|
||||||
for i in range(N):
|
with start_threads(threading.Thread(target=task,
|
||||||
t = threading.Thread(target=task,
|
args=(N, done, done_tasks, errors,))
|
||||||
args=(N, done, done_tasks, errors,))
|
for i in range(N)):
|
||||||
t.start()
|
pass
|
||||||
self.assertTrue(done.wait(60))
|
self.assertTrue(done.wait(60))
|
||||||
self.assertFalse(errors)
|
self.assertFalse(errors)
|
||||||
if verbose:
|
if verbose:
|
||||||
|
|
|
@ -18,7 +18,7 @@ FILES_PER_THREAD = 50
|
||||||
|
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from test.support import threading_setup, threading_cleanup, run_unittest, import_module
|
from test.support import start_threads, import_module
|
||||||
threading = import_module('threading')
|
threading = import_module('threading')
|
||||||
import unittest
|
import unittest
|
||||||
import io
|
import io
|
||||||
|
@ -46,33 +46,17 @@ class TempFileGreedy(threading.Thread):
|
||||||
|
|
||||||
class ThreadedTempFileTest(unittest.TestCase):
|
class ThreadedTempFileTest(unittest.TestCase):
|
||||||
def test_main(self):
|
def test_main(self):
|
||||||
threads = []
|
threads = [TempFileGreedy() for i in range(NUM_THREADS)]
|
||||||
thread_info = threading_setup()
|
with start_threads(threads, startEvent.set):
|
||||||
|
pass
|
||||||
for i in range(NUM_THREADS):
|
ok = sum(t.ok_count for t in threads)
|
||||||
t = TempFileGreedy()
|
errors = [str(t.name) + str(t.errors.getvalue())
|
||||||
threads.append(t)
|
for t in threads if t.error_count]
|
||||||
t.start()
|
|
||||||
|
|
||||||
startEvent.set()
|
|
||||||
|
|
||||||
ok = 0
|
|
||||||
errors = []
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
ok += t.ok_count
|
|
||||||
if t.error_count:
|
|
||||||
errors.append(str(t.name) + str(t.errors.getvalue()))
|
|
||||||
|
|
||||||
threading_cleanup(*thread_info)
|
|
||||||
|
|
||||||
msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
|
msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
|
||||||
'\n'.join(errors))
|
'\n'.join(errors))
|
||||||
self.assertEqual(errors, [], msg)
|
self.assertEqual(errors, [], msg)
|
||||||
self.assertEqual(ok, NUM_THREADS * FILES_PER_THREAD)
|
self.assertEqual(ok, NUM_THREADS * FILES_PER_THREAD)
|
||||||
|
|
||||||
def test_main():
|
|
||||||
run_unittest(ThreadedTempFileTest)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
test_main()
|
unittest.main()
|
||||||
|
|
|
@ -64,14 +64,9 @@ class BaseLocalTest:
|
||||||
# Simply check that the variable is correctly set
|
# Simply check that the variable is correctly set
|
||||||
self.assertEqual(local.x, i)
|
self.assertEqual(local.x, i)
|
||||||
|
|
||||||
threads= []
|
with support.start_threads(threading.Thread(target=f, args=(i,))
|
||||||
for i in range(10):
|
for i in range(10)):
|
||||||
t = threading.Thread(target=f, args=(i,))
|
pass
|
||||||
t.start()
|
|
||||||
threads.append(t)
|
|
||||||
|
|
||||||
for t in threads:
|
|
||||||
t.join()
|
|
||||||
|
|
||||||
def test_derived_cycle_dealloc(self):
|
def test_derived_cycle_dealloc(self):
|
||||||
# http://bugs.python.org/issue6990
|
# http://bugs.python.org/issue6990
|
||||||
|
|
|
@ -137,6 +137,9 @@ Library
|
||||||
Tests
|
Tests
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
- Issue #23799: Added test.support.start_threads() for running and
|
||||||
|
cleaning up multiple threads.
|
||||||
|
|
||||||
- Issue #22390: test.regrtest now emits a warning if temporary files or
|
- Issue #22390: test.regrtest now emits a warning if temporary files or
|
||||||
directories are left after running a test.
|
directories are left after running a test.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue