# pylint: disable=too-many-lines
from __future__ import annotations
import logging
import math
import mimetypes
import os
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable, Sequence
from itertools import chain, islice
from pathlib import Path
from typing import Any, cast
import pandas as pd
from box import Box
from dae.pheno.common import MeasureType
from dae.pheno.db import PhenoDb
from dae.utils.helpers import isnan
from dae.variants.attributes import Role
logger = logging.getLogger(__name__)
[docs]
def get_pheno_db_dir(dae_config: Box | None) -> str:
"""Return the directory where phenotype data configurations are located."""
if dae_config is not None:
if dae_config.phenotype_data is None or \
dae_config.phenotype_data.dir is None:
pheno_data_dir = os.path.join(
dae_config.conf_dir, "pheno")
else:
pheno_data_dir = dae_config.phenotype_data.dir
else:
pheno_data_dir = os.path.join(
os.environ.get("DAE_DB_DIR", ""), "pheno")
return pheno_data_dir
[docs]
def get_pheno_browser_images_dir(dae_config: Box | None = None) -> str:
"""Get images directory for pheno DB."""
pheno_db_dir = os.environ.get(
"DAE_PHENODB_DIR",
get_pheno_db_dir(dae_config),
)
browser_images_path = os.path.join(pheno_db_dir, "images")
if not os.path.exists(browser_images_path):
logger.error(
"Pheno images path %s does not exist!", browser_images_path,
)
return browser_images_path
[docs]
class Instrument:
"""
Instrument object represents phenotype instruments.
Common fields are:
* `instrument_name`
* `measures` -- dictionary of all measures in the instrument
"""
def __init__(self, name: str) -> None:
self.instrument_name = name
self.measures: dict[str, Measure] = {}
def __repr__(self) -> str:
return f"Instrument({self.instrument_name}, {len(self.measures)})"
[docs]
class Measure:
"""
Measure objects represent phenotype measures.
Common fields are:
* `instrument_name`
* `measure_name`
* `measure_id` - formed by `instrument_name`.`measure_name`
* `measure_type` - one of 'continuous', 'ordinal', 'categorical'
* `description`
* `min_value` - for 'continuous' and 'ordinal' measures
* `max_value` - for 'continuous' and 'ordinal' measures
* `values_domain` - string that represents the values
"""
def __init__(self, measure_id: str, name: str) -> None:
self.measure_id = measure_id
self.name: str = name
self.measure_name: str = name
self.measure_type: MeasureType = MeasureType.other
self.values_domain: str | None = None
self.instrument_name: str | None = None
self.description: str | None = None
self.default_filter = None
self.min_value = None
self.max_value = None
def __repr__(self) -> str:
return (
f"Measure({self.measure_id}, "
f"{self.measure_type}, {self.values_domain})"
)
@property
def domain(self) -> Sequence[str | float]:
"""Return measure values domain."""
domain_list: Sequence[str | float] = []
if self.values_domain is not None:
domain = (
self.values_domain.replace("[", "")
.replace("]", "")
.replace(" ", "")
)
domain_list = domain.split(",")
if self.measure_type in (
MeasureType.continuous,
MeasureType.ordinal,
):
return list(map(float, domain_list))
return domain_list
[docs]
@classmethod
def from_record(cls, row: dict[str, Any]) -> Measure:
"""Create `Measure` object from pandas data frame row."""
assert row["measure_type"] is not None
mes = Measure(row["measure_id"], row["measure_name"])
mes.instrument_name = row["instrument_name"]
mes.measure_name = row["measure_name"]
mes.measure_type = MeasureType(row["measure_type"])
mes.description = row["description"]
mes.default_filter = row["default_filter"]
mes.values_domain = row.get("values_domain")
mes.min_value = row.get("min_value")
mes.max_value = row.get("max_value")
return mes
[docs]
@classmethod
def from_json(cls, json: dict[str, Any]) -> Measure:
"""Create `Measure` object from a JSON representation."""
assert json["measureType"] is not None
mes = Measure(json["measureId"], json["measureName"])
mes.instrument_name = json["instrumentName"]
mes.measure_name = json["measureName"]
mes.measure_type = MeasureType.from_str(json["measureType"])
mes.description = json["description"]
mes.default_filter = json["defaultFilter"]
mes.values_domain = json.get("valuesDomain")
mes.min_value = json.get("minValue")
mes.max_value = json.get("maxValue")
return mes
[docs]
def to_json(self) -> dict[str, Any]:
"""Return measure description in JSON freindly format."""
result: dict[str, Any] = {}
result["measureName"] = self.measure_name
result["measureId"] = self.measure_id
result["instrumentName"] = self.instrument_name
result["measureType"] = self.measure_type.name
result["description"] = self.description
result["defaultFilter"] = self.default_filter
result["valuesDomain"] = self.values_domain
result["minValue"] = \
None if self.min_value is None or math.isnan(self.min_value) \
else self.min_value
result["maxValue"] = \
None if self.max_value is None or math.isnan(self.max_value) \
else self.max_value
return result
[docs]
class PhenotypeData(ABC):
"""Base class for all phenotype data studies and datasets."""
def __init__(self, pheno_id: str, config: Box | None) -> None:
self._pheno_id: str = pheno_id
self.config = config
self._measures: dict[str, Measure] = {}
self._instruments: dict[str, Instrument] = {}
@property
def pheno_id(self) -> str:
return self._pheno_id
@property
def measures(self) -> dict[str, Measure]:
return self._measures
@property
def instruments(self) -> dict[str, Instrument]:
return self._instruments
[docs]
def get_instruments(self) -> list[str]:
return cast(list[str], self.instruments.keys())
[docs]
@abstractmethod
def get_regressions(self) -> dict[str, Any]:
pass
[docs]
@abstractmethod
def get_measures_info(self) -> dict[str, Any]:
pass
[docs]
@abstractmethod
def search_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
sort_by: str | None = None,
order_by: str | None = None,
) -> Generator[dict[str, Any], None, None]:
"""Yield measures in the DB according to filters."""
[docs]
@abstractmethod
def count_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
) -> int:
"""Count measures in the DB according to filters."""
[docs]
def has_measure(self, measure_id: str) -> bool:
"""Check if phenotype DB contains a measure by ID."""
return measure_id in self._measures
[docs]
def get_measure(self, measure_id: str) -> Measure:
"""Return a measure by measure_id."""
assert measure_id in self._measures, measure_id
return self._measures[measure_id]
[docs]
def get_image(self, image_path: str) -> tuple[bytes, str]:
"""Return binary image data with mimetype."""
base_image_dir = Path(get_pheno_browser_images_dir())
full_image_path = base_image_dir / image_path
image_data = full_image_path.read_bytes()
mimetype = mimetypes.guess_type(full_image_path)[0]
if mimetype is None:
raise ValueError(
f"Cannot guess image mimetype of {full_image_path}",
)
return image_data, mimetype
[docs]
def get_measures(
self,
instrument_name: str | None = None,
measure_type: MeasureType | None = None,
) -> dict[str, Measure]:
"""
Return a dictionary of measures objects.
`instrument_name` -- an instrument name which measures should be
returned. If not specified all type of measures are returned.
`measure_type` -- a type ('continuous', 'ordinal' or 'categorical')
of measures that should be returned. If not specified all
type of measures are returned.
"""
result = {}
instruments = self.instruments
if instrument_name is not None:
assert instrument_name in self.instruments
instruments = {
instrument_name: self.instruments[instrument_name],
}
if measure_type is not None:
assert isinstance(measure_type, MeasureType)
for instrument in instruments.values():
for measure in instrument.measures.values():
if measure_type is not None and \
measure.measure_type != measure_type:
continue
result[measure.measure_id] = measure
return result
[docs]
def get_measure_description(self, measure_id: str) -> dict[str, Any]:
"""Construct and return a measure description."""
measure = self.measures[measure_id]
out = {
"instrument_name": measure.instrument_name,
"measure_name": measure.measure_name,
"measure_type": measure.measure_type.name,
"values_domain": measure.domain,
}
if not (measure.min_value is None or math.isnan(measure.min_value)):
out["min_value"] = measure.min_value
if not (measure.max_value is None or math.isnan(measure.max_value)):
out["max_value"] = measure.max_value
return out
[docs]
def get_instrument_measures(self, instrument_name: str) -> list[str]:
"""Return measures for given instrument."""
assert instrument_name in self.instruments
instrument = self.instruments[instrument_name]
return [
m.measure_id for m in list(instrument.measures.values())
]
[docs]
@abstractmethod
def get_people_measure_values(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> Generator[dict[str, Any], None, None]:
"""
Collect and format the values of the given measures in dict format.
Yields a dict representing every row.
`measure_ids` -- list of measure ids which values should be returned.
`person_ids` -- list of person IDs to filter result. Only data for
individuals with person_id in the list `person_ids` are returned.
`family_ids` -- list of family IDs to filter result. Only data for
individuals that are members of any of the specified `family_ids`
are returned.
`roles` -- list of roles of individuals to select measure value for.
If not specified value for individuals in all roles are returned.
"""
raise NotImplementedError
[docs]
def get_people_measure_values_df(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> pd.DataFrame:
"""
Collect and format the values of the given measures in a dataframe.
`measure_ids` -- list of measure ids which values should be returned.
`person_ids` -- list of person IDs to filter result. Only data for
individuals with person_id in the list `person_ids` are returned.
`family_ids` -- list of family IDs to filter result. Only data for
individuals that are members of any of the specified `family_ids`
are returned.
`roles` -- list of roles of individuals to select measure value for.
If not specified value for individuals in all roles are returned.
"""
raise NotImplementedError
[docs]
class PhenotypeStudy(PhenotypeData):
"""
Main class for accessing phenotype database in DAE.
To access the phenotype database create an instance of this class
and call the method *load()*.
Common fields of this class are:
* `families` -- list of all families in the database
* `persons` -- list of all individuals in the database
* `instruments` -- dictionary of all instruments
* `measures` -- dictionary of all measures
"""
def __init__(
self, pheno_id: str, dbfile: str,
config: Box | None = None, *, read_only: bool = True) -> None:
super().__init__(pheno_id, config)
self.db = PhenoDb(dbfile, read_only=read_only)
self.config = config
df = self._get_measures_df()
self._instruments = self._load_instruments(df)
logger.warning("phenotype study %s fully loaded", pheno_id)
def _get_measures_df(
self,
instrument: str | None = None,
measure_type: MeasureType | None = None,
) -> pd.DataFrame:
"""
Return data frame containing measures information.
`instrument` -- an instrument name which measures should be
returned. If not specified all type of measures are returned.
`measure_type` -- a type ('continuous', 'ordinal' or 'categorical')
of measures that should be returned. If not specified all
type of measures are returned.
Each row in the returned data frame represents given measure.
Columns in the returned data frame are: `measure_id`, `measure_name`,
`instrument_name`, `description`, `stats`, `min_value`, `max_value`,
`value_domain`, `has_probands`, `has_siblings`, `has_parents`,
`default_filter`.
"""
assert instrument is None or instrument in self.instruments
assert measure_type is None or isinstance(measure_type, MeasureType)
return self.db.get_measures_df(instrument, measure_type)
def _load_instruments(self, df: pd.DataFrame) -> dict[str, Instrument]:
instruments = {}
instrument_names = list(df.instrument_name.unique())
instrument_names = sorted(instrument_names)
for instrument_name in instrument_names:
instrument = Instrument(instrument_name)
measures = {}
measures_df = df[df.instrument_name == instrument_name]
for row in measures_df.to_dict("records"):
measure = Measure.from_record(row)
measures[measure.measure_name] = measure
self._measures[measure.measure_id] = measure
instrument.measures = measures
instruments[instrument.instrument_name] = instrument
return instruments
def _build_default_filter_clause(
self, measure: Measure, default_filter: str,
) -> str | None:
if default_filter == "skip" or measure.default_filter is None:
return None
if default_filter == "apply":
return f"value {measure.default_filter}"
if default_filter == "invert":
return f"NOT (value {measure.default_filter})"
raise ValueError(
f"bad default_filter value: {default_filter}",
)
[docs]
def get_people_measure_values(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> Generator[dict[str, Any], None, None]:
yield from self.db.get_people_measure_values(
measure_ids, person_ids, family_ids, roles,
)
[docs]
def get_people_measure_values_df(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> pd.DataFrame:
return self.db.get_people_measure_values_df(
measure_ids, person_ids, family_ids, roles,
)
[docs]
def get_regressions(self) -> dict[str, Any]:
return self.db.regression_display_names_with_ids
[docs]
def get_regression_ids(self) -> list[str]:
return self.db.regression_ids
def _get_pheno_images_base_url(self) -> str | None:
return None if self.config is None \
else self.config.get("browser_images_url")
[docs]
def get_measures_info(self) -> dict[str, Any]:
return {
"base_image_url": self._get_pheno_images_base_url(),
"has_descriptions": self.db.has_descriptions,
"regression_names": self.db.regression_display_names,
}
[docs]
def search_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
sort_by: str | None = None,
order_by: str | None = None,
) -> Generator[dict[str, Any], None, None]:
measures = self.db.search_measures(
instrument,
search_term,
page,
sort_by,
order_by,
)
for measure in measures:
if measure["values_domain"] is None:
measure["values_domain"] = ""
measure["measure_type"] = \
cast(MeasureType, measure["measure_type"]).name
measure["regressions"] = []
for reg_id in self.get_regression_ids():
reg = {
"regression_id": reg_id,
"measure_id": measure["measure_id"],
}
if isnan(measure[f"{reg_id}_pvalue_regression_male"]):
measure[f"{reg_id}_pvalue_regression_male"] = "NaN"
if isnan(measure[f"{reg_id}_pvalue_regression_female"]):
measure[f"{reg_id}_pvalue_regression_female"] = "NaN"
reg["figure_regression"] = measure.pop(
f"{reg_id}_figure_regression",
)
reg["figure_regression_small"] = measure.pop(
f"{reg_id}_figure_regression_small",
)
reg["pvalue_regression_male"] = measure.pop(
f"{reg_id}_pvalue_regression_male",
)
reg["pvalue_regression_female"] = measure.pop(
f"{reg_id}_pvalue_regression_female",
)
measure["regressions"].append(reg)
yield {
"measure": measure,
}
[docs]
def count_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
) -> int:
return self.db.count_measures(
instrument,
search_term,
page,
)
[docs]
class PhenotypeGroup(PhenotypeData):
"""Represents a group of phenotype data studies or groups."""
def __init__(
self, pheno_id: str, children: list[PhenotypeData],
) -> None:
super().__init__(pheno_id, None)
self.children = children
instruments, measures = self._merge_instruments(
[ph.instruments for ph in self.children])
self._instruments.update(instruments)
self._measures.update(measures)
@staticmethod
def _merge_instruments(
phenos_instruments: Iterable[dict[str, Instrument]],
) -> tuple[dict[str, Instrument], dict[str, Measure]]:
group_instruments: dict[str, Instrument] = {}
group_measures: dict[str, Measure] = {}
for pheno_instruments in phenos_instruments:
for instrument_name, instrument in pheno_instruments.items():
if instrument_name not in group_instruments:
group_instrument = Instrument(
instrument_name,
)
else:
group_instrument = group_instruments[instrument_name]
for name, measure in instrument.measures.items():
full_name = f"{instrument_name}.{name}"
if full_name in group_measures:
raise ValueError(
f"{full_name} measure duplication!",
)
group_instrument.measures[full_name] = measure
group_measures[full_name] = measure
group_instruments[instrument_name] = group_instrument
return group_instruments, group_measures
[docs]
def get_regressions(self) -> dict[str, Any]:
res = {}
for pheno in self.children:
res.update(pheno.get_regressions())
return res
[docs]
def get_measures_info(self) -> dict[str, Any]:
result: dict[str, Any] = {
"base_image_url": None,
"has_descriptions": False,
"regression_names": {},
}
for pheno in self.children:
measures_info = pheno.get_measures_info()
if result["base_image_url"] is None:
result["base_image_url"] = measures_info["base_image_url"]
result["has_descriptions"] = \
result["has_descriptions"] or measures_info["has_descriptions"]
cast(dict, result["regression_names"]).update(
measures_info["regression_names"],
)
return result
[docs]
def search_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
sort_by: str | None = None,
order_by: str | None = None,
) -> Generator[dict[str, Any], None, None]:
generators = [
pheno.search_measures(
instrument,
search_term,
page,
sort_by,
order_by,
)
for pheno in self.children
]
measures = islice(chain(*generators), 1001)
yield from measures
[docs]
def count_measures(
self,
instrument: str | None,
search_term: str | None,
page: int | None = None,
) -> int:
counts = [
pheno.count_measures(
instrument,
search_term,
page,
)
for pheno in self.children
]
return sum(counts)
[docs]
def get_people_measure_values(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> Generator[dict[str, Any], None, None]:
generators = []
for child in self.children:
measures_in_child = list(
filter(child.has_measure, measure_ids))
if len(measures_in_child) > 0:
generators.append(child.get_people_measure_values(
measures_in_child,
person_ids,
family_ids,
roles,
))
return cast(
Generator[dict[str, Any], None, None],
chain.from_iterable(generators),
)
[docs]
def get_people_measure_values_df(
self,
measure_ids: list[str],
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> pd.DataFrame:
dfs: list[pd.DataFrame] = []
for child in self.children:
measures_in_child = list(
filter(child.has_measure, measure_ids))
if len(measures_in_child) > 0:
dfs.append(child.get_people_measure_values_df(
measures_in_child,
person_ids,
family_ids,
roles,
))
return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()