Module paroxython.derived_labels_db
An in-memory database collecting the labels of a given program.
Main table
The database is not really relational. It merely consists in a table t
where each row is a
distinct occurrence of every label featured by a given program. Initially, t
is populated with
the labels found by the queries specified by a regular expression of spec.md
.
The SQL queries are then executed, and the resulting labels (known as derived) inserted into t
as they are found.
Note that the results of a given SQL query may themselves be required by a subsequent SQL query.
Example
The following simple SQL query of spec.md
finds the loops of a program. The resulting label names
(e.g., "loop:while"
) consist in a fixed prefix "loop"
and a suffix denoting the category of
the loop (either "for"
or "while"
).
SELECT "loop",
name_prefix,
span,
path
FROM t
WHERE name_prefix IN ("for", "while")
GROUP BY path
The execution of this query comes after the evaluation of all regular expressions, in particular
those collecting the features "for"
and "while"
. It simply gathers them under the prefix
"loop"
, and qualifies them by their category.
The resulting labels will later be used by other SQL queries to search for features such as
"loop_with_break"
, "loop_with_late_exit"
, "loop_with_else"
, and so on.
Subtables
For simplicity and performance reasons, a bunch of subtables are extracted on the fly from t
.
Actually, every time a query relies on a non-existing table named t_PREFIX
, this table is
dynamically created as the set of rows of t
whose value on column name_prefix
is PREFIX
.
Example
The following SQL query of spec.md
relies on the previous one to find the loops featuring a
break
statement. It is more complicated, since it suffixes "loop_with_break"
with the span of
the smallest enclosing loop. Anyway, it relies on the results of both the previous SQL query and a
trivial regular expression matching the break
statements.
SELECT "loop_with_break",
l.name_suffix,
max(l.span_start) || "-" || min(l.span_end),
max(l.path)
FROM t_loop l
JOIN t_break b ON (b.path GLOB l.path || "*-")
GROUP BY b.rowid
Instead of referring the corresponding rows in t
(which would complicate the query), two
subtables restricted to them are referred, t_loop
and t_break
. For example, if a t_loop
subtable does not exist yet, the system extracts it from t
by executing:
CREATE TABLE t_loop AS
SELECT *
FROM t
WHERE name_prefix = 'loop'
The newly created table is then available for all subsequent queries that may need it, which can significantly speed up their execution.
Note
Using views instead of tables turns out to be slightly slower. Also, for whatever mysterious reason, several queries produce an incorrect result.
Expand source code Browse GitHub
import sqlite3
from collections import defaultdict
from typing import Any, Set, Callable, Dict
import regex # type: ignore
from .goodies import couple_to_string, print_warning
from .user_types import Label, LabelName, Labels, Query, Span
class DerivedLabelsDatabase:
columns = (
# use rowid as primary key:
"name TEXT",
"name_prefix TEXT",
"name_suffix TEXT",
"span TEXT",
"span_start INT",
"span_end INT",
"path TEXT",
)
def __init__(self):
self.c = sqlite3.connect(":memory:")
self.c.create_function("regexp", narg=2, func=lambda rex, s: bool(regex.match(rex, s)))
# References:
# - https://www.sqlite.org/inmemorydb.html
# - https://www.sqlite.org/lang_expr.html#regexp
# - https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function
def create(
self,
labels: Labels,
create_main_table: str = f"CREATE TABLE t ({','.join(columns)})", # never provided
) -> None:
self.c.execute(create_main_table)
self.update(labels)
self.subtables: Set[LabelName] = set()
def read(
self,
query: Query,
prerequisites: Callable = regex.compile(r"(?m)\b(?:FROM|JOIN) t_(\w+)").findall,
create_subtable: str = "CREATE TABLE t_{0} AS SELECT * FROM t WHERE name_prefix = '{0}'",
) -> Labels:
# Parse the prerequisites of the query, and create the corresponding subtables if needed.
for label_name in prerequisites(query):
if label_name not in self.subtables:
self.c.execute(create_subtable.format(label_name))
self.subtables.add(label_name)
self.c.commit()
# Execute the query and return the resulting labels, deduplicated by (label_name, path).
labels_spans: Dict[LabelName, Dict[Span, Any]] = defaultdict(dict)
try:
row_iterator = self.c.execute(query)
except Exception as exception: # pragma: no cover
print_warning(f"problem in the following query:\n\n{query}\n")
raise exception
for (name_prefix, name_suffix, span_string, path) in row_iterator:
label_name = f"{name_prefix}:{name_suffix}" if name_suffix != "" else name_prefix
span = span_string.split("-")
labels_spans[label_name][Span(int(span[0]), int(span[-1]), path)] = None
return [Label(name, list(spans)) for (name, spans) in labels_spans.items()]
def update(
self,
labels: Labels,
update_query: str = f"INSERT INTO t VALUES ({','.join('?' * len(columns))})",
) -> None:
rows = []
for (name, spans) in labels:
for span in spans:
(prefix, _, suffix) = name.partition(":") # split on the first colon
span_string = couple_to_string(span)
rows.append((name, prefix, suffix, span_string, span.start, span.end, span.path))
self.c.executemany(update_query, rows)
def delete(self) -> None:
self.c.execute("DROP TABLE t")
for label_name in self.subtables:
self.c.execute(f"DROP TABLE t_{label_name}")
def __str__(self) -> str: # pragma: no cover
rows = ["name name_prefix name_suffix span span_start span_end path".split()]
rows.extend(sorted(list(map(str, row)) for row in self.c.execute("SELECT * FROM t")))
widths = list(map(len, [max(column, key=len) for column in zip(*rows)]))
result = [""]
for row in rows:
result.append(" | ".join(s + " " * (w - len(s)) for (w, s) in zip(widths, row)))
result[1:1] = ["-+-".join("-" * w for w in widths)]
return "\n".join(result)
Classes
class DerivedLabelsDatabase
-
Expand source code Browse GitHub
class DerivedLabelsDatabase: columns = ( # use rowid as primary key: "name TEXT", "name_prefix TEXT", "name_suffix TEXT", "span TEXT", "span_start INT", "span_end INT", "path TEXT", ) def __init__(self): self.c = sqlite3.connect(":memory:") self.c.create_function("regexp", narg=2, func=lambda rex, s: bool(regex.match(rex, s))) # References: # - https://www.sqlite.org/inmemorydb.html # - https://www.sqlite.org/lang_expr.html#regexp # - https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function def create( self, labels: Labels, create_main_table: str = f"CREATE TABLE t ({','.join(columns)})", # never provided ) -> None: self.c.execute(create_main_table) self.update(labels) self.subtables: Set[LabelName] = set() def read( self, query: Query, prerequisites: Callable = regex.compile(r"(?m)\b(?:FROM|JOIN) t_(\w+)").findall, create_subtable: str = "CREATE TABLE t_{0} AS SELECT * FROM t WHERE name_prefix = '{0}'", ) -> Labels: # Parse the prerequisites of the query, and create the corresponding subtables if needed. for label_name in prerequisites(query): if label_name not in self.subtables: self.c.execute(create_subtable.format(label_name)) self.subtables.add(label_name) self.c.commit() # Execute the query and return the resulting labels, deduplicated by (label_name, path). labels_spans: Dict[LabelName, Dict[Span, Any]] = defaultdict(dict) try: row_iterator = self.c.execute(query) except Exception as exception: # pragma: no cover print_warning(f"problem in the following query:\n\n{query}\n") raise exception for (name_prefix, name_suffix, span_string, path) in row_iterator: label_name = f"{name_prefix}:{name_suffix}" if name_suffix != "" else name_prefix span = span_string.split("-") labels_spans[label_name][Span(int(span[0]), int(span[-1]), path)] = None return [Label(name, list(spans)) for (name, spans) in labels_spans.items()] def update( self, labels: Labels, update_query: str = f"INSERT INTO t VALUES ({','.join('?' * len(columns))})", ) -> None: rows = [] for (name, spans) in labels: for span in spans: (prefix, _, suffix) = name.partition(":") # split on the first colon span_string = couple_to_string(span) rows.append((name, prefix, suffix, span_string, span.start, span.end, span.path)) self.c.executemany(update_query, rows) def delete(self) -> None: self.c.execute("DROP TABLE t") for label_name in self.subtables: self.c.execute(f"DROP TABLE t_{label_name}") def __str__(self) -> str: # pragma: no cover rows = ["name name_prefix name_suffix span span_start span_end path".split()] rows.extend(sorted(list(map(str, row)) for row in self.c.execute("SELECT * FROM t"))) widths = list(map(len, [max(column, key=len) for column in zip(*rows)])) result = [""] for row in rows: result.append(" | ".join(s + " " * (w - len(s)) for (w, s) in zip(widths, row))) result[1:1] = ["-+-".join("-" * w for w in widths)] return "\n".join(result)
Class variables
var columns
-
The structure of the main table
t
, consisting of the following columns:Column Type Description and example rowid
INT
Primary key (automatically generated) name
TEXT
Complete name of the label, e.g. "loop_with_early_exit:for:break"
name_prefix
TEXT
Its prefix (part before the first colon, here "loop_with_early_exit"
name_suffix
TEXT
Its suffix (may contain one or more colons), here "for:break"
span
TEXT
Numbers of the first and last lines of the feature as a hyphen-separated string, e.g. "2-8"
span_start
INT
Its first line number, here 2
span_end
INT
Its first line number, here 8
path
TEXT
String encoding the beginning of the feature as its path from the root of the AST, e.g. "3-5-3-0-1-2-2-1-"
Methods
def create(self,
labels: List[Label],
create_main_table: str = 'CREATE TABLE t (name TEXT,name_prefix TEXT,name_suffix TEXT,span TEXT,span_start INT,span_end INT,path TEXT)'
) ‑> NoneType-
Create the main table and populate it with regex-specified labels.
Args
labels
:Labels
- The label names and spans featured by a certain program. Only those
specified by a regular expression in
spec.md
are included. It is the very purpose of this module to add the remaining ones, specified by a SQL query. create_main_table
:str
, optional- The SQL query creating the table. Not to be
explicitly provided. Defaults to
f"CREATE TABLE t ({','.join(columns)})"
.
Expand source code Browse GitHub
def create( self, labels: Labels, create_main_table: str = f"CREATE TABLE t ({','.join(columns)})", # never provided ) -> None: self.c.execute(create_main_table) self.update(labels) self.subtables: Set[LabelName] = set()
def read(self,
query: Query,
prerequisites: Callable = <built-in method findall of regex>,
create_subtable: str = "CREATE TABLE t_{0} AS SELECT * FROM t WHERE name_prefix = '{0}'"
) ‑> List[Label]-
Apply the given SQL query to the currently known labels, and returns the new labels.
Args
query
:Query
- The SQL specification of a certain feature, as defined in
spec.md
. prerequisites
:Callable
, optional- A function taking an SQL query, and returning the
table names referred to in the FROM clause.
Not to be explicitly provided.
Defaults to
regex.compile(r"(?m)\b(?:FROM|JOIN) t_(\w+)").findall
. create_subtable
:str
, optional- The SQL query creating a subtable from a label prefix.
Not to be explicitly provided.
Defaults to
"CREATE TABLE t_{0} AS SELECT * FROM t WHERE name_prefix = '{0}'"
.
Returns
Labels
- The additional labels found by the given SQL query.
Note
As a side-effect, create in the database any non-existing subtable of
t
referred to as a prerequisite.Expand source code Browse GitHub
def read( self, query: Query, prerequisites: Callable = regex.compile(r"(?m)\b(?:FROM|JOIN) t_(\w+)").findall, create_subtable: str = "CREATE TABLE t_{0} AS SELECT * FROM t WHERE name_prefix = '{0}'", ) -> Labels: # Parse the prerequisites of the query, and create the corresponding subtables if needed. for label_name in prerequisites(query): if label_name not in self.subtables: self.c.execute(create_subtable.format(label_name)) self.subtables.add(label_name) self.c.commit() # Execute the query and return the resulting labels, deduplicated by (label_name, path). labels_spans: Dict[LabelName, Dict[Span, Any]] = defaultdict(dict) try: row_iterator = self.c.execute(query) except Exception as exception: # pragma: no cover print_warning(f"problem in the following query:\n\n{query}\n") raise exception for (name_prefix, name_suffix, span_string, path) in row_iterator: label_name = f"{name_prefix}:{name_suffix}" if name_suffix != "" else name_prefix span = span_string.split("-") labels_spans[label_name][Span(int(span[0]), int(span[-1]), path)] = None return [Label(name, list(spans)) for (name, spans) in labels_spans.items()]
def update(self,
labels: List[Label],
update_query: str = 'INSERT INTO t VALUES (?,?,?,?,?,?,?)'
) ‑> NoneType-
Insert a set of labels into the main table
t
.Args
labels
:Labels
- The labels to be inserted.
update_query
:str
, optional- The SQL query inserting a row in
t
. Not to be explicitly provided. Defaults tof"INSERT INTO t VALUES ({','.join('?' * len(columns))})"
.
Expand source code Browse GitHub
def update( self, labels: Labels, update_query: str = f"INSERT INTO t VALUES ({','.join('?' * len(columns))})", ) -> None: rows = [] for (name, spans) in labels: for span in spans: (prefix, _, suffix) = name.partition(":") # split on the first colon span_string = couple_to_string(span) rows.append((name, prefix, suffix, span_string, span.start, span.end, span.path)) self.c.executemany(update_query, rows)
def delete(self) ‑> NoneType
-
Empty the database after having executed all the SQL queries.
Expand source code Browse GitHub
def delete(self) -> None: self.c.execute("DROP TABLE t") for label_name in self.subtables: self.c.execute(f"DROP TABLE t_{label_name}")