import argparse
import csv
import glob
import gzip
import logging
import os
import sys
import textwrap
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, TextIO, cast
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import sqlglot
import yaml
from pydantic import BaseModel
from sqlglot.expressions import (
Table,
insert,
select,
table_,
)
from dae.genomic_resources.histogram import (
CategoricalHistogram,
NumberHistogram,
)
from dae.pedigrees.family import ALL_FAMILY_TAG_LABELS
from dae.pedigrees.loader import (
FamiliesLoader,
)
from dae.pheno.common import (
DataDictionaryConfig,
InferenceConfig,
InstrumentConfig,
MeasureDescriptionsConfig,
MeasureType,
PhenoImportConfig,
RegressionMeasure,
)
from dae.pheno.db import safe_db_name
from dae.pheno.pheno_data import PhenotypeData, PhenotypeGroup, PhenotypeStudy
from dae.pheno.prepare.measure_classifier import (
InferenceReport,
determine_histogram_type,
inference_reference_impl,
)
from dae.task_graph.cli_tools import TaskCache, TaskGraphCli
from dae.task_graph.executor import task_graph_run_with_results
from dae.task_graph.graph import TaskGraph
from dae.utils.sql_utils import to_duckdb_transpile
from dae.variants.attributes import Status
logger = logging.getLogger(__name__)
# ruff: noqa: S608
IMPORT_METADATA_TABLE = table_("import_metadata")
[docs]
@dataclass
class ImportInstrument:
files: list[Path]
name: str
delimiter: str
person_column: str
[docs]
def pheno_cli_parser() -> argparse.ArgumentParser:
"""Construct argument parser for phenotype import tool."""
parser = argparse.ArgumentParser(
description="phenotype database import tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-v",
"--verbose",
dest="verbose",
action="count",
help="Set the verbosity level. [default: %(default)s]",
)
parser.add_argument(
"-i",
"--instruments",
dest="instruments",
help="The directory where all instruments are located.",
metavar="<instruments dir>",
)
parser.add_argument("--tab-separated",
dest="tab_separated",
action="store_true",
help="Flag for whether the instrument files are tab separated.",
)
parser.add_argument("--skip-pheno-common",
dest="skip_pheno_common",
action="store_true",
help="Flag for skipping the building of the pheno common instrument.",
)
parser.add_argument("--inference-config",
dest="inference_config",
help="Measure classification type inference configuration",
)
parser.add_argument(
"-p",
"--pedigree",
dest="pedigree",
help="The pedigree file where families descriptions are located.",
metavar="<pedigree file>",
)
parser.add_argument(
"--data-dictionary",
dest="data_dictionary",
help="The tab separated file that contains descriptions of measures.",
metavar="<data dictionary file>",
)
parser.add_argument(
"-o", "--output", dest="output",
help="The output directory.", default="./output",
)
parser.add_argument(
"--pheno-id", dest="pheno_name", help="output pheno database name.",
)
parser.add_argument(
"--regression",
help="absolute path to a regression configuration file",
)
parser.add_argument(
"--person-column",
dest="person_column",
default="person_id",
help="The column in instruments files containing the person ID.",
metavar="<person column>",
)
TaskGraphCli.add_arguments(parser, use_commands=False)
return parser
[docs]
def generate_phenotype_data_config(
pheno_name: str,
pheno_db_filename: str,
regressions: dict[str, RegressionMeasure] | None,
) -> str:
"""Construct phenotype data configuration from command line arguments."""
dbfile = os.path.join("%(wd)s", os.path.basename(pheno_db_filename))
config = {
"vars": {"wd": "."},
"type": "study",
"name": pheno_name,
"dbfile": dbfile,
"browser_images_url": "static/images/",
}
if regressions is None:
return yaml.dump(config)
config["regression"] = {
reg_id: reg_measure.model_dump()
for reg_id, reg_measure in regressions.items()
}
if regressions:
for reg in config["regression"].values():
if "measure_name" in reg and reg["measure_name"] is None:
del reg["measure_name"]
if "measure_names" in reg and reg["measure_names"] is None:
del reg["measure_names"]
return yaml.dump(config)
[docs]
def main(argv: list[str] | None = None) -> int:
"""Run phenotype import tool."""
if argv is None:
argv = sys.argv
if not argv[0].endswith("import_phenotypes"):
logger.warning(
"%s tool is deprecated! Use import_phenotypes.",
argv[0],
)
argv = sys.argv[1:]
# Setup argument parser
parser = pheno_cli_parser()
args = parser.parse_args(argv)
if args.instruments is None:
raise ValueError("missing instruments directory parameter")
if args.pedigree is None:
raise ValueError("missing pedigree filename")
if args.pheno_name is None:
raise ValueError("missing pheno db name")
try:
import_config = transform_cli_args(args)
import_pheno_data(import_config, args)
except KeyboardInterrupt:
return 0
except ValueError as e:
traceback.print_exc()
program_name = "pheno_import.py"
indent = len(program_name) * " "
sys.stderr.write(program_name + ": " + repr(e) + "\n")
sys.stderr.write(indent + " for help use --help")
return 2
return 0
[docs]
def get_output_parquet_files_dir(import_config: PhenoImportConfig) -> Path:
parquet_dir = Path(import_config.output_dir) / "parquet"
parquet_dir.mkdir(exist_ok=True)
return parquet_dir
[docs]
def import_pheno_data(
config: PhenoImportConfig,
task_graph_args: argparse.Namespace,
*,
force: bool = False,
) -> None:
"""Import pheno data into DuckDB."""
os.makedirs(config.output_dir, exist_ok=True)
pheno_db_filename = os.path.join(config.output_dir, f"{config.id}.db")
if os.path.exists(pheno_db_filename):
if not force:
print(
f"Pheno DB already exists at {pheno_db_filename}!\n"
"Use --force or specify another directory.",
)
sys.exit(1)
os.remove(pheno_db_filename)
# INFERENCE CONFIGS
inference_configs: dict[str, Any] = {}
if config.inference_config is not None:
if isinstance(config.inference_config, str):
inference_configs = load_inference_configs(
config.input_dir, config.inference_config)
else:
inference_configs = config.inference_config
add_pheno_common_inference(inference_configs)
connection = duckdb.connect(pheno_db_filename)
create_tables(connection)
print("READING PEDIGREE")
start = time.time()
ped_df = read_pedigree(connection, config.input_dir, config.pedigree)
print(f"DONE {time.time() - start}")
print("READING COLUMNS FROM INSTRUMENT FILE")
start = time.time()
instruments = collect_instruments(config)
if not config.skip_pedigree_measures:
instruments.append(ImportInstrument(
[Path(config.input_dir, config.pedigree).absolute()],
"pheno_common",
"\t",
config.person_column,
))
instrument_measure_names = read_instrument_measure_names(instruments)
print(f"DONE {time.time() - start}")
print("READING AND CLASSIFYING INSTRUMENTS")
start = time.time()
task_graph = TaskGraph()
task_cache = TaskCache.create(
force=task_graph_args.force,
cache_dir=task_graph_args.task_status_dir,
)
descriptions = load_descriptions(config.input_dir, config.data_dictionary)
create_import_tasks(
task_graph, instruments,
instrument_measure_names,
inference_configs,
config, descriptions,
)
instrument_pq_files = handle_measure_inference_tasks(
task_graph, task_cache, task_graph_args,
)
print(f"DONE {time.time() - start}")
start = time.time()
print("WRITING RESULTS")
write_results(connection, instrument_pq_files, ped_df)
print(f"DONE {time.time() - start}")
ImportManifest.create_table(connection, IMPORT_METADATA_TABLE)
ImportManifest.write_to_db(connection, IMPORT_METADATA_TABLE, config)
connection.close()
regressions = None
regression_config = None
if config.study_config is not None:
regression_config = config.study_config.regressions
if regression_config is not None:
if isinstance(regression_config, str):
reg_file = Path(config.input_dir, regression_config)
with reg_file.open("r") as file:
regs = yaml.safe_load(file)
if not isinstance(regs, dict):
raise TypeError(
"Invalid regressions file, should be a dictionary",
)
regressions = {
reg_id: RegressionMeasure.model_validate(reg)
for reg_id, reg in regs["regression"].items()
}
else:
regressions = regression_config
output_config = generate_phenotype_data_config(
config.id, pheno_db_filename, regressions)
pheno_conf_path = Path(config.output_dir, f"{config.id}.yaml")
pheno_conf_path.write_text(output_config)
[docs]
def handle_measure_inference_tasks(
task_graph: TaskGraph,
task_cache: TaskCache,
task_graph_args: argparse.Namespace,
) -> dict[str, tuple[Path, Path]]:
"""Read the output of the measure inference tasks into dictionaries."""
instrument_pq_files: dict[str, tuple[Path, Path]] = {}
with TaskGraphCli.create_executor(
task_cache, **vars(task_graph_args),
) as xtor:
try:
task_results = task_graph_run_with_results(task_graph, xtor)
for (
instrument_name, values_filepath, reports_filepath,
) in task_results:
instrument_pq_files[instrument_name] = (
values_filepath, reports_filepath,
)
except Exception:
logger.exception("Failed to classify measure")
return instrument_pq_files
[docs]
def collect_instruments(
import_config: PhenoImportConfig,
) -> list[ImportInstrument]:
"""Collect all instrument files for a given import config."""
all_instruments: dict[str, ImportInstrument] = {}
def handle_path(raw_path: str):
path = Path(import_config.input_dir, raw_path)
matched_paths: list[Path] = []
if "*" in raw_path:
matched_paths.extend(
Path(res).absolute()
for res in glob.glob(str(path), recursive=True))
elif path.is_file():
matched_paths.append(path.absolute())
elif path.is_dir():
for root, _, filenames in os.walk(path):
matched_paths.extend(Path(root, filename).absolute()
for filename in filenames)
matched_paths = list(filter(lambda m: m.suffixes[0] in (".csv", ".txt"),
matched_paths))
for match in matched_paths:
logger.debug("instrument matched: %s", match.name)
instrument_name = match.name.split(".")[0]
if instrument_name not in all_instruments:
all_instruments[instrument_name] = ImportInstrument(
[],
instrument_name,
import_config.delimiter,
import_config.person_column,
)
all_instruments[instrument_name].files.append(match)
def handle_conf(conf: InstrumentConfig):
path = Path(import_config.input_dir, conf.path).absolute()
instrument = conf.instrument \
if conf.instrument is not None \
else path.name.split(".")[0]
delimiter = conf.delimiter \
if conf.delimiter is not None \
else import_config.delimiter
person_column = conf.person_column \
if conf.person_column is not None \
else import_config.person_column
if conf.instrument not in all_instruments:
all_instruments[instrument] = ImportInstrument(
[], instrument, delimiter, person_column,
)
all_instruments[instrument].files.append(path)
for config in import_config.instrument_files:
if isinstance(config, str):
handle_path(config)
elif isinstance(config, InstrumentConfig):
handle_conf(config)
else:
raise TypeError(f"Encountered invalid type while processing"
f"instrument configurations - {type(config)}")
return list(all_instruments.values())
def _transform_value(val: str | bool) -> str | None: # noqa: FBT001
if val == "":
return None
if val == "True":
return "1.0"
if val == "False":
return "0.0"
if isinstance(val, bool):
return str(int(val))
return val
[docs]
class MeasureReport(BaseModel):
measure_name: str
instrument_name: str
db_name: str
measure_type: MeasureType
inference_report: InferenceReport
[docs]
def read_and_classify_measure(
instrument: ImportInstrument,
measure_names: list[str],
descriptions: dict[str, str],
import_config: PhenoImportConfig,
db_names: list[str],
inf_configs: list[InferenceConfig],
) -> tuple[str, Path, Path]:
"""Read a measure's values and classify from an instrument file."""
person_id_column = import_config.person_column
transformed_measures = defaultdict(dict)
seen_person_ids = set()
person_ids = []
if instrument.name == "pheno_common":
person_id_column = "personId"
for instrument_filepath in instrument.files:
delimiter = instrument.delimiter
if instrument.name == "pheno_common":
delimiter = "\t"
with open_file(instrument_filepath) as csvfile:
reader = csv.DictReader(
filter(lambda x: x.strip() != "", csvfile),
delimiter=delimiter,
)
for row in reader:
person_id = row[person_id_column]
if person_id not in seen_person_ids:
person_ids.append(person_id)
seen_person_ids.add(person_id)
for measure_name in measure_names:
transformed_measures[measure_name][person_id] = \
_transform_value(row[measure_name])
output_table, reports = infer_measures(
instrument,
person_ids,
measure_names,
db_names,
inf_configs,
transformed_measures,
)
output_base_dir = get_output_parquet_files_dir(import_config)
values_parquet_dir = output_base_dir / "values"
values_parquet_dir.mkdir(exist_ok=True)
reports_parquet_dir = output_base_dir / "reports"
reports_parquet_dir.mkdir(exist_ok=True)
out_file = values_parquet_dir / f"{instrument.name}.parquet"
out_file = write_to_parquet(
instrument.name, out_file, reports, output_table,
)
reports_out_file = reports_parquet_dir / f"{instrument.name}.parquet"
reports_out_file = write_reports_to_parquet(
reports_out_file,
reports,
descriptions,
)
return (instrument.name, out_file, reports_out_file)
[docs]
def infer_measures(
instrument: ImportInstrument,
person_ids: list[str],
measure_names: list[str],
db_names: list[str],
inf_configs: list[InferenceConfig],
measure_person_values: dict[str, dict[str, Any]],
) -> tuple[dict[str, list[Any]], dict[str, MeasureReport]]:
"""Perform inference for measure values of an instrument."""
output_table: dict[str, list[Any]] = {"person_id": person_ids}
reports = {}
m_zip = zip(measure_names,
db_names,
inf_configs,
strict=True)
for (m_name, m_dbname, m_infconf) in m_zip:
m_values = list(measure_person_values[m_name].values())
values, inference_report = inference_reference_impl(
m_values, m_infconf,
)
inference_report.histogram_type = \
determine_histogram_type(inference_report, m_infconf)
m_type: MeasureType = MeasureType.raw
if inference_report.histogram_type == NumberHistogram:
m_type = MeasureType.continuous
elif inference_report.histogram_type == CategoricalHistogram:
if inference_report.value_type is str:
m_type = MeasureType.categorical
else:
m_type = MeasureType.ordinal
report = MeasureReport.model_validate({
"measure_name": instrument.name.strip(),
"instrument_name": m_name.strip(),
"db_name": m_dbname,
"measure_type": m_type,
"inference_report": inference_report,
})
report.instrument_name = instrument.name
report.measure_name = m_name
report.db_name = m_dbname
output_table[m_dbname] = [
values[idx] for idx, _ in enumerate(measure_person_values[m_name])
]
reports[m_name] = report
return output_table, reports
[docs]
def write_to_parquet(
instrument_name: str,
filepath: Path,
reports: dict[str, MeasureReport],
values_table: dict[str, list[Any]],
) -> Path:
"""Write inferred instrument measure values to parquet file."""
column_order = []
string_cols = set()
fields = []
fields.append(
pa.field(
"person_id",
pa.string(),
),
)
for report in reports.values():
if report.instrument_name != instrument_name:
raise ValueError(
f"Failed to create instrument table for {instrument_name}, ",
f"measure {report.measure_name} is of a different instrument: "
f"{report.instrument_name}",
)
val_type = report.inference_report.value_type
if val_type is int:
col_dtype = pa.int32()
elif val_type is float:
col_dtype = pa.float32()
elif val_type is str:
col_dtype = pa.string()
string_cols.add(report.measure_name)
else:
raise ValueError(f"Unsupported value type {val_type}")
column_order.append(report.measure_name)
fields.append(pa.field(report.db_name, col_dtype))
schema = pa.schema(
fields,
)
writer = pq.ParquetWriter(
filepath, schema,
)
batch = pa.RecordBatch.from_pydict(values_table, schema) # type: ignore
table = pa.Table.from_batches([batch], schema)
writer.write_table(table)
return filepath
[docs]
def add_pheno_common_inference(
config: dict[str, Any],
) -> None:
"""Add pedigree columns as skipped columns to the inference config."""
default_cols = [
"familyId",
"personId",
"momId",
"dadId",
"sex",
"status",
"role",
"sample_id",
"layout",
"generated",
"proband",
"not_sequenced",
"missing",
"member_index",
]
default_cols.extend(ALL_FAMILY_TAG_LABELS)
default_cols.append("tag_family_type_full")
for col in default_cols:
entry = f"pheno_common.{col}"
config[entry] = {"skip": True}
[docs]
def load_inference_configs(
input_dir: str,
inference_config_filepath: str | None,
) -> dict[str, Any]:
"""Load import inference configuration file."""
if inference_config_filepath:
return cast(dict[str, Any], yaml.safe_load(
Path(input_dir, inference_config_filepath).read_text(),
))
return {}
[docs]
class ImportManifest(BaseModel):
"""Import manifest for checking cache validity."""
unix_timestamp: float
import_config: PhenoImportConfig
[docs]
def is_older_than(self, other: "ImportManifest") -> bool:
if self.unix_timestamp < other.unix_timestamp:
return True
return self.import_config != other.import_config
[docs]
@staticmethod
def from_row(row: tuple[str, Any, str]) -> "ImportManifest":
timestamp = float(row[0])
import_config = PhenoImportConfig.model_validate_json(row[1])
return ImportManifest(
unix_timestamp=timestamp,
import_config=import_config,
)
[docs]
@staticmethod
def from_table(
connection: duckdb.DuckDBPyConnection, table: Table,
) -> list["ImportManifest"]:
"""Read manifests from given table."""
with connection.cursor() as cursor:
table_row = cursor.execute(sqlglot.parse_one(
"SELECT * FROM information_schema.tables"
f" WHERE table_name = '{table.alias_or_name}'",
).sql()).fetchone()
if table_row is None:
return []
rows = cursor.execute(select("*").from_(table).sql()).fetchall()
return [ImportManifest.from_row(row) for row in rows]
[docs]
@staticmethod
def from_phenotype_data(
pheno_data: PhenotypeData,
) -> list["ImportManifest"]:
"""Collect all manifests in a phenotype data instance."""
is_group = pheno_data.config["type"] == "group"
if is_group:
leaves = cast(PhenotypeGroup, pheno_data).get_leaves()
else:
leaves = [cast(PhenotypeStudy, pheno_data)]
return [
ImportManifest.from_table(
leaf.db.connection, IMPORT_METADATA_TABLE,
)[0]
for leaf in leaves
]
[docs]
@staticmethod
def create_table(connection: duckdb.DuckDBPyConnection, table: Table):
"""Create table for recording import manifests."""
query = sqlglot.parse_one(
f"CREATE TABLE {table.alias_or_name} "
"(unix_timestamp DOUBLE, import_config VARCHAR)",
).sql()
with connection.cursor() as cursor:
cursor.execute(query)
[docs]
@staticmethod
def write_to_db(
connection: duckdb.DuckDBPyConnection,
table: Table,
import_config: PhenoImportConfig,
):
"""Write manifest into DB on given table."""
config_json = import_config.model_dump_json()
timestamp = time.time()
query = insert(
f"VALUES ({timestamp}, '{config_json}')", table,
)
with connection.cursor() as cursor:
cursor.execute(to_duckdb_transpile(query)).fetchall()
[docs]
def create_tables(connection: duckdb.DuckDBPyConnection) -> None:
"""Create phenotype data tables in DuckDB."""
create_instrument = sqlglot.parse(textwrap.dedent(
"""
CREATE TABLE instrument(
instrument_name VARCHAR NOT NULL PRIMARY KEY,
table_name VARCHAR NOT NULL,
);
CREATE UNIQUE INDEX instrument_instrument_name_idx
ON instrument (instrument_name);
""",
))
create_measure = sqlglot.parse(textwrap.dedent(
"""
CREATE TABLE measure(
measure_id VARCHAR NOT NULL UNIQUE PRIMARY KEY,
db_column_name VARCHAR NOT NULL,
measure_name VARCHAR NOT NULL,
instrument_name VARCHAR NOT NULL,
description VARCHAR,
measure_type INT,
value_type VARCHAR,
histogram_type VARCHAR,
individuals INT,
default_filter VARCHAR,
min_value FLOAT,
max_value FLOAT,
values_domain VARCHAR,
rank INT,
);
CREATE UNIQUE INDEX measure_measure_id_idx
ON measure (measure_id);
CREATE INDEX measure_measure_name_idx
ON measure (measure_name);
CREATE INDEX measure_measure_type_idx
ON measure (measure_type);
""",
))
create_family = sqlglot.parse(textwrap.dedent(
"""
CREATE TABLE family(
family_id VARCHAR NOT NULL UNIQUE PRIMARY KEY,
);
CREATE UNIQUE INDEX family_family_id_idx
ON family (family_id);
""",
))
create_person = sqlglot.parse(textwrap.dedent(
f"""
CREATE TABLE person(
family_id VARCHAR NOT NULL,
person_id VARCHAR NOT NULL,
role INT NOT NULL,
status INT NOT NULL DEFAULT {Status.unaffected.value},
sex INT NOT NULL,
sample_id VARCHAR,
PRIMARY KEY (family_id, person_id)
);
CREATE UNIQUE INDEX person_person_id_idx
ON person (person_id);
""",
))
queries = [
*create_instrument,
*create_measure,
*create_family,
*create_person,
]
with connection.cursor() as cursor:
for query in queries:
cursor.execute(to_duckdb_transpile(query))
[docs]
def open_file(filepath: Path) -> TextIO:
if ".gz" in filepath.suffixes:
return gzip.open(filepath, "rt", encoding="utf-8-sig")
return filepath.open()
[docs]
def read_instrument_measure_names(
instruments: list[ImportInstrument],
) -> dict[str, list[str]]:
"""Read the headers of all the instrument files."""
instrument_measure_names = {}
for instrument in instruments:
file_to_read = instrument.files[0]
with open_file(file_to_read) as csvfile:
reader = filter(
lambda line: len(line) != 0,
csv.reader(csvfile, delimiter=instrument.delimiter),
)
header = next(reader)
instrument_measure_names[instrument.name] = [
col for col in header[1:]
if col != instrument.person_column
]
return instrument_measure_names
[docs]
def read_pedigree(
connection: duckdb.DuckDBPyConnection,
input_dir: str,
pedigree_filepath: str,
) -> pd.DataFrame:
"""
Read a pedigree file into a pandas DataFrame
Also imports the pedigree data into the database.
"""
ped_df = FamiliesLoader.flexible_pedigree_read(
Path(input_dir, pedigree_filepath), enums_as_values=True,
)
with connection.cursor() as cursor:
cursor.execute(
"INSERT INTO family "
"SELECT DISTINCT family_id FROM ped_df",
)
cursor.execute(
"INSERT INTO person "
"SELECT family_id, person_id, "
"role, status, sex, sample_id FROM ped_df ",
)
return ped_df
[docs]
def write_results(
connection: duckdb.DuckDBPyConnection,
instrument_pq_files: dict[str, tuple[Path, Path]],
ped_df: pd.DataFrame,
) -> None:
"""Write imported data into duckdb as measure value tables."""
assert ped_df is not None
with connection.cursor() as cursor:
for instrument_name, pq_files in instrument_pq_files.items():
values_file, reports_file = pq_files
table_name = safe_db_name(f"{instrument_name}_measure_values")
cursor.execute(
textwrap.dedent(f"""
INSERT INTO instrument VALUES
('{instrument_name}', '{table_name}')
"""),
)
query = (
"INSERT INTO measure "
f"SELECT * from read_parquet('{reports_file!s}')"
)
cursor.execute(query)
query = (
f"CREATE TABLE {table_name} AS ("
"SELECT "
"ped_df.person_id, ped_df.family_id, "
"ped_df.role, ped_df.status, ped_df.sex, "
"pq_table.* EXCLUDE (person_id) "
"FROM ped_df "
f"RIGHT JOIN read_parquet('{values_file!s}') as pq_table ON "
"ped_df.person_id = pq_table.person_id "
"WHERE ped_df.person_id IS NOT NULL"
")"
)
connection.execute(query)
[docs]
def load_description_file(
input_dir: str,
config: DataDictionaryConfig,
) -> dict[str, str]:
"""Load measure descriptions for single data dictionary."""
out = {}
abspath = Path(input_dir, config.path).absolute()
assert abspath.exists(), abspath
with open_file(abspath) as csvfile:
reader = csv.DictReader(csvfile, delimiter=config.delimiter)
for row in reader:
instrument_name = config.instrument or row[config.instrument_column]
measure_name = row[config.measure_column]
measure_id = f"{instrument_name}.{measure_name}"
out[measure_id] = row[config.description_column]
return out
[docs]
def load_descriptions(
input_dir: str,
config: MeasureDescriptionsConfig | None,
) -> dict[str, str]:
"""Load measure descriptions from given configuration."""
descriptions: dict[str, str] = {}
if not config:
return descriptions
if config.files is not None:
for datadictfile in config.files:
descriptions.update(load_description_file(input_dir, datadictfile))
if config.dictionary is not None:
for instrument, m_descs in config.dictionary.items():
for measure, description in m_descs.items():
measure_id = f"{instrument}.{measure}"
descriptions[measure_id] = description
return descriptions
[docs]
def merge_inference_configs(
inference_configs: dict[str, Any],
instrument_name: str,
measure_name: str,
) -> InferenceConfig:
"""Merge configs by order of specificity"""
inference_config = InferenceConfig()
current_config = inference_config.model_dump()
if "*.*" in inference_configs:
update_config = inference_configs["*.*"]
current_config.update(update_config)
if f"{instrument_name}.*" in inference_configs:
update_config = inference_configs[f"{instrument_name}.*"]
current_config.update(update_config)
if f"*.{measure_name}" in inference_configs:
update_config = inference_configs[f"*.{measure_name}"]
current_config.update(update_config)
if f"{instrument_name}.{measure_name}" in inference_configs:
update_config = inference_configs[
f"{instrument_name}.{measure_name}"
]
current_config.update(update_config)
return InferenceConfig.model_validate(current_config)
[docs]
def create_import_tasks(
task_graph: TaskGraph,
instruments: list[ImportInstrument],
instrument_measure_names: dict[str, list[str]],
inference_configs: dict[str, Any],
import_config: PhenoImportConfig,
descriptions: dict[str, str],
) -> None:
"""Add measure tasks for importing pheno data."""
for instrument in instruments:
seen_col_names: dict[str, int] = defaultdict(int)
table_column_names = [
"person_id", "family_id", "role", "status", "sex",
]
measure_names = instrument_measure_names[instrument.name]
group_measure_names = []
group_db_names = []
group_inf_configs = []
for measure_name in measure_names:
inference_config = merge_inference_configs(
inference_configs, instrument.name, measure_name,
)
if inference_config.skip:
continue
db_name = safe_db_name(measure_name)
if db_name in table_column_names:
seen_col_names[measure_name] += 1
db_name = f"{measure_name}_{seen_col_names[measure_name]}"
else:
seen_col_names[measure_name] += 1
table_column_names.append(db_name)
group_measure_names.append(measure_name)
group_db_names.append(db_name)
group_inf_configs.append(inference_config)
task_graph.create_task(
f"{instrument.name}_read_and_classify",
read_and_classify_measure,
[
instrument,
group_measure_names,
descriptions,
import_config,
group_db_names,
group_inf_configs,
],
[],
)
[docs]
def write_reports_to_parquet(
output_file: Path,
reports: dict[str, MeasureReport],
descriptions: dict[str, str],
) -> Path:
"""Write inferred instrument measure values to parquet file."""
fields = [
pa.field(
"measure_id",
pa.string(),
),
pa.field(
"db_column_name",
pa.string(),
),
pa.field(
"measure_name",
pa.string(),
),
pa.field(
"instrument_name",
pa.string(),
),
pa.field(
"description",
pa.string(),
),
pa.field(
"measure_type",
pa.int32(),
),
pa.field(
"value_type",
pa.string(),
),
pa.field(
"histogram_type",
pa.string(),
),
pa.field(
"individuals",
pa.int32(),
),
pa.field(
"default_filter",
pa.string(),
),
pa.field(
"min_value",
pa.float32(),
),
pa.field(
"max_value",
pa.float32(),
),
pa.field(
"values_domain",
pa.string(),
),
pa.field(
"rank",
pa.int32(),
),
]
batch_values = {
"measure_id": [],
"db_column_name": [],
"measure_name": [],
"instrument_name": [],
"description": [],
"measure_type": [],
"value_type": [],
"histogram_type": [],
"individuals": [],
"default_filter": [],
"min_value": [],
"max_value": [],
"values_domain": [],
"rank": [],
}
for report in reports.values():
m_id = f"{report.instrument_name}.{report.measure_name}"
value_type = report.inference_report.value_type.__name__
histogram_type = report.inference_report.histogram_type.__name__
values_domain = report.inference_report.values_domain.replace(
"'", "''",
)
desc = descriptions.get(m_id, "")
batch_values["measure_id"].append(m_id)
batch_values["db_column_name"].append(report.db_name)
batch_values["measure_name"].append(report.measure_name)
batch_values["instrument_name"].append(report.instrument_name)
batch_values["description"].append(desc)
batch_values["measure_type"].append(report.measure_type.value)
batch_values["value_type"].append(value_type)
batch_values["histogram_type"].append(histogram_type)
batch_values["individuals"].append(
report.inference_report.count_with_values)
batch_values["default_filter"].append("")
batch_values["min_value"].append(report.inference_report.min_value)
batch_values["max_value"].append(report.inference_report.max_value)
batch_values["values_domain"].append(values_domain)
batch_values["rank"].append(
report.inference_report.count_unique_values)
schema = pa.schema(
fields,
)
writer = pq.ParquetWriter(
output_file, schema,
)
batch = pa.RecordBatch.from_pydict(batch_values, schema) # type: ignore
table = pa.Table.from_batches([batch], schema)
writer.write_table(table)
return output_file
if __name__ == "__main__":
logger.warning(
"%s tool is deprecated! Use import_phenotypes.",
sys.argv[0],
)
main(sys.argv[1:])