Module paroxython.parse_program
Search a program for the features specified in spec.md
.
Expand source code Browse GitHub
from collections import defaultdict
from os.path import dirname
from pathlib import Path
from time import perf_counter
from typing import Callable, Dict, Iterator, List, Tuple
import regex # type: ignore
from .derived_labels_db import DerivedLabelsDatabase
from .flatten_ast import ast, flatten_ast
from .goodies import print_fail
from .user_types import Label, LabelName, Labels, LabelsSpans, Program, Query, Source, Span
__pdoc__ = {
"ProgramParser": "",
"ProgramParser.__init__": True,
"ProgramParser.__call__": True,
"foobar": True,
}
def find_all_features(
specifications: str,
find_all: Callable = regex.compile(
r"(?ms)"
r"^\#{4}\s+Feature\s+`(.+?)`" # capture the label's pattern
r".+?\#{5}\s+Specification" # ensure the sequel is in the Specification section
r".+?```(.*?)\n+(.*?)\n?```" # capture the language and the specification
).findall,
) -> Iterator[Tuple[LabelName, str, str]]:
return find_all(specifications)
def pos_to_span(pos: List[str]) -> Span:
(start, path) = pos[0].split(":")
(end, _) = pos[-1].split(":")
return Span(int(start), int(end), path)
DEFAULT_SPEC_PATH = Path(dirname(__file__)) / "resources" / "spec.md"
def get_bindings(
label_prefix: LabelName, captures: Dict[str, List[str]]
) -> Iterator[Tuple[LabelName, Span]]:
if captures.get("SUFFIX"):
# There is a "SUFFIX" key and its value is not `[]`.
if len(captures["POS"]) == len(captures["SUFFIX"]):
# When there are as many matched positions as suffixes, associate them in parallel.
for (suffix, pos) in zip(captures["SUFFIX"], captures["POS"]):
yield (LabelName(f"{label_prefix}:{suffix}"), pos_to_span([pos]))
else:
# When there are more or less matched positions than suffixes, interpret the positions
# as defining a unique span, and associate it with each suffix.
span = pos_to_span(captures["POS"])
for suffix in captures["SUFFIX"]:
yield (LabelName(f"{label_prefix}:{suffix}"), span)
else:
# There is no "SUFFIX" key or its value is `[]`.
yield (label_prefix, pos_to_span(captures["POS"]))
class ProgramParser:
def __init__(self, spec_path: Path = DEFAULT_SPEC_PATH) -> None:
self.spec_path = spec_path
text = self.spec_path.read_text()
self.features: Dict[LabelName, regex.Pattern] = {}
self.queries: Dict[LabelName, Query] = {}
self.times: Dict[LabelName, float] = {LabelName("TOTAL"): 0.0}
for (label_name, language, specification) in find_all_features(text):
if label_name in self.features: # pragma: no cover
print_fail(f"Duplicated name '{label_name}'!")
self.times[label_name] = 0.0
if language == "re":
self.features[label_name] = regex.compile(f"(?mx){specification}").finditer
elif language == "sql":
self.queries[label_name] = Query(specification)
elif specification.strip() != "": # pragma: no cover
print_fail(f"Unknow language '{language}' for '{label_name}'!")
self.derived_labels_database = DerivedLabelsDatabase()
def __call__(self, program: Program, yield_failed_matches: bool = False) -> Labels:
# If the program can be parsed and is nonempty, flatten its AST.
try:
tree = ast.parse(program.source)
except (SyntaxError, ValueError) as exception:
return [
Label(
LabelName(f"ast_construction:{type(exception).__name__}"),
[Span(1, program.source.count("\n") + 1)],
)
]
# The next comment prevents a (wrong?) mypy error: "AST" has no attribute "body"
if not tree.body: # type: ignore
return [Label(LabelName("ast_construction:EmptyProgramError"), [Span(0, 0)])]
self.flat_ast = flatten_ast(tree)
# Search the flat AST for every feature which is specified by a regular expression.
labels: LabelsSpans = defaultdict(list)
for (label_prefix, finditer) in self.features.items():
start = perf_counter()
failed_match = True
for match in finditer(self.flat_ast, overlapped=True):
failed_match = False
for (label_name, span) in get_bindings(label_prefix, match.capturesdict()):
try: # Unless this binding (without its path) is scheduled for deletion...
program.deletion[label_name].remove(Span(span.start, span.end)) # (no path)
except (KeyError, ValueError): # ... actually bind the name with the span.
labels[label_name].append(span)
elapsed = perf_counter() - start
self.times[label_prefix] += elapsed
self.times[LabelName("TOTAL")] += elapsed
if yield_failed_matches and failed_match:
labels[label_prefix] = []
# Update the matching features with those featured for addition.
for (label_name, spans) in program.addition.items():
labels[label_name].extend(spans)
labels[label_name].sort()
# Now, use the features found so far to derive all those specified by an SQL query.
result = [Label(*item) for item in labels.items()]
self.derived_labels_database.create(result) # The search is seeded with the know features.
for (label_name, query) in self.queries.items():
# Try to derive, from the DB current state, some label names matching `label_name`.
start = perf_counter()
derived_labels = self.derived_labels_database.read(query)
elapsed = perf_counter() - start
self.times[label_name] += elapsed
self.times[LabelName("TOTAL")] += elapsed
# Suppress the labels scheduled for deletion
labels.clear() # reuse the dictionary associating each label name to its spans
for label in derived_labels:
for span in label.spans:
try: # Unless this binding (without its path) is scheduled for deletion...
program.deletion[label.name].remove(Span(span.start, span.end))
except (KeyError, ValueError): # ... actually bind the name with the span.
labels[label.name].append(span)
derived_labels = [Label(*item) for item in labels.items()]
if derived_labels:
# Bingo! update the DB state and the result with the new labels.
self.derived_labels_database.update(derived_labels)
result.extend(derived_labels)
elif yield_failed_matches:
result.append(Label(label_name, []))
# Empty the DB to make it ready for the next program.
self.derived_labels_database.delete()
return sorted(result)
def print_performances(self): # pragma: no cover
result = sorted(self.times.items(), key=lambda item: item[1], reverse=True)
print()
print("Elapsed times by features (in seconds)")
print("--------------------------------------")
for (name, seconds) in result:
print(f"{seconds:8.4f}\t {name}")
print()
if __name__ == "__main__":
# Take an individual source, print its features and write its flat AST.
from .goodies import couple_to_string
path = Path("sandbox/source.py")
source = path.read_text().strip()
if source.startswith("1 "):
source = regex.sub(r"(?m)^.{1,4}", "", source)
for (i, line) in enumerate(source.split("\n"), 1):
print(f"{i:<4}{line}")
program = Program(source=Source(source), labels=[], taxa=[], addition={}, deletion={})
print()
parse = ProgramParser()
acc = []
for (name, spans) in sorted(parse(program, yield_failed_matches=False)):
spans_as_string = ", ".join(map(couple_to_string, spans))
acc.append(f"| `{name}` | {spans_as_string} |")
# acc[-1] += " %s |" % ", ".join(f"{span.path}" for span in spans)
print("\n".join(acc))
Path("sandbox/flat_ast.txt").write_text(parse.flat_ast)
Functions
def find_all_features(specifications: str,
find_all: Callable = <built-in method findall of regex>
) ‑> Iterator[Tuple[LabelName, str, str]]-
Iterate on all tuples defining an algorithmic feature in the given specification text.
Args
specifications
:str
- Normally, the contents of
spec.md
. find_all
:Callable
, optional- A function finding all feature-defining triples of the text. Not to be explicitly provided.
Returns
Iterator[Tuple[LabelName, str, str]]
-
An iterator yielding all matching triples of the form:
- label name pattern (e.g.,
"try"
or"try_raise|try_except"
), - language (currently,
"re"
or"sql"
), - specification (respectively, a regular expression pattern or an SQL query).
- label name pattern (e.g.,
Expand source code Browse GitHub
def find_all_features( specifications: str, find_all: Callable = regex.compile( r"(?ms)" r"^\#{4}\s+Feature\s+`(.+?)`" # capture the label's pattern r".+?\#{5}\s+Specification" # ensure the sequel is in the Specification section r".+?```(.*?)\n+(.*?)\n?```" # capture the language and the specification ).findall, ) -> Iterator[Tuple[LabelName, str, str]]: return find_all(specifications)
def pos_to_span(pos: List[str]) ‑> Span
-
Convert a list of positions (as captured in a specification string) into a single span.
Args
pos
:List[str]
- A non-empty list of strings of the form
f"{line_number}:{path}"
Returns
Span
-
A named tuple consisting of:
start
: the line number extracted from the first string;end
: the line number extracted from the last string (the same asstart
if there is only one string);path
: the path extracted from the first string.
Note that all other substrings, if any, are ignored.
Examples
>>> pos_to_span(["1:path_1"]) Span(1, 1, "path_1") >>> pos_to_span(["1:path_1", "2:path_2"]) Span(1, 2, "path_1") >>> pos_to_span(["1:path_1", "2:path_2", "3:path_3", "4:path_4"]) Span(1, 4, "path_1")
Expand source code Browse GitHub
def pos_to_span(pos: List[str]) -> Span: (start, path) = pos[0].split(":") (end, _) = pos[-1].split(":") return Span(int(start), int(end), path)
def get_bindings(label_prefix: LabelName,
captures: Dict[str, List[str]]
) ‑> Iterator[Tuple[LabelName, Span]]-
Analyze the matches of a feature, and bind the corresponding names and spans.
Args
label_prefix
:LabelName
- The name of a feature specified by a regular expression. Note that
such a name is literal: contrary to those defined by an SQL query, they cannot include
the alternation meta-character (
"|"
). captures
:Dict[str, List[str]]
- A dictionary of the named groups and lists of all the
captures of those groups, as matched by the function
regex.finditer()
. Contains a non-empty entry"POS"
and, optionally, an entry"SUFFIX"
. All other entries are ignored.
Yields
Iterator[Tuple[LabelName, Span]]
- At least one binding between a label name and a span.
Examples
The simplest cases happen when there is one or several
"POS"
, but no"SUFFIX"
:>>> get_bindings("null_operation", {"POS": ["4:1-3-1-5-1-"]}) ("null_operation", Span(start=4, end=4, path="1-3-1-5-1-")) >>> get_bindings("while", {"POS": ["11:2-", "16:2-1-4-2-1-2-"]}) ("while", Span(start=11, end=16, path="2-")) >>> get_bindings("divisibility_test", { ... "POS": ["15:3-5-1-2-1-0-"], ... "SUFFIX": [] ... }) ("divisibility_test", Span(start=15, end=15, path="3-5-1-2-1-0-"))
When there are as many
"POS"
than"SUFFIX"
, they are binded pairwise. The resulting spans cannot span multiple lines:>>> get_bindings("literal", { ... "POS": ["16:3-5-1-2-1-1-1-0-"], ... "SUFFIX": ["False"], ... }) ("literal:False", Span(start=16, end=16, path="3-5-1-2-1-1-1-0-")) >>> get_bindings("if_test_atom", { ... "POS": ["31:5-2-1-0-0-0-", "31:5-2-1-0-0-2-1-1-", "31:5-2-1-0-2-1-"], ... "SUFFIX": ["b", "a", "greatest"], ... }) ("if_test_atom:b", Span(start=31, end=31, path="5-2-1-0-0-0-")) ("if_test_atom:a", Span(start=31, end=31, path="5-2-1-0-0-2-1-1-")) ("if_test_atom:greatest", Span(start=31, end=31, path="5-2-1-0-2-1-"))
In all other cases,
"POS"
is interpreted as delimiting only one occurrence (which can span multiple lines), and associated repeatedly with the different suffixes:>>> get_bindings("import_module", {"POS": ["1:1-"], "SUFFIX": ["m1", "m2", "m3"]}) ("import_module:m1", Span(start=1, end=1, path="1-")) ("import_module:m2", Span(start=1, end=1, path="1-")) ("import_module:m3", Span(start=1, end=1, path="1-")) >>> get_bindings("function_decorator", { ... "POS": ["1:1-", "1:1-2-1-", "2:1-2-2-", "3:1-2-3-", "5:1-5-1-"], ... "SUFFIX": ["bizz", "foo", "bar"], ... }) ("function_decorator:bizz", Span(start=1, end=5, path="1-")), ("function_decorator:foo", Span(start=1, end=5, path="1-")), ("function_decorator:bar", Span(start=1, end=5, path="1-")),
Warning
Consider the problem of detecting the feature
for
in the following program:for (i, j) in seq: pass
With such a multi-line feature, it is normally enough to capture two positions, namely those of its start (1) and its end (2). However, in this case we want to suffix
for
with the names of the two iterations variables (for:i
andfor:j
). If we devise the regular expression specification without special care:^(.*)/_type=For \n(?:\1.+\n)*?\1/_pos=(?P<POS>.+) ( \n(?:\1.+\n)*?\1/target(/.+)?/id=(?P<SUFFIX>.+) )+ \n(?:\1.+\n)* \1/.*/_pos=(?P<POS>.+)
… as many positions as there are suffixes will be passed to the present function:
>>> get_bindings("for", {'POS': ['1:1-', '2:1-2-1-'], 'SUFFIX': ['i', 'j']}) ("for:i", Span(start=1, end=1, path="1-")) ("for:j", Span(start=2, end=2, path="1-2-1-"))
… which will result in two (wrong) bindings instead of one. The workaround implemented in spec.md consists in capturing as many supplementary positions as iteration variables:
^(.*)/_type=For \n(?:\1.+\n)*?\1/_pos=(?P<POS>.+) ( \n(?:\1.+\n)*?\1/(?P<_1>target(/.+)?)/_pos=(?P<POS>.+) \n(?:\1.+\n)*?\1/(?P=_1) /id=(?P<SUFFIX>.+) )+ \n(?:\1.+\n)* \1/.*/_pos=(?P<POS>.+)
So, there are now two more positions in the captured dictionary. Since they are not at the ends of the list, they are ignored by
pos_to_span()
, and the result is now correct:>>> get_bindings("for", { ... 'POS': ['1:1-', '1:1-0-0-1-', '1:1-0-0-2-', '2:1-2-1-'], ... '_1': ['target/elts/1', 'target/elts/2'], # ignored at this level ... 'SUFFIX': ['i', 'j'] ... }) ("for:i", Span(start=1, end=2, path="1-")) ("for:j", Span(start=1, end=2, path="1-"))
Expand source code Browse GitHub
def get_bindings( label_prefix: LabelName, captures: Dict[str, List[str]] ) -> Iterator[Tuple[LabelName, Span]]: if captures.get("SUFFIX"): # There is a "SUFFIX" key and its value is not `[]`. if len(captures["POS"]) == len(captures["SUFFIX"]): # When there are as many matched positions as suffixes, associate them in parallel. for (suffix, pos) in zip(captures["SUFFIX"], captures["POS"]): yield (LabelName(f"{label_prefix}:{suffix}"), pos_to_span([pos])) else: # When there are more or less matched positions than suffixes, interpret the positions # as defining a unique span, and associate it with each suffix. span = pos_to_span(captures["POS"]) for suffix in captures["SUFFIX"]: yield (LabelName(f"{label_prefix}:{suffix}"), span) else: # There is no "SUFFIX" key or its value is `[]`. yield (label_prefix, pos_to_span(captures["POS"]))
Classes
class ProgramParser
-
Expand source code Browse GitHub
class ProgramParser: def __init__(self, spec_path: Path = DEFAULT_SPEC_PATH) -> None: self.spec_path = spec_path text = self.spec_path.read_text() self.features: Dict[LabelName, regex.Pattern] = {} self.queries: Dict[LabelName, Query] = {} self.times: Dict[LabelName, float] = {LabelName("TOTAL"): 0.0} for (label_name, language, specification) in find_all_features(text): if label_name in self.features: # pragma: no cover print_fail(f"Duplicated name '{label_name}'!") self.times[label_name] = 0.0 if language == "re": self.features[label_name] = regex.compile(f"(?mx){specification}").finditer elif language == "sql": self.queries[label_name] = Query(specification) elif specification.strip() != "": # pragma: no cover print_fail(f"Unknow language '{language}' for '{label_name}'!") self.derived_labels_database = DerivedLabelsDatabase() def __call__(self, program: Program, yield_failed_matches: bool = False) -> Labels: # If the program can be parsed and is nonempty, flatten its AST. try: tree = ast.parse(program.source) except (SyntaxError, ValueError) as exception: return [ Label( LabelName(f"ast_construction:{type(exception).__name__}"), [Span(1, program.source.count("\n") + 1)], ) ] # The next comment prevents a (wrong?) mypy error: "AST" has no attribute "body" if not tree.body: # type: ignore return [Label(LabelName("ast_construction:EmptyProgramError"), [Span(0, 0)])] self.flat_ast = flatten_ast(tree) # Search the flat AST for every feature which is specified by a regular expression. labels: LabelsSpans = defaultdict(list) for (label_prefix, finditer) in self.features.items(): start = perf_counter() failed_match = True for match in finditer(self.flat_ast, overlapped=True): failed_match = False for (label_name, span) in get_bindings(label_prefix, match.capturesdict()): try: # Unless this binding (without its path) is scheduled for deletion... program.deletion[label_name].remove(Span(span.start, span.end)) # (no path) except (KeyError, ValueError): # ... actually bind the name with the span. labels[label_name].append(span) elapsed = perf_counter() - start self.times[label_prefix] += elapsed self.times[LabelName("TOTAL")] += elapsed if yield_failed_matches and failed_match: labels[label_prefix] = [] # Update the matching features with those featured for addition. for (label_name, spans) in program.addition.items(): labels[label_name].extend(spans) labels[label_name].sort() # Now, use the features found so far to derive all those specified by an SQL query. result = [Label(*item) for item in labels.items()] self.derived_labels_database.create(result) # The search is seeded with the know features. for (label_name, query) in self.queries.items(): # Try to derive, from the DB current state, some label names matching `label_name`. start = perf_counter() derived_labels = self.derived_labels_database.read(query) elapsed = perf_counter() - start self.times[label_name] += elapsed self.times[LabelName("TOTAL")] += elapsed # Suppress the labels scheduled for deletion labels.clear() # reuse the dictionary associating each label name to its spans for label in derived_labels: for span in label.spans: try: # Unless this binding (without its path) is scheduled for deletion... program.deletion[label.name].remove(Span(span.start, span.end)) except (KeyError, ValueError): # ... actually bind the name with the span. labels[label.name].append(span) derived_labels = [Label(*item) for item in labels.items()] if derived_labels: # Bingo! update the DB state and the result with the new labels. self.derived_labels_database.update(derived_labels) result.extend(derived_labels) elif yield_failed_matches: result.append(Label(label_name, [])) # Empty the DB to make it ready for the next program. self.derived_labels_database.delete() return sorted(result) def print_performances(self): # pragma: no cover result = sorted(self.times.items(), key=lambda item: item[1], reverse=True) print() print("Elapsed times by features (in seconds)") print("--------------------------------------") for (name, seconds) in result: print(f"{seconds:8.4f}\t {name}") print()
Methods
def __init__(self,
spec_path: pathlib.Path = PosixPath('./paroxython/resources/spec.md')
) ‑> NoneType-
Collect the features specified at
spec_path
.Args
spec_path
:Path
, optional- The path of the specification file. Defaults to spec.md.
Raises
ValueError
- Fail when a given feature appears multiple times in the specification file,
and when the language is not
"re"
or"sql"
, and the specification is non-empty.
Description
Some features are searched for with a regular expression, some with an SQL query. Distribute them in their respective dictionary. Compile the regular expressions and pre-bind the appropriate method. Create an instance of the database of derived labels.
Expand source code Browse GitHub
def __init__(self, spec_path: Path = DEFAULT_SPEC_PATH) -> None: self.spec_path = spec_path text = self.spec_path.read_text() self.features: Dict[LabelName, regex.Pattern] = {} self.queries: Dict[LabelName, Query] = {} self.times: Dict[LabelName, float] = {LabelName("TOTAL"): 0.0} for (label_name, language, specification) in find_all_features(text): if label_name in self.features: # pragma: no cover print_fail(f"Duplicated name '{label_name}'!") self.times[label_name] = 0.0 if language == "re": self.features[label_name] = regex.compile(f"(?mx){specification}").finditer elif language == "sql": self.queries[label_name] = Query(specification) elif specification.strip() != "": # pragma: no cover print_fail(f"Unknow language '{language}' for '{label_name}'!") self.derived_labels_database = DerivedLabelsDatabase()
def __call__(self,
program: Program,
yield_failed_matches: bool = False
) ‑> List[Label]-
Parse a given program and return its labels with their spans.
Args
program
:Program
- A
Program
object, with its fieldssource
,addition
anddeletion
populated. yield_failed_matches
:bool
, optional- If
True
, add to the returned labels those which have not been found in the program. Each one is associated with an empty list of spans. For testing purposes only. Defaults toFalse
.
Returns
Labels
- A sorted list of labels, associating each label name with its spans.
Description
- Parse the given program. If this fails or the resulting AST is empty, return a list
consisting in a single label,
"ast_construction:"
, followed by the error name. Otherwise, flatten the AST. - Search it for every feature which is specified by a regular expression. Take care to avoid adding those features which are scheduled for deletion at the same span.
- Update the matching features with those featured for addition.
- Use the features found so far to derive (directly and indirectly) all those specified by an SQL query.
- Return the found features.
Note
The computation of the derived features is done in a single pass, which require them to be in a correct dependency order. This order is checked by
helpers/reformat_spec.py
, which is executed each time the tests are launched.Expand source code Browse GitHub
def __call__(self, program: Program, yield_failed_matches: bool = False) -> Labels: # If the program can be parsed and is nonempty, flatten its AST. try: tree = ast.parse(program.source) except (SyntaxError, ValueError) as exception: return [ Label( LabelName(f"ast_construction:{type(exception).__name__}"), [Span(1, program.source.count("\n") + 1)], ) ] # The next comment prevents a (wrong?) mypy error: "AST" has no attribute "body" if not tree.body: # type: ignore return [Label(LabelName("ast_construction:EmptyProgramError"), [Span(0, 0)])] self.flat_ast = flatten_ast(tree) # Search the flat AST for every feature which is specified by a regular expression. labels: LabelsSpans = defaultdict(list) for (label_prefix, finditer) in self.features.items(): start = perf_counter() failed_match = True for match in finditer(self.flat_ast, overlapped=True): failed_match = False for (label_name, span) in get_bindings(label_prefix, match.capturesdict()): try: # Unless this binding (without its path) is scheduled for deletion... program.deletion[label_name].remove(Span(span.start, span.end)) # (no path) except (KeyError, ValueError): # ... actually bind the name with the span. labels[label_name].append(span) elapsed = perf_counter() - start self.times[label_prefix] += elapsed self.times[LabelName("TOTAL")] += elapsed if yield_failed_matches and failed_match: labels[label_prefix] = [] # Update the matching features with those featured for addition. for (label_name, spans) in program.addition.items(): labels[label_name].extend(spans) labels[label_name].sort() # Now, use the features found so far to derive all those specified by an SQL query. result = [Label(*item) for item in labels.items()] self.derived_labels_database.create(result) # The search is seeded with the know features. for (label_name, query) in self.queries.items(): # Try to derive, from the DB current state, some label names matching `label_name`. start = perf_counter() derived_labels = self.derived_labels_database.read(query) elapsed = perf_counter() - start self.times[label_name] += elapsed self.times[LabelName("TOTAL")] += elapsed # Suppress the labels scheduled for deletion labels.clear() # reuse the dictionary associating each label name to its spans for label in derived_labels: for span in label.spans: try: # Unless this binding (without its path) is scheduled for deletion... program.deletion[label.name].remove(Span(span.start, span.end)) except (KeyError, ValueError): # ... actually bind the name with the span. labels[label.name].append(span) derived_labels = [Label(*item) for item in labels.items()] if derived_labels: # Bingo! update the DB state and the result with the new labels. self.derived_labels_database.update(derived_labels) result.extend(derived_labels) elif yield_failed_matches: result.append(Label(label_name, [])) # Empty the DB to make it ready for the next program. self.derived_labels_database.delete() return sorted(result)
def print_performances(self)
-
Print a TSV report of elapsed times by features, sorted by decreasing time.
Expand source code Browse GitHub
def print_performances(self): # pragma: no cover result = sorted(self.times.items(), key=lambda item: item[1], reverse=True) print() print("Elapsed times by features (in seconds)") print("--------------------------------------") for (name, seconds) in result: print(f"{seconds:8.4f}\t {name}") print()