Source code for dae.pheno_tool.tool
import logging
from collections import Counter
from typing import Any, cast
import numpy as np
import pandas as pd
from scipy.stats import ttest_ind # type: ignore
from dae.effect_annotation.effect import EffectTypesMixin, expand_effect_types
from dae.pheno.common import MeasureType
from dae.pheno.pheno_data import PhenotypeData
from dae.pheno.utils.lin_regress import LinearRegression
from dae.studies.study import GenotypeData
from dae.variants.attributes import Role, Sex
from dae.variants.family_variant import FamilyAllele
logger = logging.getLogger(__name__)
[docs]
class PhenoResult:
"""Represents a result of PhenoTool calculation."""
def __init__(self) -> None:
self.pvalue = np.nan
self.positive_count = np.nan
self.positive_mean = np.nan
self.positive_deviation = np.nan
self.negative_count = np.nan
self.negative_mean = np.nan
self.negative_deviation = np.nan
[docs]
def set_positive_stats(
self, p_count: int, p_mean: float, p_std: float,
) -> None:
self.positive_count = p_count
self.positive_mean = p_mean
self.positive_deviation = p_std
[docs]
def set_negative_stats(
self, n_count: int, n_mean: float, n_std: float,
) -> None:
self.negative_count = n_count
self.negative_mean = n_mean
self.negative_deviation = n_std
def __repr__(self) -> str:
return (
f"PhenoResult: pvalue={self.pvalue}; "
f"pos={self.positive_count} "
f"(neg={self.negative_count})"
)
[docs]
class PhenoToolHelper:
"""Helper class for PhenoTool.
Collects variants and person ids from genotype data.
Arguments of the constructor are:
`genotype_data` -- an instance of StudyWrapper
"""
def __init__(
self, genotype_data: GenotypeData, phenotype_data: PhenotypeData,
) -> None:
assert genotype_data
self.genotype_data = genotype_data
self.phenotype_data = phenotype_data
self.effect_types_mixin = EffectTypesMixin()
def _package_effect_type_group(
self, group: str, variants: Any,
) -> dict[str, Counter]:
res: dict[str, Any] = {group: Counter()}
group_effect_types = self.effect_types_mixin.build_effect_types(group)
for effect_type in group_effect_types:
if effect_type not in variants:
continue
for person_id in variants[effect_type]:
res[group][person_id] = 1
return res
[docs]
def genotype_data_persons(
self, family_ids: list[str] | None = None,
roles: list[Role] | None = None,
) -> set[str]:
"""Collect person ids from genotype data."""
if family_ids is None:
family_ids = []
if roles is None:
roles = [Role.prb]
assert isinstance(roles, list)
persons = set()
if not family_ids:
family_ids = list(self.genotype_data.families.keys())
for family_id in family_ids:
family = self.genotype_data.families[family_id]
for person in family.members_in_order:
if person.role in roles:
persons.add(person.person_id)
return persons
[docs]
def genotype_data_variants(
self, data: dict, effect_groups: list[str],
) -> dict[str, Counter]:
"""Collect effect groups variants."""
assert "effect_types" in data, data
effect_types = set(data["effect_types"])
queried_effect_types = set(expand_effect_types(effect_types))
variants_by_effect: dict[str, Counter] = {
effect: Counter() for effect in queried_effect_types
}
for variant in self.genotype_data.query_variants(**data):
for allele in variant.matched_alleles:
fa = cast(FamilyAllele, allele)
assert fa.effects is not None
for person in filter(None, fa.variant_in_members):
for effect in fa.effects.types:
if effect in queried_effect_types:
variants_by_effect[effect][person] = 1
for effect_type in effect_groups:
if effect_type not in self.effect_types_mixin.EFFECT_GROUPS:
continue
variants_by_effect.update(
self._package_effect_type_group(
effect_type, variants_by_effect,
),
)
logger.debug("variants_by_effect: %s", list(variants_by_effect.keys()))
variants_by_effect = {
k: variants_by_effect[k]
for k in effect_groups
if k in variants_by_effect
}
logger.debug("variants_by_effect: %s", list(variants_by_effect.keys()))
return variants_by_effect
[docs]
class PhenoTool:
"""Tool to estimate dependency between variants and phenotype measrues.
Arguments of the constructor are:
`phenotype_data` -- an instance of PhenotypeData
`measure_id` -- a phenotype measure ID
`person_ids` -- an optional list of person IDs to filter the phenotype
database with
`normalize_by` -- list of continuous measure names. Default value is
an empty list
"""
def __init__(self, phenotype_data: PhenotypeData) -> None:
self.phenotype_data = phenotype_data
[docs]
def create_df(
self,
measure_id: str,
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
normalize_by: list[dict[str, str]] | None = None,
) -> pd.DataFrame:
"""Create dataframe for given measure."""
if not self.phenotype_data.has_measure(measure_id):
raise KeyError(
f"measure id {measure_id} not found!",
)
assert self.phenotype_data.get_measure(measure_id).measure_type \
in [MeasureType.continuous, MeasureType.ordinal]
if normalize_by is None:
normalize_by = []
normalize_by = self.init_normalize_measures(measure_id, normalize_by)
# currently filtering only for probands, expand with additional
# options via PeopleGroup
all_measures = [measure_id, *normalize_by]
pheno_df = self.phenotype_data.get_people_measure_values_df(
all_measures,
person_ids=person_ids,
family_ids=family_ids,
roles=[Role.prb],
)
pheno_df = pheno_df.dropna()
if not pheno_df.empty:
pheno_df = self._normalize_df(
pheno_df, measure_id, normalize_by,
)
return pheno_df
[docs]
def init_normalize_measures(
self, measure_id: str, normalize_by: list[dict[str, str]],
) -> list[str]:
result = list(filter(
None,
[
self.get_normalize_measure_id(measure_id, normalize_measure)
for normalize_measure in normalize_by
]))
assert all(
self.phenotype_data.get_measure(m).measure_type
== MeasureType.continuous
for m in result
)
return result
[docs]
def get_normalize_measure_id(
self, measure_id: str, normalize_measure: dict[str, str],
) -> str | None:
assert isinstance(normalize_measure, dict)
assert "measure_name" in normalize_measure
assert "instrument_name" in normalize_measure
if not normalize_measure["instrument_name"]:
normalize_measure["instrument_name"] = \
measure_id.split(".")[0]
normalize_id = ".".join(
[
normalize_measure["instrument_name"],
normalize_measure["measure_name"],
],
)
if self.phenotype_data.has_measure(normalize_id) and \
normalize_id != measure_id:
return normalize_id
return None
[docs]
@staticmethod
def join_pheno_df_with_variants(
pheno_df: pd.DataFrame, variants: Counter,
) -> pd.DataFrame:
"""Join phenotype dataframe with variants."""
assert not pheno_df.empty
assert isinstance(variants, Counter)
persons_variants = pd.DataFrame(
data=list(variants.items()), columns=["person_id", "variant_count"],
)
persons_variants = persons_variants.set_index("person_id")
merged_df = pheno_df.merge(
persons_variants, how="left", on=["person_id"],
)
return merged_df.fillna(0)
@staticmethod
def _normalize_df(
df: pd.DataFrame, measure_id: str,
normalize_by: list[str] | None = None,
) -> pd.DataFrame:
if normalize_by is None:
normalize_by = []
assert not df.empty
assert measure_id in df
assert all(measure_id in df for measure_id in normalize_by)
if not normalize_by:
dn = pd.Series(index=df.index, data=df[measure_id].to_numpy())
df.loc[:, "normalized"] = dn
return df
x = df[normalize_by].to_numpy()
y = df[measure_id]
fitted = LinearRegression().fit(x, y)
resids = y - fitted.predict(x)
dn = pd.Series(index=df.index, data=resids)
df.loc[:, "normalized"] = dn
return df
@staticmethod
def _calc_base_stats(arr: np.ndarray) -> tuple[int, float, float]:
count = len(arr)
if count == 0:
mean = 0.0
std = 0.0
else:
mean = float(np.mean(arr, dtype=np.float64))
std = float(1.96 * np.std(arr, dtype=np.float64) / np.sqrt(count))
return count, mean, std
@staticmethod
def _calc_pv(
positive: np.ndarray,
negative: np.ndarray,
) -> float:
if len(positive) < 2 or len(negative) < 2:
return "NA" # type: ignore
tt = ttest_ind(positive, negative)
pv = tt[1]
return float(pv)
@classmethod
def _calc_stats(
cls, data: pd.DataFrame, sex: Sex | None,
) -> PhenoResult:
if len(data) == 0:
result = PhenoResult()
result.positive_count = 0
result.positive_mean = 0
result.positive_deviation = 0
result.negative_count = 0
result.negative_mean = 0
result.negative_deviation = 0
result.pvalue = "NA" # type: ignore
return result
positive_index = np.logical_and(
data["variant_count"] != 0, ~np.isnan(data["variant_count"]),
)
negative_index = np.array(data["variant_count"] == 0)
if sex is None:
sex_index = None
positive_sex_index = positive_index
negative_sex_index = negative_index
else:
sex_index = data["sex"] == sex
positive_sex_index = np.logical_and(positive_index, sex_index)
negative_sex_index = np.logical_and(negative_index, sex_index)
assert not np.any(
np.logical_and(positive_sex_index, negative_sex_index),
)
positive = np.array(data[positive_sex_index].normalized.values)
negative = np.array(data[negative_sex_index].normalized.values)
p_val = cls._calc_pv(positive, negative)
result = PhenoResult()
result.set_positive_stats(*PhenoTool._calc_base_stats(positive))
result.set_negative_stats(*PhenoTool._calc_base_stats(negative))
result.pvalue = p_val
return result
[docs]
def calc(
self,
measure_id: str,
variants: Counter, *,
sex_split: bool = False,
person_ids: list[str] | None = None,
family_ids: list[str] | None = None,
normalize_by: list[dict[str, str]] | None = None,
) -> dict[str, PhenoResult] | PhenoResult:
"""Perform calculation.
`variants` -- an instance of Counter, matching personIds to
an amount of variants
`sex_split` -- should we split the result by sex or not. Default
is `False`.
"""
pheno_df = self.create_df(
measure_id, person_ids=person_ids,
family_ids=family_ids, normalize_by=normalize_by,
)
if not pheno_df.empty:
merged_df = PhenoTool.join_pheno_df_with_variants(
pheno_df, variants,
)
else:
merged_df = pheno_df
if not sex_split:
return self._calc_stats(merged_df, None)
result = {}
for sex in [Sex.M, Sex.F]:
p = self._calc_stats(merged_df, sex)
result[sex.name] = p
return result