Source code for dae.studies.study

"""Classes to represent genotype data."""
from __future__ import annotations

import functools
import logging
import os
import time
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable
from contextlib import closing
from os.path import basename, exists
from typing import Any, cast

from box import Box

from dae.common_reports.common_report import CommonReport
from dae.common_reports.denovo_report import DenovoReport
from dae.common_reports.family_report import FamiliesReport
from dae.common_reports.people_counter import PeopleReport
from dae.effect_annotation.effect import expand_effect_types
from dae.pedigrees.families_data import FamiliesData
from dae.person_sets import (
    PersonSetCollection,
    PersonSetCollectionConfig,
    PSCQuery,
    parse_person_set_collections_study_config,
)
from dae.query_variants.query_runners import QueryResult
from dae.utils.regions import Region
from dae.variants.attributes import Role
from dae.variants.family_variant import FamilyVariant
from dae.variants.variant import SummaryVariant

logger = logging.getLogger(__name__)


[docs] class GenotypeData(ABC): # pylint: disable=too-many-public-methods """Abstract base class for genotype data.""" def __init__(self, config: Box, studies: list[GenotypeData]): self.config = config self.studies = studies self._description: str | None = None self._person_set_collections: dict[str, PersonSetCollection] = {} self._parents: set[str] = set() self._executor = None self.is_remote = False self.config_dir: str = self.config["work_dir"]
[docs] def close(self) -> None: logger.error("base genotype data close() called for %s", self.study_id)
@property def study_id(self) -> str: return cast(str, self.config["id"]) @property def name(self) -> str: name = self.config.get("name") if name: return cast(str, name) return self.study_id @property def description(self) -> str | None: """Load and return description of a genotype data.""" if self._description is None: if basename(self.config.description_file) != "description.md" and \ not exists(self.config.description_file): # If a non-default description file was given, assert it exists raise ValueError( f"missing description file {self.config.description_file}") if exists(self.config.description_file): # the default description file may not yet exist with open( self.config.description_file, mode="rt", encoding="utf8") as infile: self._description = infile.read() return self._description @description.setter def description(self, input_text: str) -> None: self._description = None with open( self.config.description_file, mode="wt", encoding="utf8") as outfile: outfile.write(input_text) @property def year(self) -> str | None: return cast(str, self.config.get("year")) @property def pub_med(self) -> str | None: return cast(str, self.config.get("pub_med")) @property def phenotype(self) -> str | None: return cast(str, self.config.get("phenotype")) @property def has_denovo(self) -> bool: return cast(bool, self.config.get("has_denovo")) @property def has_transmitted(self) -> bool: return cast(bool, self.config.get("has_transmitted")) @property def has_cnv(self) -> bool: return cast(bool, self.config.get("has_cnv")) @property def has_complex(self) -> bool: return cast(bool, self.config.get("has_complex")) @property def study_type(self) -> str: return cast(str, self.config.get("study_type")) @property def parents(self) -> set[str]: return self._parents @property def person_set_collections(self) -> dict[str, PersonSetCollection]: return self._person_set_collections
[docs] def add_parent(self, genotype_data_id: str) -> None: self._parents.add(genotype_data_id)
@property @abstractmethod def is_group(self) -> bool: return False
[docs] @abstractmethod def get_studies_ids( self, *, leaves: bool = True, ) -> list[str]: pass
[docs] def get_leaf_children(self) -> list[GenotypeDataStudy]: """Return list of genotype studies children of this group.""" if not self.is_group: return [] result = [] seen = set() for study in self.studies: if not study.is_group: if study.study_id in seen: continue result.append(cast(GenotypeDataStudy, study)) seen.add(study.study_id) else: children = study.get_leaf_children() for child in children: if child.study_id in seen: continue result.append(child) seen.add(child.study_id) return result
def _get_query_leaf_studies( self, study_filters: Iterable[str] | None, ) -> list[GenotypeDataStudy]: leafs = [] logger.debug("find leaf studies started...") start = time.time() if self.is_group: leafs = self.get_leaf_children() else: leafs = [cast(GenotypeDataStudy, self)] elapsed = time.time() - start logger.debug("leaf studies found in %.2f sec", elapsed) logger.debug("leaf children: %s", [st.study_id for st in leafs]) logger.debug("study_filters: %s", study_filters) if study_filters is not None: leafs = [st for st in leafs if st.study_id in study_filters] logger.debug("studies to query: %s", [st.study_id for st in leafs]) return leafs
[docs] def query_result_variants( self, *, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, family_ids: Iterable[str] | None = None, person_ids: Iterable[str] | None = None, person_set_collection: PSCQuery | None = None, inheritance: str | list[str] | None = None, roles: str | None = None, sexes: str | None = None, variant_type: str | None = None, real_attr_filter: list[tuple] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, study_filters: Iterable[str] | None = None, pedigree_fields: list[str] | None = None, **kwargs: Any, ) -> QueryResult | None: """Build a query result.""" # pylint: disable=too-many-locals,too-many-arguments del pedigree_fields # Unused argument if person_ids is not None and not person_ids: return None if effect_types: effect_types = expand_effect_types(effect_types) def adapt_study_variants( study_name: str, study_phenotype: str, v: FamilyVariant, ) -> FamilyVariant: if v is None: return None for allele in v.alleles: if allele.get_attribute("study_name") is None: allele.update_attributes( {"study_name": study_name}) if allele.get_attribute("study_phenotype") is None: allele.update_attributes( {"study_phenotype": study_phenotype}) return v runners = [] logger.debug("query leaf studies...") for genotype_study in self._get_query_leaf_studies(study_filters): person_sets_query = None query_person_ids = set(person_ids) \ if person_ids is not None else None psc_query = person_set_collection if psc_query is not None: assert isinstance(psc_query, PSCQuery) psc = genotype_study.get_person_set_collection(psc_query.psc_id) if psc is None: # this study does not have requested person set collection continue assert psc is not None psc_person_ids = psc.query_person_ids(psc_query) if psc_person_ids is not None: if len(psc_person_ids) == 0: # no persons matching the query continue if query_person_ids is not None: query_person_ids = query_person_ids & psc_person_ids else: query_person_ids = psc_person_ids if query_person_ids is not None and len(query_person_ids) == 0: logger.debug( "Study %s can't match any person to filter %s... " "skipping", genotype_study.study_id, psc_query) continue runner = genotype_study.backend\ .build_family_variants_query_runner( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=query_person_ids, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, pedigree_fields=person_sets_query, **kwargs, ) runner.set_study_id(genotype_study.study_id) if runner is None: logger.debug( "study %s has no varants... skipping", genotype_study.study_id) continue runner.study_id = genotype_study.study_id logger.debug("runner created") study_name = genotype_study.name study_phenotype = genotype_study.study_phenotype runner.adapt(functools.partial( adapt_study_variants, study_name, study_phenotype)) runners.append(runner) logger.debug("runners: %s", len(runners)) if len(runners) == 0: return None return QueryResult(runners, limit=limit)
[docs] def query_variants( # pylint: disable=too-many-locals,too-many-arguments self, *, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, family_ids: Iterable[str] | None = None, person_ids: Iterable[str] | None = None, person_set_collection: PSCQuery | None = None, inheritance: str | list[str] | None = None, roles: str | None = None, sexes: str | None = None, variant_type: str | None = None, real_attr_filter: list[tuple] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, study_filters: Iterable[str] | None = None, pedigree_fields: list[str] | None = None, unique_family_variants: bool = True, **kwargs: Any, ) -> Generator[FamilyVariant, None, None]: """Query and return generator containing variants.""" result = self.query_result_variants( regions=regions, genes=genes, effect_types=effect_types, family_ids=family_ids, person_ids=person_ids, person_set_collection=person_set_collection, inheritance=inheritance, roles=roles, sexes=sexes, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, study_filters=study_filters, pedigree_fields=pedigree_fields, **kwargs, ) if result is None: return started = time.time() try: result.start() with closing(result) as result: seen = set() for v in result: if v is None: continue if unique_family_variants and v.fvuid in seen: continue seen.add(v.fvuid) yield v if limit and len(seen) >= limit: break except GeneratorExit: logger.info("generator closed") except Exception: # pylint: disable=broad-except logger.exception("unexpected exception:") finally: elapsed = time.time() - started logger.info( "processing study %s elapsed: %.3f", self.study_id, elapsed) logger.debug("[DONE] executor closed...")
[docs] def query_result_summary_variants( self, *, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, variant_type: str | None = None, real_attr_filter: list[tuple] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, study_filters: list[str] | None = None, **kwargs: Any, ) -> QueryResult | None: # pylint: disable=too-many-locals,too-many-arguments,unused-argument """Build a query result for summary variants only.""" logger.info("summary query - study_filters: %s", study_filters) logger.info( "study %s children: %s", self.study_id, self.get_leaf_children()) if effect_types: effect_types = expand_effect_types(effect_types) runners = [] for genotype_study in self._get_query_leaf_studies(study_filters): # pylint: disable=no-member,protected-access runner = genotype_study.backend \ .build_summary_variants_query_runner( regions=regions, genes=genes, effect_types=effect_types, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, **kwargs, ) runner.study_id = genotype_study.study_id runners.append(runner) if len(runners) == 0: return None return QueryResult(runners)
[docs] def query_summary_variants( self, *, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, variant_type: str | None = None, real_attr_filter: list[tuple] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, study_filters: list[str] | None = None, **kwargs: Any, ) -> Generator[SummaryVariant, None, None]: """Query and return generator containing summary variants.""" # pylint: disable=too-many-locals,too-many-arguments result = self.query_result_summary_variants( regions=regions, genes=genes, effect_types=effect_types, variant_type=variant_type, real_attr_filter=real_attr_filter, ultra_rare=ultra_rare, frequency_filter=frequency_filter, return_reference=return_reference, return_unknown=return_unknown, limit=limit, study_filters=study_filters, **kwargs) try: if result is None: return started = time.time() variants: dict[str, Any] = {} with closing(result) as result: result.start() for v in result: if v is None: continue if v.svuid in variants: existing = variants[v.svuid] fv_count = existing.get_attribute( "family_variants_count")[0] if fv_count is None: continue fv_count += v.get_attribute("family_variants_count")[0] seen_in_status = existing.get_attribute( "seen_in_status")[0] seen_in_status = \ seen_in_status | \ v.get_attribute("seen_in_status")[0] seen_as_denovo = existing.get_attribute( "seen_as_denovo")[0] seen_as_denovo = \ seen_as_denovo or \ v.get_attribute("seen_as_denovo")[0] new_attributes = { "family_variants_count": [fv_count], "seen_in_status": [seen_in_status], "seen_as_denovo": [seen_as_denovo], } v.update_attributes(new_attributes) variants[v.svuid] = v if limit and len(variants) >= limit: break elapsed = time.time() - started logger.info( "processing study %s elapsed: %.3f", self.study_id, elapsed) yield from variants.values() finally: logger.debug("[DONE] executor closed...")
@property @abstractmethod def families(self) -> FamiliesData: pass @abstractmethod def _build_person_set_collection( self, psc_config: PersonSetCollectionConfig, families: FamiliesData, ) -> PersonSetCollection: pass def _build_person_set_collections( self, study_config: dict[str, Any] | None, families: FamiliesData, ) -> dict[str, PersonSetCollection]: if study_config is None: return {} if "person_set_collections" not in study_config: return {} pscs_config = parse_person_set_collections_study_config(study_config) result = {} for psc_id, psc_config in pscs_config.items(): result[psc_id] = self._build_person_set_collection( psc_config, families) return result
[docs] def get_person_set_collection( self, person_set_collection_id: str, ) -> PersonSetCollection | None: if person_set_collection_id is None: return None return self.person_set_collections.get(person_set_collection_id)
[docs] def build_report(self) -> CommonReport: """Generate common report JSON from genotpye data study.""" config = self.config.common_report assert config.enabled, self.study_id start = time.time() if config.selected_person_set_collections.family_report: families_report_collections = [ self.person_set_collections[collection_id] for collection_id in config.selected_person_set_collections.family_report ] else: families_report_collections = \ list(self.person_set_collections.values()) families_report = FamiliesReport.from_genotype_study( self, families_report_collections, ) people_report = PeopleReport.from_person_set_collections( families_report_collections, ) elapsed = time.time() - start logger.info( "COMMON REPORTS family report build in %.2f sec", elapsed, ) start = time.time() if config.selected_person_set_collections.denovo_report: denovo_report_collections = [ self.person_set_collections[collection_id] for collection_id in config.selected_person_set_collections.denovo_report ] else: denovo_report_collections = \ list(self.person_set_collections.values()) denovo_report = DenovoReport.from_genotype_study( self, denovo_report_collections, ) elapsed = time.time() - start logger.info( "COMMON REPORTS denovo report build in %.2f sec", elapsed, ) person_sets_config = \ self.config.person_set_collections assert person_sets_config.selected_person_set_collections \ is not None, config collection = self.get_person_set_collection( person_sets_config.selected_person_set_collections[0], ) phenotype: list[str] = [] assert collection is not None for person_set in collection.person_sets.values(): if len(person_set.persons) > 0: phenotype += person_set.values # noqa: PD011 study_type = ( ",".join(self.study_type) if self.study_type else None ) number_of_probands = 0 number_of_siblings = 0 for family in self.families.values(): for person in family.members_in_order: if not person.is_child(): continue if person.role == Role.prb: number_of_probands += 1 if person.role == Role.sib: number_of_siblings += 1 return CommonReport({ "id": self.study_id, "people_report": people_report.to_dict(), "families_report": families_report.to_dict(full=True), "denovo_report": ( denovo_report.to_dict() ), "study_name": self.name, "phenotype": phenotype, "study_type": study_type, "study_year": self.year, "pub_med": self.pub_med, "families": len(self.families.values()), "number_of_probands": number_of_probands, "number_of_siblings": number_of_siblings, "denovo": self.has_denovo, "transmitted": self.has_transmitted, "study_description": self.description, })
[docs] def build_and_save( self, *, force: bool = False, ) -> CommonReport | None: """Build a common report for a study, saves it and returns the report. If the common reports are disabled for the study, the function skips building the report and returns None. If the report already exists the default behavior is to skip building the report. You can force building the report by passing `force=True` to the function. """ if not self.config.common_report.enabled: return None report_filename = self.config.common_report.file_path try: if os.path.exists(report_filename) and not force: return CommonReport.load(report_filename) except Exception: # noqa: BLE001 logger.warning( "unable to load common report for %s", self.study_id, exc_info=True) report = self.build_report() report.save(report_filename) return report
[docs] def get_common_report(self) -> CommonReport | None: """Return a study's common report.""" if not self.config.common_report.enabled: return None report = CommonReport.load(self.config.common_report.file_path) if report is None: report = self.build_and_save() return report
[docs] class GenotypeDataGroup(GenotypeData): """ Represents a group of genotype data classes. Queries to this object will be sent to all child data. """ def __init__( self, config: Box, studies: Iterable[GenotypeData], ): super().__init__( config, list(studies), ) self._families: FamiliesData self.rebuild_families() self._executor = None self.is_remote = False for study in self.studies: study.add_parent(self.study_id) @property def is_group(self) -> bool: return True @property def families(self) -> FamiliesData: return self._families
[docs] def get_studies_ids( self, *, leaves: bool = True, ) -> list[str]: result = {self.study_id} if not leaves: result = result.union([st.study_id for st in self.studies]) return list(result) for study in self.studies: result = result.union(study.get_studies_ids()) return list(result)
[docs] def rebuild_families(self) -> None: """Construct genotype group families data from child studies.""" logger.info( "building combined families from studies: %s", [st.study_id for st in self.studies]) if len(self.studies) == 1: self._families = self.studies[0].families self._person_set_collections = self._build_person_set_collections( self.config, self._families, ) return logger.info( "combining families from study %s and from study %s", self.studies[0].study_id, self.studies[1].study_id) result = FamiliesData.combine( self.studies[0].families, self.studies[1].families) if len(self.studies) > 2: for sind in range(2, len(self.studies)): logger.debug( "processing study (%s): %s", sind, self.studies[sind].study_id) logger.info( "combining families from studies (%s) %s with families " "from study %s", sind, [st.study_id for st in self.studies[:sind]], self.studies[sind].study_id) result = FamiliesData.combine( result, self.studies[sind].families, forced=True) self._families = result pscs = self._build_person_set_collections( self.config, result, ) self._person_set_collections = pscs
def _build_person_set_collection( self, psc_config: PersonSetCollectionConfig, families: FamiliesData, ) -> PersonSetCollection: psc_id = psc_config.id studies_psc = [] for study in self.studies: study_psc = study.get_person_set_collection(psc_id) if study_psc is None: raise ValueError( f"person set collection {psc_id} " f"not found in study {study.study_id}") studies_psc.append(study_psc) psc = PersonSetCollection.combine(studies_psc, families) for fpid, person in families.real_persons.items(): person_set_value = psc.get_person_set_of_person(fpid) assert person_set_value is not None person.set_attr(psc_id, person_set_value.id) return psc
[docs] class GenotypeDataStudy(GenotypeData): """Represents a singular genotype data study.""" def __init__(self, config: Box, backend: Any): super().__init__(config, [self]) self._backend = backend self. _person_set_collections = self._build_person_set_collections( self.config, self.families, ) self.is_remote = False @property def backend(self) -> Any: return self._backend @property def study_phenotype(self) -> str: return cast(str, self.config.get("study_phenotype", "-")) @property def is_group(self) -> bool: return False
[docs] def get_studies_ids( self, *, leaves: bool = True, # noqa: ARG002 ) -> list[str]: return [self.study_id]
@property def families(self) -> FamiliesData: return cast(FamiliesData, self._backend.families) def _build_person_set_collection( self, psc_config: PersonSetCollectionConfig, families: FamiliesData, ) -> PersonSetCollection: psc = PersonSetCollection.from_families(psc_config, self.families) for fpid, person in families.real_persons.items(): person_set_value = psc.get_person_set_of_person(fpid) assert person_set_value is not None person.set_attr(psc.id, person_set_value.id) return psc