import re import sys import textwrap import os import unittest from dataclasses import dataclass from functools import cache from test import support from test.support.script_helper import run_python_until_end _strace_binary = "/usr/bin/strace" _syscall_regex = re.compile( r"(?P[^(]*)\((?P[^)]*)\)\s*[=]\s*(?P.+)") _returncode_regex = re.compile( br"\+\+\+ exited with (?P\d+) \+\+\+") @dataclass class StraceEvent: syscall: str args: list[str] returncode: str @dataclass class StraceResult: strace_returncode: int python_returncode: int """The event messages generated by strace. This is very similar to the stderr strace produces with returncode marker section removed.""" event_bytes: bytes stdout: bytes stderr: bytes def events(self): """Parse event_bytes data into system calls for easier processing. This assumes the program under inspection doesn't print any non-utf8 strings which would mix into the strace output.""" decoded_events = self.event_bytes.decode('utf-8') matches = [ _syscall_regex.match(event) for event in decoded_events.splitlines() ] return [ StraceEvent(match["syscall"], [arg.strip() for arg in (match["args"].split(","))], match["returncode"]) for match in matches if match ] def sections(self): """Find all "MARK " writes and use them to make groups of events. This is useful to avoid variable / overhead events, like those at interpreter startup or when opening a file so a test can verify just the small case under study.""" current_section = "__startup" sections = {current_section: []} for event in self.events(): if event.syscall == 'write' and len( event.args) > 2 and event.args[1].startswith("\"MARK "): # Found a new section, don't include the write in the section # but all events until next mark should be in that section current_section = event.args[1].split( " ", 1)[1].removesuffix('\\n"') if current_section not in sections: sections[current_section] = list() else: sections[current_section].append(event) return sections def _filter_memory_call(call): # mmap can operate on a fd or "MAP_ANONYMOUS" which gives a block of memory. # Ignore "MAP_ANONYMOUS + the "MAP_ANON" alias. if call.syscall == "mmap" and "MAP_ANON" in call.args[3]: return True if call.syscall in ("munmap", "mprotect"): return True return False def filter_memory(syscalls): """Filter out memory allocation calls from File I/O calls. Some calls (mmap, munmap, etc) can be used on files or to just get a block of memory. Use this function to filter out the memory related calls from other calls.""" return [call for call in syscalls if not _filter_memory_call(call)] @support.requires_subprocess() def strace_python(code, strace_flags, check=True): """Run strace and return the trace. Sets strace_returncode and python_returncode to `-1` on error.""" res = None def _make_error(reason, details): return StraceResult( strace_returncode=-1, python_returncode=-1, event_bytes=f"error({reason},details={details}) = -1".encode('utf-8'), stdout=res.out if res else b"", stderr=res.err if res else b"") # Run strace, and get out the raw text try: res, cmd_line = run_python_until_end( "-c", textwrap.dedent(code), __run_using_command=[_strace_binary] + strace_flags, ) except OSError as err: return _make_error("Caught OSError", err) if check and res.rc: res.fail(cmd_line) # Get out program returncode stripped = res.err.strip() output = stripped.rsplit(b"\n", 1) if len(output) != 2: return _make_error("Expected strace events and exit code line", stripped[-50:]) returncode_match = _returncode_regex.match(output[1]) if not returncode_match: return _make_error("Expected to find returncode in last line.", output[1][:50]) python_returncode = int(returncode_match["returncode"]) if check and python_returncode: res.fail(cmd_line) return StraceResult(strace_returncode=res.rc, python_returncode=python_returncode, event_bytes=output[0], stdout=res.out, stderr=res.err) def get_events(code, strace_flags, prelude, cleanup): # NOTE: The flush is currently required to prevent the prints from getting # buffered and done all at once at exit prelude = textwrap.dedent(prelude) code = textwrap.dedent(code) cleanup = textwrap.dedent(cleanup) to_run = f""" print("MARK prelude", flush=True) {prelude} print("MARK code", flush=True) {code} print("MARK cleanup", flush=True) {cleanup} print("MARK __shutdown", flush=True) """ trace = strace_python(to_run, strace_flags) all_sections = trace.sections() return all_sections['code'] def get_syscalls(code, strace_flags, prelude="", cleanup="", ignore_memory=True): """Get the syscalls which a given chunk of python code generates""" events = get_events(code, strace_flags, prelude=prelude, cleanup=cleanup) if ignore_memory: events = filter_memory(events) return [ev.syscall for ev in events] # Moderately expensive (spawns a subprocess), so share results when possible. @cache def _can_strace(): res = strace_python("import sys; sys.exit(0)", [], check=False) assert res.events(), "Should have parsed multiple calls" return res.strace_returncode == 0 and res.python_returncode == 0 def requires_strace(): if sys.platform != "linux": return unittest.skip("Linux only, requires strace.") if "LD_PRELOAD" in os.environ: # Distribution packaging (ex. Debian `fakeroot` and Gentoo `sandbox`) # use LD_PRELOAD to intercept system calls, which changes the overall # set of system calls which breaks tests expecting a specific set of # system calls). return unittest.skip("Not supported when LD_PRELOAD is intercepting system calls.") if support.check_sanitizer(address=True, memory=True): return unittest.skip("LeakSanitizer does not work under ptrace (strace, gdb, etc)") return unittest.skipUnless(_can_strace(), "Requires working strace") __all__ = ["filter_memory", "get_events", "get_syscalls", "requires_strace", "strace_python", "StraceEvent", "StraceResult"]