# pylint: disable=too-many-lines
from __future__ import annotations
import json
import logging
import math
import mimetypes
import os
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable, Sequence
from functools import cached_property
from itertools import chain, islice
from pathlib import Path
from typing import Any, cast
import duckdb
import pandas as pd
from dae.common_reports.common_report import CommonReport
from dae.common_reports.family_report import FamiliesReport
from dae.common_reports.people_counter import PeopleReport
from dae.genomic_resources.histogram import (
CategoricalHistogram,
CategoricalHistogramConfig,
Histogram,
HistogramConfig,
NullHistogram,
NumberHistogram,
NumberHistogramConfig,
)
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Person
from dae.pedigrees.loader import FamiliesLoader
from dae.person_sets.person_sets import (
PersonSetCollection,
PersonSetCollectionConfig,
parse_person_set_collections_study_config,
)
from dae.pheno.browser import PhenoBrowser
from dae.pheno.common import IMPORT_METADATA_TABLE, ImportManifest, MeasureType
from dae.pheno.db import PhenoDb
from dae.studies.study import CommonStudyMixin
from dae.utils.helpers import isnan
from dae.variants.attributes import Role, Sex, Status
logger = logging.getLogger(__name__)
[docs]
def get_pheno_db_dir(dae_config: dict | None) -> str:
"""Return the directory where phenotype data configurations are located."""
if dae_config is not None:
if (
dae_config.get("phenotype_data") is None
or dae_config["phenotype_data"]["dir"] is None
):
pheno_data_dir = os.path.join(dae_config["conf_dir"], "pheno")
else:
pheno_data_dir = dae_config["phenotype_data"]["dir"]
else:
pheno_data_dir = os.path.join(os.environ.get("DAE_DB_DIR", ""), "pheno")
return pheno_data_dir
[docs]
def get_pheno_browser_images_dir(dae_config: dict | None = None) -> Path:
"""Get images directory for pheno DB."""
if dae_config is None:
pheno_data_dir = get_pheno_db_dir(dae_config)
return Path(pheno_data_dir, "images")
images_dir = dae_config.get("phenotype_images")
if images_dir is not None:
return Path(images_dir)
cache_dir = dae_config.get("cache_path")
if cache_dir is None:
images_path = Path(get_pheno_db_dir(dae_config), "images")
else:
images_path = Path(cache_dir, "images")
return images_path
[docs]
class Instrument:
"""
Instrument object represents phenotype instruments.
Common fields are:
* `instrument_name`
* `measures` -- dictionary of all measures in the instrument
"""
def __init__(self, name: str) -> None:
self.instrument_name = name
self.measures: dict[str, Measure] = {}
def __repr__(self) -> str:
return f"Instrument({self.instrument_name}, {len(self.measures)})"
[docs]
class Measure:
"""
Measure objects represent phenotype measures.
Common fields are:
* `instrument_name`
* `measure_name`
* `measure_id` - formed by `instrument_name`.`measure_name`
* `measure_type` - one of 'continuous', 'ordinal', 'categorical'
* `value_type` - one of 'float', 'str', 'int'
* `histogram_type` - one of 'number', 'categorical'
* `histogram_config` - one of HistogramConfig or None
* `description`
* `min_value` - for 'continuous' and 'ordinal' measures
* `max_value` - for 'continuous' and 'ordinal' measures
* `values_domain` - string that represents the values
"""
def __init__(self, measure_id: str, name: str) -> None:
self.measure_id = measure_id
self.name: str = name
self.measure_name: str = name
self.measure_type: MeasureType = MeasureType.other
self.value_type: type = str
self.histogram_type: type[Histogram] = NullHistogram
self.histogram_config: HistogramConfig | None = None
self.values_domain: str | None = None
self.instrument_name: str | None = None
self.description: str | None = None
self.default_filter = None
self.min_value = None
self.max_value = None
def __repr__(self) -> str:
return (
f"Measure({self.measure_id}, "
f"{self.measure_type}, {self.values_domain})"
)
@property
def domain(self) -> Sequence[str | float]:
"""Return measure values domain."""
domain_list: Sequence[str | float] = []
if self.values_domain is not None:
domain = (
self.values_domain.replace("[", "")
.replace("]", "")
.replace(" ", "")
)
domain_list = domain.split(",")
if self.measure_type in (
MeasureType.continuous,
MeasureType.ordinal,
):
return list(map(float, domain_list))
return domain_list
[docs]
@classmethod
def from_record(cls, row: dict[str, Any]) -> Measure:
"""Create `Measure` object from pandas data frame row."""
assert row["measure_type"] is not None
mes = Measure(row["measure_id"], row["measure_name"])
mes.instrument_name = row["instrument_name"]
mes.measure_name = row["measure_name"]
mes.measure_type = MeasureType(row["measure_type"])
if row["value_type"] == "str":
mes.value_type = str
if row["value_type"] == "float":
mes.value_type = float
if row["value_type"] == "int":
mes.value_type = int
if row["histogram_type"] == "NumberHistogram":
mes.histogram_type = NumberHistogram
if row["histogram_config"] is None:
mes.histogram_config = None
else:
mes.histogram_config = NumberHistogramConfig.from_dict(
json.loads(row["histogram_config"]),
)
elif row["histogram_type"] == "CategoricalHistogram":
mes.histogram_type = CategoricalHistogram
if row["histogram_config"] is None:
mes.histogram_config = None
else:
mes.histogram_config = CategoricalHistogramConfig.from_dict(
json.loads(row["histogram_config"]),
)
else:
mes.histogram_type = NullHistogram
mes.description = row["description"]
mes.default_filter = row["default_filter"]
mes.values_domain = row.get("values_domain")
mes.min_value = row.get("min_value")
mes.max_value = row.get("max_value")
return mes
[docs]
def to_json(self) -> dict[str, Any]:
"""Return measure description in JSON freindly format."""
result: dict[str, Any] = {}
result["measureName"] = self.measure_name
result["measureId"] = self.measure_id
result["instrumentName"] = self.instrument_name
result["measureType"] = self.measure_type.name
result["valueType"] = self.value_type.__name__
result["histogramType"] = self.histogram_type.__name__
result["histogramConfig"] = self.histogram_config
result["description"] = self.description
result["defaultFilter"] = self.default_filter
result["valuesDomain"] = self.values_domain
result["minValue"] = \
None if self.min_value is None or math.isnan(self.min_value) \
else self.min_value
result["maxValue"] = \
None if self.max_value is None or math.isnan(self.max_value) \
else self.max_value
return result
[docs]
class PhenotypeData(ABC, CommonStudyMixin):
"""Base class for all phenotype data studies and datasets."""
def __init__(
self,
pheno_id: str,
config: dict | None = None,
cache_path: Path | None = None,
) -> None:
self._pheno_id: str = pheno_id
self.config = config if config is not None else {}
self.name = self.config.get("name", pheno_id) \
if self.config is not None \
else pheno_id
self._measures: dict[str, Measure] = {}
self._instruments: dict[str, Instrument] = {}
self._browser: PhenoBrowser | None = None
self.cache_path = cache_path / self.pheno_id if cache_path else None
self._description: str | None = None
self.parents: set[str] = set()
@property
def is_group(self) -> bool:
return False
@cached_property
def families(self) -> FamiliesData:
raise NotImplementedError
@cached_property
def person_set_collections(self) -> dict[str, PersonSetCollection]:
raise NotImplementedError
@property
def pheno_id(self) -> str:
return self._pheno_id
[docs]
@abstractmethod
def generate_import_manifests(self) -> list[ImportManifest]:
"""Collect all manifests in a phenotype data instance."""
raise NotImplementedError
[docs]
@staticmethod
def create_browser(
pheno_data: PhenotypeData,
*,
read_only: bool = True,
) -> PhenoBrowser:
"""Load pheno browser from pheno configuration."""
db_dir = pheno_data.cache_path or Path(pheno_data.config["conf_dir"])
browser_dbfile = db_dir / f"{pheno_data.pheno_id}_browser.db"
if not browser_dbfile.exists():
if read_only:
raise FileNotFoundError(
f"Browser DB file {browser_dbfile!s} not found.",
)
conn = duckdb.connect(browser_dbfile, read_only=False)
conn.checkpoint()
PhenoBrowser.create_browser_tables(conn)
conn.close()
browser = PhenoBrowser(
str(browser_dbfile),
read_only=read_only,
)
pheno_data.is_browser_outdated(browser)
return browser
[docs]
def is_browser_outdated(self, browser: PhenoBrowser) -> bool:
"""Check if a rebuild is required according to manifests."""
manifests = {
manifest.import_config.id: manifest
for manifest in ImportManifest.from_table(
browser.connection, IMPORT_METADATA_TABLE,
)
}
if len(manifests) == 0:
logger.warning(
"No manifests found in browser; either fresh or legacy",
)
return True
pheno_data_manifests = {
manifest.import_config.id: manifest
for manifest in self.generate_import_manifests()
}
if len(set(manifests).symmetric_difference(pheno_data_manifests)) > 0:
logger.warning("Manifest count mismatch between input and browser")
return True
is_outdated = False
for pheno_id, pheno_manifest in pheno_data_manifests.items():
browser_manifest = manifests[pheno_id]
if browser_manifest.is_older_than(pheno_manifest):
logger.warning("Browser manifest outdated for %s", pheno_id)
is_outdated = True
return is_outdated
@property
def browser(self) -> PhenoBrowser | None:
"""Get or create pheno browser for phenotype data."""
if self._browser is None:
try:
self._browser = PhenotypeData.create_browser(self)
except FileNotFoundError:
logger.exception(
"Could not create browser for %s", self.pheno_id)
return self._browser
@property
def measures(self) -> dict[str, Measure]:
return self._measures
@property
def instruments(self) -> dict[str, Instrument]:
return self._instruments
[docs]
def get_instruments(self) -> list[str]:
return list(self.instruments.keys())
[docs]
@abstractmethod
def get_regressions(self) -> dict[str, Any]:
raise NotImplementedError
[docs]
@abstractmethod
def get_measures_info(self) -> dict[str, Any]:
raise NotImplementedError
[docs]
@abstractmethod
def get_persons_df(self) -> pd.DataFrame:
raise NotImplementedError
[docs]
def get_persons(self) -> dict[str, Person]:
"Return individuals data from phenotype database."
persons = {}
df = self.get_persons_df()
for row in df.to_dict("records"):
person_id = row["person_id"]
row["role"] = Role.from_value(row["role"])
row["sex"] = Sex.from_value(row["sex"])
row["status"] = Status.from_value(row["status"])
persons[person_id] = Person(**row) # type: ignore
return persons
[docs]
def get_person_roles(self) -> list[str]:
"Return individuals distinct role data from phenotype database."
distinct_roles = set()
df = self.get_persons_df()
distinct_roles.update(
Role.from_value(row["role"]).name for row in df.to_dict("records")
)
return sorted(distinct_roles)
[docs]
@abstractmethod
def search_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
sort_by: str | None = None,
order_by: str | None = None,
) -> Generator[dict[str, Any], None, None]:
"""Yield measures in the DB according to filters."""
raise NotImplementedError
[docs]
@abstractmethod
def count_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
) -> int:
"""Count measures in the DB according to filters."""
raise NotImplementedError
[docs]
def has_measure(self, measure_id: str) -> bool:
"""Check if phenotype DB contains a measure by ID."""
return measure_id in self._measures
[docs]
def get_measure(self, measure_id: str) -> Measure:
"""Return a measure by measure_id."""
assert measure_id in self._measures, measure_id
return self._measures[measure_id]
[docs]
def get_image(self, image_path: str) -> tuple[bytes, str]:
"""Return binary image data with mimetype."""
base_image_dir = Path(get_pheno_browser_images_dir())
full_image_path = base_image_dir / image_path
image_data = full_image_path.read_bytes()
mimetype = mimetypes.guess_type(full_image_path)[0]
if mimetype is None:
raise ValueError(
f"Cannot guess image mimetype of {full_image_path}",
)
return image_data, mimetype
[docs]
def get_measures(
self,
instrument_name: str | None = None,
measure_type: MeasureType | None = None,
) -> dict[str, Measure]:
"""
Return a dictionary of measures objects.
`instrument_name` -- an instrument name which measures should be
returned. If not specified all type of measures are returned.
`measure_type` -- a type ('continuous', 'ordinal' or 'categorical')
of measures that should be returned. If not specified all
type of measures are returned.
"""
result = {}
instruments = self.instruments
if instrument_name is not None:
assert instrument_name in self.instruments
instruments = {
instrument_name: self.instruments[instrument_name],
}
if measure_type is not None:
assert isinstance(measure_type, MeasureType)
for instrument in instruments.values():
for measure in instrument.measures.values():
if measure_type is not None and \
measure.measure_type != measure_type:
continue
result[measure.measure_id] = measure
return result
[docs]
def get_measure_description(self, measure_id: str) -> dict[str, Any]:
"""Construct and return a measure description."""
measure = self.measures[measure_id]
out = {
"instrument_name": measure.instrument_name,
"measure_name": measure.measure_name,
"measure_type": measure.measure_type.name,
"values_domain": measure.domain,
}
if not (measure.min_value is None or math.isnan(measure.min_value)):
out["min_value"] = measure.min_value
if not (measure.max_value is None or math.isnan(measure.max_value)):
out["max_value"] = measure.max_value
return out
[docs]
def get_instrument_measures(self, instrument_name: str) -> list[str]:
"""Return measures for given instrument."""
assert instrument_name in self.instruments
instrument = self.instruments[instrument_name]
return [
m.measure_id for m in list(instrument.measures.values())
]
[docs]
@abstractmethod
def get_people_measure_values(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> Generator[dict[str, Any], None, None]:
"""
Collect and format the values of the given measures in dict format.
Yields a dict representing every row.
`measure_ids` -- list of measure ids which values should be returned.
`person_ids` -- list of person IDs to filter result. Only data for
individuals with person_id in the list `person_ids` are returned.
`family_ids` -- list of family IDs to filter result. Only data for
individuals that are members of any of the specified `family_ids`
are returned.
`roles` -- list of roles of individuals to select measure value for.
If not specified value for individuals in all roles are returned.
"""
raise NotImplementedError
[docs]
def get_people_measure_values_df(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> pd.DataFrame:
"""
Collect and format the values of the given measures in a dataframe.
`measure_ids` -- list of measure ids which values should be returned.
`person_ids` -- list of person IDs to filter result. Only data for
individuals with person_id in the list `person_ids` are returned.
`family_ids` -- list of family IDs to filter result. Only data for
individuals that are members of any of the specified `family_ids`
are returned.
`roles` -- list of roles of individuals to select measure value for.
If not specified value for individuals in all roles are returned.
"""
raise NotImplementedError
[docs]
@abstractmethod
def get_children_ids(self, *, leaves: bool = True) -> list[str]:
"""Return all phenotype studies' ids in the group."""
raise NotImplementedError
@abstractmethod
def _build_person_set_collection(
self, psc_config: PersonSetCollectionConfig,
families: FamiliesData,
) -> PersonSetCollection:
raise NotImplementedError
def _build_person_set_collections(
self,
pheno_config: dict[str, Any] | None,
families: FamiliesData,
) -> dict[str, PersonSetCollection]:
if pheno_config is None:
return {}
if "person_set_collections" not in pheno_config:
return {}
pscs_config = parse_person_set_collections_study_config(pheno_config)
return {
psc_id: self._build_person_set_collection(psc_config, families)
for psc_id, psc_config in pscs_config.items()
}
[docs]
def get_person_set_collection(
self, person_set_collection_id: str | None,
) -> PersonSetCollection | None:
if person_set_collection_id is None:
return None
return self.person_set_collections.get(person_set_collection_id)
[docs]
def build_report(self) -> CommonReport:
"""Generate common report JSON from genotpye data study."""
config = self.config["common_report"]
assert config["enabled"], self.pheno_id
selected = config.get("selected_person_set_collections")
if selected and selected.get("family_report"):
families_report_collections = [
self.person_set_collections[collection_id]
for collection_id in
config["selected_person_set_collections"]["family_report"]
]
else:
families_report_collections = \
list(self.person_set_collections.values())
families_report = FamiliesReport.from_study(
self,
families_report_collections,
)
people_report = PeopleReport.from_person_set_collections(
families_report_collections,
)
person_sets_config = self.config["person_set_collections"]
collection = self.get_person_set_collection(
person_sets_config["selected_person_set_collections"][0],
)
phenotype: list[str] = []
assert collection is not None
for person_set in collection.person_sets.values():
if len(person_set.persons) > 0:
phenotype += person_set.values # noqa: PD011
number_of_probands = 0
number_of_siblings = 0
for family in self.families.values():
for person in family.members_in_order:
if not family.member_is_child(person.person_id):
continue
if person.role == Role.prb:
number_of_probands += 1
if person.role == Role.sib:
number_of_siblings += 1
return CommonReport({
"id": self.pheno_id,
"people_report": people_report.to_dict(),
"families_report": families_report.to_dict(full=True),
"denovo_report": None,
"study_name": self.name,
"phenotype": phenotype,
"study_type": None,
"study_year": None,
"pub_med": None,
"families": len(self.families.values()),
"number_of_probands": number_of_probands,
"number_of_siblings": number_of_siblings,
"denovo": False,
"transmitted": False,
"study_description": "placeholder description",
})
[docs]
def build_and_save(
self,
*,
force: bool = False,
) -> CommonReport | None:
"""Build a common report for a study, saves it and returns the report.
If the common reports are disabled for the study, the function skips
building the report and returns None.
If the report already exists the default behavior is to skip building
the report. You can force building the report by
passing `force=True` to the function.
"""
if not self.config["common_report"]["enabled"]:
return None
report_filename = self.config["common_report"]["file_path"]
if os.path.exists(report_filename) and not force:
return CommonReport.load(report_filename)
report = self.build_report()
report.save(report_filename)
return report
[docs]
def get_common_report(self) -> CommonReport | None:
"""Return a study's common report."""
if not self.config["common_report"]["enabled"]:
return None
report = CommonReport.load(self.config["common_report"]["file_path"])
if report is None:
report = self.build_and_save()
return report
[docs]
class PhenotypeStudy(PhenotypeData):
"""
Main class for accessing phenotype database in DAE.
To access the phenotype database create an instance of this class
and call the method *load()*.
Common fields of this class are:
* `persons` -- list of all individuals in the database
* `instruments` -- dictionary of all instruments
* `measures` -- dictionary of all measures
"""
def __init__(
self,
pheno_id: str,
dbfile: str,
config: dict | None = None,
*,
read_only: bool = True,
cache_path: Path | None = None,
) -> None:
super().__init__(pheno_id, config, cache_path=cache_path)
self.db = PhenoDb(dbfile, read_only=read_only)
self._instruments = self._load_instruments()
logger.info("phenotype study %s fully loaded", pheno_id)
[docs]
def generate_import_manifests(
self,
) -> list[ImportManifest]:
return [
ImportManifest.from_table(
self.db.connection, IMPORT_METADATA_TABLE,
)[0],
]
@cached_property
def families(self) -> FamiliesData:
return FamiliesLoader.build_families_data_from_pedigree(
self.get_persons_df(),
)
@cached_property
def person_set_collections(self) -> dict[str, PersonSetCollection]:
return self._build_person_set_collections(
self.config,
self.families,
)
def _load_instruments(self) -> dict[str, Instrument]:
df = self.db.get_measures_df()
instruments = {}
instrument_names = list(df.instrument_name.unique())
instrument_names = sorted(instrument_names)
for instrument_name in instrument_names:
instrument = Instrument(instrument_name)
measures = {}
measures_df = df[df.instrument_name == instrument_name]
for row in measures_df.to_dict("records"):
measure = Measure.from_record(row)
measures[measure.measure_name] = measure
self._measures[measure.measure_id] = measure
instrument.measures = measures
instruments[instrument.instrument_name] = instrument
return instruments
[docs]
def get_people_measure_values(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> Generator[dict[str, Any], None, None]:
yield from self.db.get_people_measure_values(
measure_ids, person_ids, family_ids, roles,
)
[docs]
def get_people_measure_values_df(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> pd.DataFrame:
return self.db.get_people_measure_values_df(
measure_ids, person_ids, family_ids, roles,
)
[docs]
def get_regressions(self) -> dict[str, Any]:
if self.browser is None:
return {}
return self.browser.regression_display_names_with_ids
def _get_pheno_images_base_url(self) -> str | None:
if self.config is None:
return None
return cast(str | None, self.config.get("browser_images_url"))
[docs]
def get_measures_info(self) -> dict[str, Any]:
if self.browser is None:
return {
"base_image_url": self._get_pheno_images_base_url(),
"has_descriptions": {},
"regression_names": {},
}
return {
"base_image_url": self._get_pheno_images_base_url(),
"has_descriptions": self.browser.has_descriptions,
"regression_names": self.browser.regression_display_names,
}
[docs]
def search_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
sort_by: str | None = None,
order_by: str | None = None,
) -> Generator[dict[str, Any], None, None]:
if self.browser is None:
return
measures = self.browser.search_measures(
instrument,
search_term,
page,
sort_by,
order_by,
)
for measure in measures:
if measure["values_domain"] is None:
measure["values_domain"] = ""
measure["measure_type"] = \
cast(MeasureType, measure["measure_type"]).name
measure["regressions"] = []
for reg_id in self.browser.regression_ids:
reg = {
"regression_id": reg_id,
"measure_id": measure["measure_id"],
}
if isnan(measure[f"{reg_id}_pvalue_regression_male"]):
measure[f"{reg_id}_pvalue_regression_male"] = "NaN"
if isnan(measure[f"{reg_id}_pvalue_regression_female"]):
measure[f"{reg_id}_pvalue_regression_female"] = "NaN"
reg["figure_regression"] = measure.pop(
f"{reg_id}_figure_regression",
)
reg["figure_regression_small"] = measure.pop(
f"{reg_id}_figure_regression_small",
)
reg["pvalue_regression_male"] = measure.pop(
f"{reg_id}_pvalue_regression_male",
)
reg["pvalue_regression_female"] = measure.pop(
f"{reg_id}_pvalue_regression_female",
)
measure["regressions"].append(reg)
yield {
"measure": measure,
}
[docs]
def count_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
) -> int:
if self.browser is None:
return 0
return self.browser.count_measures(
instrument,
search_term,
page,
)
[docs]
def get_children_ids(
self, *, leaves: bool = True, # noqa: ARG002
) -> list[str]:
return [self.pheno_id]
[docs]
def get_persons_df(self) -> pd.DataFrame:
return self.db.get_persons_df()
def _build_person_set_collection(
self,
psc_config: PersonSetCollectionConfig,
families: FamiliesData,
) -> PersonSetCollection:
psc = PersonSetCollection.from_families(psc_config, self.families)
for fpid, person in families.real_persons.items():
person_set_value = psc.get_person_set_of_person(fpid)
assert person_set_value is not None
person.set_attr(psc.id, person_set_value.id)
return psc
[docs]
class PhenotypeGroup(PhenotypeData):
"""Represents a group of phenotype data studies or groups."""
def __init__(
self,
pheno_id: str,
config: dict | None,
children: list[PhenotypeData],
cache_path: Path | None = None,
) -> None:
super().__init__(pheno_id, config, cache_path=cache_path)
self.children = children
instruments, measures = self._merge_instruments(
[ph.instruments for ph in self.children],
)
self._instruments.update(instruments)
self._measures.update(measures)
for child in self.children:
child.parents.add(self.pheno_id)
@property
def is_group(self) -> bool:
return True
[docs]
def get_leaves(self) -> list[PhenotypeStudy]:
"""Return all phenotype studies in the group."""
leaves = []
for child in self.children:
if child.config["type"] == "study":
leaves.append(child)
else:
leaves.extend(cast(PhenotypeGroup, child).get_leaves())
return leaves
[docs]
def generate_import_manifests(
self,
) -> list[ImportManifest]:
leaves = self.get_leaves()
return [
ImportManifest.from_table(
leaf.db.connection, IMPORT_METADATA_TABLE,
)[0]
for leaf in leaves
]
[docs]
def get_children_ids(self, *, leaves: bool = True) -> list[str]:
studies = self.get_leaves() if leaves else self.children
return [data.pheno_id for data in studies]
@staticmethod
def _merge_instruments(
phenos_instruments: Iterable[dict[str, Instrument]],
) -> tuple[dict[str, Instrument], dict[str, Measure]]:
group_instruments: dict[str, Instrument] = {}
group_measures: dict[str, Measure] = {}
for pheno_instruments in phenos_instruments:
for instrument_name, instrument in pheno_instruments.items():
if instrument_name not in group_instruments:
group_instrument = Instrument(
instrument_name,
)
else:
group_instrument = group_instruments[instrument_name]
for name, measure in instrument.measures.items():
full_name = f"{instrument_name}.{name}"
if full_name in group_measures:
raise ValueError(
f"{full_name} measure duplication!",
)
group_instrument.measures[name] = measure
group_measures[full_name] = measure
group_instruments[instrument_name] = group_instrument
return group_instruments, group_measures
[docs]
def get_regressions(self) -> dict[str, Any]:
res = {}
for pheno in self.children:
res.update(pheno.get_regressions())
return res
[docs]
def get_measures_info(self) -> dict[str, Any]:
result: dict[str, Any] = {
"base_image_url": None,
"has_descriptions": False,
"regression_names": {},
}
for pheno in self.children:
measures_info = pheno.get_measures_info()
if result["base_image_url"] is None:
result["base_image_url"] = measures_info["base_image_url"]
result["has_descriptions"] = \
result["has_descriptions"] or measures_info["has_descriptions"]
cast(dict, result["regression_names"]).update(
measures_info["regression_names"],
)
return result
[docs]
def search_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
sort_by: str | None = None,
order_by: str | None = None,
) -> Generator[dict[str, Any], None, None]:
generators = [
pheno.search_measures(
instrument,
search_term,
page,
sort_by,
order_by,
)
for pheno in self.children
]
measures = islice(chain(*generators), 1001)
yield from measures
[docs]
def count_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
) -> int:
counts = [
pheno.count_measures(
instrument,
search_term,
page,
)
for pheno in self.children
]
return sum(counts)
[docs]
def get_people_measure_values(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> Generator[dict[str, Any], None, None]:
generators = []
for child in self.children:
measures_in_child = list(
filter(child.has_measure, measure_ids))
if len(measures_in_child) > 0:
generators.append(child.get_people_measure_values(
measures_in_child,
person_ids,
family_ids,
roles,
))
return cast(
Generator[dict[str, Any], None, None],
chain.from_iterable(generators),
)
[docs]
def get_people_measure_values_df(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> pd.DataFrame:
measures_dfs: list[tuple[list[str], pd.DataFrame]] = []
for child in self.children:
measures_in_child = list(
filter(child.has_measure, measure_ids))
if len(measures_in_child) > 0:
df = child.get_people_measure_values_df(
measures_in_child,
person_ids,
family_ids,
roles,
)
measures_dfs.append((measures_in_child, df))
out_df = measures_dfs[0][1]
for measures, df in measures_dfs[1:]:
out_df = out_df.join(
df.set_index("person_id")[measures],
on="person_id",
how="inner",
)
return out_df
[docs]
def get_persons_df(self) -> pd.DataFrame:
raise NotImplementedError
[docs]
def get_person_roles(self) -> list[str]:
leaves = self.get_leaves()
distinct_roles = set()
for leaf in leaves:
df = leaf.get_persons_df()
distinct_roles.update(
Role.from_value(row["role"]).name
for row in df.to_dict("records")
)
return sorted(distinct_roles)
def _build_person_set_collection(
self,
psc_config: PersonSetCollectionConfig,
families: FamiliesData,
) -> PersonSetCollection:
raise NotImplementedError