from __future__ import annotations
import logging
import time
from collections.abc import Iterable
from copy import deepcopy
from typing import Any, cast
import numpy as np
from dae.effect_annotation.effect import EffectTypesMixin
from dae.person_sets import PersonSet, PersonSetCollection
from dae.variants.family_variant import FamilyAllele, FamilyVariant
logger = logging.getLogger(__name__)
[docs]
class EffectCell: # pylint: disable=too-many-instance-attributes
"""Class representing a cell in the denovo report table."""
def __init__(self, person_set: PersonSet, effect: str) -> None:
assert len(person_set.persons) > 0
self.person_set = person_set
self.effect = effect
expanded_effect_types = \
EffectTypesMixin().get_effect_types(effectTypes=effect)
self.effect_types = set()
if expanded_effect_types is not None:
self.effect_types = set(expanded_effect_types)
self.observed_variants_ids: set[str] = set()
self.observed_people_with_event: set[str] = set()
self.person_set_persons = set(self.person_set.persons.keys())
self.person_set_children = {
p.person_id for p in self.person_set.get_children()
}
if len(self.person_set_children) == 0:
self.person_set_children = {
p[1] for p in self.person_set.persons}
logger.info(
"DENOVO REPORTS: persons set %s children %s",
self.person_set,
len(self.person_set_children),
)
@property
def number_of_observed_events(self) -> int:
return len(self.observed_variants_ids)
@property
def number_of_children_with_event(self) -> int:
return len(self.observed_people_with_event)
@property
def observed_rate_per_child(self) -> int | float:
if self.number_of_observed_events == 0:
return 0
return self.number_of_observed_events / len(self.person_set_children)
@property
def percent_of_children_with_events(self) -> int | float:
if self.number_of_children_with_event == 0:
return 0
return self.number_of_children_with_event / len(
self.person_set_children,
)
@property
def column_name(self) -> str:
return f"{self.person_set.name} ({len(self.person_set_children)})"
[docs]
def to_dict(self) -> dict[str, int | float | str]:
return {
"number_of_observed_events":
self.number_of_observed_events,
"number_of_children_with_event":
self.number_of_children_with_event,
"observed_rate_per_child":
self.observed_rate_per_child,
"percent_of_children_with_events":
self.percent_of_children_with_events,
"column":
self.column_name,
}
[docs]
def count_variant(
self, family_variant: FamilyVariant,
family_allele: FamilyAllele,
) -> None:
"""Count given variant in the cell data."""
if not set(family_allele.variant_in_members) & \
self.person_set_children:
variant_in_members = set(family_allele.variant_in_members) & \
self.person_set_persons
if variant_in_members:
logger.warning(
"denovo variant not in child: %s; %s; "
"person set: %s; "
"mismatched persons: %s",
family_allele,
family_allele.variant_in_members,
self.person_set.id,
variant_in_members,
)
return
if not family_allele.effects:
return
if not set(family_allele.effects.types) & self.effect_types:
return
self.observed_variants_ids.add(family_variant.fvuid)
self.observed_people_with_event.update(
set(filter(None, family_allele.variant_in_members))
& self.person_set_children)
[docs]
def is_empty(self) -> bool:
return (
self.number_of_observed_events == 0
and self.number_of_children_with_event == 0
and self.observed_rate_per_child == 0
and self.percent_of_children_with_events == 0
)
[docs]
class EffectRow:
"""Class representing a row in the denovo report table."""
def __init__(self, effect: str, person_sets: list[PersonSet]) -> None:
self.person_sets = person_sets
self.effect_type = effect
self.row = self._build_row()
[docs]
def to_dict(self) -> dict[str, Any]:
return {
"effect_type": self.effect_type,
"row": [r.to_dict() for r in self.row],
}
def _build_row(self) -> list[EffectCell]:
return [
EffectCell(
person_set,
self.effect_type,
)
for person_set in self.person_sets
]
[docs]
def count_variant(self, fv: FamilyVariant) -> None:
for cell in self.row:
for aa in fv.alt_alleles:
fa = cast(FamilyAllele, aa)
cell.count_variant(fv, fa)
[docs]
def is_row_empty(self) -> bool:
return all(value.is_empty() for value in self.row)
[docs]
def get_empty(self) -> list[bool]:
return [value.is_empty() for value in self.row]
[docs]
def remove_elements(self, indexes: list[int]) -> None:
for index in sorted(indexes, reverse=True):
cell = self.row[index]
assert cell.is_empty()
self.row.pop(index)
[docs]
class DenovoReportTable:
"""Class representing a denovo report table JSON."""
def __init__(
self, json: dict[str, Any],
) -> None:
self.rows = json["rows"]
self.group_name = json["group_name"]
self.columns = json["columns"]
self.effect_groups = json["effect_groups"]
self.effect_types = json["effect_types"]
[docs]
@staticmethod
def from_variants( # pylint: disable=too-many-locals
denovo_variants: Iterable[FamilyVariant],
effect_groups: list[str],
effect_types: list[str],
person_set_collection: PersonSetCollection,
) -> DenovoReportTable:
"""Construct a denovo report table from variants."""
person_sets = [
person_set
for person_set in person_set_collection.person_sets.values()
if len(person_set.persons) > 0
]
effect_groups = list(effect_groups)
effect_types = list(effect_types)
effects = effect_groups + effect_types
effect_rows = [
EffectRow(
effect,
person_sets,
)
for effect in effects
]
for fv in denovo_variants:
for effect_row in effect_rows:
effect_row.count_variant(fv)
effect_rows_empty_columns = list(
map(
all,
np.array(
[effect_row.get_empty() for effect_row in effect_rows],
).T,
),
)
effect_rows_empty_columns_index = list(
np.where(effect_rows_empty_columns)[0],
)
for index in sorted(effect_rows_empty_columns_index, reverse=True):
person_sets.pop(index)
for effect_row in effect_rows:
effect_row.remove_elements(effect_rows_empty_columns_index)
if effect_row.is_row_empty():
if effect_row.effect_type in effect_groups:
effect_groups.remove(effect_row.effect_type)
elif effect_row.effect_type in effect_types:
effect_types.remove(effect_row.effect_type)
effect_rows = list(filter(
lambda effect_row: not effect_row.is_row_empty(), effect_rows,
))
rows = effect_rows
column_children = {}
for row in rows:
assert len(row.row) == len(person_sets)
for cell in row.row:
person_set_children = cell.person_set_children
person_set_id = cell.person_set.id
if person_set_id not in column_children:
column_children[person_set_id] = len(person_set_children)
else:
count = column_children[person_set_id]
assert count == len(person_set_children)
columns = [
f"{person_set.name} ({column_children[person_set.id]})"
for person_set in person_sets
]
return DenovoReportTable({
"rows": [r.to_dict() for r in rows],
"group_name": person_set_collection.name,
"columns": columns,
"effect_groups": effect_groups,
"effect_types": effect_types,
})
[docs]
def to_dict(self) -> dict[str, Any]:
return {
"rows": self.rows,
"group_name": self.group_name,
"columns": self.columns,
"effect_groups": self.effect_groups,
"effect_types": self.effect_types,
}
[docs]
def is_empty(self) -> bool:
"""Return whether the table does not have a single counted variant."""
def _is_row_empty(row: dict[str, Any]) -> bool:
for cell in row["row"]:
if cell["number_of_observed_events"] > 0 \
or cell["number_of_children_with_event"] > 0 \
or cell["observed_rate_per_child"] > 0 \
or cell["percent_of_children_with_events"] > 0:
return False
return True
return all(_is_row_empty(row) for row in self.rows)
[docs]
class DenovoReport:
"""Class representing a denovo report JSON."""
def __init__(self, json: dict[str, Any]) -> None:
self.tables = []
if json is not None:
self.tables = [DenovoReportTable(d) for d in json["tables"]]
[docs]
@staticmethod
def from_genotype_study(
genotype_data: Any,
person_set_collections: list[PersonSetCollection],
) -> DenovoReport:
"""Create a denovo report JSON from a genotype data study."""
config = genotype_data.config.common_report
effect_groups = config.effect_groups
effect_types = config.effect_types
logger.info(
"DENOVO REPORTS: person set collections %s",
person_set_collections)
start = time.time()
denovo_report_tables = []
if genotype_data.config.has_denovo:
for psc in person_set_collections:
denovo_variants = genotype_data.query_variants(
limit=None, inheritance=["denovo"],
)
denovo_report_table = DenovoReportTable.from_variants(
denovo_variants,
deepcopy(effect_groups),
deepcopy(effect_types),
psc,
)
if not denovo_report_table.is_empty():
denovo_report_tables.append(denovo_report_table)
elapsed = time.time() - start
logger.info(
"DENOVO REPORTS build in %.2f sec", elapsed,
)
return DenovoReport({
"tables": [t.to_dict() for t in denovo_report_tables],
})
[docs]
def to_dict(self) -> dict[str, Any]:
return {"tables": [t.to_dict() for t in self.tables]}
[docs]
def is_empty(self) -> bool:
return len(self.tables) == 0