Source code for dae.variants.family_variant

import copy
import logging
from typing import Any, cast

import numpy as np
from deprecation import deprecated

from dae.effect_annotation.effect import AlleleEffects
from dae.pedigrees.family import Family
from dae.utils.variant_utils import (
    BitmaskEnumTranslator,
    GenotypeType,
    is_all_unknown_genotype,
    is_reference_genotype,
    mat2str,
)
from dae.variants.attributes import (
    GeneticModel,
    Inheritance,
    Role,
    Sex,
    Status,
    TransmissionType,
    Zygosity,
)
from dae.variants.core import Allele
from dae.variants.variant import SummaryAllele, SummaryVariant, VariantDetails

logger = logging.getLogger(__name__)


[docs] def calculate_simple_best_state( genotype: np.ndarray, allele_count: int) -> np.ndarray: """Calculate and return the best state of a genotype.""" # Simple best state calculation # Treats every genotype as diploid (including male X non-PAR) ref: np.ndarray = 2 * np.ones(genotype.shape[1], dtype=GenotypeType) unknown = np.any(genotype == -1, axis=0) best_st = [ref] for allele_index in range(1, allele_count): alt_gt = np.zeros(genotype.shape, dtype=GenotypeType) alt_gt[genotype == allele_index] = 1 alt = cast(np.ndarray, np.sum(alt_gt, axis=0, dtype=GenotypeType)) best_st[0] = best_st[0] - alt best_st.append(alt) best_st_arr = cast(np.ndarray, np.stack(best_st, axis=0)) best_st_arr[:, unknown] = -1 return best_st_arr
[docs] class FamilyAllele(SummaryAllele): # pylint: disable=super-init-not-called,too-many-public-methods """Class representing an allele in a family.""" def __init__( self, summary_allele: SummaryAllele, family: Family, *, family_id: str | None = None, member_ids: list[str] | None = None, genotype: np.ndarray | None, best_state: np.ndarray | None, genetic_model: GeneticModel | None = None, inheritance_in_members: list[Inheritance] | None = None, ): assert isinstance(family, Family) self.family = family if family_id is None: family_id = family.family_id self._family_id = family_id if member_ids is None: member_ids = family.member_ids self._member_ids = member_ids assert self._family_id is not None assert self._member_ids is not None #: summary allele that corresponds to this allele in family variant self.summary_allele: SummaryAllele = summary_allele self.gt = genotype self._best_state = best_state if genetic_model is None: self._genetic_model = GeneticModel.autosomal else: self._genetic_model = genetic_model self._inheritance_in_members = inheritance_in_members self._variant_in_members: list[str | None] | None = None self._variant_in_roles: list[Role | None] | None = None self._variant_in_sexes: list[Sex | None] | None = None self._variant_in_statuses: list[Status | None] | None = None self._family_index: int | None = None self._family_attributes: dict = {} self._zygosity_in_status: int | None = None self._zygosity_in_roles: int | None = None self._zygosity_in_sexes: int | None = None self.matched_gene_effects: list = [] def __repr__(self) -> str: allele_repr = SummaryAllele.__repr__(self) return f"{allele_repr} {self.family_id}" @property def member_ids(self) -> list[str]: """Return list of family members IDs.""" return self._member_ids @property def member_fpids(self) -> list[tuple[str, str]]: """Return list of family members IDs.""" return [(self._family_id, pid) for pid in self.member_ids] @property def family_id(self) -> str: """Return the family ID.""" return self._family_id @property def chromosome(self) -> str: return self.summary_allele.chromosome @property def chrom(self) -> str: return self.summary_allele.chromosome @chrom.setter def chrom(self, chrom: str) -> None: self.summary_allele.chrom = chrom @property def position(self) -> int: return self.summary_allele.position @property def reference(self) -> str | None: return self.summary_allele.reference @property def alternative(self) -> str | None: return self.summary_allele.alternative @property def summary_index(self) -> int: return self.summary_allele.summary_index @summary_index.setter def summary_index(self, summary_index: int) -> None: self.summary_allele.summary_index = summary_index @property def family_index(self) -> int | None: return self._family_index @family_index.setter def family_index(self, val: int) -> None: self._family_index = val @property def allele_index(self) -> int: return self.summary_allele.allele_index @property def transmission_type(self) -> TransmissionType: return self.summary_allele.transmission_type @property def summary_attributes(self) -> dict[str, Any]: return self.summary_allele.attributes @property def family_attributes(self) -> dict[str, Any]: return self._family_attributes @property def attributes(self) -> dict[str, Any]: result = copy.deepcopy(self.summary_attributes) result.update(self.family_attributes) return result
[docs] def get_attribute(self, item: str, default_val: Any = None) -> Any: """ Return list of values from additional attributes matching given key. looks up values matching key `item` in additional attributes passed on creation of the variant. """ val = self.family_attributes.get(item) if val is not None: return val val = self.summary_allele.get_attribute(item, default_val) if val is None: return default_val return val
[docs] def has_attribute(self, item: str) -> bool: """Check if the additional variant attributes contain a given key.""" return item in self.family_attributes or \ item in self.summary_attributes
[docs] def update_attributes(self, atts: dict[str, Any]) -> None: """Update additional attributes of the variant.""" self._family_attributes.update(atts)
@property def details(self) -> VariantDetails: return self.summary_allele.details @property def effects(self) -> AlleleEffects | None: return self.summary_allele.effects @property def allele_type(self) -> Allele.Type: return self.summary_allele.allele_type @property def end_position(self) -> int | None: return self.summary_allele.end_position @property def genotype(self) -> np.ndarray: """Return the genotype of the family.""" assert self.gt is not None return self.gt.T @property def best_state(self) -> np.ndarray: """Return the best state of the variant.""" if self._best_state is None: assert self.gt is not None self._best_state = calculate_simple_best_state( self.gt, self.attributes["allele_count"], ) return self._best_state def _calc_zygosity_status(self) -> int: translator = BitmaskEnumTranslator( main_enum_type=Zygosity, partition_by_enum_type=Status, ) zygosity = 0 for idx, allele_pair in enumerate(self.genotype): if self.allele_index not in allele_pair: continue pid = self.member_ids[idx] person_status = self.family.persons[pid].status current_zygosity = Zygosity.homozygous \ if allele_pair[0] == allele_pair[1] \ else Zygosity.heterozygous if person_status == Status.unspecified: continue zygosity = translator.apply_mask( zygosity, current_zygosity.value, person_status, ) return zygosity def _calc_zygosity_sexes(self) -> int: translator = BitmaskEnumTranslator( main_enum_type=Zygosity, partition_by_enum_type=Sex, ) zygosity = 0 for idx, allele_pair in enumerate(self.genotype): if self.allele_index not in allele_pair: continue pid = self.member_ids[idx] person_sex = self.family.persons[pid].sex current_zygosity = Zygosity.homozygous \ if allele_pair[0] == allele_pair[1] \ else Zygosity.heterozygous zygosity = translator.apply_mask( zygosity, current_zygosity.value, person_sex, ) return zygosity def _calc_zygosity_roles(self) -> int: translator = BitmaskEnumTranslator( main_enum_type=Zygosity, partition_by_enum_type=Role, ) zygosity = 0 for idx, allele_pair in enumerate(self.genotype): if self.allele_index not in allele_pair: continue pid = self.member_ids[idx] person_role = self.family.persons[pid].role if person_role is None: raise ValueError(f"Person {pid} has no role!") current_zygosity = Zygosity.homozygous \ if allele_pair[0] == allele_pair[1] \ else Zygosity.heterozygous zygosity = translator.apply_mask( zygosity, current_zygosity.value, person_role, ) return zygosity @property def zygosity_in_status(self) -> int: if self._zygosity_in_status is None: self._zygosity_in_status = self._calc_zygosity_status() return self._zygosity_in_status @property def zygosity_in_roles(self) -> int: if self._zygosity_in_roles is None: self._zygosity_in_roles = self._calc_zygosity_roles() return self._zygosity_in_roles @property def zygosity_in_sexes(self) -> int: if self._zygosity_in_sexes is None: self._zygosity_in_sexes = self._calc_zygosity_sexes() return self._zygosity_in_sexes @property @deprecated(details="Replace `best_st` with `best_state`") def best_st(self) -> np.ndarray: return self.best_state @property def genetic_model(self) -> GeneticModel | None: return self._genetic_model
[docs] def gt_flatten(self) -> np.ndarray: """Return the family variant genotype flattened to a 1d array.""" assert self.gt is not None return self.gt.flatten(order="F")
@property def inheritance_in_members(self) -> list[Inheritance]: """Return list of family member inheritance.""" if self._inheritance_in_members is None: assert self.gt is not None allele_index = self.allele_index result = [] for pid in self.member_ids: if pid not in self.family.trios: result.append(Inheritance.unknown) continue trio = self.family.trios[pid] trio_index = self.family.members_index(trio) trio_gt = self.gt[:, trio_index] if np.any(trio_gt == -1): inh = Inheritance.unknown elif np.all(trio_gt != allele_index): inh = Inheritance.missing else: child = trio_gt[:, 0] parent_1 = trio_gt[:, 1] parent_2 = trio_gt[:, 2] inh = self.calc_inheritance_trio( parent_1, parent_2, child, allele_index, ) if inh != Inheritance.omission and np.all( child != allele_index, ): inh = Inheritance.missing result.append(inh) self._inheritance_in_members = result return self._inheritance_in_members @property def variant_in_members(self) -> list[str | None]: """Return set of affected by this variant family members' IDs.""" if self._variant_in_members is None: assert self.gt is not None allele_index = getattr(self, "allele_index", None) gt = np.copy(self.gt) if allele_index is not None: gt[gt != allele_index] = -1 index = cast(np.ndarray, np.any(gt == allele_index, axis=0)) assert len(index) == len(self.member_ids) self._variant_in_members = [ pid if has_variant else None for pid, has_variant in zip( self.member_ids, index, strict=True) ] return self._variant_in_members @property def allele_in_members(self) -> list[str | None]: return self.variant_in_members @property def variant_in_member_fpids(self) -> list[tuple[str, str]]: """Return list of person with the variant.""" return [ (self.family_id, pid) for pid in self.variant_in_members if pid is not None] @property def variant_in_roles(self) -> list[Role | None]: """ Return list of roles that have affected by this variant members. Returns None if none found. """ if self._variant_in_roles is None: self._variant_in_roles = [ self.family.persons[pid].role if pid is not None else None for pid in self.variant_in_members ] return self._variant_in_roles @property def allele_in_roles(self) -> list[Role | None]: return self.variant_in_roles @property def variant_in_statuses(self) -> list[Status | None]: """Return list of statuses (or 'None') of the members with variant.""" if self._variant_in_statuses is None: self._variant_in_statuses = [ self.family.persons[pid].status if pid is not None else None for pid in self.variant_in_members ] return self._variant_in_statuses @property def allele_in_statuses(self) -> list[Status | None]: return self.variant_in_statuses @property def variant_in_sexes(self) -> list[Sex | None]: """Return list of sexes that are affected by this variant in family.""" if self._variant_in_sexes is None: self._variant_in_sexes = [ self.family.persons[pid].sex if pid is not None else None for pid in self.variant_in_members ] return self._variant_in_sexes @property def allele_in_sexes(self) -> list[Sex | None]: return self.variant_in_sexes
[docs] @staticmethod def check_mendelian_trio( parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> bool: """Check if the inheritance type for a trio family is `mendelian`. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: True, when the inheritance is mendelian. """ # pylint: disable=invalid-name ai = allele_index if ai not in set(child): return False m1: bool = ( child[0] == parent_1[0] == ai or child[0] == parent_1[1] == ai ) or ( child[1] == parent_2[0] == ai or child[1] == parent_2[1] == ai ) m2: bool = ( child[0] == parent_2[0] == ai or child[0] == parent_2[1] == ai ) or ( child[1] == parent_1[0] == ai or child[1] == parent_1[1] == ai ) return m1 or m2
[docs] @staticmethod def check_denovo_trio( parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> bool: """ Check if the inheritance type for a trio family is `denovo`. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: True, when the inheritance is mendelian. """ new_alleles = set(child).difference(set(parent_1) | set(parent_2)) return allele_index in new_alleles
[docs] @staticmethod def check_omission_trio( parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> bool: """Check if the inheritance type for a trio family is `omission`. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: True, when the inheritance is mendelian. """ if allele_index not in set(parent_1) | set(parent_2): return False child_alleles = set(child) if allele_index in child_alleles: return False p1res = False p2res = False if parent_1[0] == parent_1[1] == allele_index: p1res = not bool(parent_1[0] in child_alleles) if parent_2[0] == parent_2[1] == allele_index: p2res = not bool(parent_2[0] in child_alleles) return p1res or p2res
[docs] @classmethod def calc_inheritance_trio( cls, parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> Inheritance: """Calculate the inheritance type of a trio family. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: inheritance type as :class:`variants.attributes.Inheritance` of the trio family. """ if cls.check_mendelian_trio(parent_1, parent_2, child, allele_index): return Inheritance.mendelian if cls.check_denovo_trio(parent_1, parent_2, child, allele_index): return Inheritance.denovo if cls.check_omission_trio(parent_1, parent_2, child, allele_index): return Inheritance.omission return Inheritance.other
[docs] class FamilyVariant(SummaryVariant): # pylint: disable=super-init-not-called,too-many-public-methods """Class representing a variant in a family.""" def __init__( self, summary_variant: SummaryVariant, family: Family, *, family_id: str | None = None, member_ids: list[str] | None = None, genotype: np.ndarray | None = None, best_state: np.ndarray | None = None, inheritance_in_members: dict[int, list[Inheritance]] | None = None, ): assert family is not None assert isinstance(family, Family) self.family = family if family_id is None: family_id = family.family_id self._family_id = family_id if member_ids is None: member_ids = family.member_ids self._member_ids = member_ids assert self._family_id is not None assert self._member_ids is not None self.summary_variant = summary_variant self.summary_alleles = self.summary_variant.alleles self.gt = genotype self._genetic_model: GeneticModel | None = None self._family_alleles: list[FamilyAllele] | None = None self._best_state = best_state self._zygosity_in_status: int | None = None self._zygosity_in_roles: int | None = None self._zygosity_in_sexes: int | None = None self._fvuid: str | None = None if inheritance_in_members is None: self._inheritance_in_members = {} else: self._inheritance_in_members = inheritance_in_members def _build_family_alleles(self) -> None: assert self._family_alleles is None alleles = [ FamilyAllele( self.summary_variant.alleles[0], self.family, genotype=self.gt, best_state=self._best_state, inheritance_in_members=self._inheritance_in_members.get(0), ), ] # pylint: disable=invalid-name assert self.gt is not None for ai in self.calc_alt_alleles(self.gt): summary_allele: SummaryAllele | None = None for allele in self.summary_variant.alt_alleles: if allele.allele_index == ai: summary_allele = allele break if summary_allele is None: continue inheritance = self._inheritance_in_members.get(ai) allele = FamilyAllele( summary_allele, self.family, family_id=self.family_id, member_ids=self.member_ids, genotype=self.gt, best_state=self._best_state, inheritance_in_members=inheritance, ) alleles.append(allele) self._family_alleles = alleles @property def member_ids(self) -> list[str]: """Return list of family members IDs.""" return self._member_ids @property def member_fpids(self) -> list[tuple[str, str]]: """Return list of family members IDs.""" return [(self.family_id, pid) for pid in self.member_ids] @property def family_id(self) -> str: """Return the family ID.""" return self._family_id @property def fvuid(self) -> str: """Construct and return the family variant unique identifier.""" if self._fvuid is None: self._fvuid = ( f"{self.family_id}.{self.location}" f".{self.reference}.{self.alternative}" ) return self._fvuid @property def chromosome(self) -> str: return self.summary_variant.chromosome @property def chrom(self) -> str: return self.summary_variant.chromosome @chrom.setter def chrom(self, chrom: str) -> None: self.summary_variant.chrom = chrom @property def position(self) -> int: return self.summary_variant.position @property def reference(self) -> str | None: return self.summary_variant.reference @property def end_position(self) -> int | None: return self.summary_variant.end_position @property def allele_count(self) -> int: return self.summary_variant.allele_count @property def summary_index(self) -> int: return self.summary_variant.summary_index @summary_index.setter def summary_index(self, summary_index: int) -> None: self.summary_variant.summary_index = summary_index @property def family_index(self) -> int | None: return cast(FamilyAllele, self.ref_allele).family_index @family_index.setter def family_index(self, val: int) -> None: for allele in self.family_alleles: allele.family_index = val @property def allele_indexes(self) -> list[int]: return [a.allele_index for a in self.alleles] @property def family_allele_indexes(self) -> list[int]: return list(range(len(self.alleles))) @property def alleles(self) -> list[SummaryAllele]: if self._family_alleles is None: self._build_family_alleles() assert self._family_alleles is not None return cast(list[SummaryAllele], self._family_alleles) @property def family_alleles(self) -> list[FamilyAllele]: if self._family_alleles is None: self._build_family_alleles() return cast(list[FamilyAllele], self._family_alleles) @property def family_alt_alleles(self) -> list[FamilyAllele]: return self.family_alleles[1:]
[docs] def gt_flatten(self) -> np.ndarray: """Return genotype of the family variant flattened to a 1d array.""" assert self.gt is not None return self.gt.flatten(order="F")
[docs] def is_reference(self) -> bool: """Return True if all known alleles in the variant are `reference`.""" assert self.gt is not None return is_reference_genotype(self.gt)
[docs] def is_unknown(self) -> bool: """Return True if all alleles in the variant are `unknown`.""" assert self.gt is not None return is_all_unknown_genotype(self.gt)
def __repr__(self) -> str: assert self.gt is not None output = SummaryVariant.__repr__(self) return f"{output} {self.family_id} {mat2str(self.gt)}" @property def genotype(self) -> list[list[int]]: """Return genotype using summary variant allele indexes.""" assert self.gt is not None return [list(self.gt[:, m]) for m in range(self.gt.shape[1])] @property def family_genotype(self) -> list[list[int]]: """Return family genotype using family variant indexes.""" assert self.gt is not None gt2fgt = zip( self.allele_indexes, self.family_allele_indexes, strict=True, ) fgt = np.zeros(shape=self.gt.shape, dtype=np.int8) # pylint: disable=invalid-name for gi, fgi in gt2fgt: fgt[self.gt == gi] = fgi return [list(fgt[:, m]) for m in range(fgt.shape[1])] @property def genetic_model(self) -> GeneticModel: if self._genetic_model is None: self._genetic_model = GeneticModel.autosomal assert self._genetic_model is not None return self._genetic_model @property def best_state(self) -> np.ndarray: """Return best state of the variant.""" if self._best_state is None: assert self.gt is not None self._best_state = calculate_simple_best_state( self.gt, self.allele_count, ) assert self._best_state is not None return self._best_state @property def family_best_state(self) -> np.ndarray: return self.best_state[self.allele_indexes, :] @property @deprecated(details="Replace usage of `best_st` with `best_state`") def best_st(self) -> np.ndarray: return self.best_state
[docs] @staticmethod def calc_alt_alleles(gt: np.ndarray) -> list[int]: """ Return relevant for the given genotype alternative allele indexes. :param gt: genotype as `np.array`. :return: list of all alternative allele indexes present into genotype passed. """ return sorted(set(gt.flatten()).difference({0}))
[docs] @staticmethod def calc_alleles(gt: np.ndarray) -> list[int]: """ Return allele indexes that are relevant for the given genotype. :param gt: genotype as `np.array`. :return: list of all allele indexes present into genotype passed. """ return sorted(set(gt.flatten()).difference({-1}))
@property def variant_in_members(self) -> set[str]: """Return list of members with the variant.""" members: set[str] = set() for allele in self.alt_alleles: members = members.union(filter( None, cast(FamilyAllele, allele).variant_in_members)) return members def _serialize_inheritance_in_members( self, ) -> dict[str, list[int]]: result = {} for allele in self.family_alleles: result[str(allele.allele_index)] = [ inh.value for inh in allele.inheritance_in_members] return result @property def zygosity_in_status(self) -> int: """Calculate and cache zygosity based on alternative alleles.""" if self._zygosity_in_status is None: zygosity = 0 for fa_zygosity in [ fa.zygosity_in_status for fa in self.family_alt_alleles ]: zygosity = zygosity | fa_zygosity self._zygosity_in_status = zygosity return self._zygosity_in_status @property def zygosity_in_roles(self) -> int: """Calculate and cache zygosity based on alternative alleles.""" if self._zygosity_in_roles is None: zygosity = 0 for fa_zygosity in [ fa.zygosity_in_roles for fa in self.family_alt_alleles ]: zygosity = zygosity | fa_zygosity self._zygosity_in_roles = zygosity return self._zygosity_in_roles @property def zygosity_in_sexes(self) -> int: """Calculate and cache zygosity based on alternative alleles.""" if self._zygosity_in_sexes is None: zygosity = 0 for fa_zygosity in [ fa.zygosity_in_sexes for fa in self.family_alt_alleles ]: zygosity = zygosity | fa_zygosity self._zygosity_in_sexes = zygosity return self._zygosity_in_sexes
[docs] def to_record(self) -> dict[str, Any]: # type: ignore assert self.gt is not None return { "family_id": self.family_id, "summary_index": self.summary_index, "family_index": self.family_index, "genotype": self.gt.tolist(), "best_state": self.best_state.tolist(), "inheritance_in_members": self._serialize_inheritance_in_members(), "family_variant_attributes": [ allele.family_attributes for allele in self.family_alt_alleles ], }