cpython/Lib/test/test_perf_profiler.py

533 lines
17 KiB
Python
Raw Normal View History

import unittest
import string
import subprocess
import sys
import sysconfig
import os
import pathlib
from test import support
from test.support.script_helper import (
make_script,
assert_python_failure,
assert_python_ok,
)
from test.support.os_helper import temp_dir
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
if support.check_sanitizer(address=True, memory=True, ub=True):
# gh-109580: Skip the test because it does crash randomly if Python is
# built with ASAN.
raise unittest.SkipTest("test crash randomly on ASAN/MSAN/UBSAN build")
def supports_trampoline_profiling():
perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
if not perf_trampoline:
return False
return int(perf_trampoline) == 1
if not supports_trampoline_profiling():
raise unittest.SkipTest("perf trampoline profiling not supported")
class TestPerfTrampoline(unittest.TestCase):
def setUp(self):
super().setUp()
self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
def tearDown(self) -> None:
super().tearDown()
files_to_delete = (
set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
)
for file in files_to_delete:
file.unlink()
def test_trampoline_works(self):
code = """if 1:
def foo():
pass
def bar():
foo()
def baz():
bar()
baz()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
env = {**os.environ, "PYTHON_JIT": "0"}
with subprocess.Popen(
[sys.executable, "-Xperf", script],
text=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(stderr, "")
self.assertEqual(stdout, "")
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
self.assertTrue(perf_file.exists())
perf_file_contents = perf_file.read_text()
perf_lines = perf_file_contents.splitlines()
expected_symbols = [
f"py::foo:{script}",
f"py::bar:{script}",
f"py::baz:{script}",
]
for expected_symbol in expected_symbols:
perf_line = next(
(line for line in perf_lines if expected_symbol in line), None
)
self.assertIsNotNone(
perf_line, f"Could not find {expected_symbol} in perf file"
)
perf_addr = perf_line.split(" ")[0]
self.assertFalse(
perf_addr.startswith("0x"), "Address should not be prefixed with 0x"
)
self.assertTrue(
set(perf_addr).issubset(string.hexdigits),
"Address should contain only hex characters",
)
def test_trampoline_works_with_forks(self):
code = """if 1:
import os, sys
def foo_fork():
pass
def bar_fork():
foo_fork()
def baz_fork():
bar_fork()
def foo():
pid = os.fork()
if pid == 0:
print(os.getpid())
baz_fork()
else:
_, status = os.waitpid(-1, 0)
sys.exit(status)
def bar():
foo()
def baz():
bar()
baz()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
env = {**os.environ, "PYTHON_JIT": "0"}
with subprocess.Popen(
[sys.executable, "-Xperf", script],
text=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(process.returncode, 0)
self.assertEqual(stderr, "")
child_pid = int(stdout.strip())
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
self.assertTrue(perf_file.exists())
self.assertTrue(perf_child_file.exists())
perf_file_contents = perf_file.read_text()
self.assertIn(f"py::foo:{script}", perf_file_contents)
self.assertIn(f"py::bar:{script}", perf_file_contents)
self.assertIn(f"py::baz:{script}", perf_file_contents)
child_perf_file_contents = perf_child_file.read_text()
self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
def test_sys_api(self):
code = """if 1:
import sys
def foo():
pass
def spam():
pass
def bar():
sys.deactivate_stack_trampoline()
foo()
sys.activate_stack_trampoline("perf")
spam()
def baz():
bar()
sys.activate_stack_trampoline("perf")
baz()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
env = {**os.environ, "PYTHON_JIT": "0"}
with subprocess.Popen(
[sys.executable, script],
text=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(stderr, "")
self.assertEqual(stdout, "")
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
self.assertTrue(perf_file.exists())
perf_file_contents = perf_file.read_text()
self.assertNotIn(f"py::foo:{script}", perf_file_contents)
self.assertIn(f"py::spam:{script}", perf_file_contents)
self.assertIn(f"py::bar:{script}", perf_file_contents)
self.assertIn(f"py::baz:{script}", perf_file_contents)
def test_sys_api_with_existing_trampoline(self):
code = """if 1:
import sys
sys.activate_stack_trampoline("perf")
sys.activate_stack_trampoline("perf")
"""
assert_python_ok("-c", code)
def test_sys_api_with_invalid_trampoline(self):
code = """if 1:
import sys
sys.activate_stack_trampoline("invalid")
"""
rc, out, err = assert_python_failure("-c", code)
self.assertIn("invalid backend: invalid", err.decode())
def test_sys_api_get_status(self):
code = """if 1:
import sys
sys.activate_stack_trampoline("perf")
assert sys.is_stack_trampoline_active() is True
sys.deactivate_stack_trampoline()
assert sys.is_stack_trampoline_active() is False
"""
assert_python_ok("-c", code)
def is_unwinding_reliable_with_frame_pointers():
cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
if not cflags:
return False
return "no-omit-frame-pointer" in cflags
def perf_command_works():
try:
cmd = ["perf", "--help"]
stdout = subprocess.check_output(cmd, text=True)
except (subprocess.SubprocessError, OSError):
return False
# perf version does not return a version number on Fedora. Use presence
# of "perf.data" in help as indicator that it's perf from Linux tools.
if "perf.data" not in stdout:
return False
# Check that we can run a simple perf run
with temp_dir() as script_dir:
try:
output_file = script_dir + "/perf_output.perf"
cmd = (
"perf",
"record",
"-g",
"--call-graph=fp",
"-o",
output_file,
"--",
sys.executable,
"-c",
'print("hello")',
)
env = {**os.environ, "PYTHON_JIT": "0"}
stdout = subprocess.check_output(
cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT, env=env
)
except (subprocess.SubprocessError, OSError):
return False
if "hello" not in stdout:
return False
return True
def run_perf(cwd, *args, use_jit=False, **env_vars):
env = os.environ.copy()
if env_vars:
env.update(env_vars)
env["PYTHON_JIT"] = "0"
output_file = cwd + "/perf_output.perf"
if not use_jit:
base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
else:
base_cmd = (
"perf",
"record",
"-g",
"--call-graph=dwarf,65528",
"-F99",
"-k1",
"-o",
output_file,
"--",
)
proc = subprocess.run(
base_cmd + args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
if proc.returncode:
print(proc.stderr, file=sys.stderr)
raise ValueError(f"Perf failed with return code {proc.returncode}")
if use_jit:
jit_output_file = cwd + "/jit_output.dump"
command = ("perf", "inject", "-j", "-i", output_file, "-o", jit_output_file)
proc = subprocess.run(
command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env
)
if proc.returncode:
print(proc.stderr)
raise ValueError(f"Perf failed with return code {proc.returncode}")
# Copy the jit_output_file to the output_file
os.rename(jit_output_file, output_file)
base_cmd = ("perf", "script")
proc = subprocess.run(
("perf", "script", "-i", output_file),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
check=True,
)
return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
"utf-8", "replace"
)
class TestPerfProfilerMixin:
def run_perf(self, script_dir, perf_mode, script):
raise NotImplementedError()
def test_python_calls_appear_in_the_stack_if_perf_activated(self):
with temp_dir() as script_dir:
code = """if 1:
def foo(n):
x = 0
for i in range(n):
x += i
def bar(n):
foo(n)
def baz(n):
bar(n)
baz(10000000)
"""
script = make_script(script_dir, "perftest", code)
stdout, stderr = self.run_perf(script_dir, script)
self.assertEqual(stderr, "")
self.assertIn(f"py::foo:{script}", stdout)
self.assertIn(f"py::bar:{script}", stdout)
self.assertIn(f"py::baz:{script}", stdout)
def test_python_calls_do_not_appear_in_the_stack_if_perf_deactivated(self):
with temp_dir() as script_dir:
code = """if 1:
def foo(n):
x = 0
for i in range(n):
x += i
def bar(n):
foo(n)
def baz(n):
bar(n)
baz(10000000)
"""
script = make_script(script_dir, "perftest", code)
stdout, stderr = self.run_perf(
script_dir, script, activate_trampoline=False
)
self.assertEqual(stderr, "")
self.assertNotIn(f"py::foo:{script}", stdout)
self.assertNotIn(f"py::bar:{script}", stdout)
self.assertNotIn(f"py::baz:{script}", stdout)
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
@unittest.skipUnless(
is_unwinding_reliable_with_frame_pointers(),
"Unwinding is unreliable with frame pointers",
)
class TestPerfProfiler(unittest.TestCase, TestPerfProfilerMixin):
def run_perf(self, script_dir, script, activate_trampoline=True):
if activate_trampoline:
return run_perf(script_dir, sys.executable, "-Xperf", script)
return run_perf(script_dir, sys.executable, script)
def setUp(self):
super().setUp()
self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
def tearDown(self) -> None:
super().tearDown()
files_to_delete = (
set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
)
for file in files_to_delete:
file.unlink()
def test_pre_fork_compile(self):
code = """if 1:
import sys
import os
import sysconfig
from _testinternalcapi import (
compile_perf_trampoline_entry,
perf_trampoline_set_persist_after_fork,
)
def foo_fork():
pass
def bar_fork():
foo_fork()
def foo():
import time; time.sleep(1)
def bar():
foo()
def compile_trampolines_for_all_functions():
perf_trampoline_set_persist_after_fork(1)
for _, obj in globals().items():
if callable(obj) and hasattr(obj, '__code__'):
compile_perf_trampoline_entry(obj.__code__)
if __name__ == "__main__":
compile_trampolines_for_all_functions()
pid = os.fork()
if pid == 0:
print(os.getpid())
bar_fork()
else:
bar()
"""
with temp_dir() as script_dir:
script = make_script(script_dir, "perftest", code)
env = {**os.environ, "PYTHON_JIT": "0"}
with subprocess.Popen(
[sys.executable, "-Xperf", script],
universal_newlines=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
) as process:
stdout, stderr = process.communicate()
self.assertEqual(process.returncode, 0)
self.assertNotIn("Error:", stderr)
child_pid = int(stdout.strip())
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
self.assertTrue(perf_file.exists())
self.assertTrue(perf_child_file.exists())
perf_file_contents = perf_file.read_text()
self.assertIn(f"py::foo:{script}", perf_file_contents)
self.assertIn(f"py::bar:{script}", perf_file_contents)
self.assertIn(f"py::foo_fork:{script}", perf_file_contents)
self.assertIn(f"py::bar_fork:{script}", perf_file_contents)
child_perf_file_contents = perf_child_file.read_text()
self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
# Pre-compiled perf-map entries of a forked process must be
# identical in both the parent and child perf-map files.
perf_file_lines = perf_file_contents.split("\n")
for line in perf_file_lines:
if f"py::foo_fork:{script}" in line or f"py::bar_fork:{script}" in line:
self.assertIn(line, child_perf_file_contents)
def _is_perf_version_at_least(major, minor):
# The output of perf --version looks like "perf version 6.7-3" but
# it can also be perf version "perf version 5.15.143"
try:
output = subprocess.check_output(["perf", "--version"], text=True)
except (subprocess.CalledProcessError, FileNotFoundError):
return False
version = output.split()[2]
version = version.split("-")[0]
version = version.split(".")
version = tuple(map(int, version))
return version >= (major, minor)
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
@unittest.skipUnless(
_is_perf_version_at_least(6, 6), "perf command may not work due to a perf bug"
)
class TestPerfProfilerWithDwarf(unittest.TestCase, TestPerfProfilerMixin):
def run_perf(self, script_dir, script, activate_trampoline=True):
if activate_trampoline:
return run_perf(
script_dir, sys.executable, "-Xperf_jit", script, use_jit=True
)
return run_perf(script_dir, sys.executable, script, use_jit=True)
def setUp(self):
super().setUp()
self.perf_files = set(pathlib.Path("/tmp/").glob("jit*.dump"))
self.perf_files |= set(pathlib.Path("/tmp/").glob("jitted-*.so"))
def tearDown(self) -> None:
super().tearDown()
files_to_delete = set(pathlib.Path("/tmp/").glob("jit*.dump"))
files_to_delete |= set(pathlib.Path("/tmp/").glob("jitted-*.so"))
files_to_delete = files_to_delete - self.perf_files
for file in files_to_delete:
file.unlink()
if __name__ == "__main__":
unittest.main()