Merge: Use shared testing facilities in test_threading
This commit is contained in:
commit
b596dc7c86
|
@ -1,18 +1,17 @@
|
|||
# Very rudimentary test of threading module
|
||||
|
||||
import test.support
|
||||
from test.support import verbose, strip_python_stderr
|
||||
from test.support import verbose, strip_python_stderr, import_module
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
_thread = test.support.import_module('_thread')
|
||||
threading = test.support.import_module('threading')
|
||||
_thread = import_module('_thread')
|
||||
threading = import_module('threading')
|
||||
import time
|
||||
import unittest
|
||||
import weakref
|
||||
import os
|
||||
import subprocess
|
||||
from test.script_helper import assert_python_ok
|
||||
from test.script_helper import assert_python_ok, assert_python_failure
|
||||
|
||||
from test import lock_tests
|
||||
|
||||
|
@ -163,10 +162,7 @@ class ThreadTests(BaseTestCase):
|
|||
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
|
||||
# exposed at the Python level. This test relies on ctypes to get at it.
|
||||
def test_PyThreadState_SetAsyncExc(self):
|
||||
try:
|
||||
import ctypes
|
||||
except ImportError:
|
||||
raise unittest.SkipTest("cannot import ctypes")
|
||||
ctypes = import_module("ctypes")
|
||||
|
||||
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
|
||||
|
||||
|
@ -269,12 +265,9 @@ class ThreadTests(BaseTestCase):
|
|||
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
|
||||
# very late on python exit: on deallocation of a running thread for
|
||||
# example.
|
||||
try:
|
||||
import ctypes
|
||||
except ImportError:
|
||||
raise unittest.SkipTest("cannot import ctypes")
|
||||
import_module("ctypes")
|
||||
|
||||
rc = subprocess.call([sys.executable, "-c", """if 1:
|
||||
rc, out, err = assert_python_failure("-c", """if 1:
|
||||
import ctypes, sys, time, _thread
|
||||
|
||||
# This lock is used as a simple event variable.
|
||||
|
@ -298,13 +291,13 @@ class ThreadTests(BaseTestCase):
|
|||
_thread.start_new_thread(waitingThread, ())
|
||||
ready.acquire() # Be sure the other thread is waiting.
|
||||
sys.exit(42)
|
||||
"""])
|
||||
""")
|
||||
self.assertEqual(rc, 42)
|
||||
|
||||
def test_finalize_with_trace(self):
|
||||
# Issue1733757
|
||||
# Avoid a deadlock when sys.settrace steps into threading._shutdown
|
||||
p = subprocess.Popen([sys.executable, "-c", """if 1:
|
||||
assert_python_ok("-c", """if 1:
|
||||
import sys, threading
|
||||
|
||||
# A deadlock-killer, to prevent the
|
||||
|
@ -324,21 +317,12 @@ class ThreadTests(BaseTestCase):
|
|||
return func
|
||||
|
||||
sys.settrace(func)
|
||||
"""],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
self.addCleanup(p.stdout.close)
|
||||
self.addCleanup(p.stderr.close)
|
||||
stdout, stderr = p.communicate()
|
||||
rc = p.returncode
|
||||
self.assertFalse(rc == 2, "interpreted was blocked")
|
||||
self.assertTrue(rc == 0,
|
||||
"Unexpected error: " + ascii(stderr))
|
||||
""")
|
||||
|
||||
def test_join_nondaemon_on_shutdown(self):
|
||||
# Issue 1722344
|
||||
# Raising SystemExit skipped threading._shutdown
|
||||
p = subprocess.Popen([sys.executable, "-c", """if 1:
|
||||
rc, out, err = assert_python_ok("-c", """if 1:
|
||||
import threading
|
||||
from time import sleep
|
||||
|
||||
|
@ -350,16 +334,10 @@ class ThreadTests(BaseTestCase):
|
|||
|
||||
threading.Thread(target=child).start()
|
||||
raise SystemExit
|
||||
"""],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
self.addCleanup(p.stdout.close)
|
||||
self.addCleanup(p.stderr.close)
|
||||
stdout, stderr = p.communicate()
|
||||
self.assertEqual(stdout.strip(),
|
||||
""")
|
||||
self.assertEqual(out.strip(),
|
||||
b"Woke up, sleep function is: <built-in function sleep>")
|
||||
stderr = strip_python_stderr(stderr)
|
||||
self.assertEqual(stderr, b"")
|
||||
self.assertEqual(err, b"")
|
||||
|
||||
def test_enumerate_after_join(self):
|
||||
# Try hard to trigger #1703448: a thread is still returned in
|
||||
|
@ -452,13 +430,9 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
|||
sys.stdout.flush()
|
||||
\n""" + script
|
||||
|
||||
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
|
||||
rc = p.wait()
|
||||
data = p.stdout.read().decode().replace('\r', '')
|
||||
p.stdout.close()
|
||||
rc, out, err = assert_python_ok("-c", script)
|
||||
data = out.decode().replace('\r', '')
|
||||
self.assertEqual(data, "end of main\nend of thread\n")
|
||||
self.assertFalse(rc == 2, "interpreter was blocked")
|
||||
self.assertTrue(rc == 0, "Unexpected error")
|
||||
|
||||
def test_1_join_on_shutdown(self):
|
||||
# The usual case: on exit, wait for a non-daemon thread
|
||||
|
@ -518,11 +492,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
|||
self._run_and_join(script)
|
||||
|
||||
def assertScriptHasOutput(self, script, expected_output):
|
||||
p = subprocess.Popen([sys.executable, "-c", script],
|
||||
stdout=subprocess.PIPE)
|
||||
stdout, stderr = p.communicate()
|
||||
data = stdout.decode().replace('\r', '')
|
||||
self.assertEqual(p.returncode, 0, "Unexpected error")
|
||||
rc, out, err = assert_python_ok("-c", script)
|
||||
data = out.decode().replace('\r', '')
|
||||
self.assertEqual(data, expected_output)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
||||
|
|
Loading…
Reference in New Issue