import argparse
import csv
import glob
import gzip
import json
import logging
import os
import shutil
import sys
import textwrap
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, TextIO, cast
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import sqlglot
import yaml
from pydantic import BaseModel
from dae.genomic_resources.histogram import (
CategoricalHistogram,
CategoricalHistogramConfig,
HistogramConfig,
NumberHistogram,
NumberHistogramConfig,
)
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.pedigrees.family import ALL_FAMILY_TAG_LABELS
from dae.pedigrees.loader import (
FamiliesLoader,
)
from dae.pheno.common import (
IMPORT_METADATA_TABLE,
DataDictionaryConfig,
ImportManifest,
InferenceConfig,
InstrumentConfig,
MeasureDescriptionsConfig,
MeasureHistogramConfigs,
MeasureType,
PhenoImportConfig,
RegressionMeasure,
StudyConfig,
)
from dae.pheno.db import safe_db_name
from dae.pheno.pheno_data import (
get_pheno_db_dir,
)
from dae.pheno.prepare.measure_classifier import (
InferenceReport,
determine_histogram_type,
inference_reference_impl,
)
from dae.task_graph.cli_tools import TaskCache, TaskGraphCli
from dae.task_graph.executor import task_graph_run_with_results
from dae.task_graph.graph import TaskGraph
from dae.utils.sql_utils import to_duckdb_transpile
from dae.variants.attributes import Status
logger = logging.getLogger(__name__)
# ruff: noqa: S608
[docs]
@dataclass
class ImportInstrument:
files: list[Path]
name: str
delimiter: str
person_column: str
[docs]
def pheno_cli_parser() -> argparse.ArgumentParser:
"""Construct argument parser for phenotype import tool."""
parser = argparse.ArgumentParser(
description="phenotype database import tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-v",
"--verbose",
dest="verbose",
action="count",
help="Set the verbosity level. [default: %(default)s]",
)
parser.add_argument(
"-i",
"--instruments",
dest="instruments",
help="The directory where all instruments are located.",
metavar="<instruments dir>",
)
parser.add_argument(
"--tab-separated",
dest="tab_separated",
action="store_true",
help="Flag for whether the instrument files are tab separated.",
)
parser.add_argument(
"--skip-pheno-common",
dest="skip_pheno_common",
action="store_true",
help="Flag for skipping the building of the pheno common instrument.",
)
parser.add_argument(
"--inference-config",
dest="inference_config",
help="Measure classification type inference configuration",
)
parser.add_argument(
"-p",
"--pedigree",
dest="pedigree",
help="The pedigree file where families descriptions are located.",
metavar="<pedigree file>",
)
parser.add_argument(
"--data-dictionary",
dest="data_dictionary",
help="The tab separated file that contains descriptions of measures.",
metavar="<data dictionary file>",
)
parser.add_argument(
"-o",
"--output",
dest="output",
help="The output directory.",
default="./output",
)
parser.add_argument(
"--pheno-id",
dest="pheno_name",
help="output pheno database name.",
)
parser.add_argument(
"--regression",
help="absolute path to a regression configuration file",
)
parser.add_argument(
"--person-column",
dest="person_column",
default="person_id",
help="The column in instruments files containing the person ID.",
metavar="<person column>",
)
TaskGraphCli.add_arguments(parser, use_commands=False)
return parser
[docs]
def generate_phenotype_data_config(
pheno_name: str,
storage_id: str | None,
overrides: StudyConfig | None,
) -> str:
"""Construct phenotype data configuration from command line arguments."""
config: dict[str, Any] = {
"type": "study",
"id": pheno_name,
"name": pheno_name,
"browser_images_url": "static/images/",
}
if storage_id is not None:
config["phenotype_storage"] = {
"id": storage_id,
"db": f"{pheno_name}/{pheno_name}.db",
}
if overrides is None:
return yaml.dump(config)
overrides_dict = overrides.model_dump()
if overrides.common_report is None:
del overrides_dict["common_report"]
if overrides.person_set_collections is None:
del overrides_dict["person_set_collections"]
config.update(overrides_dict)
return yaml.dump(config)
[docs]
def main(argv: list[str] | None = None) -> int:
"""Run phenotype import tool."""
if argv is None:
argv = sys.argv
if not argv[0].endswith("import_phenotypes"):
logger.warning(
"%s tool is deprecated! Use import_phenotypes.",
argv[0],
)
argv = sys.argv[1:]
# Setup argument parser
parser = pheno_cli_parser()
args = parser.parse_args(argv)
if args.instruments is None:
raise ValueError("missing instruments directory parameter")
if args.pedigree is None:
raise ValueError("missing pedigree filename")
if args.pheno_name is None:
raise ValueError("missing pheno db name")
try:
import_config = transform_cli_args(args)
gpf_instance = get_gpf_instance(import_config)
import_pheno_data(import_config, gpf_instance, args)
except KeyboardInterrupt:
return 0
except ValueError as e:
traceback.print_exc()
program_name = "pheno_import.py"
indent = len(program_name) * " "
sys.stderr.write(program_name + ": " + repr(e) + "\n")
sys.stderr.write(indent + " for help use --help")
return 2
return 0
[docs]
def get_output_parquet_files_dir(import_config: PhenoImportConfig) -> Path:
parquet_dir = Path(import_config.work_dir) / "parquet"
parquet_dir.mkdir(exist_ok=True)
return parquet_dir
[docs]
def get_gpf_instance(config: PhenoImportConfig) -> GPFInstance | None:
"""Return a GPF instance for an import config if it can be found."""
if config.gpf_instance is not None:
return GPFInstance.build(config.gpf_instance.path)
try:
return GPFInstance.build()
except ValueError:
logger.exception("Cannot build GPF instance")
return None
[docs]
def determine_destination(
gpf_instance: GPFInstance | None, config: PhenoImportConfig,
) -> tuple[str | None, Path | None, Path | None]:
"""Determine where output should be placed based on configuration."""
destination_storage_id = None
data_copy_destination = None
config_copy_destination = None
if config.destination is not None and \
config.destination.storage_dir is not None:
data_copy_destination = Path(
config.destination.storage_dir,
config.id,
f"{config.id}.db",
)
config_copy_destination = Path(
config.destination.storage_dir,
config.id,
f"{config.id}.yaml",
)
elif gpf_instance is not None:
if config.destination is not None:
destination_storage_id = config.destination.storage_id
if destination_storage_id is None:
raise ValueError(
"Cannot use gpf instance and "
"destination without a storage id",
)
if destination_storage_id not in gpf_instance.phenotype_storages:
raise ValueError(
f"Phenotype storage {destination_storage_id} not found in"
"instance!",
)
storage = gpf_instance.phenotype_storages.get_phenotype_storage(
destination_storage_id,
)
else:
storage = \
gpf_instance.phenotype_storages.get_default_phenotype_storage()
pheno_dir = get_pheno_db_dir(gpf_instance.dae_config)
config_copy_destination = Path(
pheno_dir,
config.id,
f"{config.id}.yaml",
)
data_copy_destination = \
storage.base_dir / config.id / f"{config.id}.db"
return (
destination_storage_id,
data_copy_destination,
config_copy_destination,
)
[docs]
def import_pheno_data( # pylint: disable=R0912
config: PhenoImportConfig,
gpf_instance: GPFInstance | None = None,
task_graph_args: argparse.Namespace | None = None,
) -> None:
"""Import pheno data into DuckDB."""
os.makedirs(config.work_dir, exist_ok=True)
if task_graph_args is None:
task_graph_args = argparse.Namespace(
jobs=1,
force=True,
no_cache=True,
task_status_dir=None,
dask_cluster_name=None,
dask_cluster_config_file=None,
)
destination_storage_id, data_copy_destination, config_copy_destination = \
determine_destination(gpf_instance, config)
pheno_db_filename = os.path.join(config.work_dir, f"{config.id}.db")
if os.path.exists(pheno_db_filename):
if not task_graph_args.force:
print(
f"Pheno DB already exists at {pheno_db_filename}!\n"
"Use --force or specify another directory.",
)
sys.exit(1)
os.remove(pheno_db_filename)
# INFERENCE CONFIGS
inference_configs: dict[str, Any] = {}
if config.inference_config is not None:
if isinstance(config.inference_config, str):
inference_configs = load_inference_configs(
config.input_dir, config.inference_config,
)
else:
inference_configs = config.inference_config
add_pheno_common_inference(inference_configs)
histogram_configs: MeasureHistogramConfigs | None = None
if config.histogram_configs is not None:
if isinstance(config.histogram_configs, str):
histogram_configs = load_histogram_configs(
config.input_dir, config.histogram_configs,
)
else:
histogram_configs = config.histogram_configs
connection = duckdb.connect(pheno_db_filename)
create_tables(connection)
print("READING PEDIGREE")
start = time.time()
ped_df = read_pedigree(connection, config.input_dir, config.pedigree)
print(f"DONE {time.time() - start}")
print("READING COLUMNS FROM INSTRUMENT FILE")
start = time.time()
instruments = collect_instruments(config)
if not config.skip_pedigree_measures:
instruments.append(ImportInstrument(
[Path(config.input_dir, config.pedigree).absolute()],
"pheno_common",
"\t",
config.person_column,
))
instrument_measure_names = read_instrument_measure_names(instruments)
print(f"DONE {time.time() - start}")
print("READING AND CLASSIFYING INSTRUMENTS")
start = time.time()
task_graph = TaskGraph()
task_cache = TaskCache.create(
force=task_graph_args.force,
cache_dir=task_graph_args.task_status_dir,
no_cache=task_graph_args.no_cache,
)
descriptions = load_descriptions(config.input_dir, config.data_dictionary)
create_import_tasks(
task_graph, instruments,
instrument_measure_names,
inference_configs,
histogram_configs,
config, descriptions,
)
instrument_pq_files = handle_measure_inference_tasks(
task_graph, task_cache, task_graph_args,
)
print(f"DONE {time.time() - start}")
start = time.time()
print("WRITING RESULTS")
write_results(connection, instrument_pq_files, ped_df)
print(f"DONE {time.time() - start}")
ImportManifest.create_table(connection, IMPORT_METADATA_TABLE)
ImportManifest.write_to_db(connection, IMPORT_METADATA_TABLE, config)
connection.close()
regressions = None
regression_config = None
overrides = config.study_config
if overrides is not None:
regression_config = overrides.regressions
if regression_config is not None:
if isinstance(regression_config, str):
reg_file = Path(config.input_dir, regression_config)
with reg_file.open("r") as file:
regs = yaml.safe_load(file)
if not isinstance(regs, dict):
raise TypeError(
"Invalid regressions file, should be a dictionary",
)
regressions = {
reg_id: RegressionMeasure.model_validate(reg)
for reg_id, reg in regs["regression"].items()
}
else:
regressions = regression_config
overrides.regressions = regressions
output_config = generate_phenotype_data_config(
config.id, destination_storage_id, overrides)
pheno_conf_path = Path(config.work_dir, f"{config.id}.yaml")
pheno_conf_path.write_text(output_config)
if config_copy_destination:
config_copy_destination.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy(pheno_conf_path, config_copy_destination)
except shutil.SameFileError:
logger.warning(
"Tried to copy pheno config to where it already is %s",
str(config_copy_destination),
)
if data_copy_destination:
data_copy_destination.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy(pheno_db_filename, data_copy_destination)
except shutil.SameFileError:
logger.warning(
"Tried to copy pheno data to where it already is %s",
str(data_copy_destination),
)
[docs]
def handle_measure_inference_tasks(
task_graph: TaskGraph,
task_cache: TaskCache,
task_graph_args: argparse.Namespace,
) -> dict[str, tuple[Path, Path]]:
"""Read the output of the measure inference tasks into dictionaries."""
instrument_pq_files: dict[str, tuple[Path, Path]] = {}
with TaskGraphCli.create_executor(
task_cache, **vars(task_graph_args),
) as xtor:
try:
task_results = task_graph_run_with_results(task_graph, xtor)
for (
instrument_name, values_filepath, reports_filepath,
) in task_results:
instrument_pq_files[instrument_name] = (
values_filepath, reports_filepath,
)
except Exception:
logger.exception("Failed to classify measure")
return instrument_pq_files
[docs]
def collect_instruments(
import_config: PhenoImportConfig,
) -> list[ImportInstrument]:
"""Collect all instrument files for a given import config."""
all_instruments: dict[str, ImportInstrument] = {}
def handle_path(raw_path: str):
path = Path(import_config.input_dir, raw_path)
matched_paths: list[Path] = []
if "*" in raw_path:
matched_paths.extend(
Path(res).absolute()
for res in glob.glob(str(path), recursive=True))
elif path.is_file():
matched_paths.append(path.absolute())
elif path.is_dir():
for root, _, filenames in os.walk(path):
matched_paths.extend(Path(root, filename).absolute()
for filename in filenames)
matched_paths = list(filter(lambda m: m.suffixes[0] in (".csv", ".txt"),
matched_paths))
for match in matched_paths:
logger.debug("instrument matched: %s", match.name)
instrument_name = match.name.split(".")[0]
if instrument_name not in all_instruments:
all_instruments[instrument_name] = ImportInstrument(
[],
instrument_name,
import_config.delimiter,
import_config.person_column,
)
all_instruments[instrument_name].files.append(match)
def handle_conf(conf: InstrumentConfig):
path = Path(import_config.input_dir, conf.path).absolute()
instrument = conf.instrument \
if conf.instrument is not None \
else path.name.split(".")[0]
delimiter = conf.delimiter \
if conf.delimiter is not None \
else import_config.delimiter
person_column = conf.person_column \
if conf.person_column is not None \
else import_config.person_column
if conf.instrument not in all_instruments:
all_instruments[instrument] = ImportInstrument(
[], instrument, delimiter, person_column,
)
all_instruments[instrument].files.append(path)
for config in import_config.instrument_files:
if isinstance(config, str):
handle_path(config)
elif isinstance(config, InstrumentConfig):
handle_conf(config)
else:
raise TypeError(f"Encountered invalid type while processing"
f"instrument configurations - {type(config)}")
return list(all_instruments.values())
def _transform_value(val: str | bool) -> str | None: # noqa: FBT001
if val == "":
return None
if val == "True":
return "1.0"
if val == "False":
return "0.0"
if isinstance(val, bool):
return str(int(val))
return val
[docs]
class MeasureReport(BaseModel):
measure_name: str
instrument_name: str
db_name: str
measure_type: MeasureType
inference_report: InferenceReport
[docs]
def read_and_classify_measure(
instrument: ImportInstrument,
measure_names: list[str],
descriptions: dict[str, str],
import_config: PhenoImportConfig,
db_names: list[str],
inf_configs: list[InferenceConfig],
hist_configs: MeasureHistogramConfigs | None,
) -> tuple[str, Path, Path]:
"""Read a measure's values and classify from an instrument file."""
person_id_column = import_config.person_column
transformed_measures = defaultdict(dict)
seen_person_ids = set()
person_ids = []
if instrument.name == "pheno_common":
person_id_column = "personId"
for instrument_filepath in instrument.files:
delimiter = instrument.delimiter
if instrument.name == "pheno_common":
delimiter = "\t"
with open_file(instrument_filepath) as csvfile:
reader = csv.DictReader(
filter(lambda x: x.strip() != "", csvfile),
delimiter=delimiter,
)
for row in reader:
person_id = row[person_id_column]
if person_id not in seen_person_ids:
person_ids.append(person_id)
seen_person_ids.add(person_id)
for measure_name in measure_names:
transformed_measures[measure_name][person_id] = \
_transform_value(row[measure_name])
output_table, reports = infer_measures(
instrument,
person_ids,
measure_names,
db_names,
inf_configs,
transformed_measures,
)
output_base_dir = get_output_parquet_files_dir(import_config)
values_parquet_dir = output_base_dir / "values"
values_parquet_dir.mkdir(exist_ok=True)
reports_parquet_dir = output_base_dir / "reports"
reports_parquet_dir.mkdir(exist_ok=True)
out_file = values_parquet_dir / f"{instrument.name}.parquet"
out_file = write_to_parquet(
instrument.name, out_file, reports, output_table,
)
reports_out_file = reports_parquet_dir / f"{instrument.name}.parquet"
reports_out_file = write_reports_to_parquet(
reports_out_file,
reports,
descriptions,
hist_configs,
)
return (instrument.name, out_file, reports_out_file)
[docs]
def infer_measures(
instrument: ImportInstrument,
person_ids: list[str],
measure_names: list[str],
db_names: list[str],
inf_configs: list[InferenceConfig],
measure_person_values: dict[str, dict[str, Any]],
) -> tuple[dict[str, list[Any]], dict[str, MeasureReport]]:
"""Perform inference for measure values of an instrument."""
output_table: dict[str, list[Any]] = {"person_id": person_ids}
reports = {}
m_zip = zip(measure_names,
db_names,
inf_configs,
strict=True)
for (m_name, m_dbname, m_infconf) in m_zip:
m_values = list(measure_person_values[m_name].values())
values, inference_report = inference_reference_impl(
m_values, m_infconf,
)
inference_report.histogram_type = \
determine_histogram_type(inference_report, m_infconf)
m_type: MeasureType = MeasureType.raw
if inference_report.histogram_type == NumberHistogram:
m_type = MeasureType.continuous
elif inference_report.histogram_type == CategoricalHistogram:
if inference_report.value_type is str:
m_type = MeasureType.categorical
else:
m_type = MeasureType.ordinal
report = MeasureReport.model_validate({
"measure_name": instrument.name.strip(),
"instrument_name": m_name.strip(),
"db_name": m_dbname,
"measure_type": m_type,
"inference_report": inference_report,
})
report.instrument_name = instrument.name
report.measure_name = m_name
report.db_name = m_dbname
output_table[m_dbname] = [
values[idx] for idx, _ in enumerate(measure_person_values[m_name])
]
reports[m_name] = report
return output_table, reports
[docs]
def write_to_parquet(
instrument_name: str,
filepath: Path,
reports: dict[str, MeasureReport],
values_table: dict[str, list[Any]],
) -> Path:
"""Write inferred instrument measure values to parquet file."""
column_order = []
string_cols = set()
fields = []
fields.append(
pa.field(
"person_id",
pa.string(),
),
)
for report in reports.values():
if report.instrument_name != instrument_name:
raise ValueError(
f"Failed to create instrument table for {instrument_name}, ",
f"measure {report.measure_name} is of a different instrument: "
f"{report.instrument_name}",
)
val_type = report.inference_report.value_type
if val_type is int:
col_dtype = pa.int32()
elif val_type is float:
col_dtype = pa.float32()
elif val_type is str:
col_dtype = pa.string()
string_cols.add(report.measure_name)
else:
raise ValueError(f"Unsupported value type {val_type}")
column_order.append(report.measure_name)
fields.append(pa.field(report.db_name, col_dtype))
schema = pa.schema(
fields,
)
writer = pq.ParquetWriter(
filepath, schema,
)
batch = pa.RecordBatch.from_pydict(values_table, schema) # type: ignore
table = pa.Table.from_batches([batch], schema)
writer.write_table(table)
return filepath
[docs]
def add_pheno_common_inference(
config: dict[str, Any],
) -> None:
"""Add pedigree columns as skipped columns to the inference config."""
default_cols = [
"familyId",
"personId",
"momId",
"dadId",
"sex",
"status",
"role",
"sample_id",
"layout",
"generated",
"proband",
"not_sequenced",
"missing",
"member_index",
]
default_cols.extend(ALL_FAMILY_TAG_LABELS)
default_cols.append("tag_family_type_full")
for col in default_cols:
entry = f"pheno_common.{col}"
config[entry] = {"skip": True}
[docs]
def load_inference_configs(
input_dir: str,
inference_config_filepath: str | None,
) -> dict[str, Any]:
"""Load import inference configuration file."""
if inference_config_filepath:
return cast(dict[str, Any], yaml.safe_load(
Path(input_dir, inference_config_filepath).read_text(),
))
return {}
[docs]
def load_histogram_configs(
input_dir: str,
histogram_config_filepath: str | None,
) -> MeasureHistogramConfigs | None:
"""Load import histogram configuration file."""
if histogram_config_filepath:
return MeasureHistogramConfigs.model_validate(yaml.safe_load(
Path(input_dir, histogram_config_filepath).read_text(),
))
return None
[docs]
def create_tables(connection: duckdb.DuckDBPyConnection) -> None:
"""Create phenotype data tables in DuckDB."""
create_instrument = sqlglot.parse(textwrap.dedent(
"""
CREATE TABLE instrument(
instrument_name VARCHAR NOT NULL PRIMARY KEY,
table_name VARCHAR NOT NULL,
);
CREATE UNIQUE INDEX instrument_instrument_name_idx
ON instrument (instrument_name);
""",
))
create_measure = sqlglot.parse(textwrap.dedent(
"""
CREATE TABLE measure(
measure_id VARCHAR NOT NULL UNIQUE PRIMARY KEY,
db_column_name VARCHAR NOT NULL,
measure_name VARCHAR NOT NULL,
instrument_name VARCHAR NOT NULL,
description VARCHAR,
measure_type INT,
value_type VARCHAR,
histogram_type VARCHAR,
histogram_config VARCHAR,
individuals INT,
default_filter VARCHAR,
min_value FLOAT,
max_value FLOAT,
values_domain VARCHAR,
rank INT,
);
CREATE UNIQUE INDEX measure_measure_id_idx
ON measure (measure_id);
CREATE INDEX measure_measure_name_idx
ON measure (measure_name);
CREATE INDEX measure_measure_type_idx
ON measure (measure_type);
""",
))
create_family = sqlglot.parse(textwrap.dedent(
"""
CREATE TABLE family(
family_id VARCHAR NOT NULL UNIQUE PRIMARY KEY,
);
CREATE UNIQUE INDEX family_family_id_idx
ON family (family_id);
""",
))
create_person = sqlglot.parse(textwrap.dedent(
f"""
CREATE TABLE person(
family_id VARCHAR NOT NULL,
person_id VARCHAR NOT NULL,
role INT NOT NULL,
status INT NOT NULL DEFAULT {Status.unaffected.value},
sex INT NOT NULL,
sample_id VARCHAR,
dad_id VARCHAR,
mom_id VARCHAR,
layout VARCHAR,
generated BOOLEAN DEFAULT false,
not_sequenced BOOLEAN DEFAULT false,
tag_nuclear_family BOOLEAN DEFAULT false,
tag_quad_family BOOLEAN DEFAULT false,
tag_trio_family BOOLEAN DEFAULT false,
tag_simplex_family BOOLEAN DEFAULT false,
tag_multiplex_family BOOLEAN DEFAULT false,
tag_control_family BOOLEAN DEFAULT false,
tag_affected_dad_family BOOLEAN DEFAULT false,
tag_affected_mom_family BOOLEAN DEFAULT false,
tag_affected_prb_family BOOLEAN DEFAULT false,
tag_affected_sib_family BOOLEAN DEFAULT false,
tag_unaffected_dad_family BOOLEAN DEFAULT false,
tag_unaffected_mom_family BOOLEAN DEFAULT false,
tag_unaffected_prb_family BOOLEAN DEFAULT false,
tag_unaffected_sib_family BOOLEAN DEFAULT false,
tag_male_prb_family BOOLEAN DEFAULT false,
tag_female_prb_family BOOLEAN DEFAULT false,
tag_missing_mom_family BOOLEAN DEFAULT false,
tag_missing_dad_family BOOLEAN DEFAULT false,
member_index INT NOT NULL,
PRIMARY KEY (family_id, person_id)
);
CREATE UNIQUE INDEX person_person_id_idx
ON person (person_id);
""",
))
queries = [
*create_instrument,
*create_measure,
*create_family,
*create_person,
]
with connection.cursor() as cursor:
for query in queries:
cursor.execute(to_duckdb_transpile(query))
[docs]
def open_file(filepath: Path) -> TextIO:
if ".gz" in filepath.suffixes:
return gzip.open(filepath, "rt", encoding="utf-8-sig")
return filepath.open()
[docs]
def read_instrument_measure_names(
instruments: list[ImportInstrument],
) -> dict[str, list[str]]:
"""Read the headers of all the instrument files."""
instrument_measure_names = {}
for instrument in instruments:
file_to_read = instrument.files[0]
with open_file(file_to_read) as csvfile:
reader = filter(
lambda line: len(line) != 0,
csv.reader(csvfile, delimiter=instrument.delimiter),
)
header = next(reader)
instrument_measure_names[instrument.name] = [
col for col in header[1:]
if col != instrument.person_column
]
return instrument_measure_names
[docs]
def read_pedigree(
connection: duckdb.DuckDBPyConnection,
input_dir: str,
pedigree_filepath: str,
) -> pd.DataFrame:
"""
Read a pedigree file into a pandas DataFrame
Also imports the pedigree data into the database.
"""
initial_ped_df = FamiliesLoader.flexible_pedigree_read(
Path(input_dir, pedigree_filepath), enums_as_values=True,
)
families = FamiliesLoader.build_families_data_from_pedigree(
initial_ped_df, {"ped_layout_mode": "generate"},
)
columns: list[str] = [
"family_id",
"person_id",
"role",
"status",
"sex",
"sample_id",
"dad_id",
"mom_id",
"layout",
"generated",
"not_sequenced",
"tag_nuclear_family",
"tag_quad_family",
"tag_trio_family",
"tag_simplex_family",
"tag_multiplex_family",
"tag_control_family",
"tag_affected_dad_family",
"tag_affected_mom_family",
"tag_affected_prb_family",
"tag_affected_sib_family",
"tag_unaffected_dad_family",
"tag_unaffected_mom_family",
"tag_unaffected_prb_family",
"tag_unaffected_sib_family",
"tag_male_prb_family",
"tag_female_prb_family",
"tag_missing_mom_family",
"tag_missing_dad_family",
"member_index",
]
ped_df = families.ped_df
ped_df["role"] = ped_df["role"].apply(lambda x: x.value)
ped_df["status"] = ped_df["status"].apply(lambda x: x.value)
ped_df["sex"] = ped_df["sex"].apply(lambda x: x.value)
with connection.cursor() as cursor:
cursor.execute(
"INSERT INTO family "
"SELECT DISTINCT family_id FROM ped_df",
)
cursor.execute(
f"INSERT INTO person "
f"SELECT {', '.join(columns)} FROM ped_df ",
)
return ped_df
[docs]
def write_results(
connection: duckdb.DuckDBPyConnection,
instrument_pq_files: dict[str, tuple[Path, Path]],
ped_df: pd.DataFrame,
) -> None:
"""Write imported data into duckdb as measure value tables."""
assert ped_df is not None
with connection.cursor() as cursor:
for instrument_name, pq_files in instrument_pq_files.items():
values_file, reports_file = pq_files
table_name = safe_db_name(f"{instrument_name}_measure_values")
cursor.execute(
textwrap.dedent(f"""
INSERT INTO instrument VALUES
('{instrument_name}', '{table_name}')
"""),
)
query = (
"INSERT INTO measure "
f"SELECT * from read_parquet('{reports_file!s}')"
)
cursor.execute(query)
query = (
f"CREATE TABLE {table_name} AS ("
"SELECT "
"ped_df.person_id, ped_df.family_id, "
"ped_df.role, ped_df.status, ped_df.sex, "
"pq_table.* EXCLUDE (person_id) "
"FROM ped_df "
f"RIGHT JOIN read_parquet('{values_file!s}') as pq_table ON "
"ped_df.person_id = pq_table.person_id "
"WHERE ped_df.person_id IS NOT NULL"
")"
)
connection.execute(query)
[docs]
def load_description_file(
input_dir: str,
config: DataDictionaryConfig,
) -> dict[str, str]:
"""Load measure descriptions for single data dictionary."""
out = {}
abspath = Path(input_dir, config.path).absolute()
assert abspath.exists(), abspath
with open_file(abspath) as csvfile:
reader = csv.DictReader(csvfile, delimiter=config.delimiter)
for row in reader:
instrument_name = config.instrument or row[config.instrument_column]
measure_name = row[config.measure_column]
measure_id = f"{instrument_name}.{measure_name}"
out[measure_id] = row[config.description_column]
return out
[docs]
def load_descriptions(
input_dir: str,
config: MeasureDescriptionsConfig | None,
) -> dict[str, str]:
"""Load measure descriptions from given configuration."""
descriptions: dict[str, str] = {}
if not config:
return descriptions
if config.files is not None:
for datadictfile in config.files:
descriptions.update(load_description_file(input_dir, datadictfile))
if config.dictionary is not None:
for instrument, m_descs in config.dictionary.items():
for measure, description in m_descs.items():
measure_id = f"{instrument}.{measure}"
descriptions[measure_id] = description
return descriptions
[docs]
def merge_inference_configs(
inference_configs: dict[str, Any],
instrument_name: str,
measure_name: str,
) -> InferenceConfig:
"""Merge configs by order of specificity"""
inference_config = InferenceConfig()
current_config = inference_config.model_dump()
if "*.*" in inference_configs:
update_config = inference_configs["*.*"]
current_config.update(update_config)
if f"{instrument_name}.*" in inference_configs:
update_config = inference_configs[f"{instrument_name}.*"]
current_config.update(update_config)
if f"*.{measure_name}" in inference_configs:
update_config = inference_configs[f"*.{measure_name}"]
current_config.update(update_config)
if f"{instrument_name}.{measure_name}" in inference_configs:
update_config = inference_configs[
f"{instrument_name}.{measure_name}"
]
current_config.update(update_config)
return InferenceConfig.model_validate(current_config)
[docs]
def merge_histogram_configs(
histogram_configs: MeasureHistogramConfigs | None,
measure_report: MeasureReport,
) -> HistogramConfig | None:
"""Merge configs by order of specificity"""
if measure_report.inference_report.histogram_type is \
NumberHistogram:
histogram_config = NumberHistogramConfig.default_config(None)
if histogram_configs is None:
return None
configs_dict = histogram_configs.number_config
elif measure_report.inference_report.histogram_type is \
CategoricalHistogram:
histogram_config = CategoricalHistogramConfig.default_config()
if histogram_configs is None:
return None
configs_dict = histogram_configs.categorical_config
else:
return None
instrument_name = measure_report.instrument_name
measure_name = measure_report.measure_name
current_config = None
if "*.*" in configs_dict:
update_config = configs_dict["*.*"]
if current_config is None:
current_config = histogram_config.to_dict()
current_config.update(update_config)
if f"{instrument_name}.*" in configs_dict:
update_config = configs_dict[f"{instrument_name}.*"]
if current_config is None:
current_config = histogram_config.to_dict()
current_config.update(update_config)
if f"*.{measure_name}" in configs_dict:
update_config = configs_dict[f"*.{measure_name}"]
if current_config is None:
current_config = histogram_config.to_dict()
current_config.update(update_config)
if f"{instrument_name}.{measure_name}" in configs_dict:
update_config = configs_dict[
f"{instrument_name}.{measure_name}"
]
if current_config is None:
current_config = histogram_config.to_dict()
current_config.update(update_config)
if current_config is None:
return current_config
if measure_report.inference_report.histogram_type is \
NumberHistogram:
return NumberHistogramConfig.from_dict(current_config)
return CategoricalHistogramConfig.from_dict(current_config)
[docs]
def create_import_tasks(
task_graph: TaskGraph,
instruments: list[ImportInstrument],
instrument_measure_names: dict[str, list[str]],
inference_configs: dict[str, Any],
histogram_configs: MeasureHistogramConfigs | None,
import_config: PhenoImportConfig,
descriptions: dict[str, str],
) -> None:
"""Add measure tasks for importing pheno data."""
for instrument in instruments:
seen_col_names: dict[str, int] = defaultdict(int)
table_column_names = [
"person_id", "family_id", "role", "status", "sex",
]
measure_names = instrument_measure_names[instrument.name]
group_measure_names = []
group_db_names = []
group_inf_configs = []
for measure_name in measure_names:
inference_config = merge_inference_configs(
inference_configs, instrument.name, measure_name,
)
if inference_config.skip:
continue
db_name = safe_db_name(measure_name)
if db_name in table_column_names:
seen_col_names[measure_name] += 1
db_name = f"{measure_name}_{seen_col_names[measure_name]}"
else:
seen_col_names[measure_name] += 1
table_column_names.append(db_name)
group_measure_names.append(measure_name)
group_db_names.append(db_name)
group_inf_configs.append(inference_config)
task_graph.create_task(
f"{instrument.name}_read_and_classify",
read_and_classify_measure,
[
instrument,
group_measure_names,
descriptions,
import_config,
group_db_names,
group_inf_configs,
histogram_configs,
],
[],
)
[docs]
def write_reports_to_parquet(
output_file: Path,
reports: dict[str, MeasureReport],
descriptions: dict[str, str],
hist_configs: MeasureHistogramConfigs | None,
) -> Path:
"""Write inferred instrument measure values to parquet file."""
fields = [
pa.field(
"measure_id",
pa.string(),
),
pa.field(
"db_column_name",
pa.string(),
),
pa.field(
"measure_name",
pa.string(),
),
pa.field(
"instrument_name",
pa.string(),
),
pa.field(
"description",
pa.string(),
),
pa.field(
"measure_type",
pa.int32(),
),
pa.field(
"value_type",
pa.string(),
),
pa.field(
"histogram_type",
pa.string(),
),
pa.field(
"histogram_config",
pa.string(),
),
pa.field(
"individuals",
pa.int32(),
),
pa.field(
"default_filter",
pa.string(),
),
pa.field(
"min_value",
pa.float64(),
),
pa.field(
"max_value",
pa.float64(),
),
pa.field(
"values_domain",
pa.string(),
),
pa.field(
"rank",
pa.int32(),
),
]
batch_values = {
"measure_id": [],
"db_column_name": [],
"measure_name": [],
"instrument_name": [],
"description": [],
"measure_type": [],
"value_type": [],
"histogram_type": [],
"histogram_config": [],
"individuals": [],
"default_filter": [],
"min_value": [],
"max_value": [],
"values_domain": [],
"rank": [],
}
for report in reports.values():
histogram_config = merge_histogram_configs(
hist_configs,
report,
)
if histogram_config is None:
histogram_config_json = None
else:
histogram_config_json = json.dumps(histogram_config.to_dict())
m_id = f"{report.instrument_name}.{report.measure_name}"
value_type = report.inference_report.value_type.__name__
histogram_type = report.inference_report.histogram_type.__name__
values_domain = report.inference_report.values_domain.replace(
"'", "''",
)
desc = descriptions.get(m_id, "")
batch_values["measure_id"].append(m_id)
batch_values["db_column_name"].append(report.db_name)
batch_values["measure_name"].append(report.measure_name)
batch_values["instrument_name"].append(report.instrument_name)
batch_values["description"].append(desc)
batch_values["measure_type"].append(report.measure_type.value)
batch_values["value_type"].append(value_type)
batch_values["histogram_type"].append(histogram_type)
batch_values["histogram_config"].append(histogram_config_json)
batch_values["individuals"].append(
report.inference_report.count_with_values)
batch_values["default_filter"].append("")
batch_values["min_value"].append(report.inference_report.min_value)
batch_values["max_value"].append(report.inference_report.max_value)
batch_values["values_domain"].append(values_domain)
batch_values["rank"].append(
report.inference_report.count_unique_values)
schema = pa.schema(
fields,
)
writer = pq.ParquetWriter(
output_file, schema,
)
batch = pa.RecordBatch.from_pydict(batch_values, schema) # type: ignore
table = pa.Table.from_batches([batch], schema)
writer.write_table(table)
return output_file
if __name__ == "__main__":
logger.warning(
"%s tool is deprecated! Use import_phenotypes.",
sys.argv[0],
)
main(sys.argv[1:])