Source code for dae.annotation.cnv_collection_annotator

from typing import Any

from dae.annotation.annotatable import Annotatable
from dae.annotation.annotation_config import AnnotatorInfo, AttributeInfo
from dae.annotation.annotation_pipeline import (
    AnnotationPipeline,
    Annotator,
)
from dae.genomic_resources.aggregators import build_aggregator
from dae.genomic_resources.cnv_collection import CnvCollection


[docs] def build_cnv_collection_annotator(pipeline: AnnotationPipeline, info: AnnotatorInfo) -> Annotator: return CnvCollectionAnnotator(pipeline, info)
[docs] class CnvCollectionAnnotator(Annotator): """Simple effect annotator class.""" def __init__(self, pipeline: AnnotationPipeline, info: AnnotatorInfo): cnv_collection_resrouce_id = info.parameters.get("resource_id") if cnv_collection_resrouce_id is None: raise ValueError(f"Can't create {info.type}: " "no resrouce_id parameter.") resource = pipeline.repository.get_resource(cnv_collection_resrouce_id) self.cnv_collection = CnvCollection(resource) info.resources.append(resource) self.cnv_filter = None cnv_filter_str = info.parameters.get("cnv_filter") if cnv_filter_str is not None: try: # pylint: disable=eval-used self.cnv_filter = eval(f"lambda cnv: { cnv_filter_str }") except Exception as error: raise ValueError( f"The cnv_filter |{cnv_filter_str}| is " f"sytactically invalid.", error) from error if not info.attributes: info.attributes = [AttributeInfo("count", "count", False, {})] source_type_desc = { "count": ("int", "The number of CNVs overlapping with " "the annotatable."), } self.cnv_attributes = {} for attribute_def in info.attributes: if attribute_def.source.startswith("attribute."): attribute = attribute_def.source[len("attribute."):] if attribute not in self.cnv_collection.score_defs: raise ValueError(f"The attribute {attribute} is not " "supported for the cnvs in the" "cnv_collection " f"{cnv_collection_resrouce_id}") res_attribute_def = self.cnv_collection.score_defs[attribute] if "aggregator" in attribute_def.parameters: aggregator = attribute_def.parameters["aggregator"] else: aggregator = res_attribute_def.allele_aggregator attribute_def.type = res_attribute_def.value_type attribute_def.description = res_attribute_def.desc attribute_def._documentation = f""" {attribute_def.description} small values: {res_attribute_def.small_values_desc}, large_values: {res_attribute_def.large_values_desc} aggregator: {aggregator} """ self.cnv_attributes[attribute_def.name] = \ (attribute, aggregator) elif attribute_def.source in source_type_desc: att_type, att_desc = source_type_desc[attribute_def.source] attribute_def.type = att_type attribute_def.description = att_desc else: raise ValueError(f"The source {attribute_def.source} " " is not supported for the annotator " f"{info.type}") super().__init__(pipeline, info)
[docs] def open(self) -> Annotator: self.cnv_collection.open() return super().open()
[docs] def close(self) -> None: self.cnv_collection.close() super().close()
[docs] def annotate( self, annotatable: Annotatable | None, _: dict[str, Any], ) -> dict[str, Any]: if annotatable is None: return self._empty_result() cnvs = self.cnv_collection.fetch_cnvs( annotatable.chrom, annotatable.pos, annotatable.pos_end) if self.cnv_filter: cnvs = [cnv for cnv in cnvs if self.cnv_filter(cnv)] aggregators = {name: build_aggregator(aggregator) for name, (_, aggregator) in self.cnv_attributes.items()} for cnv in cnvs: for name, (attribute, _) in self.cnv_attributes.items(): aggregators[name].add(cnv.attributes[attribute]) ret = {} for attribute_config in self._info.attributes: if attribute_config.name in self.cnv_attributes: ret[attribute_config.name] = \ aggregators[attribute_config.name].get_final() elif attribute_config.source == "count": ret[attribute_config.name] = len(cnvs) return ret