Source code for studies.response_transformer
import itertools
import logging
import math
import traceback
from collections.abc import Callable, Generator, Iterable, Iterator
from functools import partial
from typing import (
Any,
ClassVar,
cast,
)
from dae.effect_annotation.effect import (
gd2str,
ge2str,
gene_effect_get_genes,
gene_effect_get_genes_worst,
gene_effect_get_worst_effect,
)
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Person
from dae.person_sets import PersonSetCollection
from dae.utils.dae_utils import join_line, split_iterable
from dae.utils.variant_utils import fgt2str, mat2str
from dae.variants.attributes import Inheritance, Role
from dae.variants.family_variant import FamilyAllele, FamilyVariant
from dae.variants.variant import SummaryVariant, VariantDesc
logger = logging.getLogger(__name__)
[docs]
def members_in_order_get_family_structure(mio: list[Person]) -> str:
return ";".join([
f"{p.role.name}:{p.sex.short()}:{p.status.name}" # type: ignore
for p in mio])
[docs]
class ResponseTransformer:
"""Helper class to transform genotype browser response."""
STREAMING_CHUNK_SIZE = 20
SPECIAL_ATTRS: ClassVar[dict[str, Callable]] = {
"family":
lambda v: [v.family_id],
"location":
lambda v: v.cshl_location,
"variant":
lambda v: VariantDesc.combine([
aa.details.variant_desc for aa in v.alt_alleles]),
"position":
lambda v: [aa.position for aa in v.alt_alleles],
"reference":
lambda v: [aa.reference for aa in v.alt_alleles],
"alternative":
lambda v: [aa.alternative for aa in v.alt_alleles],
"genotype":
lambda v: [fgt2str(v.family_genotype)],
"best_st":
lambda v: [mat2str(v.family_best_state)],
"family_person_attributes":
lambda v: [members_in_order_get_family_structure(
v.members_in_order)],
"family_structure":
lambda v: [members_in_order_get_family_structure(
v.members_in_order)],
"family_person_ids":
lambda v: [";".join([m.person_id for m in v.members_in_order])],
"carrier_person_ids":
lambda v: [
";".join(m for m in aa.variant_in_members if m is not None)
for aa in v.alt_alleles
],
"carrier_person_attributes":
lambda v: [
members_in_order_get_family_structure([
m for m in aa.variant_in_members_objects if m is not None])
for aa in v.alt_alleles
],
"inheritance_type":
lambda v: [
"denovo"
if Inheritance.denovo in aa.inheritance_in_members
else "-"
if {Inheritance.possible_denovo, Inheritance.possible_omission}
& set(aa.inheritance_in_members)
else "mendelian"
for aa in v.alt_alleles
],
"is_denovo":
lambda v: [
Inheritance.denovo in aa.inheritance_in_members
for aa in v.alt_alleles
],
"effects":
lambda v: [ge2str(e) for e in v.effects],
"raw_effects":
lambda v: [repr(e) for e in v.effects],
"genes":
lambda v: [gene_effect_get_genes_worst(e) for e in v.effects],
"worst_effect":
lambda v: [gene_effect_get_worst_effect(e) for e in v.effects],
"effect_details":
lambda v: [gd2str(e) for e in v.effects],
"full_effect_details":
lambda v: (
[v.family_id]
+ v.cshl_location
+ [gd2str(e) for e in v.effects]
+ [ge2str(e) for e in v.effects]
),
"seen_in_affected":
lambda v: bool(v.get_attribute("seen_in_status") in {2, 3}),
"seen_in_unaffected":
lambda v: bool(v.get_attribute("seen_in_status") in {1, 3}),
}
PHENOTYPE_ATTRS: ClassVar[dict[str, Callable]] = {
"family_phenotypes":
lambda v, phenotype_person_sets:
[
":".join([
phenotype_person_sets.get_person_set_of_person(mid).name
for mid in v.members_fpids]),
],
"carrier_phenotypes":
lambda v, phenotype_person_sets:
[
":".join([ # type: ignore
phenotype_person_sets.get_person_set_of_person(mid).name
for mid in filter(None, aa.variant_in_members_fpid)])
for aa in v.alt_alleles
],
}
def __init__(self, study_wrapper: Any) -> None:
# pylint: disable=import-outside-toplevel
from studies.study_wrapper import StudyWrapper
self.study_wrapper = cast(StudyWrapper, study_wrapper)
self._pheno_columns = study_wrapper.config_columns.phenotype
self._pheno_values: dict[str, Any] | None = None
self.gene_scores_dicts = {}
if not study_wrapper.is_remote \
and self.study_wrapper.gene_scores_db is not None:
gene_scores_db = self.study_wrapper.gene_scores_db
for score_id, score_desc in gene_scores_db.score_descs.items():
gene_score = gene_scores_db.get_gene_score(
score_desc.resource_id,
)
if gene_score is None:
continue
self.gene_scores_dicts[score_id] = \
gene_score._to_dict(score_id) # noqa: SLF001
self._get_all_pheno_values()
@property
def families(self) -> FamiliesData:
return self.study_wrapper.families
def _get_all_pheno_values(
self,
) -> dict | None:
if self._pheno_values is not None:
return self._pheno_values
if not self.study_wrapper.phenotype_data \
or not self.study_wrapper.config_columns.phenotype:
return None
pheno_values = {}
for column in self.study_wrapper.config_columns.phenotype.values():
assert column.role
result = {}
column_values_iter = self.study_wrapper\
.phenotype_data.get_people_measure_values(
[column.source], roles=[Role.from_name(column.role)])
for column_value in column_values_iter:
result[column_value["family_id"]] = column_value[column.source]
pheno_column_name = f"{column.source}.{column.role}"
pheno_values[pheno_column_name] = result
self._pheno_values = pheno_values
return self._pheno_values
@staticmethod
def _get_pheno_values_for_variant(
variant: FamilyVariant,
pheno_column_values: dict | None,
) -> dict[str, str] | None:
if not pheno_column_values:
return None
pheno_values = {}
for pheno_column_name in pheno_column_values:
family_id = variant.family_id
pheno_value = pheno_column_values[pheno_column_name].get(family_id)
pheno_values[pheno_column_name] = pheno_value
return pheno_values
def _get_gene_scores_values(
self, allele: FamilyAllele, default: str | None = None,
) -> dict[str, Any]:
if not self.study_wrapper.gene_score_column_sources:
return {}
if allele.effects is None:
return {}
genes = gene_effect_get_genes(allele.effects).split(";")
gene = genes[0]
gene_scores_values = {}
for gwc in self.study_wrapper.gene_score_column_sources:
if gwc not in self.gene_scores_dicts:
continue
if gene != "":
gene_scores_values[gwc] = self.gene_scores_dicts[gwc].get(
gene, default,
)
else:
gene_scores_values[gwc] = default
return gene_scores_values
@staticmethod
def _get_wdae_member(
member: Person,
person_set_collection: PersonSetCollection,
best_st: str | int) -> list:
return [
member.family_id,
member.person_id,
member.mom_id or "0",
member.dad_id or "0",
member.sex.short(),
str(member.role),
PersonSetCollection.get_person_color(
member, person_set_collection,
),
member.layout,
(member.generated or member.not_sequenced),
best_st,
0,
]
def _generate_pedigree(
self, variant: FamilyVariant, collection_id: str,
) -> list:
result = []
person_set_collection = self.study_wrapper.get_person_set_collection(
collection_id,
)
genotype = variant.family_genotype
missing_members = set()
for index, member in enumerate(variant.members_in_order):
try:
result.append(
ResponseTransformer._get_wdae_member(
member, person_set_collection,
"/".join([
str(v) for v in filter(
lambda g: g != 0, genotype[index],
)],
),
),
)
except IndexError:
missing_members.add(member.person_id)
logger.exception(
"problems generating pedigree: %s, %s, %s",
genotype, index, member)
result.extend([
ResponseTransformer._get_wdae_member(
member, person_set_collection, 0)
for member in variant.family.full_members
if (member.generated or member.not_sequenced) or (
member.person_id in missing_members)
])
return result
def _add_additional_columns_summary(
self, variants_iterable: Generator[SummaryVariant, None, None],
) -> Generator[SummaryVariant, None, None]:
for variants_chunk in split_iterable(
variants_iterable, self.STREAMING_CHUNK_SIZE):
for variant in variants_chunk:
for allele in variant.alt_alleles:
gene_scores_values = self._get_gene_scores_values(allele)
allele.update_attributes(gene_scores_values)
yield variant
[docs]
def build_variant_row(
self, v: SummaryVariant | FamilyVariant,
column_descs: list[dict], **kwargs: str,
) -> list:
"""Construct response row for a variant."""
# pylint: disable=too-many-branches
row_variant: list[Any] = []
for col_desc in column_descs:
try:
col_source = col_desc["source"]
col_format = col_desc.get("format")
col_role = col_desc.get("role")
if col_format is None:
# pylint: disable=unused-argument
def col_formatter(
val: Any,
col_format: str | None, # noqa: ARG001
) -> str:
if val is None:
return "-"
return str(val)
else:
def col_formatter(
val: Any,
col_format: str | None,
) -> str:
# pylint: disable=broad-except
if val is None:
return "-"
try:
assert col_format is not None
return str(col_format % val)
except Exception: # noqa: BLE001
logging.warning(
"error formatting variant: %s (%s) (%s)",
v, col_format, val, exc_info=True)
if math.isnan(val):
return "-"
return str(val)
if col_role is not None:
col_source = f"{col_source}.{col_role}"
if col_source == "pedigree":
assert isinstance(v, FamilyVariant)
psc_id = kwargs["person_set_collection"]
row_variant.append(self._generate_pedigree(
v, psc_id,
))
elif col_source in self.PHENOTYPE_ATTRS:
phenotype_person_sets = \
self.study_wrapper.person_set_collections.get(
"phenotype",
)
if phenotype_person_sets is None:
row_variant.append("-")
else:
fn_format = self.PHENOTYPE_ATTRS[col_source]
row_variant.append(
",".join(fn_format(v, phenotype_person_sets)))
elif col_source == "study_phenotype":
row_variant.append(
self.study_wrapper.config.study_phenotype,
)
else:
if col_source in self.SPECIAL_ATTRS:
attribute = self.SPECIAL_ATTRS[col_source](v)
else:
attribute = v.get_attribute(col_source)
if kwargs.get("reduceAlleles", True) and \
all(a == attribute[0] for a in attribute):
attribute = [attribute[0]]
attribute = list(
map(
partial(col_formatter, col_format=col_format),
attribute))
row_variant.append(attribute)
except (AttributeError, KeyError, Exception):
logging.exception("error build variant: %s", v)
traceback.print_stack()
row_variant.append([""])
raise
return row_variant
@staticmethod
def _gene_view_summary_download_variants_iterator(
variants: Iterable[SummaryVariant], frequency_column: str,
) -> Generator[list, None, None]:
for v in variants:
for aa in v.alt_alleles:
yield [
aa.cshl_location,
aa.position,
aa.end_position,
aa.chrom,
aa.get_attribute(frequency_column),
gene_effect_get_worst_effect(aa.effects),
aa.cshl_variant,
aa.get_attribute("family_variants_count"),
aa.get_attribute("seen_as_denovo"),
aa.get_attribute("seen_in_status") in {2, 3},
aa.get_attribute("seen_in_status") in {1, 3},
]
[docs]
@staticmethod
def transform_gene_view_summary_variant(
variant: SummaryVariant, frequency_column: str,
) -> Generator[dict[str, Any], None, None]:
"""Transform gene view summary response into dicts."""
out: dict[str, Any] = {
"svuid": variant.svuid,
}
alleles = [
{
"location": aa.cshl_location,
"position": aa.position,
"end_position": aa.end_position,
"chrom": aa.chrom,
"frequency": aa.get_attribute(frequency_column),
"effect": gene_effect_get_worst_effect(aa.effects),
"variant": aa.cshl_variant,
"family_variants_count":
aa.get_attribute("family_variants_count"),
"is_denovo": aa.get_attribute("seen_as_denovo"),
"seen_in_affected":
aa.get_attribute("seen_in_status") in {2, 3},
"seen_in_unaffected":
aa.get_attribute("seen_in_status") in {1, 3},
}
for aa in variant.alt_alleles
]
out["alleles"] = alleles
yield out
[docs]
def transform_gene_view_summary_variant_download(
self,
variants: Iterable[SummaryVariant],
frequency_column: str,
summary_variant_ids: set[str],
) -> Iterator[str]:
"""Transform gene view summary response into rows."""
columns = [
"location",
"position",
"end_position",
"chrom",
"frequency",
"effect",
"variant",
"family_variants_count",
"is_denovo",
"seen_in_affected",
"seen_in_unaffected",
]
rows = filter(
lambda sa: f"{sa[0]}:{sa[6]}" in summary_variant_ids,
self._gene_view_summary_download_variants_iterator(
variants, frequency_column,
),
)
return map(join_line, itertools.chain([columns], rows))
[docs]
def variant_transformer(self) -> Callable[[FamilyVariant], FamilyVariant]:
"""Build and return a variant transformer function."""
assert not self.study_wrapper.is_remote
pheno_column_values = self._get_all_pheno_values()
def transformer(variant: FamilyVariant) -> FamilyVariant:
pheno_values = self._get_pheno_values_for_variant(
variant, pheno_column_values,
)
for allele in variant.alt_alleles:
fallele = cast(FamilyAllele, allele)
gene_scores_values = self._get_gene_scores_values(
fallele,
)
fallele.update_attributes(gene_scores_values)
if pheno_values:
fallele.update_attributes(pheno_values)
return variant
return transformer
[docs]
def transform_summary_variants(
self, variants_iterable: Generator[SummaryVariant, None, None],
) -> Generator[list, None, None]:
for v in self._add_additional_columns_summary(variants_iterable):
yield self.build_variant_row(
v, self.study_wrapper.summary_preview_descs,
)