cpython/Tools/cases_generator/instructions.py

425 lines
14 KiB
Python
Raw Normal View History

import dataclasses
import re
import typing
from flags import InstructionFlags, variable_used_unspecialized
from formatting import (
Formatter,
UNUSED,
string_effect_size,
list_effect_size,
maybe_parenthesize,
)
import lexer as lx
import parsing
from parsing import StackEffect
BITS_PER_CODE_UNIT = 16
@dataclasses.dataclass
class ActiveCacheEffect:
"""Wraps a CacheEffect that is actually used, in context."""
effect: parsing.CacheEffect
offset: int
FORBIDDEN_NAMES_IN_UOPS = (
"resume_with_error",
"kwnames",
"next_instr",
"oparg1", # Proxy for super-instructions like LOAD_FAST_LOAD_FAST
"JUMPBY",
"DISPATCH",
"INSTRUMENTED_JUMP",
"throwflag",
"exception_unwind",
"import_from",
"import_name",
"_PyObject_CallNoArgs", # Proxy for BEFORE_WITH
)
# Interpreter tiers
TIER_ONE: typing.Final = 1 # Specializing adaptive interpreter (PEP 659)
TIER_TWO: typing.Final = 2 # Experimental tracing interpreter
Tiers: typing.TypeAlias = typing.Literal[1, 2]
@dataclasses.dataclass
class Instruction:
"""An instruction with additional data and code."""
# Parts of the underlying instruction definition
inst: parsing.InstDef
kind: typing.Literal["inst", "op"]
name: str
block: parsing.Block
block_text: list[str] # Block.text, less curlies, less PREDICT() calls
block_line: int # First line of block in original code
# Computed by constructor
always_exits: bool
cache_offset: int
cache_effects: list[parsing.CacheEffect]
input_effects: list[StackEffect]
output_effects: list[StackEffect]
unmoved_names: frozenset[str]
instr_fmt: str
instr_flags: InstructionFlags
active_caches: list[ActiveCacheEffect]
# Set later
family: parsing.Family | None = None
predicted: bool = False
def __init__(self, inst: parsing.InstDef):
self.inst = inst
self.kind = inst.kind
self.name = inst.name
self.block = inst.block
self.block_text, self.check_eval_breaker, self.block_line = extract_block_text(
self.block
)
self.always_exits = always_exits(self.block_text)
self.cache_effects = [
effect for effect in inst.inputs if isinstance(effect, parsing.CacheEffect)
]
self.cache_offset = sum(c.size for c in self.cache_effects)
self.input_effects = [
effect for effect in inst.inputs if isinstance(effect, StackEffect)
]
self.output_effects = inst.outputs # For consistency/completeness
unmoved_names: set[str] = set()
for ieffect, oeffect in zip(self.input_effects, self.output_effects):
if ieffect.name == oeffect.name:
unmoved_names.add(ieffect.name)
else:
break
self.unmoved_names = frozenset(unmoved_names)
self.instr_flags = InstructionFlags.fromInstruction(inst)
self.active_caches = []
offset = 0
for effect in self.cache_effects:
if effect.name != UNUSED:
self.active_caches.append(ActiveCacheEffect(effect, offset))
offset += effect.size
if self.instr_flags.HAS_ARG_FLAG:
fmt = "IB"
else:
fmt = "IX"
if offset:
fmt += "C" + "0" * (offset - 1)
self.instr_fmt = fmt
def is_viable_uop(self) -> bool:
"""Whether this instruction is viable as a uop."""
dprint: typing.Callable[..., None] = lambda *args, **kwargs: None
# if self.name.startswith("CALL"):
# dprint = print
if self.name == "EXIT_TRACE":
return True # This has 'return frame' but it's okay
if self.always_exits:
dprint(f"Skipping {self.name} because it always exits")
return False
if len(self.active_caches) > 1:
# print(f"Skipping {self.name} because it has >1 cache entries")
return False
res = True
for forbidden in FORBIDDEN_NAMES_IN_UOPS:
# NOTE: To disallow unspecialized uops, use
# if variable_used(self.inst, forbidden):
if variable_used_unspecialized(self.inst, forbidden):
dprint(f"Skipping {self.name} because it uses {forbidden}")
res = False
return res
def write(self, out: Formatter, tier: Tiers = TIER_ONE) -> None:
"""Write one instruction, sans prologue and epilogue."""
# Write a static assertion that a family's cache size is correct
if family := self.family:
if self.name == family.name:
if cache_size := family.size:
out.emit(
f"static_assert({cache_size} == "
f'{self.cache_offset}, "incorrect cache size");'
)
# Write input stack effect variable declarations and initializations
ieffects = list(reversed(self.input_effects))
for i, ieffect in enumerate(ieffects):
isize = string_effect_size(
list_effect_size([ieff for ieff in ieffects[: i + 1]])
)
if ieffect.size:
src = StackEffect(
f"(stack_pointer - {maybe_parenthesize(isize)})", "PyObject **"
)
elif ieffect.cond:
src = StackEffect(
f"({ieffect.cond}) ? stack_pointer[-{maybe_parenthesize(isize)}] : NULL",
"",
)
else:
src = StackEffect(f"stack_pointer[-{maybe_parenthesize(isize)}]", "")
out.declare(ieffect, src)
# Write output stack effect variable declarations
isize = string_effect_size(list_effect_size(self.input_effects))
input_names = {ieffect.name for ieffect in self.input_effects}
for i, oeffect in enumerate(self.output_effects):
if oeffect.name not in input_names:
if oeffect.size:
osize = string_effect_size(
list_effect_size([oeff for oeff in self.output_effects[:i]])
)
offset = "stack_pointer"
if isize != osize:
if isize != "0":
offset += f" - ({isize})"
if osize != "0":
offset += f" + {osize}"
src = StackEffect(offset, "PyObject **")
out.declare(oeffect, src)
else:
out.declare(oeffect, None)
# out.emit(f"next_instr += OPSIZE({self.inst.name}) - 1;")
self.write_body(out, 0, self.active_caches, tier=tier)
# Skip the rest if the block always exits
if self.always_exits:
return
# Write net stack growth/shrinkage
out.stack_adjust(
[ieff for ieff in self.input_effects],
[oeff for oeff in self.output_effects],
)
# Write output stack effect assignments
oeffects = list(reversed(self.output_effects))
for i, oeffect in enumerate(oeffects):
if oeffect.name in self.unmoved_names:
continue
osize = string_effect_size(
list_effect_size([oeff for oeff in oeffects[: i + 1]])
)
if oeffect.size:
dst = StackEffect(
f"stack_pointer - {maybe_parenthesize(osize)}", "PyObject **"
)
else:
dst = StackEffect(f"stack_pointer[-{maybe_parenthesize(osize)}]", "")
out.assign(dst, oeffect)
# Write cache effect
if tier == TIER_ONE and self.cache_offset:
out.emit(f"next_instr += {self.cache_offset};")
def write_body(
self,
out: Formatter,
dedent: int,
active_caches: list[ActiveCacheEffect],
tier: Tiers = TIER_ONE,
) -> None:
"""Write the instruction body."""
# Write cache effect variable declarations and initializations
for active in active_caches:
ceffect = active.effect
bits = ceffect.size * BITS_PER_CODE_UNIT
if bits == 64:
# NOTE: We assume that 64-bit data in the cache
# is always an object pointer.
# If this becomes false, we need a way to specify
# syntactically what type the cache data is.
typ = "PyObject *"
func = "read_obj"
else:
typ = f"uint{bits}_t "
func = f"read_u{bits}"
if tier == TIER_ONE:
out.emit(
f"{typ}{ceffect.name} = {func}(&next_instr[{active.offset}].cache);"
)
else:
out.emit(f"{typ}{ceffect.name} = ({typ.strip()})operand;")
# Write the body, substituting a goto for ERROR_IF() and other stuff
assert dedent <= 0
extra = " " * -dedent
names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"})
offset = 0
context = self.block.context
assert context is not None and context.owner is not None
filename = context.owner.filename
for line in self.block_text:
out.set_lineno(self.block_line + offset, filename)
offset += 1
if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line):
space, cond, label = m.groups()
space = extra + space
# ERROR_IF() must pop the inputs from the stack.
# The code block is responsible for DECREF()ing them.
# NOTE: If the label doesn't exist, just add it to ceval.c.
# Don't pop common input/output effects at the bottom!
# These aren't DECREF'ed so they can stay.
ieffs = list(self.input_effects)
oeffs = list(self.output_effects)
while ieffs and oeffs and ieffs[0] == oeffs[0]:
ieffs.pop(0)
oeffs.pop(0)
ninputs, symbolic = list_effect_size(ieffs)
if ninputs:
label = f"pop_{ninputs}_{label}"
if symbolic:
out.write_raw(
f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n"
)
else:
out.write_raw(f"{space}if ({cond}) goto {label};\n")
elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line):
out.reset_lineno()
space = extra + m.group(1)
for ieff in self.input_effects:
if ieff.name in names_to_skip:
continue
if ieff.size:
out.write_raw(
f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n"
)
out.write_raw(f"{space} Py_DECREF({ieff.name}[_i]);\n")
out.write_raw(f"{space}}}\n")
else:
decref = "XDECREF" if ieff.cond else "DECREF"
out.write_raw(f"{space}Py_{decref}({ieff.name});\n")
else:
out.write_raw(extra + line)
out.reset_lineno()
InstructionOrCacheEffect = Instruction | parsing.CacheEffect
StackEffectMapping = list[tuple[StackEffect, StackEffect]]
@dataclasses.dataclass
class Component:
instr: Instruction
input_mapping: StackEffectMapping
output_mapping: StackEffectMapping
active_caches: list[ActiveCacheEffect]
def write_body(self, out: Formatter) -> None:
with out.block(""):
input_names = {ieffect.name for _, ieffect in self.input_mapping}
for var, ieffect in self.input_mapping:
out.declare(ieffect, var)
for _, oeffect in self.output_mapping:
if oeffect.name not in input_names:
out.declare(oeffect, None)
self.instr.write_body(out, -4, self.active_caches)
for var, oeffect in self.output_mapping:
out.assign(var, oeffect)
MacroParts = list[Component | parsing.CacheEffect]
@dataclasses.dataclass
class MacroInstruction:
"""A macro instruction."""
name: str
stack: list[StackEffect]
initial_sp: int
final_sp: int
instr_fmt: str
instr_flags: InstructionFlags
macro: parsing.Macro
parts: MacroParts
cache_offset: int
predicted: bool = False
@dataclasses.dataclass
class PseudoInstruction:
"""A pseudo instruction."""
name: str
targets: list[Instruction]
instr_fmt: str
instr_flags: InstructionFlags
@dataclasses.dataclass
class OverriddenInstructionPlaceHolder:
name: str
AnyInstruction = Instruction | MacroInstruction | PseudoInstruction
def extract_block_text(block: parsing.Block) -> tuple[list[str], bool, int]:
# Get lines of text with proper dedent
blocklines = block.text.splitlines(True)
first_token: lx.Token = block.tokens[0] # IndexError means the context is broken
block_line = first_token.begin[0]
# Remove blank lines from both ends
while blocklines and not blocklines[0].strip():
blocklines.pop(0)
block_line += 1
while blocklines and not blocklines[-1].strip():
blocklines.pop()
# Remove leading and trailing braces
assert blocklines and blocklines[0].strip() == "{"
assert blocklines and blocklines[-1].strip() == "}"
blocklines.pop()
blocklines.pop(0)
block_line += 1
# Remove trailing blank lines
while blocklines and not blocklines[-1].strip():
blocklines.pop()
# Separate CHECK_EVAL_BREAKER() macro from end
check_eval_breaker = (
blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();"
)
if check_eval_breaker:
del blocklines[-1]
return blocklines, check_eval_breaker, block_line
def always_exits(lines: list[str]) -> bool:
"""Determine whether a block always ends in a return/goto/etc."""
if not lines:
return False
line = lines[-1].rstrip()
# Indent must match exactly (TODO: Do something better)
if line[:12] != " " * 12:
return False
line = line[12:]
return line.startswith(
(
"goto ",
"return ",
"DISPATCH",
"GO_TO_",
"Py_UNREACHABLE()",
"ERROR_IF(true, ",
)
)