Module paroxython.make_db
Collect all infos pertaining to the programs, labels and taxa into a so-called tag database.
Expand source code Browse GitHub
import json
import sqlite3
from bisect import insort
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Dict
import regex # type: ignore
from .goodies import add_line_numbers, print_success
from .label_programs import labelled_programs, iterate_and_print_programs
from .map_taxonomy import Taxonomy
from .user_types import (
LabelInfos,
Labels,
LabelsPoorSpans,
ProgramInfos,
ProgramPath,
ProgramPathSet,
Programs,
ProgramToPrograms,
TaxonInfos,
Taxa,
TaxaPoorSpans,
)
__pdoc__ = {
"TagDatabase": "",
"TagDatabase.__init__": True,
}
class TagDatabase:
def __init__(self, directory: Path, ignore_timestamps: bool = False, **kwargs) -> None:
self.directory = directory
programs: Programs = labelled_programs(directory, **kwargs)
self.labels = collect_labels(programs)
map_labels_on_taxa(programs, Taxonomy(**kwargs))
self.taxa = collect_taxa(programs)
importations = compute_direct_importations(programs)
self.importations = complete_and_collect_importations(importations)
self.exportations = compute_and_collect_exportations(programs, self.importations)
get_timestamp = lambda path: str(datetime.fromtimestamp(path.stat().st_mtime))
if ignore_timestamps:
get_timestamp = lambda path: ""
self.programs_infos: ProgramInfos = {}
for program in programs:
self.programs_infos[program.path] = {
"timestamp": get_timestamp(directory / program.path),
"source": program.source,
"labels": prepared_labels(program.labels),
"taxa": prepared_taxa(program.taxa),
}
def get_json(self) -> str:
data = {
"programs": self.programs_infos,
"labels": dict(sorted(self.labels.items())),
"taxa": dict(sorted(self.taxa.items())),
"importations": dict(self.importations.items()),
"exportations": dict(self.exportations.items()),
}
text = json.dumps(data, indent=2) + "\n"
text = regex.sub(r"\s*\[\s+(\d+),\s+(\d+)\s+\](,?)\s+", r"[\1,\2]\3", text)
return text
def write_json(self, db_path: Optional[Path] = None) -> None:
db_path = db_path or self.directory.parent / f"{self.directory.name}_db.json"
db_path.write_text(self.get_json())
print_success(f"Dumped: {db_path.absolute()}")
def write_sqlite(self, db_path: Optional[Path] = None) -> None:
db_path = db_path or self.directory.parent / f"{self.directory.name}_db.sqlite"
program_rows: List[Tuple] = []
label_rows: List[Tuple] = []
taxon_rows: List[Tuple] = []
for (path, info) in self.programs_infos.items():
source = f"{path}\n\n" + add_line_numbers(info["source"])
program_rows.append((path, info["timestamp"], source))
for (label_name, spans) in info["labels"].items():
(prefix, _, suffix) = label_name.partition(":")
for span in spans:
label_rows.append(
(
label_name,
prefix,
suffix,
"-".join(map(str, span)) if span[0] != span[1] else str(span[0]),
span[0],
span[1],
path,
)
)
for (taxon_name, spans) in info["taxa"].items():
for span in spans:
taxon_rows.append(
(
taxon_name,
"-".join(map(str, span)) if span[0] != span[1] else str(span[0]),
span[0],
span[1],
path,
)
)
db_path.unlink(missing_ok=True)
connexion = sqlite3.connect(db_path)
c = connexion.cursor()
fill = lambda columns: ",".join("?" * len(columns))
program_columns = (
"program TEXT PRIMARY KEY",
"timestamp TEXT",
"source TEXT",
)
c.execute(f"CREATE TABLE program ({','.join(program_columns)})")
c.executemany(f"INSERT INTO program VALUES ({fill(program_columns)})", program_rows)
label_columns = (
# use rowid as primary key
"label TEXT",
"label_prefix TEXT",
"label_suffix TEXT",
"span TEXT",
"span_start INTEGER",
"span_end INTEGER",
"program TEXT",
)
c.execute(
f"CREATE TABLE label ({','.join(label_columns)},"
"FOREIGN KEY (program) REFERENCES program (program))"
)
c.executemany(f"INSERT INTO label VALUES ({fill(label_columns)})", label_rows)
taxon_columns = (
# use rowid as primary key
"taxon TEXT",
"span TEXT",
"span_start INTEGER",
"span_end INTEGER",
"program TEXT",
)
c.execute(
f"CREATE TABLE taxon ({','.join(taxon_columns)},"
"FOREIGN KEY (program) REFERENCES program (program))"
)
c.executemany(f"INSERT INTO taxon VALUES ({fill(taxon_columns)})", taxon_rows)
connexion.commit()
connexion.close()
print_success(f"Dumped: {db_path.absolute()}")
def collect_labels(programs: Programs) -> LabelInfos:
result: LabelInfos = defaultdict(list)
for program in programs:
for label in program.labels:
result[label.name].append(program.path)
return result
def map_labels_on_taxa(programs: Programs, taxonomy: Taxonomy) -> None:
print(f"Mapping taxonomy on {len(programs)} programs.")
for program in iterate_and_print_programs(programs):
program.taxa[:] = taxonomy.to_taxa(program.labels)
# `program` being a tuple, modifying its fields can only be done in place.
def collect_taxa(programs: Programs) -> TaxonInfos:
result: TaxonInfos = defaultdict(list)
for program in programs:
for taxon in program.taxa:
result[taxon.name].append(program.path)
return result
def compute_direct_importations(
programs: Programs, match_import: Callable = regex.compile(r"import_internally:([^:]+)").match
) -> ProgramToPrograms:
importations: Dict = {program.path: set() for program in programs}
for program in programs:
for label in program.labels:
match = match_import(label.name)
if match: # Python 3.8: use assignement-expression
importations[program.path].add(f"{match[1]}.py")
return importations
def complete_and_collect_importations(importations: ProgramToPrograms) -> ProgramToPrograms:
@lru_cache(maxsize=None)
def complete_internal_imports(program_path: ProgramPath) -> ProgramPathSet:
result: ProgramPathSet = set(importations.get(program_path, []))
for imported in list(result): # traverse a copy
result.update(complete_internal_imports(imported))
return result
completed_importations: ProgramToPrograms = {}
for program_path in list(importations.keys()):
completed_importations[program_path] = sorted(complete_internal_imports(program_path))
return completed_importations
def compute_and_collect_exportations(
programs: Programs, importations: ProgramToPrograms
) -> ProgramToPrograms:
exportations: ProgramToPrograms = {program.path: [] for program in programs}
for (importing_path, imported_paths) in importations.items():
for imported_path in imported_paths:
if importing_path not in exportations[imported_path]:
insort(exportations[imported_path], importing_path)
return exportations
def prepared_labels(labels: Labels) -> LabelsPoorSpans:
result: LabelsPoorSpans = {}
for (label_name, spans) in labels:
result[label_name] = [(span.start, span.end) for span in sorted(set(spans))]
return result
def prepared_taxa(taxa: Taxa) -> TaxaPoorSpans:
result: TaxaPoorSpans = {}
for (taxon_name, spans) in taxa:
result[taxon_name] = [(span.start, span.end) for span in sorted(spans)]
return result
if __name__ == "__main__":
# fmt:off
directories = [
# "examples/simple/programs",
# "../algo/programs",
"../Python",
]
# fmt:on
print()
for directory in directories:
db = TagDatabase(Path(directory), ignore_timestamps=True)
db.write_json()
db.write_sqlite()
print()
Functions
def collect_labels(programs: List[Program]) ‑> Dict[LabelName, List[ProgramPath]]
-
Iterate through the given programs to collect all their labels.
Args
programs
:Programs
- A list of
Program
objects, with theirname
andlabels
fields already populated.
Returns
LabelInfos
- A dictionary mapping each label name to the names of the programs featuring it.
Expand source code Browse GitHub
def collect_labels(programs: Programs) -> LabelInfos: result: LabelInfos = defaultdict(list) for program in programs: for label in program.labels: result[label.name].append(program.path) return result
def map_labels_on_taxa(programs: List[Program],
taxonomy: Taxonomy
) ‑> NoneType-
Populate the
taxa
fields of the givenprograms
with the translations of their labels.Args
programs
:Programs
- A list of
Program
objects with theirlabels
field populated. taxonomy
:Taxonomy
- A “translator” of label names into taxon names.
Expand source code Browse GitHub
def map_labels_on_taxa(programs: Programs, taxonomy: Taxonomy) -> None: print(f"Mapping taxonomy on {len(programs)} programs.") for program in iterate_and_print_programs(programs): program.taxa[:] = taxonomy.to_taxa(program.labels)
def collect_taxa(programs: List[Program]) ‑> Dict[TaxonName, List[ProgramPath]]
-
Iterate through the given programs to collect all their taxa.
Args
programs
:Programs
- A list of
Program
objects, with theirname
andtaxa
fields already populated.
Returns
TaxonInfos
- A dictionary mapping each taxon name to the names of the programs featuring it.
Expand source code Browse GitHub
def collect_taxa(programs: Programs) -> TaxonInfos: result: TaxonInfos = defaultdict(list) for program in programs: for taxon in program.taxa: result[taxon.name].append(program.path) return result
def compute_direct_importations(programs: List[Program],
match_import: Callable = <built-in method match of regex>
) ‑> Dict[ProgramPath, List[ProgramPath]]-
Associate each given labelled program to the set of its direct internal imports.
Description
Iterate through the programs, retrieve their labels starting with
"import_internally"
and collect the names of the involved programs.Args
programs
:Programs
- A list of labelled
Program
objects. match_import
:Callable
, optional- A function taking a label name and, in
the case it starts with
"import_internally:"
, returns a match object whose first group is the name of the imported program. Not to be explicitly provided.
Returns
ProgramToPrograms
- A dictionary mapping every program path to the list of the paths of the internal programs it imports directly.
Expand source code Browse GitHub
def compute_direct_importations( programs: Programs, match_import: Callable = regex.compile(r"import_internally:([^:]+)").match ) -> ProgramToPrograms: importations: Dict = {program.path: set() for program in programs} for program in programs: for label in program.labels: match = match_import(label.name) if match: # Python 3.8: use assignement-expression importations[program.path].add(f"{match[1]}.py") return importations
def complete_and_collect_importations(importations: Dict[ProgramPath, List[ProgramPath]]
) ‑> Dict[ProgramPath, List[ProgramPath]]-
Complete the direct internal imports with indirect ones.
Args
importations
:ProgramToPrograms
- A dictionary mapping every program path to the list of the paths of the internal programs it imports directly.
Returns
ProgramToPrograms
- A dictionary mapping every program path to the sorted list of the paths of the internal programs it imports either directly or indirectly.
Expand source code Browse GitHub
def complete_and_collect_importations(importations: ProgramToPrograms) -> ProgramToPrograms: @lru_cache(maxsize=None) def complete_internal_imports(program_path: ProgramPath) -> ProgramPathSet: result: ProgramPathSet = set(importations.get(program_path, [])) for imported in list(result): # traverse a copy result.update(complete_internal_imports(imported)) return result completed_importations: ProgramToPrograms = {} for program_path in list(importations.keys()): completed_importations[program_path] = sorted(complete_internal_imports(program_path)) return completed_importations
def compute_and_collect_exportations(programs: List[Program],
importations: Dict[ProgramPath, List[ProgramPath]]
) ‑> Dict[ProgramPath, List[ProgramPath]]-
Invert
importations
to constructexportations
.Args
programs
:Programs
- A list of named
Program
objects. importations
:ProgramToPrograms
- A dictionary mapping every program path to the list of the paths of the internal programs it imports either directly or indirectly.
Returns
ProgramToPrograms
- A dictionary mapping every program path to the sorted list of the paths of the internal programs it is imported by, either directly or indirectly.
Expand source code Browse GitHub
def compute_and_collect_exportations( programs: Programs, importations: ProgramToPrograms ) -> ProgramToPrograms: exportations: ProgramToPrograms = {program.path: [] for program in programs} for (importing_path, imported_paths) in importations.items(): for imported_path in imported_paths: if importing_path not in exportations[imported_path]: insort(exportations[imported_path], importing_path) return exportations
def prepared_labels(labels: List[Label]) ‑> Dict[LabelName, List[Tuple[int, int]]]
-
Prepare the labels for serialization.
Args
labels
:Labels
- The list of labels to be serialized. Each label consists in a name and a list of spans. Each span is a tuple starting by the actual range of line numbers.
Returns
LabelPoorSpans: A dictionary mapping the label names with the list of their spans, transformed into simple couples of integers.
Example
>>> prepared_labels([ ... ("name_1", [ ... Span(start=1, end=2, path="foo"), ... Span(start=1, end=2, path="foo"), # note the duplicate ... Span(start=1, end=2, path="bar"), # note the difference ... ]), ... ("name_2", [ ... Span(start=2, end=4, path="fizz"), ... Span(start=6, end=7, path="buzz"), ... ]), ... ("name_3", [Span(start=5, end=5, path="foobar")]), ... ]) { "name_1": [(1, 2), (1, 2)], # deduplicated "name_2": [(2, 4), (6, 7)], "name_3": [(5, 5)], }
Expand source code Browse GitHub
def prepared_labels(labels: Labels) -> LabelsPoorSpans: result: LabelsPoorSpans = {} for (label_name, spans) in labels: result[label_name] = [(span.start, span.end) for span in sorted(set(spans))] return result
def prepared_taxa(taxa: List[Taxon]) ‑> Dict[TaxonName, List[Tuple[int, int]]]
-
Prepare the taxa for serialization.
Args
taxa
:Taxa
- The list of taxa to be serialized. Each taxon consists in a name and a bag of spans. Each span is a tuple starting by the actual range of line numbers.
Returns
TaxonPoorSpans: A dictionary mapping the taxon names with the list of their spans, transformed into simple couples of integers.
Example
>>> prepared_taxa([ ... ("name_1", Counter({ ... Span(start=1, end=2, path="foo"): 2, # note the duplicate ... Span(start=1, end=2, path="bar"): 1, ... })), ... ("name_2", Counter({ ... Span(start=2, end=4, path="fizz"): 1, ... Span(start=6, end=7, path="buzz"): 1, ... })), ... ("name_3", Counter({Span(start=5, end=5, path="foobar"): 1})) ... ]) { "name_1": [(1, 2), (1, 2)], # deduplicated "name_2": [(2, 4), (6, 7)], "name_3": [(5, 5)], }
Expand source code Browse GitHub
def prepared_taxa(taxa: Taxa) -> TaxaPoorSpans: result: TaxaPoorSpans = {} for (taxon_name, spans) in taxa: result[taxon_name] = [(span.start, span.end) for span in sorted(spans)] return result
Classes
class TagDatabase
-
Expand source code Browse GitHub
class TagDatabase: def __init__(self, directory: Path, ignore_timestamps: bool = False, **kwargs) -> None: self.directory = directory programs: Programs = labelled_programs(directory, **kwargs) self.labels = collect_labels(programs) map_labels_on_taxa(programs, Taxonomy(**kwargs)) self.taxa = collect_taxa(programs) importations = compute_direct_importations(programs) self.importations = complete_and_collect_importations(importations) self.exportations = compute_and_collect_exportations(programs, self.importations) get_timestamp = lambda path: str(datetime.fromtimestamp(path.stat().st_mtime)) if ignore_timestamps: get_timestamp = lambda path: "" self.programs_infos: ProgramInfos = {} for program in programs: self.programs_infos[program.path] = { "timestamp": get_timestamp(directory / program.path), "source": program.source, "labels": prepared_labels(program.labels), "taxa": prepared_taxa(program.taxa), } def get_json(self) -> str: data = { "programs": self.programs_infos, "labels": dict(sorted(self.labels.items())), "taxa": dict(sorted(self.taxa.items())), "importations": dict(self.importations.items()), "exportations": dict(self.exportations.items()), } text = json.dumps(data, indent=2) + "\n" text = regex.sub(r"\s*\[\s+(\d+),\s+(\d+)\s+\](,?)\s+", r"[\1,\2]\3", text) return text def write_json(self, db_path: Optional[Path] = None) -> None: db_path = db_path or self.directory.parent / f"{self.directory.name}_db.json" db_path.write_text(self.get_json()) print_success(f"Dumped: {db_path.absolute()}") def write_sqlite(self, db_path: Optional[Path] = None) -> None: db_path = db_path or self.directory.parent / f"{self.directory.name}_db.sqlite" program_rows: List[Tuple] = [] label_rows: List[Tuple] = [] taxon_rows: List[Tuple] = [] for (path, info) in self.programs_infos.items(): source = f"{path}\n\n" + add_line_numbers(info["source"]) program_rows.append((path, info["timestamp"], source)) for (label_name, spans) in info["labels"].items(): (prefix, _, suffix) = label_name.partition(":") for span in spans: label_rows.append( ( label_name, prefix, suffix, "-".join(map(str, span)) if span[0] != span[1] else str(span[0]), span[0], span[1], path, ) ) for (taxon_name, spans) in info["taxa"].items(): for span in spans: taxon_rows.append( ( taxon_name, "-".join(map(str, span)) if span[0] != span[1] else str(span[0]), span[0], span[1], path, ) ) db_path.unlink(missing_ok=True) connexion = sqlite3.connect(db_path) c = connexion.cursor() fill = lambda columns: ",".join("?" * len(columns)) program_columns = ( "program TEXT PRIMARY KEY", "timestamp TEXT", "source TEXT", ) c.execute(f"CREATE TABLE program ({','.join(program_columns)})") c.executemany(f"INSERT INTO program VALUES ({fill(program_columns)})", program_rows) label_columns = ( # use rowid as primary key "label TEXT", "label_prefix TEXT", "label_suffix TEXT", "span TEXT", "span_start INTEGER", "span_end INTEGER", "program TEXT", ) c.execute( f"CREATE TABLE label ({','.join(label_columns)}," "FOREIGN KEY (program) REFERENCES program (program))" ) c.executemany(f"INSERT INTO label VALUES ({fill(label_columns)})", label_rows) taxon_columns = ( # use rowid as primary key "taxon TEXT", "span TEXT", "span_start INTEGER", "span_end INTEGER", "program TEXT", ) c.execute( f"CREATE TABLE taxon ({','.join(taxon_columns)}," "FOREIGN KEY (program) REFERENCES program (program))" ) c.executemany(f"INSERT INTO taxon VALUES ({fill(taxon_columns)})", taxon_rows) connexion.commit() connexion.close() print_success(f"Dumped: {db_path.absolute()}")
Methods
def __init__(self,
directory: pathlib.Path,
ignore_timestamps: bool = False,
**kwargs
) ‑> NoneType-
Construct and populate the complete database of program features.
Args
directory
:Path
- The directory to walk (by default, recursively), containing the Python programs of interest.
ignore_timestamps
:bool
, optional- Don't store the last modification dates of these
programms. Since the database snapshots generated by our tests are placed under
version control, this prevents Git from tracking irrelevant differences. Defaults
to
False
. **kwargs
-
May include the keyword arguments:
print_performances
, transmitted tolabelled_programs()
;cleanup_strategy
,skip_pattern
,glob_pattern
, transmitted tolist_programs()
throughlabelled_programs()
;taxonomy_path
, transmitted toTaxonomy
.
Description
- Create a list
programs
ofProgram
objects, labelled bylabelled_programs()
. - Create a dictionary
self.labels
mapping each label name to the names of the programs featuring it (collect_labels()
). - Map the labels to taxa (
Taxonomy
), and populate the fieldtaxa
ofprograms
(map_labels_on_taxa()
). - Create a dictionary
self.taxa
mapping each taxon name to the names of the programs featuring it (collect_taxa()
). - Create a dictionary
self.importations
mapping each program path to the paths of the programs importing it, both directly (compute_direct_importations()
) and indirectly (complete_and_collect_importations()
). - Invert the previous dictionary to create a dictionary
self.exportations
(compute_and_collect_exportations()
). - Gather in
self.programs_infos
each program timestamp, source, labels and taxa as a serialization-ready dictionary indexed by program paths.
Expand source code Browse GitHub
def __init__(self, directory: Path, ignore_timestamps: bool = False, **kwargs) -> None: self.directory = directory programs: Programs = labelled_programs(directory, **kwargs) self.labels = collect_labels(programs) map_labels_on_taxa(programs, Taxonomy(**kwargs)) self.taxa = collect_taxa(programs) importations = compute_direct_importations(programs) self.importations = complete_and_collect_importations(importations) self.exportations = compute_and_collect_exportations(programs, self.importations) get_timestamp = lambda path: str(datetime.fromtimestamp(path.stat().st_mtime)) if ignore_timestamps: get_timestamp = lambda path: "" self.programs_infos: ProgramInfos = {} for program in programs: self.programs_infos[program.path] = { "timestamp": get_timestamp(directory / program.path), "source": program.source, "labels": prepared_labels(program.labels), "taxa": prepared_taxa(program.taxa), }
def get_json(self) ‑> str
-
Dump the constructed
TagDatabase
object as a JSON string.Description
Schema and purpose in the user manual.
Example
See the JSON tag database constructed from the programs of this directory.
Note
For readability purposes, the output of
json.dumps()
is reformatted to fit each span list on a single line, e.g.:"flow/loop/exit/late": [ [ 3, 8 ], [ 6, 7 ] ],
… is unwrap as:
"flow/loop/exit/late": [[3,8],[6,7]],
Expand source code Browse GitHub
def get_json(self) -> str: data = { "programs": self.programs_infos, "labels": dict(sorted(self.labels.items())), "taxa": dict(sorted(self.taxa.items())), "importations": dict(self.importations.items()), "exportations": dict(self.exportations.items()), } text = json.dumps(data, indent=2) + "\n" text = regex.sub(r"\s*\[\s+(\d+),\s+(\d+)\s+\](,?)\s+", r"[\1,\2]\3", text) return text
def write_json(self,
db_path: Union[pathlib.Path, NoneType] = None
) ‑> NoneType-
Call
TagDatabase.get_json()
and write the result to a file.Args
db_path
:Optional[Path]
, optional- If not provided, and the directory provided to the
class constructor is
"foobar"
, falls back to"foobar_db.json"
in the same parent directory. Defaults toNone
.
Expand source code Browse GitHub
def write_json(self, db_path: Optional[Path] = None) -> None: db_path = db_path or self.directory.parent / f"{self.directory.name}_db.json" db_path.write_text(self.get_json()) print_success(f"Dumped: {db_path.absolute()}")
def write_sqlite(self,
db_path: Union[pathlib.Path, NoneType] = None
) ‑> NoneType-
Dump the constructed
TagDatabase
object as a SQLite database (experimental).Args
db_path
:Optional[Path]
, optional- The path of the SQLite database. If not provided,
and the directory provided to the class constructor is
"foobar"
, falls back to"foobar_db.sqlite"
in the same parent directory. Defaults toNone
.
Description
Schema, purpose and example queries in the user manual.
Expand source code Browse GitHub
def write_sqlite(self, db_path: Optional[Path] = None) -> None: db_path = db_path or self.directory.parent / f"{self.directory.name}_db.sqlite" program_rows: List[Tuple] = [] label_rows: List[Tuple] = [] taxon_rows: List[Tuple] = [] for (path, info) in self.programs_infos.items(): source = f"{path}\n\n" + add_line_numbers(info["source"]) program_rows.append((path, info["timestamp"], source)) for (label_name, spans) in info["labels"].items(): (prefix, _, suffix) = label_name.partition(":") for span in spans: label_rows.append( ( label_name, prefix, suffix, "-".join(map(str, span)) if span[0] != span[1] else str(span[0]), span[0], span[1], path, ) ) for (taxon_name, spans) in info["taxa"].items(): for span in spans: taxon_rows.append( ( taxon_name, "-".join(map(str, span)) if span[0] != span[1] else str(span[0]), span[0], span[1], path, ) ) db_path.unlink(missing_ok=True) connexion = sqlite3.connect(db_path) c = connexion.cursor() fill = lambda columns: ",".join("?" * len(columns)) program_columns = ( "program TEXT PRIMARY KEY", "timestamp TEXT", "source TEXT", ) c.execute(f"CREATE TABLE program ({','.join(program_columns)})") c.executemany(f"INSERT INTO program VALUES ({fill(program_columns)})", program_rows) label_columns = ( # use rowid as primary key "label TEXT", "label_prefix TEXT", "label_suffix TEXT", "span TEXT", "span_start INTEGER", "span_end INTEGER", "program TEXT", ) c.execute( f"CREATE TABLE label ({','.join(label_columns)}," "FOREIGN KEY (program) REFERENCES program (program))" ) c.executemany(f"INSERT INTO label VALUES ({fill(label_columns)})", label_rows) taxon_columns = ( # use rowid as primary key "taxon TEXT", "span TEXT", "span_start INTEGER", "span_end INTEGER", "program TEXT", ) c.execute( f"CREATE TABLE taxon ({','.join(taxon_columns)}," "FOREIGN KEY (program) REFERENCES program (program))" ) c.executemany(f"INSERT INTO taxon VALUES ({fill(taxon_columns)})", taxon_rows) connexion.commit() connexion.close() print_success(f"Dumped: {db_path.absolute()}")