"""Classes to represent genotype data."""
from __future__ import annotations
import functools
import logging
import os
import time
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable
from contextlib import closing
from os.path import basename, exists
from typing import Any, cast
from box import Box
from dae.common_reports.common_report import CommonReport
from dae.common_reports.denovo_report import DenovoReport
from dae.common_reports.family_report import FamiliesReport
from dae.common_reports.people_counter import PeopleReport
from dae.effect_annotation.effect import expand_effect_types
from dae.pedigrees.families_data import FamiliesData
from dae.person_sets import (
PersonSetCollection,
PersonSetCollectionConfig,
PSCQuery,
parse_person_set_collections_study_config,
)
from dae.query_variants.query_runners import QueryResult
from dae.utils.regions import Region
from dae.variants.attributes import Role
from dae.variants.family_variant import FamilyVariant
from dae.variants.variant import SummaryVariant
logger = logging.getLogger(__name__)
[docs]
class GenotypeData(ABC): # pylint: disable=too-many-public-methods
"""Abstract base class for genotype data."""
def __init__(self, config: Box, studies: list[GenotypeData]):
self.config = config
self.studies = studies
self._description: str | None = None
self._person_set_collections: dict[str, PersonSetCollection] = {}
self._parents: set[str] = set()
self._executor = None
self.is_remote = False
self.config_dir: str = self.config["work_dir"]
[docs]
def close(self) -> None:
logger.error("base genotype data close() called for %s", self.study_id)
@property
def study_id(self) -> str:
return cast(str, self.config["id"])
@property
def name(self) -> str:
name = self.config.get("name")
if name:
return cast(str, name)
return self.study_id
@property
def description(self) -> str | None:
"""Load and return description of a genotype data."""
if self._description is None:
if basename(self.config.description_file) != "description.md" and \
not exists(self.config.description_file):
# If a non-default description file was given, assert it exists
raise ValueError(
f"missing description file {self.config.description_file}")
if exists(self.config.description_file):
# the default description file may not yet exist
with open(
self.config.description_file,
mode="rt", encoding="utf8") as infile:
self._description = infile.read()
return self._description
@description.setter
def description(self, input_text: str) -> None:
self._description = None
with open(
self.config.description_file,
mode="wt", encoding="utf8") as outfile:
outfile.write(input_text)
@property
def year(self) -> str | None:
return cast(str, self.config.get("year"))
@property
def pub_med(self) -> str | None:
return cast(str, self.config.get("pub_med"))
@property
def phenotype(self) -> str | None:
return cast(str, self.config.get("phenotype"))
@property
def has_denovo(self) -> bool:
return cast(bool, self.config.get("has_denovo"))
@property
def has_transmitted(self) -> bool:
return cast(bool, self.config.get("has_transmitted"))
@property
def has_cnv(self) -> bool:
return cast(bool, self.config.get("has_cnv"))
@property
def has_complex(self) -> bool:
return cast(bool, self.config.get("has_complex"))
@property
def study_type(self) -> str:
return cast(str, self.config.get("study_type"))
@property
def parents(self) -> set[str]:
return self._parents
@property
def person_set_collections(self) -> dict[str, PersonSetCollection]:
return self._person_set_collections
[docs]
def add_parent(self, genotype_data_id: str) -> None:
self._parents.add(genotype_data_id)
@property
@abstractmethod
def is_group(self) -> bool:
return False
[docs]
@abstractmethod
def get_studies_ids(
self, *,
leaves: bool = True,
) -> list[str]:
pass
[docs]
def get_leaf_children(self) -> list[GenotypeDataStudy]:
"""Return list of genotype studies children of this group."""
if not self.is_group:
return []
result = []
seen = set()
for study in self.studies:
if not study.is_group:
if study.study_id in seen:
continue
result.append(cast(GenotypeDataStudy, study))
seen.add(study.study_id)
else:
children = study.get_leaf_children()
for child in children:
if child.study_id in seen:
continue
result.append(child)
seen.add(child.study_id)
return result
def _get_query_leaf_studies(
self, study_filters: Iterable[str] | None,
) -> list[GenotypeDataStudy]:
leafs = []
logger.debug("find leaf studies started...")
start = time.time()
if self.is_group:
leafs = self.get_leaf_children()
else:
leafs = [cast(GenotypeDataStudy, self)]
elapsed = time.time() - start
logger.debug("leaf studies found in %.2f sec", elapsed)
logger.debug("leaf children: %s", [st.study_id for st in leafs])
logger.debug("study_filters: %s", study_filters)
if study_filters is not None:
leafs = [st for st in leafs if st.study_id in study_filters]
logger.debug("studies to query: %s", [st.study_id for st in leafs])
return leafs
[docs]
def query_result_variants(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
family_ids: Iterable[str] | None = None,
person_ids: Iterable[str] | None = None,
person_set_collection: PSCQuery | None = None,
inheritance: str | list[str] | None = None,
roles: str | None = None,
sexes: str | None = None,
variant_type: str | None = None,
real_attr_filter: list[tuple] | None = None,
ultra_rare: bool | None = None,
frequency_filter: list[tuple] | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
study_filters: Iterable[str] | None = None,
pedigree_fields: list[str] | None = None,
**kwargs: Any,
) -> QueryResult | None:
"""Build a query result."""
# pylint: disable=too-many-locals,too-many-arguments
del pedigree_fields # Unused argument
if person_ids is not None and not person_ids:
return None
if effect_types:
effect_types = expand_effect_types(effect_types)
def adapt_study_variants(
study_name: str,
study_phenotype: str,
v: FamilyVariant,
) -> FamilyVariant:
if v is None:
return None
for allele in v.alleles:
if allele.get_attribute("study_name") is None:
allele.update_attributes(
{"study_name": study_name})
if allele.get_attribute("study_phenotype") is None:
allele.update_attributes(
{"study_phenotype": study_phenotype})
return v
runners = []
logger.debug("query leaf studies...")
for genotype_study in self._get_query_leaf_studies(study_filters):
person_sets_query = None
query_person_ids = set(person_ids) \
if person_ids is not None else None
psc_query = person_set_collection
if psc_query is not None:
assert isinstance(psc_query, PSCQuery)
psc = genotype_study.get_person_set_collection(psc_query.psc_id)
if psc is None:
# this study does not have requested person set collection
continue
assert psc is not None
psc_person_ids = psc.query_person_ids(psc_query)
if psc_person_ids is not None:
if len(psc_person_ids) == 0:
# no persons matching the query
continue
if query_person_ids is not None:
query_person_ids = query_person_ids & psc_person_ids
else:
query_person_ids = psc_person_ids
if query_person_ids is not None and len(query_person_ids) == 0:
logger.debug(
"Study %s can't match any person to filter %s... "
"skipping",
genotype_study.study_id,
psc_query)
continue
runner = genotype_study.backend\
.build_family_variants_query_runner(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=query_person_ids,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
pedigree_fields=person_sets_query,
**kwargs,
)
runner.set_study_id(genotype_study.study_id)
if runner is None:
logger.debug(
"study %s has no varants... skipping",
genotype_study.study_id)
continue
runner.study_id = genotype_study.study_id
logger.debug("runner created")
study_name = genotype_study.name
study_phenotype = genotype_study.study_phenotype
runner.adapt(functools.partial(
adapt_study_variants, study_name, study_phenotype))
runners.append(runner)
logger.debug("runners: %s", len(runners))
if len(runners) == 0:
return None
return QueryResult(runners, limit=limit)
[docs]
def query_variants( # pylint: disable=too-many-locals,too-many-arguments
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
family_ids: Iterable[str] | None = None,
person_ids: Iterable[str] | None = None,
person_set_collection: PSCQuery | None = None,
inheritance: str | list[str] | None = None,
roles: str | None = None,
sexes: str | None = None,
variant_type: str | None = None,
real_attr_filter: list[tuple] | None = None,
ultra_rare: bool | None = None,
frequency_filter: list[tuple] | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
study_filters: Iterable[str] | None = None,
pedigree_fields: list[str] | None = None,
unique_family_variants: bool = True,
**kwargs: Any,
) -> Generator[FamilyVariant, None, None]:
"""Query and return generator containing variants."""
result = self.query_result_variants(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=person_ids,
person_set_collection=person_set_collection,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
study_filters=study_filters,
pedigree_fields=pedigree_fields,
**kwargs,
)
if result is None:
return
started = time.time()
try:
result.start()
with closing(result) as result:
seen = set()
for v in result:
if v is None:
continue
if unique_family_variants and v.fvuid in seen:
continue
seen.add(v.fvuid)
yield v
if limit and len(seen) >= limit:
break
except GeneratorExit:
logger.info("generator closed")
except Exception: # pylint: disable=broad-except
logger.exception("unexpected exception:")
finally:
elapsed = time.time() - started
logger.info(
"processing study %s elapsed: %.3f", self.study_id, elapsed)
logger.debug("[DONE] executor closed...")
[docs]
def query_result_summary_variants(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
variant_type: str | None = None,
real_attr_filter: list[tuple] | None = None,
ultra_rare: bool | None = None,
frequency_filter: list[tuple] | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
study_filters: list[str] | None = None,
**kwargs: Any,
) -> QueryResult | None:
# pylint: disable=too-many-locals,too-many-arguments,unused-argument
"""Build a query result for summary variants only."""
logger.info("summary query - study_filters: %s", study_filters)
logger.info(
"study %s children: %s", self.study_id, self.get_leaf_children())
if effect_types:
effect_types = expand_effect_types(effect_types)
runners = []
for genotype_study in self._get_query_leaf_studies(study_filters):
# pylint: disable=no-member,protected-access
runner = genotype_study.backend \
.build_summary_variants_query_runner(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
**kwargs,
)
runner.study_id = genotype_study.study_id
runners.append(runner)
if len(runners) == 0:
return None
return QueryResult(runners)
[docs]
def query_summary_variants(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
variant_type: str | None = None,
real_attr_filter: list[tuple] | None = None,
ultra_rare: bool | None = None,
frequency_filter: list[tuple] | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
study_filters: list[str] | None = None,
**kwargs: Any,
) -> Generator[SummaryVariant, None, None]:
"""Query and return generator containing summary variants."""
# pylint: disable=too-many-locals,too-many-arguments
result = self.query_result_summary_variants(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
study_filters=study_filters,
**kwargs)
try:
if result is None:
return
started = time.time()
variants: dict[str, Any] = {}
with closing(result) as result:
result.start()
for v in result:
if v is None:
continue
if v.svuid in variants:
existing = variants[v.svuid]
fv_count = existing.get_attribute(
"family_variants_count")[0]
if fv_count is None:
continue
fv_count += v.get_attribute("family_variants_count")[0]
seen_in_status = existing.get_attribute(
"seen_in_status")[0]
seen_in_status = \
seen_in_status | \
v.get_attribute("seen_in_status")[0]
seen_as_denovo = existing.get_attribute(
"seen_as_denovo")[0]
seen_as_denovo = \
seen_as_denovo or \
v.get_attribute("seen_as_denovo")[0]
new_attributes = {
"family_variants_count": [fv_count],
"seen_in_status": [seen_in_status],
"seen_as_denovo": [seen_as_denovo],
}
v.update_attributes(new_attributes)
variants[v.svuid] = v
if limit and len(variants) >= limit:
break
elapsed = time.time() - started
logger.info(
"processing study %s elapsed: %.3f",
self.study_id, elapsed)
yield from variants.values()
finally:
logger.debug("[DONE] executor closed...")
@property
@abstractmethod
def families(self) -> FamiliesData:
pass
@abstractmethod
def _build_person_set_collection(
self, psc_config: PersonSetCollectionConfig,
families: FamiliesData,
) -> PersonSetCollection:
pass
def _build_person_set_collections(
self, study_config: dict[str, Any] | None,
families: FamiliesData,
) -> dict[str, PersonSetCollection]:
if study_config is None:
return {}
if "person_set_collections" not in study_config:
return {}
pscs_config = parse_person_set_collections_study_config(study_config)
result = {}
for psc_id, psc_config in pscs_config.items():
result[psc_id] = self._build_person_set_collection(
psc_config, families)
return result
[docs]
def get_person_set_collection(
self, person_set_collection_id: str,
) -> PersonSetCollection | None:
if person_set_collection_id is None:
return None
return self.person_set_collections.get(person_set_collection_id)
[docs]
def build_report(self) -> CommonReport:
"""Generate common report JSON from genotpye data study."""
config = self.config.common_report
assert config.enabled, self.study_id
start = time.time()
if config.selected_person_set_collections.family_report:
families_report_collections = [
self.person_set_collections[collection_id]
for collection_id in
config.selected_person_set_collections.family_report
]
else:
families_report_collections = \
list(self.person_set_collections.values())
families_report = FamiliesReport.from_genotype_study(
self,
families_report_collections,
)
people_report = PeopleReport.from_person_set_collections(
families_report_collections,
)
elapsed = time.time() - start
logger.info(
"COMMON REPORTS family report build in %.2f sec", elapsed,
)
start = time.time()
if config.selected_person_set_collections.denovo_report:
denovo_report_collections = [
self.person_set_collections[collection_id]
for collection_id in
config.selected_person_set_collections.denovo_report
]
else:
denovo_report_collections = \
list(self.person_set_collections.values())
denovo_report = DenovoReport.from_genotype_study(
self,
denovo_report_collections,
)
elapsed = time.time() - start
logger.info(
"COMMON REPORTS denovo report build in %.2f sec", elapsed,
)
person_sets_config = \
self.config.person_set_collections
assert person_sets_config.selected_person_set_collections \
is not None, config
collection = self.get_person_set_collection(
person_sets_config.selected_person_set_collections[0],
)
phenotype: list[str] = []
assert collection is not None
for person_set in collection.person_sets.values():
if len(person_set.persons) > 0:
phenotype += person_set.values # noqa: PD011
study_type = (
",".join(self.study_type)
if self.study_type
else None
)
number_of_probands = 0
number_of_siblings = 0
for family in self.families.values():
for person in family.members_in_order:
if not person.is_child():
continue
if person.role == Role.prb:
number_of_probands += 1
if person.role == Role.sib:
number_of_siblings += 1
return CommonReport({
"id": self.study_id,
"people_report": people_report.to_dict(),
"families_report": families_report.to_dict(full=True),
"denovo_report": (
denovo_report.to_dict()
),
"study_name": self.name,
"phenotype": phenotype,
"study_type": study_type,
"study_year": self.year,
"pub_med": self.pub_med,
"families": len(self.families.values()),
"number_of_probands": number_of_probands,
"number_of_siblings": number_of_siblings,
"denovo": self.has_denovo,
"transmitted": self.has_transmitted,
"study_description": self.description,
})
[docs]
def build_and_save(
self,
*,
force: bool = False,
) -> CommonReport | None:
"""Build a common report for a study, saves it and returns the report.
If the common reports are disabled for the study, the function skips
building the report and returns None.
If the report already exists the default behavior is to skip building
the report. You can force building the report by
passing `force=True` to the function.
"""
if not self.config.common_report.enabled:
return None
report_filename = self.config.common_report.file_path
try:
if os.path.exists(report_filename) and not force:
return CommonReport.load(report_filename)
except Exception: # noqa: BLE001
logger.warning(
"unable to load common report for %s", self.study_id,
exc_info=True)
report = self.build_report()
report.save(report_filename)
return report
[docs]
def get_common_report(self) -> CommonReport | None:
"""Return a study's common report."""
if not self.config.common_report.enabled:
return None
report = CommonReport.load(self.config.common_report.file_path)
if report is None:
report = self.build_and_save()
return report
[docs]
class GenotypeDataGroup(GenotypeData):
"""
Represents a group of genotype data classes.
Queries to this object will be sent to all child data.
"""
def __init__(
self, config: Box,
studies: Iterable[GenotypeData],
):
super().__init__(
config, list(studies),
)
self._families: FamiliesData
self.rebuild_families()
self._executor = None
self.is_remote = False
for study in self.studies:
study.add_parent(self.study_id)
@property
def is_group(self) -> bool:
return True
@property
def families(self) -> FamiliesData:
return self._families
[docs]
def get_studies_ids(
self, *,
leaves: bool = True,
) -> list[str]:
result = {self.study_id}
if not leaves:
result = result.union([st.study_id for st in self.studies])
return list(result)
for study in self.studies:
result = result.union(study.get_studies_ids())
return list(result)
[docs]
def rebuild_families(self) -> None:
"""Construct genotype group families data from child studies."""
logger.info(
"building combined families from studies: %s",
[st.study_id for st in self.studies])
if len(self.studies) == 1:
self._families = self.studies[0].families
self._person_set_collections = self._build_person_set_collections(
self.config,
self._families,
)
return
logger.info(
"combining families from study %s and from study %s",
self.studies[0].study_id, self.studies[1].study_id)
result = FamiliesData.combine(
self.studies[0].families,
self.studies[1].families)
if len(self.studies) > 2:
for sind in range(2, len(self.studies)):
logger.debug(
"processing study (%s): %s",
sind, self.studies[sind].study_id)
logger.info(
"combining families from studies (%s) %s with families "
"from study %s",
sind, [st.study_id for st in self.studies[:sind]],
self.studies[sind].study_id)
result = FamiliesData.combine(
result,
self.studies[sind].families,
forced=True)
self._families = result
pscs = self._build_person_set_collections(
self.config,
result,
)
self._person_set_collections = pscs
def _build_person_set_collection(
self, psc_config: PersonSetCollectionConfig,
families: FamiliesData,
) -> PersonSetCollection:
psc_id = psc_config.id
studies_psc = []
for study in self.studies:
study_psc = study.get_person_set_collection(psc_id)
if study_psc is None:
raise ValueError(
f"person set collection {psc_id} "
f"not found in study {study.study_id}")
studies_psc.append(study_psc)
psc = PersonSetCollection.combine(studies_psc, families)
for fpid, person in families.real_persons.items():
person_set_value = psc.get_person_set_of_person(fpid)
assert person_set_value is not None
person.set_attr(psc_id, person_set_value.id)
return psc
[docs]
class GenotypeDataStudy(GenotypeData):
"""Represents a singular genotype data study."""
def __init__(self, config: Box, backend: Any):
super().__init__(config, [self])
self._backend = backend
self. _person_set_collections = self._build_person_set_collections(
self.config,
self.families,
)
self.is_remote = False
@property
def backend(self) -> Any:
return self._backend
@property
def study_phenotype(self) -> str:
return cast(str, self.config.get("study_phenotype", "-"))
@property
def is_group(self) -> bool:
return False
[docs]
def get_studies_ids(
self, *,
leaves: bool = True, # noqa: ARG002
) -> list[str]:
return [self.study_id]
@property
def families(self) -> FamiliesData:
return cast(FamiliesData, self._backend.families)
def _build_person_set_collection(
self, psc_config: PersonSetCollectionConfig,
families: FamiliesData,
) -> PersonSetCollection:
psc = PersonSetCollection.from_families(psc_config, self.families)
for fpid, person in families.real_persons.items():
person_set_value = psc.get_person_set_of_person(fpid)
assert person_set_value is not None
person.set_attr(psc.id, person_set_value.id)
return psc