Source code for dae.variants.family_variant

import copy
import logging
from typing import Any, cast

import numpy as np
from deprecation import deprecated

from dae.effect_annotation.effect import AlleleEffects
from dae.pedigrees.family import Family, Person
from dae.utils.variant_utils import (
    GenotypeType,
    is_all_unknown_genotype,
    is_reference_genotype,
    mat2str,
)
from dae.variants.attributes import (
    GeneticModel,
    Inheritance,
    Role,
    Sex,
    Status,
    TransmissionType,
)
from dae.variants.core import Allele
from dae.variants.variant import SummaryAllele, SummaryVariant, VariantDetails

logger = logging.getLogger(__name__)


[docs] def calculate_simple_best_state( genotype: np.ndarray, allele_count: int) -> np.ndarray: """Calculate and return the best state of a genotype.""" # Simple best state calculation # Treats every genotype as diploid (including male X non-PAR) ref: np.ndarray = 2 * np.ones(genotype.shape[1], dtype=GenotypeType) unknown = np.any(genotype == -1, axis=0) best_st = [ref] for allele_index in range(1, allele_count): alt_gt = np.zeros(genotype.shape, dtype=GenotypeType) alt_gt[genotype == allele_index] = 1 alt = np.sum(alt_gt, axis=0, dtype=GenotypeType) best_st[0] = best_st[0] - alt best_st.append(alt) best_st_arr = cast(np.ndarray, np.stack(best_st, axis=0)) best_st_arr[:, unknown] = -1 return best_st_arr
[docs] class FamilyDelegate: """Delegate for handling families.""" def __init__(self, family: Family): self.family = family @property def members_in_order(self) -> list[Person]: """ Return the members from the pedigree file in order. Return list of the members of the family in the order specified from the pedigree file. Each element of the returned list is an object of type :class:`variants.family.Person`. """ return self.family.members_in_order @property def members_ids(self) -> list[str]: """Return list of family members IDs.""" return self.family.members_ids @property def members_fpids(self) -> list[tuple[str, str]]: """Return list of family members IDs.""" return [m.fpid for m in self.family.members_in_order] @property def family_id(self) -> str: """Return the family ID.""" return self.family.family_id
[docs] class FamilyAllele(SummaryAllele, FamilyDelegate): # pylint: disable=super-init-not-called,too-many-public-methods """Class representing an allele in a family.""" def __init__( self, summary_allele: SummaryAllele, family: Family, genotype: np.ndarray | None, best_state: np.ndarray | None, genetic_model: GeneticModel | None = None, inheritance_in_members: list[Inheritance] | None = None, ): assert isinstance(family, Family) FamilyDelegate.__init__(self, family) #: summary allele that corresponds to this allele in family variant self.summary_allele: SummaryAllele = summary_allele self.gt = genotype self._best_state = best_state if genetic_model is None: self._genetic_model = GeneticModel.autosomal else: self._genetic_model = genetic_model self._inheritance_in_members = inheritance_in_members self._variant_in_members: list[str | None] | None = None self._variant_in_members_objects: list[Person] | None = None self._variant_in_roles: list[Role | None] | None = None self._variant_in_sexes: list[Sex | None] | None = None self._variant_in_statuses: list[Status | None] | None = None self._family_index: int | None = None self._family_attributes: dict = {} self.matched_gene_effects: list = [] def __repr__(self) -> str: allele_repr = SummaryAllele.__repr__(self) return f"{allele_repr} {self.family_id}" @property def chromosome(self) -> str: return self.summary_allele.chromosome @property def chrom(self) -> str: return self.summary_allele.chromosome @property def position(self) -> int: return self.summary_allele.position @property def reference(self) -> str | None: return self.summary_allele.reference @property def alternative(self) -> str | None: return self.summary_allele.alternative @property def summary_index(self) -> int: return self.summary_allele.summary_index @summary_index.setter def summary_index(self, val: int) -> None: self.summary_allele.summary_index = val @property def family_index(self) -> int | None: return self._family_index @family_index.setter def family_index(self, val: int) -> None: self._family_index = val @property def allele_index(self) -> int: return self.summary_allele.allele_index @property def transmission_type(self) -> TransmissionType: return self.summary_allele.transmission_type @property def summary_attributes(self) -> dict[str, Any]: return self.summary_allele.attributes @property def family_attributes(self) -> dict[str, Any]: return self._family_attributes @property def attributes(self) -> dict[str, Any]: result = copy.deepcopy(self.summary_attributes) result.update(self.family_attributes) return result
[docs] def get_attribute(self, item: str, default_val: Any = None) -> Any: """ Return list of values from additional attributes matching given key. looks up values matching key `item` in additional attributes passed on creation of the variant. """ val = self.family_attributes.get(item) if val is not None: return val val = self.summary_allele.get_attribute(item, default_val) if val is None: return default_val return val
[docs] def has_attribute(self, item: str) -> bool: """Check if the additional variant attributes contain a given key.""" return item in self.family_attributes or \ item in self.summary_attributes
[docs] def update_attributes(self, atts: dict[str, Any]) -> None: """Update additional attributes of the variant.""" self._family_attributes.update(atts)
@property def details(self) -> VariantDetails: return self.summary_allele.details @property def effects(self) -> AlleleEffects | None: return self.summary_allele.effects @property def allele_type(self) -> Allele.Type: return self.summary_allele.allele_type @property def end_position(self) -> int | None: return self.summary_allele.end_position @property def genotype(self) -> np.ndarray: """Return the genotype of the family.""" assert self.gt is not None return self.gt.T @property def best_state(self) -> np.ndarray: """Return the best state of the variant.""" if self._best_state is None: assert self.gt is not None self._best_state = calculate_simple_best_state( self.gt, self.attributes["allele_count"], ) return self._best_state @property @deprecated(details="Replace `best_st` with `best_state`") def best_st(self) -> np.ndarray: return self.best_state @property def genetic_model(self) -> GeneticModel | None: return self._genetic_model
[docs] def gt_flatten(self) -> np.ndarray: """Return the family variant genotype flattened to a 1d array.""" assert self.gt is not None return self.gt.flatten(order="F")
@property def inheritance_in_members(self) -> list[Inheritance]: """Return list of family member inheritance.""" if self._inheritance_in_members is None: assert self.gt is not None allele_index = self.allele_index result = [] for pid in self.members_ids: if pid not in self.family.trios: result.append(Inheritance.unknown) continue trio = self.family.trios[pid] trio_index = self.family.members_index(trio) trio_gt = self.gt[:, trio_index] if np.any(trio_gt == -1): inh = Inheritance.unknown elif np.all(trio_gt != allele_index): inh = Inheritance.missing else: child = trio_gt[:, 0] parent_1 = trio_gt[:, 1] parent_2 = trio_gt[:, 2] inh = self.calc_inheritance_trio( parent_1, parent_2, child, allele_index, ) if inh != Inheritance.omission and np.all( child != allele_index, ): inh = Inheritance.missing result.append(inh) self._inheritance_in_members = result return self._inheritance_in_members @property def variant_in_members(self) -> list[str | None]: """Return set of affected by this variant family members' IDs.""" if self._variant_in_members is None: assert self.gt is not None allele_index = getattr(self, "allele_index", None) gt = np.copy(self.gt) if allele_index is not None: gt[gt != allele_index] = -1 index = np.any(gt == allele_index, axis=0) assert len(index) == len(self.members_in_order) self._variant_in_members = [ m.person_id if has_variant else None for m, has_variant in zip( self.members_in_order, index, strict=True) ] return self._variant_in_members @property def allele_in_members(self) -> list[str | None]: return self.variant_in_members @property def variant_in_members_objects(self) -> list[Person]: """Return list of person with the variant.""" if self._variant_in_members_objects is None: variant_in_members = set(filter(None, self.variant_in_members)) self._variant_in_members_objects = [ member for member in self.family.members_in_order if member.person_id in variant_in_members ] return self._variant_in_members_objects @property def variant_in_members_fpid(self) -> list[tuple[str, str]]: """Return list of person with the variant.""" return [p.fpid for p in self.variant_in_members_objects] @property def variant_in_roles(self) -> list[Role | None]: """ Return list of roles that have affected by this variant members. Returns None if none found. """ if self._variant_in_roles is None: self._variant_in_roles = [ self.family.persons[pid].role if pid is not None else None for pid in self.variant_in_members ] return self._variant_in_roles @property def allele_in_roles(self) -> list[Role | None]: return self.variant_in_roles @property def variant_in_statuses(self) -> list[Status | None]: """Return list of statuses (or 'None') of the members with variant.""" if self._variant_in_statuses is None: self._variant_in_statuses = [ self.family.persons[pid].status if pid is not None else None for pid in self.variant_in_members ] return self._variant_in_statuses @property def allele_in_statuses(self) -> list[Status | None]: return self.variant_in_statuses @property def variant_in_sexes(self) -> list[Sex | None]: """Return list of sexes that are affected by this variant in family.""" if self._variant_in_sexes is None: self._variant_in_sexes = [ self.family.persons[pid].sex if pid is not None else None for pid in self.variant_in_members ] return self._variant_in_sexes @property def allele_in_sexes(self) -> list[Sex | None]: return self.variant_in_sexes
[docs] @staticmethod def check_mendelian_trio( parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> bool: """Check if the inheritance type for a trio family is `mendelian`. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: True, when the inheritance is mendelian. """ # pylint: disable=invalid-name ai = allele_index if ai not in set(child): return False m1: bool = ( child[0] == parent_1[0] == ai or child[0] == parent_1[1] == ai ) or ( child[1] == parent_2[0] == ai or child[1] == parent_2[1] == ai ) m2: bool = ( child[0] == parent_2[0] == ai or child[0] == parent_2[1] == ai ) or ( child[1] == parent_1[0] == ai or child[1] == parent_1[1] == ai ) return m1 or m2
[docs] @staticmethod def check_denovo_trio( parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> bool: """ Check if the inheritance type for a trio family is `denovo`. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: True, when the inheritance is mendelian. """ new_alleles = set(child).difference(set(parent_1) | set(parent_2)) return allele_index in new_alleles
[docs] @staticmethod def check_omission_trio( parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> bool: """Check if the inheritance type for a trio family is `omission`. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: True, when the inheritance is mendelian. """ if allele_index not in set(parent_1) | set(parent_2): return False child_alleles = set(child) if allele_index in child_alleles: return False p1res = False p2res = False if parent_1[0] == parent_1[1] == allele_index: p1res = not bool(parent_1[0] in child_alleles) if parent_2[0] == parent_2[1] == allele_index: p2res = not bool(parent_2[0] in child_alleles) return p1res or p2res
[docs] @classmethod def calc_inheritance_trio( cls, parent_1: np.ndarray, parent_2: np.ndarray, child: np.ndarray, allele_index: int) -> Inheritance: """Calculate the inheritance type of a trio family. :param parent_1: genotype of the first parent (pair of allele indexes). :param parent_2: genotype of the second parent. :param child: genotype of the child. :return: inheritance type as :class:`variants.attributes.Inheritance` of the trio family. """ if cls.check_mendelian_trio(parent_1, parent_2, child, allele_index): return Inheritance.mendelian if cls.check_denovo_trio(parent_1, parent_2, child, allele_index): return Inheritance.denovo if cls.check_omission_trio(parent_1, parent_2, child, allele_index): return Inheritance.omission return Inheritance.other
[docs] class FamilyVariant(SummaryVariant, FamilyDelegate): # pylint: disable=super-init-not-called,too-many-public-methods """Class representing a variant in a family.""" def __init__( self, summary_variant: SummaryVariant, family: Family, genotype: np.ndarray | None, best_state: np.ndarray | None, inheritance_in_members: dict[int, list[Inheritance]] | None = None, ): assert family is not None assert isinstance(family, Family) FamilyDelegate.__init__(self, family) self.summary_variant = summary_variant self.summary_alleles = self.summary_variant.alleles self.gt = genotype self._genetic_model: GeneticModel | None = None self._family_alleles: list[FamilyAllele] | None = None self._best_state = best_state self._fvuid: str | None = None if inheritance_in_members is None: self._inheritance_in_members = {} else: self._inheritance_in_members = inheritance_in_members def _build_family_alleles(self) -> None: assert self._family_alleles is None alleles = [ FamilyAllele( self.summary_variant.alleles[0], self.family, self.gt, self._best_state, inheritance_in_members=self._inheritance_in_members.get(0), ), ] # pylint: disable=invalid-name assert self.gt is not None for ai in self.calc_alt_alleles(self.gt): summary_allele: SummaryAllele | None = None for allele in self.summary_variant.alt_alleles: if allele.allele_index == ai: summary_allele = allele break if summary_allele is None: continue inheritance = self._inheritance_in_members.get(ai) allele = FamilyAllele( summary_allele, self.family, self.gt, self._best_state, inheritance_in_members=inheritance, ) alleles.append(allele) self._family_alleles = alleles @property def fvuid(self) -> str: """Construct and return the family variant unique identifier.""" if self._fvuid is None: self._fvuid = ( f"{self.family_id}.{self.location}" f".{self.reference}.{self.alternative}" ) return self._fvuid @property def chromosome(self) -> str: return self.summary_variant.chromosome @property def chrom(self) -> str: return self.summary_variant.chromosome @property def position(self) -> int: return self.summary_variant.position @property def reference(self) -> str | None: return self.summary_variant.reference @property def end_position(self) -> int | None: return self.summary_variant.end_position @property def allele_count(self) -> int: return self.summary_variant.allele_count @property def summary_index(self) -> int: return self.summary_variant.summary_index @summary_index.setter def summary_index(self, summary_index: int) -> None: self.summary_variant.summary_index = summary_index @property def family_index(self) -> int | None: return cast(FamilyAllele, self.ref_allele).family_index @family_index.setter def family_index(self, val: int) -> None: for allele in self.family_alleles: allele.family_index = val @property def allele_indexes(self) -> list[int]: return [a.allele_index for a in self.alleles] @property def family_allele_indexes(self) -> list[int]: return list(range(len(self.alleles))) @property def alleles(self) -> list[SummaryAllele]: if self._family_alleles is None: self._build_family_alleles() assert self._family_alleles is not None return cast(list[SummaryAllele], self._family_alleles) @property def family_alleles(self) -> list[FamilyAllele]: if self._family_alleles is None: self._build_family_alleles() return cast(list[FamilyAllele], self._family_alleles) @property def family_alt_alleles(self) -> list[FamilyAllele]: return self.family_alleles[1:]
[docs] def gt_flatten(self) -> np.ndarray: """Return genotype of the family variant flattened to a 1d array.""" assert self.gt is not None return self.gt.flatten(order="F")
[docs] def is_reference(self) -> bool: """Return True if all known alleles in the variant are `reference`.""" assert self.gt is not None return is_reference_genotype(self.gt)
[docs] def is_unknown(self) -> bool: """Return True if all alleles in the variant are `unknown`.""" assert self.gt is not None return is_all_unknown_genotype(self.gt)
def __repr__(self) -> str: assert self.gt is not None output = SummaryVariant.__repr__(self) return f"{output} {self.family_id} {mat2str(self.gt)}" @property def genotype(self) -> list[list[int]]: """Return genotype using summary variant allele indexes.""" assert self.gt is not None return [list(self.gt[:, m]) for m in range(self.gt.shape[1])] @property def family_genotype(self) -> list[list[int]]: """Return family genotype using family variant indexes.""" assert self.gt is not None gt2fgt = zip( self.allele_indexes, self.family_allele_indexes, strict=True, ) fgt = np.zeros(shape=self.gt.shape, dtype=np.int8) # pylint: disable=invalid-name for gi, fgi in gt2fgt: fgt[self.gt == gi] = fgi return [list(fgt[:, m]) for m in range(fgt.shape[1])] @property def genetic_model(self) -> GeneticModel: if self._genetic_model is None: self._genetic_model = GeneticModel.autosomal assert self._genetic_model is not None return self._genetic_model @property def best_state(self) -> np.ndarray: """Return best state of the variant.""" if self._best_state is None: assert self.gt is not None self._best_state = calculate_simple_best_state( self.gt, self.allele_count, ) assert self._best_state is not None return self._best_state @property def family_best_state(self) -> np.ndarray: return self.best_state[self.allele_indexes, :] @property # type: ignore @deprecated(details="Replace usage of `best_st` with `best_state`") def best_st(self): return self.best_state
[docs] @staticmethod def calc_alt_alleles(gt: np.ndarray) -> list[int]: """ Return relevant for the given genotype alternative allele indexes. :param gt: genotype as `np.array`. :return: list of all alternative allele indexes present into genotype passed. """ return sorted(set(gt.flatten()).difference({0}))
[docs] @staticmethod def calc_alleles(gt: np.ndarray) -> list[int]: """ Return allele indexes that are relevant for the given genotype. :param gt: genotype as `np.array`. :return: list of all allele indexes present into genotype passed. """ return sorted(set(gt.flatten()).difference({-1}))
@property def variant_in_members(self) -> set[str]: """Return list of members with the variant.""" members: set[str] = set() for allele in self.alt_alleles: members = members.union(filter( None, cast(FamilyAllele, allele).variant_in_members)) return members def _serialize_inheritance_in_members( self, ) -> dict[int, list[int]]: result = {} for allele in self.family_alleles: result[allele.allele_index] = [ inh.value for inh in allele.inheritance_in_members] return result
[docs] def to_record(self) -> dict[str, Any]: # type: ignore assert self.gt is not None return { "family_id": self.family_id, "summary_index": self.summary_index, "family_index": self.family_index, "genotype": self.gt.tolist(), "best_state": self.best_state.tolist(), "inheritance_in_members": self._serialize_inheritance_in_members(), "family_variant_attributes": [ allele.family_attributes for allele in self.family_alt_alleles ], }