Source code for dae.genotype_storage.genotype_storage

from __future__ import annotations

import abc
import functools
import logging
from typing import Any, cast

from dae.effect_annotation.effect import expand_effect_types
from dae.genomic_resources.gene_models import GeneModels
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.query_variants.base_query_variants import QueryVariantsBase
from dae.query_variants.query_runners import QueryRunner
from dae.variants.family_variant import FamilyVariant

logger = logging.getLogger(__name__)


[docs] class GenotypeStorage(abc.ABC): """Base class for genotype storages.""" def __init__( self, storage_config: dict[str, Any], ): self.storage_config = \ self.validate_and_normalize_config(storage_config) self.storage_id = self.storage_config["id"] self.storage_type = cast(str, self.storage_config["storage_type"]) self._read_only = cast( bool, self.storage_config.get("read_only", False)) self._study_configs: dict[str, dict[str, Any]] = {} self._loaded_variants: dict[str, QueryVariantsBase] = {} @property def study_configs(self) -> dict[str, dict[str, Any]]: return self._study_configs @property def loaded_variants(self) -> dict[str, QueryVariantsBase]: return self._loaded_variants
[docs] @classmethod def validate_and_normalize_config(cls, config: dict) -> dict: """Normalize and validate the genotype storage configuration. When validation passes returns the normalized and validated annotator configuration dict. When validation fails, raises ValueError. All genotype storage configurations are required to have: * "storage_type" - which storage type this configuration is used for; * "id" - the ID of the genotype storage instance that will be created. """ if config.get("id") is None: raise ValueError( f"genotype storage without ID; 'id' is required: {config}") if config.get("storage_type") is None: raise ValueError( f"genotype storage without type; 'storage_type' is required: " f"{config}") if config["storage_type"] not in cls.get_storage_types(): raise ValueError( f"storage configuration for <{config['storage_type']}> passed " f"to genotype storage class type <{cls.get_storage_types()}>") return config
[docs] def is_read_only(self) -> bool: return self._read_only
@property def read_only(self) -> bool: return self._read_only
[docs] @classmethod @abc.abstractmethod def get_storage_types(cls) -> set[str]: """Return the genotype storage type."""
[docs] @abc.abstractmethod def start(self) -> GenotypeStorage: """Allocate all resources needed for the genotype storage to work."""
[docs] @abc.abstractmethod def shutdown(self) -> GenotypeStorage: """Frees all resources used by the genotype storage to work."""
@abc.abstractmethod def _build_backend_internal( self, study_config: dict, genome: ReferenceGenome, gene_models: GeneModels, ) -> QueryVariantsBase: """Construct a query backend for this genotype storage."""
[docs] def build_backend( self, study_config: dict, genome: ReferenceGenome, gene_models: GeneModels, ) -> None: """Create and cache backend for study.""" study_id = study_config["id"] if study_id not in self.loaded_variants: self.study_configs[study_id] = study_config self.loaded_variants[study_id] = self._build_backend_internal( study_config, genome, gene_models)
[docs] def create_runner( self, study_id: str, kwargs: dict[str, Any], ) -> QueryRunner | None: """Create a query runner for a study with given query kwargs.""" study_filters = kwargs.get("study_filters") regions = kwargs.get("regions") genes = kwargs.get("genes") effect_types = kwargs.get("effect_types") family_ids = kwargs.get("family_ids") person_ids = kwargs.get("person_ids") inheritance = kwargs.get("inheritance") roles = kwargs.get("roles") sexes = kwargs.get("sexes") affected_statuses = kwargs.get("affected_statuses") variant_type = kwargs.get("variant_type") real_attr_filter = kwargs.get("real_attr_filter") categorical_attr_filter = kwargs.get("categorical_attr_filter") ultra_rare = kwargs.get("ultra_rare") frequency_filter = kwargs.get("frequency_filter") return_reference = kwargs.get("return_reference") return_unknown = kwargs.get("return_unknown") limit = kwargs.get("limit") tags_query = kwargs.get("tags_query") summary_variant_ids = kwargs.get("summary_variant_ids") if study_filters is not None and study_id not in study_filters: return None if person_ids is not None and not person_ids: return None if isinstance(inheritance, str): inheritance = [inheritance] if effect_types: effect_types = expand_effect_types(effect_types) def adapt_study_variants( study_name: str, study_phenotype: str, summary_variant_ids: list[str] | None, v: FamilyVariant | None, ) -> FamilyVariant | None: if v is None: return None if summary_variant_ids is not None: svids = [ f"{aa.cshl_location}:{aa.cshl_variant}" for aa in v.alt_alleles ] if not any(svid in summary_variant_ids for svid in svids): return None for allele in v.alleles: if allele.get_attribute("study_name") is None: allele.update_attributes( {"study_name": study_name}) if allele.get_attribute("study_phenotype") is None: allele.update_attributes( {"study_phenotype": study_phenotype}) return v if study_id not in self.loaded_variants: return None backend = self.loaded_variants[study_id] runner = backend\ .build_family_variants_query_runner( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=cast(list, person_ids), inheritance=inheritance, roles=roles, sexes=sexes, affected_statuses=affected_statuses, variant_type=variant_type, real_attr_filter=real_attr_filter, categorical_attr_filter=categorical_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, tags_query=tags_query, ) if runner is None: logger.debug( "study %s has no varants... skipping", study_id) return None runner.set_study_id(study_id) logger.debug("runner created") study_config = self.study_configs[study_id] study_name = study_config.get("name", study_id) study_phenotype = study_config.get("study_phenotype", "-") runner.adapt(functools.partial( adapt_study_variants, study_name, study_phenotype, summary_variant_ids)) return runner
[docs] def create_summary_runner( self, study_id: str, kwargs: dict[str, Any], ) -> QueryRunner | None: """Create a query runner for summary variants for a given study.""" study_filters = kwargs.get("study_filters") if study_filters is not None and study_id not in study_filters: return None if study_id not in self.loaded_variants: return None backend = self.loaded_variants[study_id] runner = backend.build_summary_variants_query_runner( **kwargs, ) if runner is None: return None runner.set_study_id(study_id) return runner