Source code for studies.query_transformer
import logging
import time
from functools import reduce
from typing import Any, ClassVar, cast
from dae.effect_annotation.effect import EffectTypesMixin
from dae.pedigrees.family import FamilyTag
from dae.pedigrees.family_tag_builder import check_family_tags_query
from dae.person_filters import make_pedigree_filter, make_pheno_filter
from dae.person_sets import PSCQuery
from dae.query_variants.attributes_query import role_query
from dae.utils.regions import Region
from dae.variants.attributes import Inheritance
logger = logging.getLogger(__name__)
[docs]
class QueryTransformer:
"""Transform genotype data query WEB parameters into query variants."""
FILTER_RENAMES_MAP: ClassVar[dict[str, str]] = {
"familyIds": "family_ids",
"personIds": "person_ids",
"genders": "sexes",
"geneSymbols": "genes",
"variantTypes": "variant_type",
"effectTypes": "effect_types",
"regionS": "regions",
}
def __init__(self, study_wrapper): # type: ignore
self.study_wrapper = study_wrapper
self.effect_types_mixin = EffectTypesMixin()
self.gpf_instance = study_wrapper.gpf_instance
def _transform_genomic_scores(
self, genomic_scores: list[dict],
) -> list[tuple[str, tuple[int | None, int | None]]]:
return [
(score["metric"], (score["rangeStart"], score["rangeEnd"]))
for score in genomic_scores
]
def _transform_gene_scores(self, gene_scores: dict) -> list[str] | None:
if not self.study_wrapper.gene_scores_db:
return None
scores_name = gene_scores.get("score")
range_start = gene_scores.get("rangeStart")
range_end = gene_scores.get("rangeEnd")
values = gene_scores.get("values")
if scores_name and scores_name in self.study_wrapper.gene_scores_db:
score_desc = self.study_wrapper.gene_scores_db[
scores_name
]
score = self.study_wrapper.gene_scores_db.get_gene_score(
score_desc.resource_id,
)
genes = score.get_genes(scores_name, range_start, range_end, values)
return list(genes)
return None
def _transform_min_max_alt_frequency(
self, min_value: float | None, max_value: float | None,
) -> tuple[str, tuple[float, float]] | None:
value_range = (min_value, max_value)
if value_range == (None, None):
return None
result_range: tuple[float, float]
if value_range[0] is None:
assert value_range[1] is not None
result_range = (float("-inf"), value_range[1])
elif value_range[1] is None:
assert value_range[0] is not None
result_range = (value_range[0], float("inf"))
else:
assert value_range[0] is not None
assert value_range[1] is not None
result_range = cast(tuple[float, float], value_range)
value = "af_allele_freq"
return (value, result_range)
@staticmethod
def _transform_present_in_child_and_parent_roles(
present_in_child: set[str],
present_in_parent: set[str],
) -> str | None:
roles_query = [
QueryTransformer._present_in_child_to_roles(present_in_child),
QueryTransformer._present_in_parent_to_roles(present_in_parent),
]
result = [role for role in roles_query if role is not None]
if len(result) == 2:
return f"({result[0]}) and ({result[1]})"
if len(result) == 1:
return result[0]
return None
@staticmethod
def _transform_present_in_child_and_parent_inheritance(
present_in_child: set[str],
present_in_parent: set[str], *,
show_all_unknown: bool = False,
) -> str:
inheritance = None
if present_in_child == {"neither"} and \
present_in_parent != {"neither"}:
inheritance = [Inheritance.mendelian, Inheritance.missing]
elif present_in_child != {"neither"} and \
present_in_parent == {"neither"}:
inheritance = [Inheritance.denovo]
else:
inheritance = [
Inheritance.denovo,
Inheritance.mendelian,
Inheritance.missing,
Inheritance.omission,
]
if show_all_unknown:
inheritance.append(Inheritance.unknown)
result = ",".join([str(inh) for inh in inheritance])
return f"any({result})"
@staticmethod
def _transform_present_in_child_and_parent_frequency(
_present_in_child: set[str], _present_in_parent: set[str],
rarity: dict,
frequency_filter: list[
tuple[str, tuple[float | None, float | None]]],
) -> tuple[str | None, Any]:
ultra_rare = rarity.get("ultraRare")
ultra_rare = bool(ultra_rare)
if ultra_rare:
return ("ultra_rare", True)
max_alt_freq = rarity.get("maxFreq")
min_alt_freq = rarity.get("minFreq")
if min_alt_freq is not None or max_alt_freq is not None:
frequency_filter.append(
("af_allele_freq", (min_alt_freq, max_alt_freq)),
)
return ("frequency_filter", frequency_filter)
return (None, None)
@staticmethod
def _present_in_child_to_roles(
present_in_child: set[str],
) -> str | None:
roles_query = []
if "proband only" in present_in_child:
roles_query.append("prb and not sib")
if "sibling only" in present_in_child:
roles_query.append("sib and not prb")
if "proband and sibling" in present_in_child:
roles_query.append("prb and sib")
if "neither" in present_in_child:
roles_query.append("not prb and not sib")
if len(roles_query) == 4 or len(roles_query) == 0:
return None
if len(roles_query) == 1:
return roles_query[0]
return " or ".join(f"( {r} )" for r in roles_query)
@staticmethod
def _present_in_parent_to_roles(
present_in_parent: set[str],
) -> str | None:
roles_query = []
if "mother only" in present_in_parent:
roles_query.append("mom and not dad")
if "father only" in present_in_parent:
roles_query.append("dad and not mom")
if "mother and father" in present_in_parent:
roles_query.append("mom and dad")
if "neither" in present_in_parent:
roles_query.append("not mom and not dad")
if len(roles_query) == 4 or len(roles_query) == 0:
return None
if len(roles_query) == 1:
return roles_query[0]
return " or ".join(f"( {r} )" for r in roles_query)
def _transform_filters_to_ids(self, filters: list[dict]) -> set[str]:
result = []
for filter_conf in filters:
roles = filter_conf.get("role") if "role" in filter_conf else None
if filter_conf["from"] == "phenodb":
ids = make_pheno_filter(
filter_conf, self.study_wrapper.phenotype_data,
).apply(self.study_wrapper.families, roles)
else:
ids = make_pedigree_filter(filter_conf).apply(
self.study_wrapper.families, roles,
)
result.append(ids)
return reduce(set.intersection, result)
@staticmethod
def _add_inheritance_to_query(query: str, kwargs: dict[str, Any]) -> None:
if not query:
return
inheritance = kwargs.get("inheritance", [])
if isinstance(inheritance, list):
inheritance.append(query)
elif isinstance(inheritance, str):
inheritance = [inheritance]
inheritance.append(query)
else:
raise TypeError(f"unexpected inheritance query {inheritance}")
kwargs["inheritance"] = inheritance
@staticmethod
def _add_roles_to_query(
query: str | None, kwargs: dict[str, Any],
) -> None:
if not query:
return
original_roles = kwargs.get("roles")
if original_roles is not None:
if isinstance(original_roles, str):
original_roles = role_query.transform_query_string_to_tree(
original_roles,
)
kwargs["roles"] = f"{original_roles} and {query}"
else:
kwargs["roles"] = query
def _handle_person_set_collection(
self, kwargs: dict[str, Any],
) -> dict[str, Any]:
psc_query = kwargs.pop("personSetCollection", {})
logger.debug("person set collection requested: %s", psc_query)
collection_id, selected_sets = None, None
if psc_query:
collection_id = psc_query["id"]
selected_sets = psc_query["checkedValues"]
kwargs["person_set_collection"] = PSCQuery(
collection_id, set(selected_sets))
else:
# use default (first defined) person set collection
# we need it for meaningful pedigree display
person_set_collections = self.study_wrapper\
.genotype_data.person_set_collections
psc_id = next(iter(person_set_collections))
default_psc = person_set_collections[psc_id]
kwargs["person_set_collection"] = PSCQuery(
psc_id, set(default_psc.person_sets.keys()),
)
return kwargs
def _transform_regions(self, regions: list[str]) -> list[Region]:
result = list(map(Region.from_str, regions))
chrom_prefix = self.gpf_instance.reference_genome.chrom_prefix
chromosomes = set(self.gpf_instance.reference_genome.chromosomes)
for region in result:
if region.chrom not in chromosomes:
if chrom_prefix == "chr":
region.chrom = f"{chrom_prefix}{region.chrom}"
if chrom_prefix not in chromosomes:
continue
elif chrom_prefix == "":
region.chrom = region.chrom.lstrip("chr")
if region.chrom not in chromosomes:
continue
else:
continue
return result
[docs]
def transform_kwargs(self, **kwargs: Any) -> dict[str, Any]:
"""Transform WEB query variants params into genotype data params."""
# flake8: noqa: C901
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
start = time.time()
logger.debug("kwargs in study wrapper: %s", kwargs)
self._add_inheritance_to_query(
"not possible_denovo and not possible_omission",
kwargs,
)
kwargs = self._handle_person_set_collection(kwargs)
if "selectedFamilyTags" in kwargs or "deselectedFamilyTags" in kwargs:
or_mode = not (bool(kwargs.get("tagIntersection")) is True
or kwargs.get("tagIntersection") is None)
include_tags = kwargs.get("selectedFamilyTags")
if isinstance(include_tags, list):
include_tags = {
FamilyTag.from_label(label)
for label
in include_tags
}
else:
include_tags = set[FamilyTag]()
exclude_tags = kwargs.get("deselectedFamilyTags")
if isinstance(exclude_tags, list):
exclude_tags = {
FamilyTag.from_label(label)
for label
in exclude_tags
}
else:
exclude_tags = set[FamilyTag]()
family_ids: set[str] = set()
for family_id, family in self.study_wrapper.families.items():
if check_family_tags_query(
family, or_mode, include_tags, exclude_tags,
):
family_ids.add(family_id)
kwargs["familyIds"] = family_ids
if "querySummary" in kwargs:
kwargs["query_summary"] = kwargs["querySummary"]
del kwargs["querySummary"]
if "uniqueFamilyVariants" in kwargs:
kwargs["unique_family_variants"] = kwargs["uniqueFamilyVariants"]
del kwargs["uniqueFamilyVariants"]
if "regions" in kwargs:
kwargs["regions"] = self._transform_regions(kwargs["regions"])
present_in_child = set()
present_in_parent = set()
rarity = None
if "presentInChild" in kwargs or "presentInParent" in kwargs:
if "presentInChild" in kwargs:
present_in_child = set(kwargs["presentInChild"])
kwargs.pop("presentInChild")
if "presentInParent" in kwargs:
present_in_parent = \
set(kwargs["presentInParent"]["presentInParent"])
rarity = kwargs["presentInParent"].get("rarity", None)
kwargs.pop("presentInParent")
roles_query = self._transform_present_in_child_and_parent_roles(
present_in_child, present_in_parent,
)
self._add_roles_to_query(roles_query, kwargs)
if present_in_parent != {"neither"} and rarity is not None:
frequency_filter = kwargs.get("frequency_filter", [])
arg, val = \
self._transform_present_in_child_and_parent_frequency(
present_in_child, present_in_parent,
rarity, frequency_filter,
)
if arg is not None:
kwargs[arg] = val
show_all_unknown = \
self.study_wrapper.config.genotype_browser.show_all_unknown
if kwargs.get("inheritanceTypeFilter"):
inheritance_types = set(kwargs["inheritanceTypeFilter"])
if show_all_unknown \
and inheritance_types & {"mendelian", "missing"}:
inheritance_types.add("unknown")
query = f"any({','.join(inheritance_types)})"
self._add_inheritance_to_query(query, kwargs)
kwargs.pop("inheritanceTypeFilter")
else:
inheritance = \
self._transform_present_in_child_and_parent_inheritance(
present_in_child, present_in_parent,
show_all_unknown=show_all_unknown)
self._add_inheritance_to_query(inheritance, kwargs)
if "genomicScores" in kwargs:
genomic_scores = kwargs.pop("genomicScores", [])
if "real_attr_filter" not in kwargs:
kwargs["real_attr_filter"] = []
kwargs["real_attr_filter"].extend(
self._transform_genomic_scores(genomic_scores))
if "frequencyScores" in kwargs:
frequency_scores = kwargs.pop("frequencyScores", [])
if "frequency_filter" not in kwargs:
kwargs["frequency_filter"] = []
kwargs["frequency_filter"].extend(
self._transform_genomic_scores(frequency_scores))
if "geneScores" in kwargs:
gene_scores = kwargs.pop("geneScores", {})
genes = self._transform_gene_scores(gene_scores)
if genes is not None:
if "genes" not in kwargs:
kwargs["genes"] = []
kwargs["genes"] += genes
if "genders" in kwargs:
sexes = set(kwargs["genders"])
if sexes != {"female", "male", "unspecified"}:
sexes_query = f"any({','.join(sexes)})"
kwargs["genders"] = sexes_query
else:
kwargs["genders"] = None
if "variantTypes" in kwargs:
variant_types = set(kwargs["variantTypes"])
if variant_types != {"ins", "del", "sub", "CNV", "complex"}:
if "CNV" in variant_types:
variant_types.remove("CNV")
variant_types.add("CNV+")
variant_types.add("CNV-")
variant_types_query = f"any({','.join(variant_types)})"
kwargs["variantTypes"] = variant_types_query
else:
del kwargs["variantTypes"]
if "effectTypes" in kwargs:
kwargs["effectTypes"] = self.effect_types_mixin.build_effect_types(
kwargs["effectTypes"],
)
if kwargs.get("studyFilters"):
request = set(kwargs["studyFilters"])
if kwargs.get("allowed_studies") is not None:
request = request & set(kwargs.pop("allowed_studies"))
kwargs["study_filters"] = request
del kwargs["studyFilters"]
elif kwargs.get("allowed_studies") is not None:
kwargs["study_filters"] = set(kwargs.pop("allowed_studies"))
if "personFilters" in kwargs:
person_filters = kwargs.pop("personFilters")
if person_filters:
matching_person_ids = self._transform_filters_to_ids(
person_filters,
)
if matching_person_ids is not None and kwargs.get("personIds"):
kwargs["personIds"] = set.intersection(
matching_person_ids, set(kwargs.pop("personIds")),
)
else:
kwargs["personIds"] = matching_person_ids
if "familyFilters" in kwargs:
family_filters = kwargs.pop("familyFilters")
if family_filters:
matching_family_ids = self._transform_filters_to_ids(
family_filters,
)
if matching_family_ids is not None and kwargs.get("familyIds"):
kwargs["familyIds"] = set.intersection(
matching_family_ids, set(kwargs.pop("familyIds")),
)
else:
kwargs["familyIds"] = matching_family_ids
if "personIds" in kwargs:
kwargs["personIds"] = list(kwargs["personIds"])
if "affectedStatus" in kwargs:
statuses = kwargs.pop("affectedStatus")
kwargs["affected_status"] = [
status.lower() for status in statuses
]
for key in list(kwargs.keys()):
if key in self.FILTER_RENAMES_MAP:
kwargs[self.FILTER_RENAMES_MAP[key]] = kwargs[key]
kwargs.pop(key)
elapsed = time.time() - start
logger.debug("transform kwargs took %.2f sec", elapsed)
return kwargs