2024-02-28 06:17:34 -04:00
|
|
|
import unittest
|
|
|
|
import os
|
|
|
|
import textwrap
|
|
|
|
import importlib
|
|
|
|
import sys
|
|
|
|
from test.support import os_helper, SHORT_TIMEOUT
|
|
|
|
from test.support.script_helper import make_script
|
|
|
|
|
|
|
|
import subprocess
|
|
|
|
|
|
|
|
PROCESS_VM_READV_SUPPORTED = False
|
|
|
|
|
|
|
|
try:
|
|
|
|
from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
|
|
|
|
from _testexternalinspection import get_stack_trace
|
|
|
|
except ImportError:
|
2024-04-03 10:11:36 -03:00
|
|
|
raise unittest.SkipTest("Test only runs when _testexternalinspection is available")
|
2024-02-28 06:17:34 -04:00
|
|
|
|
|
|
|
def _make_test_script(script_dir, script_basename, source):
|
|
|
|
to_return = make_script(script_dir, script_basename, source)
|
|
|
|
importlib.invalidate_caches()
|
|
|
|
return to_return
|
|
|
|
|
|
|
|
class TestGetStackTrace(unittest.TestCase):
|
|
|
|
|
|
|
|
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
|
|
|
|
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
|
|
|
|
def test_remote_stack_trace(self):
|
|
|
|
# Spawn a process with some realistic Python code
|
|
|
|
script = textwrap.dedent("""\
|
|
|
|
import time, sys, os
|
|
|
|
def bar():
|
|
|
|
for x in range(100):
|
|
|
|
if x == 50:
|
|
|
|
baz()
|
|
|
|
def baz():
|
|
|
|
foo()
|
|
|
|
|
|
|
|
def foo():
|
|
|
|
fifo = sys.argv[1]
|
|
|
|
with open(sys.argv[1], "w") as fifo:
|
|
|
|
fifo.write("ready")
|
|
|
|
time.sleep(1000)
|
|
|
|
|
|
|
|
bar()
|
|
|
|
""")
|
|
|
|
stack_trace = None
|
|
|
|
with os_helper.temp_dir() as work_dir:
|
|
|
|
script_dir = os.path.join(work_dir, "script_pkg")
|
|
|
|
os.mkdir(script_dir)
|
|
|
|
fifo = f"{work_dir}/the_fifo"
|
|
|
|
os.mkfifo(fifo)
|
|
|
|
script_name = _make_test_script(script_dir, 'script', script)
|
|
|
|
try:
|
|
|
|
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
|
|
|
|
with open(fifo, "r") as fifo_file:
|
|
|
|
response = fifo_file.read()
|
|
|
|
self.assertEqual(response, "ready")
|
|
|
|
stack_trace = get_stack_trace(p.pid)
|
|
|
|
except PermissionError:
|
|
|
|
self.skipTest("Insufficient permissions to read the stack trace")
|
|
|
|
finally:
|
|
|
|
os.remove(fifo)
|
|
|
|
p.kill()
|
|
|
|
p.terminate()
|
|
|
|
p.wait(timeout=SHORT_TIMEOUT)
|
|
|
|
|
|
|
|
|
|
|
|
expected_stack_trace = [
|
|
|
|
'foo',
|
|
|
|
'baz',
|
|
|
|
'bar',
|
|
|
|
'<module>'
|
|
|
|
]
|
|
|
|
self.assertEqual(stack_trace, expected_stack_trace)
|
|
|
|
|
|
|
|
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
|
|
|
|
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
|
|
|
|
def test_self_trace(self):
|
|
|
|
stack_trace = get_stack_trace(os.getpid())
|
|
|
|
self.assertEqual(stack_trace[0], "test_self_trace")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|