gh-105191: Cleanup peg generator; keep only necessary files (#105197)

This commit is contained in:
Lysandros Nikolaou 2023-06-01 17:24:15 +02:00 committed by GitHub
parent c67121ac6b
commit a241003d04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 60 additions and 16832 deletions

View File

@ -1,4 +1,2 @@
peg_extension/parse.c
data/xxl.py
venv/
@data

View File

@ -8,97 +8,40 @@ endif
VENVDIR ?= ./venv
VENVPYTHON ?= $(VENVDIR)/bin/python
CPYTHON ?= ../../Lib
MYPY ?= mypy
MYPY ?= ./venv/bin/mypy
BLACK ?= ./venv/bin/black
GRAMMAR = ../../Grammar/python.gram
TOKENS = ../../Grammar/Tokens
TESTFILE = data/cprog.py
TIMEFILE = data/xxl.py
TESTDIR = .
TESTFLAGS = --short
data/xxl.py:
$(PYTHON) -m zipfile -e data/xxl.zip data
build: peg_extension/parse.c
peg_extension/parse.c: $(GRAMMAR) $(TOKENS) pegen/*.py peg_extension/peg_extension.c ../../Parser/pegen.c ../../Parser/pegen_errors.c ../../Parser/string_parser.c ../../Parser/action_helpers.c ../../Parser/*.h pegen/grammar_parser.py
$(PYTHON) -m pegen -q c $(GRAMMAR) $(TOKENS) -o peg_extension/parse.c --compile-extension
$(VENVPYTHON) -m pegen -q c $(GRAMMAR) $(TOKENS) -o peg_extension/parse.c --compile-extension
clean:
-rm -f peg_extension/*.o peg_extension/*.so peg_extension/parse.c
-rm -f data/xxl.py
-rm -rf $(VENVDIR)
dump: peg_extension/parse.c
cat -n $(TESTFILE)
$(PYTHON) -c "from peg_extension import parse; import ast; t = parse.parse_file('$(TESTFILE)', mode=1); print(ast.dump(t))"
regen-metaparser: pegen/metagrammar.gram pegen/*.py
$(PYTHON) -m pegen -q python pegen/metagrammar.gram -o pegen/grammar_parser.py
$(VENVPYTHON) -m pegen -q python pegen/metagrammar.gram -o pegen/grammar_parser.py
# Note: These targets really depend on the generated shared object in peg_extension/parse.*.so but
# this has different names in different systems so we are abusing the implicit dependency on
# parse.c by the use of --compile-extension.
.PHONY: test
venv:
$(PYTHON) -m venv $(VENVDIR)
$(VENVPYTHON) -m pip install -U pip setuptools
$(VENVPYTHON) -m pip install -r requirements.pip
$(VENVPYTHON) -m pip install -U pip setuptools black mypy
@echo "The venv has been created in the $(VENVDIR) directory"
test: run
run: peg_extension/parse.c
$(PYTHON) -c "from peg_extension import parse; t = parse.parse_file('$(TESTFILE)'); exec(t)"
compile: peg_extension/parse.c
$(PYTHON) -c "from peg_extension import parse; t = parse.parse_file('$(TESTFILE)', mode=2)"
parse: peg_extension/parse.c
$(PYTHON) -c "from peg_extension import parse; t = parse.parse_file('$(TESTFILE)', mode=1)"
check: peg_extension/parse.c
$(PYTHON) -c "from peg_extension import parse; t = parse.parse_file('$(TESTFILE)', mode=0)"
stats: peg_extension/parse.c data/xxl.py
$(PYTHON) -c "from peg_extension import parse; t = parse.parse_file('$(TIMEFILE)', mode=0); parse.dump_memo_stats()" >@data
$(PYTHON) scripts/joinstats.py @data
time: time_compile
time_compile: venv data/xxl.py
$(VENVPYTHON) scripts/benchmark.py --target=xxl compile
time_parse: venv data/xxl.py
$(VENVPYTHON) scripts/benchmark.py --target=xxl parse
time_peg_dir: venv
$(VENVPYTHON) scripts/test_parse_directory.py \
-d $(TESTDIR) \
$(TESTFLAGS) \
--exclude "*/failset/*" \
--exclude "*/failset/**" \
--exclude "*/failset/**/*"
time_stdlib: $(CPYTHON) venv
$(VENVPYTHON) scripts/test_parse_directory.py \
-d $(CPYTHON) \
$(TESTFLAGS) \
--exclude "*/bad*"
mypy: regen-metaparser
$(MYPY) # For list of files, see mypy.ini
format-python:
black pegen scripts
$(BLACK) pegen
format: format-python
find_max_nesting:
$(PYTHON) scripts/find_max_nesting.py
format: venv format-python
tags: TAGS

View File

@ -1,11 +0,0 @@
if 1:
print("Hello " + "world")
if 0:
print("then")
print("clause")
elif 1:
pass
elif 1:
pass
else:
print("else-clause")

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@ -50,24 +50,23 @@ def fixup_build_ext(cmd):
Taken from distutils (was part of the CPython stdlib until Python 3.11)
"""
if os.name == 'nt':
cmd.debug = sys.executable.endswith('_d.exe')
elif sysconfig.get_config_var('Py_ENABLE_SHARED'):
if os.name == "nt":
cmd.debug = sys.executable.endswith("_d.exe")
elif sysconfig.get_config_var("Py_ENABLE_SHARED"):
# To further add to the shared builds fun on Unix, we can't just add
# library_dirs to the Extension() instance because that doesn't get
# plumbed through to the final compiler command.
runshared = sysconfig.get_config_var('RUNSHARED')
runshared = sysconfig.get_config_var("RUNSHARED")
if runshared is None:
cmd.library_dirs = ['.']
cmd.library_dirs = ["."]
else:
if sys.platform == 'darwin':
if sys.platform == "darwin":
cmd.library_dirs = []
else:
name, equals, value = runshared.partition('=')
name, equals, value = runshared.partition("=")
cmd.library_dirs = [d for d in value.split(os.pathsep) if d]
def compile_c_extension(
generated_source_path: str,
build_dir: Optional[str] = None,
@ -110,7 +109,7 @@ def compile_c_extension(
if keep_asserts:
extra_compile_args.append("-UNDEBUG")
if disable_optimization:
if sys.platform == 'win32':
if sys.platform == "win32":
extra_compile_args.append("/Od")
extra_link_args.append("/LTCG:OFF")
else:
@ -153,54 +152,65 @@ def compile_c_extension(
compiler.set_library_dirs(cmd.library_dirs)
# build static lib
if library_dir:
library_filename = compiler.library_filename(extension_name,
output_dir=library_dir)
if newer_group(common_sources, library_filename, 'newer'):
if sys.platform == 'win32':
pdb = compiler.static_lib_format % (extension_name, '.pdb')
library_filename = compiler.library_filename(extension_name, output_dir=library_dir)
if newer_group(common_sources, library_filename, "newer"):
if sys.platform == "win32":
pdb = compiler.static_lib_format % (extension_name, ".pdb")
compile_opts = [f"/Fd{library_dir}\\{pdb}"]
compile_opts.extend(extra_compile_args)
else:
compile_opts = extra_compile_args
objects = compiler.compile(common_sources,
output_dir=library_dir,
debug=cmd.debug,
extra_postargs=compile_opts)
compiler.create_static_lib(objects, extension_name,
output_dir=library_dir,
debug=cmd.debug)
if sys.platform == 'win32':
objects = compiler.compile(
common_sources,
output_dir=library_dir,
debug=cmd.debug,
extra_postargs=compile_opts,
)
compiler.create_static_lib(
objects, extension_name, output_dir=library_dir, debug=cmd.debug
)
if sys.platform == "win32":
compiler.add_library_dir(library_dir)
extension.libraries = [extension_name]
elif sys.platform == 'darwin':
compiler.set_link_objects([
'-Wl,-force_load', library_filename,
])
elif sys.platform == "darwin":
compiler.set_link_objects(
[
"-Wl,-force_load",
library_filename,
]
)
else:
compiler.set_link_objects([
'-Wl,--whole-archive', library_filename, '-Wl,--no-whole-archive',
])
compiler.set_link_objects(
[
"-Wl,--whole-archive",
library_filename,
"-Wl,--no-whole-archive",
]
)
else:
extension.sources[0:0] = common_sources
# Compile the source code to object files.
ext_path = cmd.get_ext_fullpath(extension_name)
if newer_group(extension.sources, ext_path, 'newer'):
objects = compiler.compile(extension.sources,
output_dir=cmd.build_temp,
debug=cmd.debug,
extra_postargs=extra_compile_args)
if newer_group(extension.sources, ext_path, "newer"):
objects = compiler.compile(
extension.sources,
output_dir=cmd.build_temp,
debug=cmd.debug,
extra_postargs=extra_compile_args,
)
else:
objects = compiler.object_filenames(extension.sources,
output_dir=cmd.build_temp)
objects = compiler.object_filenames(extension.sources, output_dir=cmd.build_temp)
# Now link the object files together into a "shared object"
compiler.link_shared_object(
objects, ext_path,
objects,
ext_path,
libraries=cmd.get_libraries(extension),
extra_postargs=extra_link_args,
export_symbols=cmd.get_export_symbols(extension),
debug=cmd.debug,
build_temp=cmd.build_temp)
build_temp=cmd.build_temp,
)
return pathlib.Path(ext_path)

View File

@ -29,7 +29,6 @@ class ASTGrammarPrinter:
printer(self.print_nodes_recursively(rule))
def print_nodes_recursively(self, node: Rule, prefix: str = "", istail: bool = True) -> str:
children = list(self.children(node))
value = self.name(node)

View File

@ -87,7 +87,6 @@ class RuleCheckingVisitor(GrammarVisitor):
class ParserGenerator:
callmakervisitor: GrammarVisitor
def __init__(self, grammar: Grammar, tokens: Set[str], file: Optional[IO[Text]]):

View File

@ -83,7 +83,9 @@ def generate_c_parser_source(grammar: Grammar) -> str:
def generate_parser_c_extension(
grammar: Grammar, path: pathlib.PurePath, debug: bool = False,
grammar: Grammar,
path: pathlib.PurePath,
debug: bool = False,
library_dir: Optional[str] = None,
) -> Any:
"""Generate a parser c extension for the given grammar in the given path
@ -112,7 +114,7 @@ def generate_parser_c_extension(
def print_memstats() -> bool:
MiB: Final = 2 ** 20
MiB: Final = 2**20
try:
import psutil # type: ignore
except ImportError:

View File

@ -1,9 +1,8 @@
[tool.black]
line-length = 99
target_version = ['py38']
target_version = ['py311']
exclude = '''
(
/pegen/grammar_parser.py # generated file
| /test/test_data/ # test files
)
'''

View File

@ -1,2 +0,0 @@
memory-profiler==0.57.0
psutil==5.7.0

View File

@ -1 +0,0 @@
# This exists to let mypy find modules here

View File

@ -1,26 +0,0 @@
import ast
import sys
import time
from pegen.testutil import print_memstats
def main() -> None:
t0 = time.time()
for filename in sys.argv[1:]:
print(filename, end="\r")
try:
with open(filename) as file:
source = file.read()
tree = ast.parse(source, filename)
except Exception as err:
print(f"{filename}: {err.__class__.__name__}: {err}", file=sys.stderr)
tok = None
t1 = time.time()
dt = t1 - t0
print(f"Parsed in {dt:.3f} secs", file=sys.stderr)
print_memstats()
if __name__ == "__main__":
main()

View File

@ -1,104 +0,0 @@
#!/usr/bin/env python3
import argparse
import ast
import sys
import os
from time import time
try:
import memory_profiler
except ModuleNotFoundError:
print(
"Please run `make venv` to create a virtual environment and install"
" all the dependencies, before running this script."
)
sys.exit(1)
sys.path.insert(0, os.getcwd())
from scripts.test_parse_directory import parse_directory
argparser = argparse.ArgumentParser(
prog="benchmark", description="Reproduce the various pegen benchmarks"
)
argparser.add_argument(
"--target",
action="store",
choices=["xxl", "stdlib"],
default="xxl",
help="Which target to use for the benchmark (default is xxl.py)",
)
subcommands = argparser.add_subparsers(title="Benchmarks", dest="subcommand")
command_compile = subcommands.add_parser(
"compile", help="Benchmark parsing and compiling to bytecode"
)
command_parse = subcommands.add_parser("parse", help="Benchmark parsing and generating an ast.AST")
def benchmark(func):
def wrapper(*args):
times = list()
for _ in range(3):
start = time()
result = func(*args)
end = time()
times.append(end - start)
memory = memory_profiler.memory_usage((func, args))
print(f"{func.__name__}")
print(f"\tTime: {sum(times)/3:.3f} seconds on an average of 3 runs")
print(f"\tMemory: {max(memory)} MiB on an average of 3 runs")
return result
return wrapper
@benchmark
def time_compile(source):
return compile(source, "<string>", "exec")
@benchmark
def time_parse(source):
return ast.parse(source)
def run_benchmark_xxl(subcommand, source):
if subcommand == "compile":
time_compile(source)
elif subcommand == "parse":
time_parse(source)
def run_benchmark_stdlib(subcommand):
modes = {"compile": 2, "parse": 1}
for _ in range(3):
parse_directory(
"../../Lib",
verbose=False,
excluded_files=[
"*/bad*",
],
short=True,
mode=modes[subcommand],
)
def main():
args = argparser.parse_args()
subcommand = args.subcommand
target = args.target
if subcommand is None:
argparser.error("A benchmark to run is required")
if target == "xxl":
with open(os.path.join("data", "xxl.py"), "r") as f:
source = f.read()
run_benchmark_xxl(subcommand, source)
elif target == "stdlib":
run_benchmark_stdlib(subcommand)
if __name__ == "__main__":
main()

View File

@ -1,87 +0,0 @@
#!/usr/bin/env python3.8
import argparse
import os
import json
from typing import Dict, Any
from urllib.request import urlretrieve
argparser = argparse.ArgumentParser(
prog="download_pypi_packages",
description="Helper program to download PyPI packages",
)
argparser.add_argument(
"-n", "--number", type=int, default=100, help="Number of packages to download"
)
argparser.add_argument(
"-a", "--all", action="store_true", help="Download all packages listed in the json file"
)
def load_json(filename: str) -> Dict[Any, Any]:
with open(os.path.join("data", f"{filename}.json"), "r") as f:
j = json.loads(f.read())
return j
def remove_json(filename: str) -> None:
path = os.path.join("data", f"{filename}.json")
os.remove(path)
def download_package_json(package_name: str) -> None:
url = f"https://pypi.org/pypi/{package_name}/json"
urlretrieve(url, os.path.join("data", f"{package_name}.json"))
def download_package_code(name: str, package_json: Dict[Any, Any]) -> None:
source_index = -1
for idx, url_info in enumerate(package_json["urls"]):
if url_info["python_version"] == "source":
source_index = idx
break
filename = package_json["urls"][source_index]["filename"]
url = package_json["urls"][source_index]["url"]
urlretrieve(url, os.path.join("data", "pypi", filename))
def main() -> None:
args = argparser.parse_args()
number_packages = args.number
all_packages = args.all
top_pypi_packages = load_json("top-pypi-packages-365-days")
if all_packages:
top_pypi_packages = top_pypi_packages["rows"]
elif number_packages >= 0 and number_packages <= 4000:
top_pypi_packages = top_pypi_packages["rows"][:number_packages]
else:
raise AssertionError("Unknown value for NUMBER_OF_PACKAGES")
try:
os.mkdir(os.path.join("data", "pypi"))
except FileExistsError:
pass
for package in top_pypi_packages:
package_name = package["project"]
print(f"Downloading JSON Data for {package_name}... ", end="")
download_package_json(package_name)
print("Done")
package_json = load_json(package_name)
try:
print(f"Downloading and compressing package {package_name} ... ", end="")
download_package_code(package_name, package_json)
print("Done")
except (IndexError, KeyError):
print(f"Could not locate source for {package_name}")
continue
finally:
remove_json(package_name)
if __name__ == "__main__":
main()

View File

@ -1,55 +0,0 @@
#!/usr/bin/env python3.8
"""Find the maximum amount of nesting for an expression that can be parsed
without causing a parse error.
Starting at the INITIAL_NESTING_DEPTH, an expression containing n parenthesis
around a 0 is generated then tested with both the C and Python parsers. We
continue incrementing the number of parenthesis by 10 until both parsers have
failed. As soon as a single parser fails, we stop testing that parser.
The grammar file, initial nesting size, and amount by which the nested size is
incremented on each success can be controlled by changing the GRAMMAR_FILE,
INITIAL_NESTING_DEPTH, or NESTED_INCR_AMT variables.
Usage: python -m scripts.find_max_nesting
"""
import sys
import ast
GRAMMAR_FILE = "data/python.gram"
INITIAL_NESTING_DEPTH = 10
NESTED_INCR_AMT = 10
FAIL = "\033[91m"
ENDC = "\033[0m"
def check_nested_expr(nesting_depth: int) -> bool:
expr = f"{'(' * nesting_depth}0{')' * nesting_depth}"
try:
ast.parse(expr)
print(f"Nesting depth of {nesting_depth} is successful")
return True
except Exception as err:
print(f"{FAIL}(Failed with nesting depth of {nesting_depth}{ENDC}")
print(f"{FAIL}\t{err}{ENDC}")
return False
def main() -> None:
print(f"Testing {GRAMMAR_FILE} starting at nesting depth of {INITIAL_NESTING_DEPTH}...")
nesting_depth = INITIAL_NESTING_DEPTH
succeeded = True
while succeeded:
expr = f"{'(' * nesting_depth}0{')' * nesting_depth}"
if succeeded:
succeeded = check_nested_expr(nesting_depth)
nesting_depth += NESTED_INCR_AMT
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,119 +0,0 @@
#!/usr/bin/env python3.8
""" Convert a grammar into a dot-file suitable for use with GraphViz
For example:
Generate the GraphViz file:
# scripts/grammar_grapher.py data/python.gram > python.gv
Then generate the graph...
# twopi python.gv -Tpng > python_twopi.png
or
# dot python.gv -Tpng > python_dot.png
NOTE: The _dot_ and _twopi_ tools seem to produce the most useful results.
The _circo_ tool is the worst of the bunch. Don't even bother.
"""
import argparse
import sys
from typing import Any, List
sys.path.insert(0, ".")
from pegen.build import build_parser
from pegen.grammar import (
Alt,
Cut,
Forced,
Group,
Leaf,
Lookahead,
Rule,
NameLeaf,
NamedItem,
Opt,
Repeat,
Rhs,
)
argparser = argparse.ArgumentParser(
prog="graph_grammar",
description="Graph a grammar tree",
)
argparser.add_argument(
"-s",
"--start",
choices=["exec", "eval", "single"],
default="exec",
help="Choose the grammar's start rule (exec, eval or single)",
)
argparser.add_argument("grammar_file", help="The grammar file to graph")
def references_for_item(item: Any) -> List[Any]:
if isinstance(item, Alt):
return [_ref for _item in item.items for _ref in references_for_item(_item)]
elif isinstance(item, Cut):
return []
elif isinstance(item, Forced):
return references_for_item(item.node)
elif isinstance(item, Group):
return references_for_item(item.rhs)
elif isinstance(item, Lookahead):
return references_for_item(item.node)
elif isinstance(item, NamedItem):
return references_for_item(item.item)
# NOTE NameLeaf must be before Leaf
elif isinstance(item, NameLeaf):
if item.value == "ENDMARKER":
return []
return [item.value]
elif isinstance(item, Leaf):
return []
elif isinstance(item, Opt):
return references_for_item(item.node)
elif isinstance(item, Repeat):
return references_for_item(item.node)
elif isinstance(item, Rhs):
return [_ref for alt in item.alts for _ref in references_for_item(alt)]
elif isinstance(item, Rule):
return references_for_item(item.rhs)
else:
raise RuntimeError(f"Unknown item: {type(item)}")
def main() -> None:
args = argparser.parse_args()
try:
grammar, parser, tokenizer = build_parser(args.grammar_file)
except Exception as err:
print("ERROR: Failed to parse grammar file", file=sys.stderr)
sys.exit(1)
references = {}
for name, rule in grammar.rules.items():
references[name] = set(references_for_item(rule))
# Flatten the start node if has only a single reference
root_node = {"exec": "file", "eval": "eval", "single": "interactive"}[args.start]
print("digraph g1 {")
print('\toverlap="scale";') # Force twopi to scale the graph to avoid overlaps
print(f'\troot="{root_node}";')
print(f"\t{root_node} [color=green, shape=circle];")
for name, refs in references.items():
for ref in refs:
print(f"\t{name} -> {ref};")
print("}")
if __name__ == "__main__":
main()

View File

@ -1,66 +0,0 @@
#!/usr/bin/env python3.8
"""Produce a report about the most-memoable types.
Reads a list of statistics from stdin. Each line must be two numbers,
being a type and a count. We then read some other files and produce a
list sorted by most frequent type.
There should also be something to recognize left-recursive rules.
"""
import os
import re
import sys
from typing import Dict
reporoot = os.path.dirname(os.path.dirname(__file__))
parse_c = os.path.join(reporoot, "peg_extension", "parse.c")
class TypeMapper:
"""State used to map types to names."""
def __init__(self, filename: str) -> None:
self.table: Dict[int, str] = {}
with open(filename) as f:
for line in f:
match = re.match(r"#define (\w+)_type (\d+)", line)
if match:
name, type = match.groups()
if "left" in line.lower():
name += " // Left-recursive"
self.table[int(type)] = name
def lookup(self, type: int) -> str:
return self.table.get(type, str(type))
def main() -> None:
mapper = TypeMapper(parse_c)
table = []
filename = sys.argv[1]
with open(filename) as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
# Extra fields ignored
if len(parts) < 2:
print(f"{lineno}: bad input ({line!r})")
continue
try:
type, count = map(int, parts[:2])
except ValueError as err:
print(f"{lineno}: non-integer input ({line!r})")
continue
table.append((type, count))
table.sort(key=lambda values: -values[1])
for type, count in table:
print(f"{type:4d} {count:9d} {mapper.lookup(type)}")
if __name__ == "__main__":
main()

View File

@ -1,148 +0,0 @@
#!/usr/bin/env python3.8
import argparse
import ast
import os
import sys
import time
import tokenize
from glob import glob, escape
from pathlib import PurePath
from typing import List, Optional, Any, Tuple
sys.path.insert(0, os.getcwd())
from pegen.testutil import print_memstats
SUCCESS = "\033[92m"
FAIL = "\033[91m"
ENDC = "\033[0m"
COMPILE = 2
PARSE = 1
NOTREE = 0
argparser = argparse.ArgumentParser(
prog="test_parse_directory",
description="Helper program to test directories or files for pegen",
)
argparser.add_argument("-d", "--directory", help="Directory path containing files to test")
argparser.add_argument(
"-e", "--exclude", action="append", default=[], help="Glob(s) for matching files to exclude"
)
argparser.add_argument(
"-s", "--short", action="store_true", help="Only show errors, in a more Emacs-friendly format"
)
argparser.add_argument(
"-v", "--verbose", action="store_true", help="Display detailed errors for failures"
)
def report_status(
succeeded: bool,
file: str,
verbose: bool,
error: Optional[Exception] = None,
short: bool = False,
) -> None:
if short and succeeded:
return
if succeeded is True:
status = "OK"
COLOR = SUCCESS
else:
status = "Fail"
COLOR = FAIL
if short:
lineno = 0
offset = 0
if isinstance(error, SyntaxError):
lineno = error.lineno or 1
offset = error.offset or 1
message = error.args[0]
else:
message = f"{error.__class__.__name__}: {error}"
print(f"{file}:{lineno}:{offset}: {message}")
else:
print(f"{COLOR}{file:60} {status}{ENDC}")
if error and verbose:
print(f" {str(error.__class__.__name__)}: {error}")
def parse_file(source: str, file: str) -> Tuple[Any, float]:
t0 = time.time()
result = ast.parse(source, filename=file)
t1 = time.time()
return result, t1 - t0
def generate_time_stats(files, total_seconds) -> None:
total_files = len(files)
total_bytes = 0
total_lines = 0
for file in files:
# Count lines and bytes separately
with open(file, "rb") as f:
total_lines += sum(1 for _ in f)
total_bytes += f.tell()
print(
f"Checked {total_files:,} files, {total_lines:,} lines,",
f"{total_bytes:,} bytes in {total_seconds:,.3f} seconds.",
)
if total_seconds > 0:
print(
f"That's {total_lines / total_seconds :,.0f} lines/sec,",
f"or {total_bytes / total_seconds :,.0f} bytes/sec.",
)
def parse_directory(directory: str, verbose: bool, excluded_files: List[str], short: bool) -> int:
# For a given directory, traverse files and attempt to parse each one
# - Output success/failure for each file
errors = 0
files = []
total_seconds = 0
for file in sorted(glob(os.path.join(escape(directory), f"**/*.py"), recursive=True)):
# Only attempt to parse Python files and files that are not excluded
if any(PurePath(file).match(pattern) for pattern in excluded_files):
continue
with tokenize.open(file) as f:
source = f.read()
try:
result, dt = parse_file(source, file)
total_seconds += dt
report_status(succeeded=True, file=file, verbose=verbose, short=short)
except SyntaxError as error:
report_status(succeeded=False, file=file, verbose=verbose, error=error, short=short)
errors += 1
files.append(file)
generate_time_stats(files, total_seconds)
if short:
print_memstats()
if errors:
print(f"Encountered {errors} failures.", file=sys.stderr)
return 1
return 0
def main() -> None:
args = argparser.parse_args()
directory = args.directory
verbose = args.verbose
excluded_files = args.exclude
short = args.short
sys.exit(parse_directory(directory, verbose, excluded_files, short))
if __name__ == "__main__":
main()

View File

@ -1,92 +0,0 @@
#!/usr/bin/env python3.8
import argparse
import os
import glob
import tarfile
import zipfile
import shutil
import pathlib
import sys
from typing import Generator
sys.path.insert(0, ".")
from scripts import test_parse_directory
HERE = pathlib.Path(__file__).resolve().parent
argparser = argparse.ArgumentParser(
prog="test_pypi_packages",
description="Helper program to test parsing PyPI packages",
)
argparser.add_argument(
"-t", "--tree", action="count", help="Compare parse tree to official AST", default=0
)
def get_packages() -> Generator[str, None, None]:
all_packages = (
glob.glob("./data/pypi/*.tar.gz")
+ glob.glob("./data/pypi/*.zip")
+ glob.glob("./data/pypi/*.tgz")
)
for package in all_packages:
yield package
def extract_files(filename: str) -> None:
savedir = os.path.join("data", "pypi")
if tarfile.is_tarfile(filename):
tarfile.open(filename).extractall(savedir)
elif zipfile.is_zipfile(filename):
zipfile.ZipFile(filename).extractall(savedir)
else:
raise ValueError(f"Could not identify type of compressed file {filename}")
def find_dirname(package_name: str) -> str:
for name in os.listdir(os.path.join("data", "pypi")):
full_path = os.path.join("data", "pypi", name)
if os.path.isdir(full_path) and name in package_name:
return full_path
assert False # This is to fix mypy, should never be reached
def run_tests(dirname: str, tree: int) -> int:
return test_parse_directory.parse_directory(
dirname,
verbose=False,
excluded_files=[],
tree_arg=tree,
short=True,
mode=1 if tree else 0,
parser="pegen",
)
def main() -> None:
args = argparser.parse_args()
tree = args.tree
for package in get_packages():
print(f"Extracting files from {package}... ", end="")
try:
extract_files(package)
print("Done")
except ValueError as e:
print(e)
continue
print(f"Trying to parse all python files ... ")
dirname = find_dirname(package)
status = run_tests(dirname, tree)
if status == 0:
shutil.rmtree(dirname)
else:
print(f"Failed to parse {dirname}")
if __name__ == "__main__":
main()