Source code for studies.query_transformer
from __future__ import annotations
import logging
import time
from functools import reduce
from threading import Lock
from typing import Any, ClassVar, cast
from dae.effect_annotation.effect import EffectTypesMixin
from dae.gene_scores.gene_scores import GeneScoresDb
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.person_filters import make_pedigree_filter, make_pheno_filter
from dae.person_filters.person_filters import make_pheno_filter_beta
from dae.person_sets import PSCQuery
from dae.person_sets.person_sets import (
AttributeQueriesUnsupportedException,
)
from dae.query_variants.attribute_queries import (
update_attribute_query_with_compounds,
)
from dae.query_variants.sql.schema2.sql_query_builder import (
TagsQuery,
)
from dae.studies.study import GenotypeDataStudy
from dae.utils.regions import Region
from dae.variants.attributes import Inheritance, Zygosity
from dae.variants.core import Allele
from studies.study_wrapper import (
QueryTransformerProtocol,
WDAEStudy,
)
logger = logging.getLogger(__name__)
_QUERY_TRANSFORMER: QueryTransformer | None = None
_QUERY_TRANSFORMER_LOCK = Lock()
[docs]
class QueryTransformer(QueryTransformerProtocol):
"""Transform genotype data query WEB parameters into query variants."""
FILTER_RENAMES_MAP: ClassVar[dict[str, str]] = {
"familyIds": "family_ids",
"personIds": "person_ids",
"genders": "sexes",
"geneSymbols": "genes",
"variantTypes": "variant_type",
"effectTypes": "effect_types",
"regionS": "regions",
}
def __init__(
self, gene_scores_db: GeneScoresDb,
chromosomes: list[str], chr_prefix: str,
) -> None:
self.gene_scores_db = gene_scores_db
self.chromosomes = chromosomes
self.chr_prefix = chr_prefix
self.effect_types_mixin = EffectTypesMixin()
def _transform_genomic_scores_continuous(
self, genomic_scores: list[dict],
) -> list[tuple[str, tuple[int | None, int | None]]]:
return [
(score["score"], (score["rangeStart"], score["rangeEnd"]))
for score in genomic_scores
if score["histogramType"] == "continuous"
]
def _transform_genomic_scores_categorical(
self, genomic_scores: list[dict],
) -> list[tuple[str, list[str | None]]]:
return [
(score["score"], score["values"])
for score in genomic_scores
if score["histogramType"] == "categorical"
]
def _transform_gene_scores(self, gene_scores: dict) -> list[str] | None:
if not self.gene_scores_db:
return None
scores_name = gene_scores.get("score")
range_start = gene_scores.get("rangeStart")
range_end = gene_scores.get("rangeEnd")
values = gene_scores.get("values")
if scores_name and scores_name in self.gene_scores_db:
score_desc = self.gene_scores_db[
scores_name
]
score = self.gene_scores_db.get_gene_score(
score_desc.resource_id,
)
if score is None:
return None
genes = score.get_genes(
scores_name, range_start, range_end, values)
return list(genes)
return None
def _transform_min_max_alt_frequency(
self, min_value: float | None, max_value: float | None,
) -> tuple[str, tuple[float, float]] | None:
value_range = (min_value, max_value)
if value_range == (None, None):
return None
result_range: tuple[float, float]
if value_range[0] is None:
assert value_range[1] is not None
result_range = (float("-inf"), value_range[1])
elif value_range[1] is None:
assert value_range[0] is not None
result_range = (value_range[0], float("inf"))
else:
assert value_range[0] is not None
assert value_range[1] is not None
result_range = cast(tuple[float, float], value_range)
value = "af_allele_freq"
return (value, result_range)
@staticmethod
def _transform_present_in_child_and_parent_roles(
kwargs: dict[str, Any],
) -> str | None:
present_in_child = None
present_in_parent = None
if "presentInChild" in kwargs:
present_in_child = kwargs.pop("presentInChild")
if "presentInParent" in kwargs:
present_in_parent = kwargs.pop("presentInParent")
roles_query = [present_in_child, present_in_parent]
result = [role for role in roles_query if role is not None]
if len(result) == 2:
return f"({result[0]}) and ({result[1]})"
if len(result) == 1:
return cast(str, result[0])
return None
@staticmethod
def _transform_present_in_child_and_parent_inheritance(
present_in_child: set[str],
present_in_parent: set[str],
) -> str:
inheritance = None
if present_in_child == {"neither"} and \
present_in_parent != {"neither"}:
inheritance = [
Inheritance.mendelian, Inheritance.missing]
elif present_in_child != {"neither"} and \
present_in_parent == {"neither"}:
inheritance = [Inheritance.denovo]
else:
inheritance = [
Inheritance.denovo,
Inheritance.mendelian,
Inheritance.missing,
Inheritance.omission,
Inheritance.unknown,
]
result = ",".join([str(inh) for inh in inheritance])
return f"any([{result}])"
@staticmethod
def _transform_present_in_child_and_parent_frequency(
_present_in_child: set[str], _present_in_parent: set[str],
rarity: dict,
frequency_filter: list[
tuple[str, tuple[float | None, float | None]]],
) -> tuple[str | None, Any]:
ultra_rare = rarity.get("ultraRare")
ultra_rare = bool(ultra_rare)
if ultra_rare:
return ("ultra_rare", True)
max_alt_freq = rarity.get("maxFreq")
min_alt_freq = rarity.get("minFreq")
if min_alt_freq is not None or max_alt_freq is not None:
frequency_filter.append(
("af_allele_freq", (min_alt_freq, max_alt_freq)),
)
return ("frequency_filter", frequency_filter)
return (None, None)
@staticmethod
def _present_in_child_to_roles(
present_in_child: set[str],
) -> str | None:
roles_query = []
if "proband only" in present_in_child:
roles_query.append("prb and not sib")
if "sibling only" in present_in_child:
roles_query.append("sib and not prb")
if "proband and sibling" in present_in_child:
roles_query.append("prb and sib")
if "neither" in present_in_child:
roles_query.append("not prb and not sib")
if (len(roles_query) == 4) or len(roles_query) == 0:
return None
if len(roles_query) == 1:
return roles_query[0]
return " or ".join(f"( {r} )" for r in roles_query)
@staticmethod
def _present_in_parent_to_roles(
present_in_parent: set[str],
) -> str | None:
roles_query = []
if "mother only" in present_in_parent:
roles_query.append("mom and not dad")
if "father only" in present_in_parent:
roles_query.append("dad and not mom")
if "mother and father" in present_in_parent:
roles_query.append("mom and dad")
if "neither" in present_in_parent:
roles_query.append("not mom and not dad")
if (len(roles_query) == 4) or len(roles_query) == 0:
return None
if len(roles_query) == 1:
return roles_query[0]
return " or ".join(f"( {r} )" for r in roles_query)
def _transform_filters_to_ids(
self, filters: list[dict],
study_wrapper: WDAEStudy,
) -> set[str]:
result = []
for filter_conf in filters:
roles = filter_conf.get("role") if "role" in filter_conf else None
if filter_conf["from"] == "phenodb":
ids = make_pheno_filter(
filter_conf, study_wrapper.phenotype_data,
).apply(study_wrapper.families, roles)
else:
ids = make_pedigree_filter(filter_conf).apply(
study_wrapper.families, roles,
)
result.append(ids)
return reduce(set.intersection, result)
def _transform_pheno_filters_to_ids(
self, filters: list[dict],
study_wrapper: WDAEStudy,
) -> set[str]:
result = []
for filter_conf in filters:
roles = filter_conf.get("roles") if "roles" in filter_conf else None
ids = make_pheno_filter_beta(
filter_conf, study_wrapper.phenotype_data,
).apply(study_wrapper.families, roles)
result.append(ids)
return reduce(set.intersection, result)
@staticmethod
def _add_inheritance_to_query(query: str, kwargs: dict[str, Any]) -> None:
if not query:
return
inheritance = kwargs.get("inheritance", [])
if isinstance(inheritance, list):
inheritance.append(query)
elif isinstance(inheritance, str):
inheritance = [inheritance]
inheritance.append(query)
else:
raise TypeError(f"unexpected inheritance query {inheritance}")
kwargs["inheritance"] = inheritance
[docs]
def extract_person_set_collection_query(
self, study: WDAEStudy, kwargs: dict[str, Any],
) -> PSCQuery:
psc_query_raw = kwargs.pop("personSetCollection", {})
logger.debug("person set collection requested: %s", psc_query_raw)
if psc_query_raw:
collection_id = psc_query_raw["id"]
selected_sets = psc_query_raw["checkedValues"]
psc_query = PSCQuery(collection_id, set(selected_sets))
else:
# use default (first defined) person set collection
# we need it for meaningful pedigree display
person_set_collections = study\
.genotype_data.person_set_collections
psc_id = next(iter(person_set_collections))
default_psc = person_set_collections[psc_id]
psc_query = PSCQuery(
psc_id, set(default_psc.person_sets.keys()),
)
return psc_query
def _handle_person_set_collection(
self, study_wrapper: WDAEStudy, kwargs: dict[str, Any],
) -> dict[str, Any]:
psc_query = \
self.extract_person_set_collection_query(study_wrapper, kwargs)
kwargs["person_set_collection"] = psc_query
if not study_wrapper.is_genotype:
raise ValueError(
"Cannot handle person set collection "
"query argument on non genotype studies.",
)
psc = study_wrapper.genotype_data.get_person_set_collection(
psc_query.psc_id,
)
assert psc is not None
if study_wrapper.is_group:
raise ValueError(
"Determining person set collection kwargs for groups "
"is not supported!",
)
genotype_data = cast(GenotypeDataStudy, study_wrapper.genotype_data)
# Handling of person set collections for roles and sexes
# is not implemented here for backends which do not
# support affected status intentionally.
# This is left as a problem for later as the design decisions
# behind how this should get handled were getting way too
# complicated for a feature that has barely seen use.
if genotype_data.backend.has_affected_status_queries():
try:
psc_queries = psc.transform_ps_query_to_attribute_queries(
psc_query,
)
except AttributeQueriesUnsupportedException:
person_ids = kwargs.get("personIds")
psc_person_ids = psc.query_person_ids(psc_query)
if psc_person_ids is not None:
if person_ids is None:
person_ids = psc_person_ids
else:
person_ids = person_ids.intersection(
psc_person_ids,
)
if person_ids is not None:
kwargs["personIds"] = person_ids
else:
kwargs.update(psc_queries)
return kwargs
def _transform_regions(self, regions: list[str]) -> list[Region]:
result = list(map(Region.from_str, regions))
chrom_prefix = self.chr_prefix
chromosomes = self.chromosomes
for region in result:
if region.chrom not in chromosomes:
if chrom_prefix == "chr":
region.chrom = f"{chrom_prefix}{region.chrom}"
if chrom_prefix not in chromosomes:
continue
elif chrom_prefix == "":
region.chrom = region.chrom.lstrip("chr")
if region.chrom not in chromosomes:
continue
else:
continue
return result
def _apply_zygosity(self, kwargs: dict[str, Any]) -> dict[str, Any]:
valid_zygosities = [v.name for v in Zygosity]
if "presentInChild" in kwargs and "zygosityInChild" in kwargs:
zygosity = kwargs.pop("zygosityInChild")
if not isinstance(zygosity, str):
raise ValueError(
"Invalid zygosity in child argument - not a string.",
)
if zygosity not in valid_zygosities:
raise ValueError(
f"Invalid zygosity in child {zygosity}, "
f"expected one of {valid_zygosities}",
)
kwargs["presentInChild"] = update_attribute_query_with_compounds(
kwargs["presentInChild"], zygosity,
)
if "presentInParent" in kwargs and "zygosityInParent" in kwargs:
zygosity = kwargs.pop("zygosityInParent")
if not isinstance(zygosity, str):
raise ValueError(
"Invalid zygosity in parent argument - not a string.",
)
if zygosity not in valid_zygosities:
raise ValueError(
f"Invalid zygosity in parent {zygosity}, "
f"expected one of {valid_zygosities}",
)
kwargs["presentInParent"] = update_attribute_query_with_compounds(
kwargs["presentInParent"], zygosity,
)
if "genders" in kwargs and "zygosityInSexes" in kwargs:
zygosity = kwargs.pop("zygosityInSexes")
if not isinstance(zygosity, str):
raise ValueError(
"Invalid zygosity in sexes argument - not a string.",
)
if zygosity not in valid_zygosities:
raise ValueError(
f"Invalid zygosity in sexes {zygosity}, "
f"expected one of {valid_zygosities}",
)
kwargs["genders"] = update_attribute_query_with_compounds(
kwargs["genders"], zygosity,
)
if "status" in kwargs and "zygosityInStatus" in kwargs:
zygosity = kwargs.pop("zygosityInStatus")
if not isinstance(zygosity, str):
raise ValueError(
"Invalid zygosity in status argument - not a string.",
)
zygosity = zygosity.lower()
if zygosity not in valid_zygosities:
raise ValueError(
f"Invalid zygosity in status {zygosity}, "
f"expected one of {valid_zygosities}",
)
kwargs["status"] = update_attribute_query_with_compounds(
kwargs["status"], zygosity,
)
return kwargs
[docs]
def get_unique_family_variants(self, kwargs: dict[str, Any]) -> bool:
if "uniqueFamilyVariants" not in kwargs:
return False
return bool(kwargs["uniqueFamilyVariants"])
[docs]
def transform_kwargs(
self, study: WDAEStudy, **kwargs: Any,
) -> dict[str, Any]:
"""
Transform WEB query variants params into genotype data params.
Requires a study wrapper to handle study context specific arguments,
such as person set collections and phenotype filters.
Returns None if the query is deemed empty.
"""
# flake8: noqa: C901
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
start = time.time()
logger.debug("kwargs in study wrapper: %s", kwargs)
if kwargs.get("personIds"):
# Temporarily transform to set for easier combining of person IDs.
kwargs["personIds"] = set(kwargs["personIds"])
self._add_inheritance_to_query(
"not possible_denovo and not possible_omission",
kwargs,
)
if kwargs.get("personSetCollection"):
kwargs = self._handle_person_set_collection(study, kwargs)
kwargs["tags_query"] = TagsQuery(
selected_family_tags=kwargs.get("selectedFamilyTags"),
deselected_family_tags=kwargs.get("deselectedFamilyTags"),
tags_or_mode=not bool(kwargs.get("tagIntersection", "True")),
)
if "querySummary" in kwargs:
kwargs["query_summary"] = kwargs["querySummary"]
del kwargs["querySummary"]
if "uniqueFamilyVariants" in kwargs:
kwargs["unique_family_variants"] = self.get_unique_family_variants(
kwargs)
del kwargs["uniqueFamilyVariants"]
if "regions" in kwargs:
kwargs["regions"] = self._transform_regions(kwargs["regions"])
present_in_child = set()
present_in_parent = set()
rarity = None
if "presentInChild" in kwargs:
present_in_child = set(kwargs["presentInChild"])
kwargs["presentInChild"] = self._present_in_child_to_roles(
present_in_child,
)
if "presentInParent" in kwargs:
present_in_parent = \
set(kwargs["presentInParent"]["presentInParent"])
rarity = kwargs["presentInParent"].get("rarity", None)
kwargs["presentInParent"] = self._present_in_parent_to_roles(
present_in_parent,
)
if present_in_parent != {"neither"} and rarity is not None:
frequency_filter = kwargs.get("frequency_filter", [])
arg, val = \
self._transform_present_in_child_and_parent_frequency(
present_in_child, present_in_parent,
rarity, frequency_filter,
)
if arg is not None:
kwargs[arg] = val
if kwargs.get("inheritanceTypeFilter"):
inheritance_types = set(kwargs["inheritanceTypeFilter"])
if inheritance_types & {"mendelian", "missing"}:
inheritance_types.add("unknown")
query = f"any([{','.join(inheritance_types)}])"
self._add_inheritance_to_query(query, kwargs)
kwargs.pop("inheritanceTypeFilter")
else:
inheritance = \
self._transform_present_in_child_and_parent_inheritance(
present_in_child, present_in_parent)
self._add_inheritance_to_query(inheritance, kwargs)
if "genomicScores" in kwargs:
genomic_scores = kwargs.pop("genomicScores", [])
if "real_attr_filter" not in kwargs:
kwargs["real_attr_filter"] = []
kwargs["real_attr_filter"].extend(
self._transform_genomic_scores_continuous(genomic_scores))
if "categorical_attr_filter" not in kwargs:
kwargs["categorical_attr_filter"] = []
kwargs["categorical_attr_filter"].extend(
self._transform_genomic_scores_categorical(genomic_scores))
if "frequencyScores" in kwargs:
frequency_scores = kwargs.pop("frequencyScores", [])
if "frequency_filter" not in kwargs:
kwargs["frequency_filter"] = []
kwargs["frequency_filter"].extend(
self._transform_genomic_scores_continuous(frequency_scores))
if "geneScores" in kwargs:
gene_scores = kwargs.pop("geneScores", {})
genes = self._transform_gene_scores(gene_scores)
if genes is not None:
if "genes" not in kwargs:
kwargs["genes"] = []
kwargs["genes"] += genes
if "genders" in kwargs:
sexes = set(kwargs["genders"])
if sexes != {"female", "male", "unspecified"}:
sexes = {f"{sex}" for sex in sexes}
sexes_query = f"any([{','.join(sexes)}])"
kwargs["genders"] = sexes_query
else:
kwargs["genders"] = None
if "variantTypes" in kwargs:
variant_types = {
Allele.DISPLAY_NAME_TYPE[vt.lower()]
for vt in kwargs["variantTypes"]
}
if variant_types != {
"small_insertion",
"small_deletion",
"substitution",
"cnv",
"complex",
}:
if "cnv" in variant_types:
variant_types.remove("cnv")
variant_types.add("cnv+")
variant_types.add("cnv-")
variant_types_query = f"{' or '.join(variant_types)}"
kwargs["variantTypes"] = variant_types_query
else:
del kwargs["variantTypes"]
if "effectTypes" in kwargs:
kwargs["effectTypes"] = self.effect_types_mixin.build_effect_types(
kwargs["effectTypes"],
)
if kwargs.get("personFilters"):
person_filters = kwargs.pop("personFilters")
if person_filters:
matching_person_ids = self._transform_filters_to_ids(
person_filters,
study,
)
if matching_person_ids is not None and kwargs.get("personIds"):
kwargs["personIds"] = set.intersection(
matching_person_ids, set(kwargs.pop("personIds")),
)
else:
kwargs["personIds"] = matching_person_ids
if kwargs.get("personFiltersBeta"):
person_filters = kwargs.pop("personFiltersBeta")
if person_filters:
matching_person_ids = self._transform_pheno_filters_to_ids(
person_filters,
study,
)
if matching_person_ids is not None and kwargs.get("personIds"):
kwargs["personIds"] = set.intersection(
matching_person_ids, set(kwargs.pop("personIds")),
)
else:
kwargs["personIds"] = matching_person_ids
if kwargs.get("familyFilters"):
family_filters = kwargs.pop("familyFilters")
if family_filters:
matching_family_ids = self._transform_filters_to_ids(
family_filters,
study,
)
if matching_family_ids is not None and kwargs.get("familyIds"):
kwargs["familyIds"] = set.intersection(
matching_family_ids, set(kwargs.pop("familyIds")),
)
else:
kwargs["familyIds"] = matching_family_ids
if kwargs.get("familyPhenoFilters"):
family_filters = kwargs.pop("familyPhenoFilters")
if family_filters:
matching_family_ids = self._transform_pheno_filters_to_ids(
family_filters,
study,
)
if matching_family_ids is not None and kwargs.get("familyIds"):
kwargs["familyIds"] = set.intersection(
matching_family_ids, set(kwargs.pop("familyIds")),
)
else:
kwargs["familyIds"] = matching_family_ids
if kwargs.get("personIds"):
kwargs["personIds"] = list(kwargs["personIds"])
if "affectedStatus" in kwargs:
statuses = kwargs.pop("affectedStatus")
kwargs["affected_status"] = [
status.lower() for status in statuses
]
self._apply_zygosity(kwargs)
kwargs["roles"] = self._transform_present_in_child_and_parent_roles(
kwargs,
)
for key in list(kwargs.keys()):
if key in self.FILTER_RENAMES_MAP:
kwargs[self.FILTER_RENAMES_MAP[key]] = kwargs[key]
kwargs.pop(key)
elapsed = time.time() - start
logger.debug("transform kwargs took %.2f sec", elapsed)
return kwargs
[docs]
def make_query_transformer(gpf_instance: GPFInstance) -> QueryTransformer:
return QueryTransformer(
gpf_instance.gene_scores_db,
gpf_instance.reference_genome.chromosomes,
gpf_instance.reference_genome.chrom_prefix,
)
[docs]
def get_or_create_query_transformer(
gpf_instance: GPFInstance,
) -> QueryTransformer:
"""Get or create query transformer singleton instance."""
global _QUERY_TRANSFORMER # pylint: disable=global-statement
with _QUERY_TRANSFORMER_LOCK:
if _QUERY_TRANSFORMER is not None:
return _QUERY_TRANSFORMER
_QUERY_TRANSFORMER = QueryTransformer(
gpf_instance.gene_scores_db,
gpf_instance.reference_genome.chromosomes,
gpf_instance.reference_genome.chrom_prefix,
)
return _QUERY_TRANSFORMER