cpython/Lib/test/test_perf_profiler.py

435 lines
14 KiB
Python
Raw Normal View History

import unittest
import string
import subprocess
import sys
import sysconfig
import os
import pathlib
from test import support
from test.support.script_helper import (
make_script,
assert_python_failure,
assert_python_ok,
)
from test.support.os_helper import temp_dir
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
if support.check_sanitizer(address=True, memory=True, ub=True):
# gh-109580: Skip the test because it does crash randomly if Python is
# built with ASAN.
raise unittest.SkipTest("test crash randomly on ASAN/MSAN/UBSAN build")
def supports_trampoline_profiling():
perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
if not perf_trampoline:
return False
return int(perf_trampoline) == 1
if not supports_trampoline_profiling():
raise unittest.SkipTest("perf trampoline profiling not supported")
class TestPerfTrampoline(unittest.TestCase):
def setUp(self):
super().setUp()
self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
def tearDown(self) -> None:
super().tearDown()
files_to_delete = (
set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
)
for file in files_to_delete:
file.unlink()
def test_trampoline_works(self):
code = """if 1:
def foo():
pass
def bar():
foo()
def baz():
bar()
baz()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
with subprocess.Popen(
[sys.executable, "-Xperf", script],
text=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(stderr, "")
self.assertEqual(stdout, "")
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
self.assertTrue(perf_file.exists())
perf_file_contents = perf_file.read_text()
perf_lines = perf_file_contents.splitlines();
expected_symbols = [f"py::foo:{script}", f"py::bar:{script}", f"py::baz:{script}"]
for expected_symbol in expected_symbols:
perf_line = next((line for line in perf_lines if expected_symbol in line), None)
self.assertIsNotNone(perf_line, f"Could not find {expected_symbol} in perf file")
perf_addr = perf_line.split(" ")[0]
self.assertFalse(perf_addr.startswith("0x"), "Address should not be prefixed with 0x")
self.assertTrue(set(perf_addr).issubset(string.hexdigits), "Address should contain only hex characters")
def test_trampoline_works_with_forks(self):
code = """if 1:
import os, sys
def foo_fork():
pass
def bar_fork():
foo_fork()
def baz_fork():
bar_fork()
def foo():
pid = os.fork()
if pid == 0:
print(os.getpid())
baz_fork()
else:
_, status = os.waitpid(-1, 0)
sys.exit(status)
def bar():
foo()
def baz():
bar()
baz()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
with subprocess.Popen(
[sys.executable, "-Xperf", script],
text=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(process.returncode, 0)
self.assertEqual(stderr, "")
child_pid = int(stdout.strip())
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
self.assertTrue(perf_file.exists())
self.assertTrue(perf_child_file.exists())
perf_file_contents = perf_file.read_text()
self.assertIn(f"py::foo:{script}", perf_file_contents)
self.assertIn(f"py::bar:{script}", perf_file_contents)
self.assertIn(f"py::baz:{script}", perf_file_contents)
child_perf_file_contents = perf_child_file.read_text()
self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
def test_sys_api(self):
code = """if 1:
import sys
def foo():
pass
def spam():
pass
def bar():
sys.deactivate_stack_trampoline()
foo()
sys.activate_stack_trampoline("perf")
spam()
def baz():
bar()
sys.activate_stack_trampoline("perf")
baz()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
with subprocess.Popen(
[sys.executable, script],
text=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(stderr, "")
self.assertEqual(stdout, "")
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
self.assertTrue(perf_file.exists())
perf_file_contents = perf_file.read_text()
self.assertNotIn(f"py::foo:{script}", perf_file_contents)
self.assertIn(f"py::spam:{script}", perf_file_contents)
self.assertIn(f"py::bar:{script}", perf_file_contents)
self.assertIn(f"py::baz:{script}", perf_file_contents)
def test_sys_api_with_existing_trampoline(self):
code = """if 1:
import sys
sys.activate_stack_trampoline("perf")
sys.activate_stack_trampoline("perf")
"""
assert_python_ok("-c", code)
def test_sys_api_with_invalid_trampoline(self):
code = """if 1:
import sys
sys.activate_stack_trampoline("invalid")
"""
rc, out, err = assert_python_failure("-c", code)
self.assertIn("invalid backend: invalid", err.decode())
def test_sys_api_get_status(self):
code = """if 1:
import sys
sys.activate_stack_trampoline("perf")
assert sys.is_stack_trampoline_active() is True
sys.deactivate_stack_trampoline()
assert sys.is_stack_trampoline_active() is False
"""
assert_python_ok("-c", code)
def is_unwinding_reliable():
cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
if not cflags:
return False
return "no-omit-frame-pointer" in cflags
def perf_command_works():
try:
cmd = ["perf", "--help"]
stdout = subprocess.check_output(cmd, text=True)
except (subprocess.SubprocessError, OSError):
return False
# perf version does not return a version number on Fedora. Use presence
# of "perf.data" in help as indicator that it's perf from Linux tools.
if "perf.data" not in stdout:
return False
# Check that we can run a simple perf run
with temp_dir() as script_dir:
try:
output_file = script_dir + "/perf_output.perf"
cmd = (
"perf",
"record",
"-g",
"--call-graph=fp",
"-o",
output_file,
"--",
sys.executable,
"-c",
'print("hello")',
)
stdout = subprocess.check_output(
cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT
)
except (subprocess.SubprocessError, OSError):
return False
if "hello" not in stdout:
return False
return True
def run_perf(cwd, *args, **env_vars):
if env_vars:
env = os.environ.copy()
env.update(env_vars)
else:
env = None
output_file = cwd + "/perf_output.perf"
base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
proc = subprocess.run(
base_cmd + args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
if proc.returncode:
print(proc.stderr)
raise ValueError(f"Perf failed with return code {proc.returncode}")
base_cmd = ("perf", "script")
proc = subprocess.run(
("perf", "script", "-i", output_file),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
check=True,
)
return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
"utf-8", "replace"
)
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
@unittest.skipUnless(is_unwinding_reliable(), "Unwinding is unreliable")
class TestPerfProfiler(unittest.TestCase):
def setUp(self):
super().setUp()
self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
def tearDown(self) -> None:
super().tearDown()
files_to_delete = (
set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
)
for file in files_to_delete:
file.unlink()
def test_python_calls_appear_in_the_stack_if_perf_activated(self):
with temp_dir() as script_dir:
code = """if 1:
def foo(n):
x = 0
for i in range(n):
x += i
def bar(n):
foo(n)
def baz(n):
bar(n)
baz(10000000)
"""
script = make_script(script_dir, "perftest", code)
stdout, stderr = run_perf(script_dir, sys.executable, "-Xperf", script)
self.assertEqual(stderr, "")
self.assertIn(f"py::foo:{script}", stdout)
self.assertIn(f"py::bar:{script}", stdout)
self.assertIn(f"py::baz:{script}", stdout)
def test_python_calls_do_not_appear_in_the_stack_if_perf_activated(self):
with temp_dir() as script_dir:
code = """if 1:
def foo(n):
x = 0
for i in range(n):
x += i
def bar(n):
foo(n)
def baz(n):
bar(n)
baz(10000000)
"""
script = make_script(script_dir, "perftest", code)
stdout, stderr = run_perf(script_dir, sys.executable, script)
self.assertEqual(stderr, "")
self.assertNotIn(f"py::foo:{script}", stdout)
self.assertNotIn(f"py::bar:{script}", stdout)
self.assertNotIn(f"py::baz:{script}", stdout)
def test_pre_fork_compile(self):
code = """if 1:
import sys
import os
import sysconfig
from _testinternalcapi import (
compile_perf_trampoline_entry,
perf_trampoline_set_persist_after_fork,
)
def foo_fork():
pass
def bar_fork():
foo_fork()
def foo():
pass
def bar():
foo()
def compile_trampolines_for_all_functions():
perf_trampoline_set_persist_after_fork(1)
for _, obj in globals().items():
if callable(obj) and hasattr(obj, '__code__'):
compile_perf_trampoline_entry(obj.__code__)
if __name__ == "__main__":
compile_trampolines_for_all_functions()
pid = os.fork()
if pid == 0:
print(os.getpid())
bar_fork()
else:
bar()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
with subprocess.Popen(
[sys.executable, "-Xperf", script],
universal_newlines=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(process.returncode, 0)
self.assertNotIn("Error:", stderr)
child_pid = int(stdout.strip())
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
self.assertTrue(perf_file.exists())
self.assertTrue(perf_child_file.exists())
perf_file_contents = perf_file.read_text()
self.assertIn(f"py::foo:{script}", perf_file_contents)
self.assertIn(f"py::bar:{script}", perf_file_contents)
self.assertIn(f"py::foo_fork:{script}", perf_file_contents)
self.assertIn(f"py::bar_fork:{script}", perf_file_contents)
child_perf_file_contents = perf_child_file.read_text()
self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
# Pre-compiled perf-map entries of a forked process must be
# identical in both the parent and child perf-map files.
perf_file_lines = perf_file_contents.split("\n")
for line in perf_file_lines:
if (
f"py::foo_fork:{script}" in line
or f"py::bar_fork:{script}" in line
):
self.assertIn(line, child_perf_file_contents)
if __name__ == "__main__":
unittest.main()