cpython/Tools/cases_generator/generate_cases.py

1260 lines
47 KiB
Python

"""Generate the main interpreter switch.
Reads the instruction definitions from bytecodes.c.
Writes the cases to generated_cases.c.h, which is #included in ceval.c.
"""
import argparse
import contextlib
import dataclasses
import os
import posixpath
import re
import sys
import typing
import lexer as lx
import parser
from parser import StackEffect
HERE = os.path.dirname(__file__)
ROOT = os.path.join(HERE, "../..")
THIS = os.path.relpath(__file__, ROOT).replace(os.path.sep, posixpath.sep)
DEFAULT_INPUT = os.path.relpath(os.path.join(ROOT, "Python/bytecodes.c"))
DEFAULT_OUTPUT = os.path.relpath(os.path.join(ROOT, "Python/generated_cases.c.h"))
DEFAULT_METADATA_OUTPUT = os.path.relpath(
os.path.join(ROOT, "Python/opcode_metadata.h")
)
BEGIN_MARKER = "// BEGIN BYTECODES //"
END_MARKER = "// END BYTECODES //"
RE_PREDICTED = (
r"^\s*(?:PREDICT\(|GO_TO_INSTRUCTION\(|DEOPT_IF\(.*?,\s*)(\w+)\);\s*(?://.*)?$"
)
UNUSED = "unused"
BITS_PER_CODE_UNIT = 16
arg_parser = argparse.ArgumentParser(
description="Generate the code for the interpreter switch.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
arg_parser.add_argument(
"-o", "--output", type=str, help="Generated code", default=DEFAULT_OUTPUT
)
arg_parser.add_argument(
"-m", "--metadata", type=str, help="Generated metadata", default=DEFAULT_METADATA_OUTPUT
)
arg_parser.add_argument(
"-l", "--emit-line-directives", help="Emit #line directives", action="store_true"
)
arg_parser.add_argument(
"input", nargs=argparse.REMAINDER, help="Instruction definition file(s)"
)
def effect_size(effect: StackEffect) -> tuple[int, str]:
"""Return the 'size' impact of a stack effect.
Returns a tuple (numeric, symbolic) where:
- numeric is an int giving the statically analyzable size of the effect
- symbolic is a string representing a variable effect (e.g. 'oparg*2')
At most one of these will be non-zero / non-empty.
"""
if effect.size:
assert not effect.cond, "Array effects cannot have a condition"
return 0, effect.size
elif effect.cond:
return 0, f"{maybe_parenthesize(effect.cond)} ? 1 : 0"
else:
return 1, ""
def maybe_parenthesize(sym: str) -> str:
"""Add parentheses around a string if it contains an operator.
An exception is made for '*' which is common and harmless
in the context where the symbolic size is used.
"""
if re.match(r"^[\s\w*]+$", sym):
return sym
else:
return f"({sym})"
def list_effect_size(effects: list[StackEffect]) -> tuple[int, str]:
numeric = 0
symbolic: list[str] = []
for effect in effects:
diff, sym = effect_size(effect)
numeric += diff
if sym:
symbolic.append(maybe_parenthesize(sym))
return numeric, " + ".join(symbolic)
def string_effect_size(arg: tuple[int, str]) -> str:
numeric, symbolic = arg
if numeric and symbolic:
return f"{numeric} + {symbolic}"
elif symbolic:
return symbolic
else:
return str(numeric)
class Formatter:
"""Wraps an output stream with the ability to indent etc."""
stream: typing.TextIO
prefix: str
emit_line_directives: bool = False
lineno: int # Next line number, 1-based
filename: str # Slightly improved stream.filename
nominal_lineno: int
nominal_filename: str
def __init__(
self, stream: typing.TextIO, indent: int, emit_line_directives: bool = False
) -> None:
self.stream = stream
self.prefix = " " * indent
self.emit_line_directives = emit_line_directives
self.lineno = 1
filename = os.path.relpath(self.stream.name, ROOT)
# Make filename more user-friendly and less platform-specific
filename = filename.replace("\\", "/")
if filename.startswith("./"):
filename = filename[2:]
if filename.endswith(".new"):
filename = filename[:-4]
self.filename = filename
self.nominal_lineno = 1
self.nominal_filename = filename
def write_raw(self, s: str) -> None:
self.stream.write(s)
newlines = s.count("\n")
self.lineno += newlines
self.nominal_lineno += newlines
def emit(self, arg: str) -> None:
if arg:
self.write_raw(f"{self.prefix}{arg}\n")
else:
self.write_raw("\n")
def set_lineno(self, lineno: int, filename: str) -> None:
if self.emit_line_directives:
if lineno != self.nominal_lineno or filename != self.nominal_filename:
self.emit(f'#line {lineno} "{filename}"')
self.nominal_lineno = lineno
self.nominal_filename = filename
def reset_lineno(self) -> None:
if self.lineno != self.nominal_lineno or self.filename != self.nominal_filename:
self.set_lineno(self.lineno + 1, self.filename)
@contextlib.contextmanager
def indent(self):
self.prefix += " "
yield
self.prefix = self.prefix[:-4]
@contextlib.contextmanager
def block(self, head: str):
if head:
self.emit(head + " {")
else:
self.emit("{")
with self.indent():
yield
self.emit("}")
def stack_adjust(
self,
diff: int,
input_effects: list[StackEffect],
output_effects: list[StackEffect],
):
# TODO: Get rid of 'diff' parameter
shrink, isym = list_effect_size(input_effects)
grow, osym = list_effect_size(output_effects)
diff += grow - shrink
if isym and isym != osym:
self.emit(f"STACK_SHRINK({isym});")
if diff < 0:
self.emit(f"STACK_SHRINK({-diff});")
if diff > 0:
self.emit(f"STACK_GROW({diff});")
if osym and osym != isym:
self.emit(f"STACK_GROW({osym});")
def declare(self, dst: StackEffect, src: StackEffect | None):
if dst.name == UNUSED:
return
typ = f"{dst.type}" if dst.type else "PyObject *"
if src:
cast = self.cast(dst, src)
init = f" = {cast}{src.name}"
elif dst.cond:
init = " = NULL"
else:
init = ""
sepa = "" if typ.endswith("*") else " "
self.emit(f"{typ}{sepa}{dst.name}{init};")
def assign(self, dst: StackEffect, src: StackEffect):
if src.name == UNUSED:
return
if src.size:
# Don't write sized arrays -- it's up to the user code.
return
cast = self.cast(dst, src)
if re.match(r"^REG\(oparg(\d+)\)$", dst.name):
self.emit(f"Py_XSETREF({dst.name}, {cast}{src.name});")
else:
stmt = f"{dst.name} = {cast}{src.name};"
if src.cond:
stmt = f"if ({src.cond}) {{ {stmt} }}"
self.emit(stmt)
def cast(self, dst: StackEffect, src: StackEffect) -> str:
return f"({dst.type or 'PyObject *'})" if src.type != dst.type else ""
@dataclasses.dataclass
class Instruction:
"""An instruction with additional data and code."""
# Parts of the underlying instruction definition
inst: parser.InstDef
kind: typing.Literal["inst", "op", "legacy"] # Legacy means no (input -- output)
name: str
block: parser.Block
block_text: list[str] # Block.text, less curlies, less PREDICT() calls
predictions: list[str] # Prediction targets (instruction names)
block_line: int # First line of block in original code
# Computed by constructor
always_exits: bool
cache_offset: int
cache_effects: list[parser.CacheEffect]
input_effects: list[StackEffect]
output_effects: list[StackEffect]
unmoved_names: frozenset[str]
instr_fmt: str
# Set later
family: parser.Family | None = None
predicted: bool = False
def __init__(self, inst: parser.InstDef):
self.inst = inst
self.kind = inst.kind
self.name = inst.name
self.block = inst.block
self.block_text, self.check_eval_breaker, self.predictions, self.block_line = \
extract_block_text(self.block)
self.always_exits = always_exits(self.block_text)
self.cache_effects = [
effect for effect in inst.inputs if isinstance(effect, parser.CacheEffect)
]
self.cache_offset = sum(c.size for c in self.cache_effects)
self.input_effects = [
effect for effect in inst.inputs if isinstance(effect, StackEffect)
]
self.output_effects = inst.outputs # For consistency/completeness
unmoved_names: set[str] = set()
for ieffect, oeffect in zip(self.input_effects, self.output_effects):
if ieffect.name == oeffect.name:
unmoved_names.add(ieffect.name)
else:
break
self.unmoved_names = frozenset(unmoved_names)
if variable_used(inst, "oparg"):
fmt = "IB"
else:
fmt = "IX"
cache = "C"
for ce in self.cache_effects:
for _ in range(ce.size):
fmt += cache
cache = "0"
self.instr_fmt = fmt
def write(self, out: Formatter) -> None:
"""Write one instruction, sans prologue and epilogue."""
# Write a static assertion that a family's cache size is correct
if family := self.family:
if self.name == family.members[0]:
if cache_size := family.size:
out.emit(
f"static_assert({cache_size} == "
f'{self.cache_offset}, "incorrect cache size");'
)
# Write input stack effect variable declarations and initializations
ieffects = list(reversed(self.input_effects))
for i, ieffect in enumerate(ieffects):
isize = string_effect_size(
list_effect_size([ieff for ieff in ieffects[: i + 1]])
)
if ieffect.size:
src = StackEffect(f"(stack_pointer - {maybe_parenthesize(isize)})", "PyObject **")
elif ieffect.cond:
src = StackEffect(f"({ieffect.cond}) ? stack_pointer[-{maybe_parenthesize(isize)}] : NULL", "")
else:
src = StackEffect(f"stack_pointer[-{maybe_parenthesize(isize)}]", "")
out.declare(ieffect, src)
# Write output stack effect variable declarations
isize = string_effect_size(list_effect_size(self.input_effects))
input_names = {ieffect.name for ieffect in self.input_effects}
for i, oeffect in enumerate(self.output_effects):
if oeffect.name not in input_names:
if oeffect.size:
osize = string_effect_size(
list_effect_size([oeff for oeff in self.output_effects[:i]])
)
offset = "stack_pointer"
if isize != osize:
if isize != "0":
offset += f" - ({isize})"
if osize != "0":
offset += f" + {osize}"
src = StackEffect(offset, "PyObject **")
out.declare(oeffect, src)
else:
out.declare(oeffect, None)
# out.emit(f"next_instr += OPSIZE({self.inst.name}) - 1;")
self.write_body(out, 0)
# Skip the rest if the block always exits
if self.always_exits:
return
# Write net stack growth/shrinkage
out.stack_adjust(
0,
[ieff for ieff in self.input_effects],
[oeff for oeff in self.output_effects],
)
# Write output stack effect assignments
oeffects = list(reversed(self.output_effects))
for i, oeffect in enumerate(oeffects):
if oeffect.name in self.unmoved_names:
continue
osize = string_effect_size(
list_effect_size([oeff for oeff in oeffects[: i + 1]])
)
if oeffect.size:
dst = StackEffect(f"stack_pointer - {maybe_parenthesize(osize)}", "PyObject **")
else:
dst = StackEffect(f"stack_pointer[-{maybe_parenthesize(osize)}]", "")
out.assign(dst, oeffect)
# Write cache effect
if self.cache_offset:
out.emit(f"next_instr += {self.cache_offset};")
def write_body(self, out: Formatter, dedent: int, cache_adjust: int = 0) -> None:
"""Write the instruction body."""
# Write cache effect variable declarations and initializations
cache_offset = cache_adjust
for ceffect in self.cache_effects:
if ceffect.name != UNUSED:
bits = ceffect.size * BITS_PER_CODE_UNIT
if bits == 64:
# NOTE: We assume that 64-bit data in the cache
# is always an object pointer.
# If this becomes false, we need a way to specify
# syntactically what type the cache data is.
typ = "PyObject *"
func = "read_obj"
else:
typ = f"uint{bits}_t "
func = f"read_u{bits}"
out.emit(
f"{typ}{ceffect.name} = {func}(&next_instr[{cache_offset}].cache);"
)
cache_offset += ceffect.size
assert cache_offset == self.cache_offset + cache_adjust
# Write the body, substituting a goto for ERROR_IF() and other stuff
assert dedent <= 0
extra = " " * -dedent
names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"})
offset = 0
context = self.block.context
assert context != None
filename = context.owner.filename
for line in self.block_text:
out.set_lineno(self.block_line + offset, filename)
offset += 1
if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line):
space, cond, label = m.groups()
space = extra + space
# ERROR_IF() must pop the inputs from the stack.
# The code block is responsible for DECREF()ing them.
# NOTE: If the label doesn't exist, just add it to ceval.c.
# Don't pop common input/output effects at the bottom!
# These aren't DECREF'ed so they can stay.
ieffs = list(self.input_effects)
oeffs = list(self.output_effects)
while ieffs and oeffs and ieffs[0] == oeffs[0]:
ieffs.pop(0)
oeffs.pop(0)
ninputs, symbolic = list_effect_size(ieffs)
if ninputs:
label = f"pop_{ninputs}_{label}"
if symbolic:
out.write_raw(
f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n"
)
else:
out.write_raw(f"{space}if ({cond}) goto {label};\n")
elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line):
out.reset_lineno()
space = extra + m.group(1)
for ieff in self.input_effects:
if ieff.name in names_to_skip:
continue
if ieff.size:
out.write_raw(
f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n"
)
out.write_raw(f"{space} Py_DECREF({ieff.name}[_i]);\n")
out.write_raw(f"{space}}}\n")
else:
decref = "XDECREF" if ieff.cond else "DECREF"
out.write_raw(f"{space}Py_{decref}({ieff.name});\n")
else:
out.write_raw(extra + line)
out.reset_lineno()
InstructionOrCacheEffect = Instruction | parser.CacheEffect
StackEffectMapping = list[tuple[StackEffect, StackEffect]]
@dataclasses.dataclass
class Component:
instr: Instruction
input_mapping: StackEffectMapping
output_mapping: StackEffectMapping
def write_body(self, out: Formatter, cache_adjust: int) -> None:
with out.block(""):
input_names = {ieffect.name for _, ieffect in self.input_mapping}
for var, ieffect in self.input_mapping:
out.declare(ieffect, var)
for _, oeffect in self.output_mapping:
if oeffect.name not in input_names:
out.declare(oeffect, None)
self.instr.write_body(out, dedent=-4, cache_adjust=cache_adjust)
for var, oeffect in self.output_mapping:
out.assign(var, oeffect)
@dataclasses.dataclass
class SuperOrMacroInstruction:
"""Common fields for super- and macro instructions."""
name: str
stack: list[StackEffect]
initial_sp: int
final_sp: int
instr_fmt: str
@dataclasses.dataclass
class SuperInstruction(SuperOrMacroInstruction):
"""A super-instruction."""
super: parser.Super
parts: list[Component]
@dataclasses.dataclass
class MacroInstruction(SuperOrMacroInstruction):
"""A macro instruction."""
macro: parser.Macro
parts: list[Component | parser.CacheEffect]
@dataclasses.dataclass
class OverriddenInstructionPlaceHolder:
name: str
AnyInstruction = Instruction | SuperInstruction | MacroInstruction
INSTR_FMT_PREFIX = "INSTR_FMT_"
class Analyzer:
"""Parse input, analyze it, and write to output."""
input_filenames: list[str]
output_filename: str
metadata_filename: str
errors: int = 0
emit_line_directives: bool = False
def __init__(self, input_filenames: list[str], output_filename: str, metadata_filename: str):
"""Read the input file."""
self.input_filenames = input_filenames
self.output_filename = output_filename
self.metadata_filename = metadata_filename
def error(self, msg: str, node: parser.Node) -> None:
lineno = 0
filename = "<unknown file>"
if context := node.context:
filename = context.owner.filename
# Use line number of first non-comment in the node
for token in context.owner.tokens[context.begin : context.end]:
lineno = token.line
if token.kind != "COMMENT":
break
print(f"{filename}:{lineno}: {msg}", file=sys.stderr)
self.errors += 1
everything: list[
parser.InstDef | parser.Super | parser.Macro | OverriddenInstructionPlaceHolder
]
instrs: dict[str, Instruction] # Includes ops
supers: dict[str, parser.Super]
super_instrs: dict[str, SuperInstruction]
macros: dict[str, parser.Macro]
macro_instrs: dict[str, MacroInstruction]
families: dict[str, parser.Family]
def parse(self) -> None:
"""Parse the source text.
We only want the parser to see the stuff between the
begin and end markers.
"""
self.everything = []
self.instrs = {}
self.supers = {}
self.macros = {}
self.families = {}
instrs_idx: dict[str, int] = dict()
for filename in self.input_filenames:
self.parse_file(filename, instrs_idx)
files = " + ".join(self.input_filenames)
print(
f"Read {len(self.instrs)} instructions/ops, "
f"{len(self.supers)} supers, {len(self.macros)} macros, "
f"and {len(self.families)} families from {files}",
file=sys.stderr,
)
def parse_file(self, filename: str, instrs_idx: dict[str, int]) -> None:
with open(filename) as file:
src = file.read()
filename = os.path.relpath(filename, ROOT)
# Make filename more user-friendly and less platform-specific
filename = filename.replace("\\", "/")
if filename.startswith("./"):
filename = filename[2:]
psr = parser.Parser(src, filename=filename)
# Skip until begin marker
while tkn := psr.next(raw=True):
if tkn.text == BEGIN_MARKER:
break
else:
raise psr.make_syntax_error(
f"Couldn't find {BEGIN_MARKER!r} in {psr.filename}"
)
start = psr.getpos()
# Find end marker, then delete everything after it
while tkn := psr.next(raw=True):
if tkn.text == END_MARKER:
break
del psr.tokens[psr.getpos() - 1 :]
# Parse from start
psr.setpos(start)
thing: parser.InstDef | parser.Super | parser.Macro | parser.Family | None
thing_first_token = psr.peek()
while thing := psr.definition():
match thing:
case parser.InstDef(name=name):
if name in self.instrs:
if not thing.override:
raise psr.make_syntax_error(
f"Duplicate definition of '{name}' @ {thing.context} "
f"previous definition @ {self.instrs[name].inst.context}",
thing_first_token,
)
self.everything[instrs_idx[name]] = OverriddenInstructionPlaceHolder(name=name)
if name not in self.instrs and thing.override:
raise psr.make_syntax_error(
f"Definition of '{name}' @ {thing.context} is supposed to be "
"an override but no previous definition exists.",
thing_first_token,
)
self.instrs[name] = Instruction(thing)
instrs_idx[name] = len(self.everything)
self.everything.append(thing)
case parser.Super(name):
self.supers[name] = thing
self.everything.append(thing)
case parser.Macro(name):
self.macros[name] = thing
self.everything.append(thing)
case parser.Family(name):
self.families[name] = thing
case _:
typing.assert_never(thing)
if not psr.eof():
raise psr.make_syntax_error(f"Extra stuff at the end of {filename}")
def analyze(self) -> None:
"""Analyze the inputs.
Raises SystemExit if there is an error.
"""
self.find_predictions()
self.analyze_supers_and_macros()
self.map_families()
self.check_families()
def find_predictions(self) -> None:
"""Find the instructions that need PREDICTED() labels."""
for instr in self.instrs.values():
targets = set(instr.predictions)
for line in instr.block_text:
if m := re.match(RE_PREDICTED, line):
targets.add(m.group(1))
for target in targets:
if target_instr := self.instrs.get(target):
target_instr.predicted = True
else:
self.error(
f"Unknown instruction {target!r} predicted in {instr.name!r}",
instr.inst, # TODO: Use better location
)
def map_families(self) -> None:
"""Link instruction names back to their family, if they have one."""
for family in self.families.values():
for member in family.members:
if member_instr := self.instrs.get(member):
if member_instr.family not in (family, None):
self.error(
f"Instruction {member} is a member of multiple families "
f"({member_instr.family.name}, {family.name}).",
family,
)
else:
member_instr.family = family
elif member_macro := self.macro_instrs.get(member):
for part in member_macro.parts:
if isinstance(part, Component):
if part.instr.family not in (family, None):
self.error(
f"Component {part.instr.name} of macro {member} "
f"is a member of multiple families "
f"({part.instr.family.name}, {family.name}).",
family,
)
else:
part.instr.family = family
else:
self.error(
f"Unknown instruction {member!r} referenced in family {family.name!r}",
family,
)
def check_families(self) -> None:
"""Check each family:
- Must have at least 2 members
- All members must be known instructions
- All members must have the same cache, input and output effects
"""
for family in self.families.values():
if len(family.members) < 2:
self.error(f"Family {family.name!r} has insufficient members", family)
members = [
member
for member in family.members
if member in self.instrs or member in self.macro_instrs
]
if members != family.members:
unknown = set(family.members) - set(members)
self.error(
f"Family {family.name!r} has unknown members: {unknown}", family
)
if len(members) < 2:
continue
expected_effects = self.effect_counts(members[0])
for member in members[1:]:
member_effects = self.effect_counts(member)
if member_effects != expected_effects:
self.error(
f"Family {family.name!r} has inconsistent "
f"(cache, input, output) effects:\n"
f" {family.members[0]} = {expected_effects}; "
f"{member} = {member_effects}",
family,
)
def effect_counts(self, name: str) -> tuple[int, int, int]:
if instr := self.instrs.get(name):
cache = instr.cache_offset
input = len(instr.input_effects)
output = len(instr.output_effects)
elif macro := self.macro_instrs.get(name):
cache, input, output = 0, 0, 0
for part in macro.parts:
if isinstance(part, Component):
cache += part.instr.cache_offset
# A component may pop what the previous component pushed,
# so we offset the input/output counts by that.
delta_i = len(part.instr.input_effects)
delta_o = len(part.instr.output_effects)
offset = min(delta_i, output)
input += delta_i - offset
output += delta_o - offset
else:
assert isinstance(part, parser.CacheEffect), part
cache += part.size
else:
assert False, f"Unknown instruction {name!r}"
return cache, input, output
def analyze_supers_and_macros(self) -> None:
"""Analyze each super- and macro instruction."""
self.super_instrs = {}
self.macro_instrs = {}
for name, super in self.supers.items():
self.super_instrs[name] = self.analyze_super(super)
for name, macro in self.macros.items():
self.macro_instrs[name] = self.analyze_macro(macro)
def analyze_super(self, super: parser.Super) -> SuperInstruction:
components = self.check_super_components(super)
stack, initial_sp = self.stack_analysis(components)
sp = initial_sp
parts: list[Component] = []
format = ""
for instr in components:
part, sp = self.analyze_instruction(instr, stack, sp)
parts.append(part)
format += instr.instr_fmt
final_sp = sp
return SuperInstruction(
super.name, stack, initial_sp, final_sp, format, super, parts
)
def analyze_macro(self, macro: parser.Macro) -> MacroInstruction:
components = self.check_macro_components(macro)
stack, initial_sp = self.stack_analysis(components)
sp = initial_sp
parts: list[Component | parser.CacheEffect] = []
format = "IB"
cache = "C"
for component in components:
match component:
case parser.CacheEffect() as ceffect:
parts.append(ceffect)
for _ in range(ceffect.size):
format += cache
cache = "0"
case Instruction() as instr:
part, sp = self.analyze_instruction(instr, stack, sp)
parts.append(part)
for ce in instr.cache_effects:
for _ in range(ce.size):
format += cache
cache = "0"
case _:
typing.assert_never(component)
final_sp = sp
return MacroInstruction(
macro.name, stack, initial_sp, final_sp, format, macro, parts
)
def analyze_instruction(
self, instr: Instruction, stack: list[StackEffect], sp: int
) -> tuple[Component, int]:
input_mapping: StackEffectMapping = []
for ieffect in reversed(instr.input_effects):
sp -= 1
input_mapping.append((stack[sp], ieffect))
output_mapping: StackEffectMapping = []
for oeffect in instr.output_effects:
output_mapping.append((stack[sp], oeffect))
sp += 1
return Component(instr, input_mapping, output_mapping), sp
def check_super_components(self, super: parser.Super) -> list[Instruction]:
components: list[Instruction] = []
for op in super.ops:
if op.name not in self.instrs:
self.error(f"Unknown instruction {op.name!r}", super)
else:
components.append(self.instrs[op.name])
return components
def check_macro_components(
self, macro: parser.Macro
) -> list[InstructionOrCacheEffect]:
components: list[InstructionOrCacheEffect] = []
for uop in macro.uops:
match uop:
case parser.OpName(name):
if name not in self.instrs:
self.error(f"Unknown instruction {name!r}", macro)
components.append(self.instrs[name])
case parser.CacheEffect():
components.append(uop)
case _:
typing.assert_never(uop)
return components
def stack_analysis(
self, components: typing.Iterable[InstructionOrCacheEffect]
) -> tuple[list[StackEffect], int]:
"""Analyze a super-instruction or macro.
Ignore cache effects.
Return the list of variable names and the initial stack pointer.
"""
lowest = current = highest = 0
for thing in components:
match thing:
case Instruction() as instr:
if any(
eff.size for eff in instr.input_effects + instr.output_effects
):
# TODO: Eventually this will be needed, at least for macros.
self.error(
f"Instruction {instr.name!r} has variable-sized stack effect, "
"which are not supported in super- or macro instructions",
instr.inst, # TODO: Pass name+location of super/macro
)
current -= len(instr.input_effects)
lowest = min(lowest, current)
current += len(instr.output_effects)
highest = max(highest, current)
case parser.CacheEffect():
pass
case _:
typing.assert_never(thing)
# At this point, 'current' is the net stack effect,
# and 'lowest' and 'highest' are the extremes.
# Note that 'lowest' may be negative.
# TODO: Reverse the numbering.
stack = [
StackEffect(f"_tmp_{i+1}", "") for i in reversed(range(highest - lowest))
]
return stack, -lowest
def get_stack_effect_info(
self, thing: parser.InstDef | parser.Super | parser.Macro
) -> tuple[AnyInstruction | None, str, str]:
def effect_str(effects: list[StackEffect]) -> str:
if getattr(thing, "kind", None) == "legacy":
return str(-1)
n_effect, sym_effect = list_effect_size(effects)
if sym_effect:
return f"{sym_effect} + {n_effect}" if n_effect else sym_effect
return str(n_effect)
instr: AnyInstruction | None
match thing:
case parser.InstDef():
if thing.kind != "op":
instr = self.instrs[thing.name]
popped = effect_str(instr.input_effects)
pushed = effect_str(instr.output_effects)
else:
instr = None
popped = ""
pushed = ""
case parser.Super():
instr = self.super_instrs[thing.name]
popped = "+".join(
effect_str(comp.instr.input_effects) for comp in instr.parts
)
pushed = "+".join(
effect_str(comp.instr.output_effects) for comp in instr.parts
)
case parser.Macro():
instr = self.macro_instrs[thing.name]
parts = [comp for comp in instr.parts if isinstance(comp, Component)]
popped = "+".join(
effect_str(comp.instr.input_effects) for comp in parts
)
pushed = "+".join(
effect_str(comp.instr.output_effects) for comp in parts
)
case _:
typing.assert_never(thing)
return instr, popped, pushed
def write_stack_effect_functions(self) -> None:
popped_data: list[tuple[AnyInstruction, str]] = []
pushed_data: list[tuple[AnyInstruction, str]] = []
for thing in self.everything:
if isinstance(thing, OverriddenInstructionPlaceHolder):
continue
instr, popped, pushed = self.get_stack_effect_info(thing)
if instr is not None:
popped_data.append((instr, popped))
pushed_data.append((instr, pushed))
def write_function(
direction: str, data: list[tuple[AnyInstruction, str]]
) -> None:
self.out.emit("")
self.out.emit("#ifndef NEED_OPCODE_METADATA")
self.out.emit(f"extern int _PyOpcode_num_{direction}(int opcode, int oparg, bool jump);")
self.out.emit("#else")
self.out.emit("int")
self.out.emit(f"_PyOpcode_num_{direction}(int opcode, int oparg, bool jump) {{")
self.out.emit(" switch(opcode) {")
for instr, effect in data:
self.out.emit(f" case {instr.name}:")
self.out.emit(f" return {effect};")
self.out.emit(" default:")
self.out.emit(" return -1;")
self.out.emit(" }")
self.out.emit("}")
self.out.emit("#endif")
write_function("popped", popped_data)
write_function("pushed", pushed_data)
self.out.emit("")
def from_source_files(self) -> str:
paths = "\n// ".join(
os.path.relpath(filename, ROOT).replace(os.path.sep, posixpath.sep)
for filename in self.input_filenames
)
return f"// from:\n// {paths}\n"
def write_metadata(self) -> None:
"""Write instruction metadata to output file."""
# Compute the set of all instruction formats.
all_formats: set[str] = set()
for thing in self.everything:
match thing:
case OverriddenInstructionPlaceHolder():
continue
case parser.InstDef():
format = self.instrs[thing.name].instr_fmt
case parser.Super():
format = self.super_instrs[thing.name].instr_fmt
case parser.Macro():
format = self.macro_instrs[thing.name].instr_fmt
case _:
typing.assert_never(thing)
all_formats.add(format)
# Turn it into a list of enum definitions.
format_enums = [INSTR_FMT_PREFIX + format for format in sorted(all_formats)]
with open(self.metadata_filename, "w") as f:
# Create formatter
self.out = Formatter(f, 0)
# Write provenance header
self.out.write_raw(f"// This file is generated by {THIS}\n")
self.out.write_raw(self.from_source_files())
self.out.write_raw(f"// Do not edit!\n")
self.write_stack_effect_functions()
# Write type definitions
self.out.emit(f"enum InstructionFormat {{ {', '.join(format_enums)} }};")
self.out.emit("struct opcode_metadata {")
with self.out.indent():
self.out.emit("bool valid_entry;")
self.out.emit("enum InstructionFormat instr_format;")
self.out.emit("};")
self.out.emit("")
# Write metadata array declaration
self.out.emit("#ifndef NEED_OPCODE_METADATA")
self.out.emit("extern const struct opcode_metadata _PyOpcode_opcode_metadata[256];")
self.out.emit("#else")
self.out.emit("const struct opcode_metadata _PyOpcode_opcode_metadata[256] = {")
# Write metadata for each instruction
for thing in self.everything:
match thing:
case OverriddenInstructionPlaceHolder():
continue
case parser.InstDef():
if thing.kind != "op":
self.write_metadata_for_inst(self.instrs[thing.name])
case parser.Super():
self.write_metadata_for_super(self.super_instrs[thing.name])
case parser.Macro():
self.write_metadata_for_macro(self.macro_instrs[thing.name])
case _:
typing.assert_never(thing)
# Write end of array
self.out.emit("};")
self.out.emit("#endif")
def write_metadata_for_inst(self, instr: Instruction) -> None:
"""Write metadata for a single instruction."""
self.out.emit(
f" [{instr.name}] = {{ true, {INSTR_FMT_PREFIX}{instr.instr_fmt} }},"
)
def write_metadata_for_super(self, sup: SuperInstruction) -> None:
"""Write metadata for a super-instruction."""
self.out.emit(
f" [{sup.name}] = {{ true, {INSTR_FMT_PREFIX}{sup.instr_fmt} }},"
)
def write_metadata_for_macro(self, mac: MacroInstruction) -> None:
"""Write metadata for a macro-instruction."""
self.out.emit(
f" [{mac.name}] = {{ true, {INSTR_FMT_PREFIX}{mac.instr_fmt} }},"
)
def write_instructions(self) -> None:
"""Write instructions to output file."""
with open(self.output_filename, "w") as f:
# Create formatter
self.out = Formatter(f, 8, self.emit_line_directives)
# Write provenance header
self.out.write_raw(f"// This file is generated by {THIS}\n")
self.out.write_raw(self.from_source_files())
self.out.write_raw(f"// Do not edit!\n")
# Write and count instructions of all kinds
n_instrs = 0
n_supers = 0
n_macros = 0
for thing in self.everything:
match thing:
case OverriddenInstructionPlaceHolder():
self.write_overridden_instr_place_holder(thing)
case parser.InstDef():
if thing.kind != "op":
n_instrs += 1
self.write_instr(self.instrs[thing.name])
case parser.Super():
n_supers += 1
self.write_super(self.super_instrs[thing.name])
case parser.Macro():
n_macros += 1
self.write_macro(self.macro_instrs[thing.name])
case _:
typing.assert_never(thing)
print(
f"Wrote {n_instrs} instructions, {n_supers} supers, "
f"and {n_macros} macros to {self.output_filename}",
file=sys.stderr,
)
def write_overridden_instr_place_holder(self,
place_holder: OverriddenInstructionPlaceHolder) -> None:
self.out.emit("")
self.out.emit(
f"// TARGET({place_holder.name}) overridden by later definition")
def write_instr(self, instr: Instruction) -> None:
name = instr.name
self.out.emit("")
if instr.inst.override:
self.out.emit("// Override")
with self.out.block(f"TARGET({name})"):
if instr.predicted:
self.out.emit(f"PREDICTED({name});")
instr.write(self.out)
if not instr.always_exits:
for prediction in instr.predictions:
self.out.emit(f"PREDICT({prediction});")
if instr.check_eval_breaker:
self.out.emit("CHECK_EVAL_BREAKER();")
self.out.emit(f"DISPATCH();")
def write_super(self, sup: SuperInstruction) -> None:
"""Write code for a super-instruction."""
with self.wrap_super_or_macro(sup):
first = True
for comp in sup.parts:
if not first:
self.out.emit("oparg = (next_instr++)->op.arg;")
# self.out.emit("next_instr += OPSIZE(opcode) - 1;")
first = False
comp.write_body(self.out, 0)
if comp.instr.cache_offset:
self.out.emit(f"next_instr += {comp.instr.cache_offset};")
def write_macro(self, mac: MacroInstruction) -> None:
"""Write code for a macro instruction."""
last_instr: Instruction | None = None
with self.wrap_super_or_macro(mac):
cache_adjust = 0
for part in mac.parts:
match part:
case parser.CacheEffect(size=size):
cache_adjust += size
case Component() as comp:
last_instr = comp.instr
comp.write_body(self.out, cache_adjust)
cache_adjust += comp.instr.cache_offset
if cache_adjust:
self.out.emit(f"next_instr += {cache_adjust};")
if (
last_instr
and (family := last_instr.family)
and mac.name == family.members[0]
and (cache_size := family.size)
):
self.out.emit(
f"static_assert({cache_size} == "
f'{cache_adjust}, "incorrect cache size");'
)
@contextlib.contextmanager
def wrap_super_or_macro(self, up: SuperOrMacroInstruction):
"""Shared boilerplate for super- and macro instructions."""
# TODO: Somewhere (where?) make it so that if one instruction
# has an output that is input to another, and the variable names
# and types match and don't conflict with other instructions,
# that variable is declared with the right name and type in the
# outer block, rather than trusting the compiler to optimize it.
self.out.emit("")
with self.out.block(f"TARGET({up.name})"):
for i, var in reversed(list(enumerate(up.stack))):
src = None
if i < up.initial_sp:
src = StackEffect(f"stack_pointer[-{up.initial_sp - i}]", "")
self.out.declare(var, src)
yield
# TODO: Use slices of up.stack instead of numeric values
self.out.stack_adjust(up.final_sp - up.initial_sp, [], [])
for i, var in enumerate(reversed(up.stack[: up.final_sp]), 1):
dst = StackEffect(f"stack_pointer[-{i}]", "")
self.out.assign(dst, var)
self.out.emit(f"DISPATCH();")
def extract_block_text(block: parser.Block) -> tuple[list[str], bool, list[str], int]:
# Get lines of text with proper dedent
blocklines = block.text.splitlines(True)
first_token: lx.Token = block.tokens[0] # IndexError means the context is broken
block_line = first_token.begin[0]
# Remove blank lines from both ends
while blocklines and not blocklines[0].strip():
blocklines.pop(0)
block_line += 1
while blocklines and not blocklines[-1].strip():
blocklines.pop()
# Remove leading and trailing braces
assert blocklines and blocklines[0].strip() == "{"
assert blocklines and blocklines[-1].strip() == "}"
blocklines.pop()
blocklines.pop(0)
block_line += 1
# Remove trailing blank lines
while blocklines and not blocklines[-1].strip():
blocklines.pop()
# Separate CHECK_EVAL_BREAKER() macro from end
check_eval_breaker = \
blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();"
if check_eval_breaker:
del blocklines[-1]
# Separate PREDICT(...) macros from end
predictions: list[str] = []
while blocklines and (
m := re.match(r"^\s*PREDICT\((\w+)\);\s*(?://.*)?$", blocklines[-1])
):
predictions.insert(0, m.group(1))
blocklines.pop()
return blocklines, check_eval_breaker, predictions, block_line
def always_exits(lines: list[str]) -> bool:
"""Determine whether a block always ends in a return/goto/etc."""
if not lines:
return False
line = lines[-1].rstrip()
# Indent must match exactly (TODO: Do something better)
if line[:12] != " " * 12:
return False
line = line[12:]
return line.startswith(
(
"goto ",
"return ",
"DISPATCH",
"GO_TO_",
"Py_UNREACHABLE()",
"ERROR_IF(true, ",
)
)
def variable_used(node: parser.Node, name: str) -> bool:
"""Determine whether a variable with a given name is used in a node."""
return any(
token.kind == "IDENTIFIER" and token.text == name for token in node.tokens
)
def main():
"""Parse command line, parse input, analyze, write output."""
args = arg_parser.parse_args() # Prints message and sys.exit(2) on error
if len(args.input) == 0:
args.input.append(DEFAULT_INPUT)
a = Analyzer(args.input, args.output, args.metadata) # Raises OSError if input unreadable
if args.emit_line_directives:
a.emit_line_directives = True
a.parse() # Raises SyntaxError on failure
a.analyze() # Prints messages and sets a.errors on failure
if a.errors:
sys.exit(f"Found {a.errors} errors")
a.write_instructions() # Raises OSError if output can't be written
a.write_metadata()
if __name__ == "__main__":
main()