Source code for dae.annotation.cnv_collection_annotator

import textwrap
from collections.abc import Callable
from typing import Any

from lark import Lark, Token, Tree

from dae.annotation.annotatable import Annotatable
from dae.annotation.annotation_config import AnnotatorInfo, AttributeInfo
from dae.annotation.annotation_pipeline import (
    AnnotationPipeline,
    Annotator,
    AttributeDesc,
)
from dae.genomic_resources.aggregators import build_aggregator
from dae.genomic_resources.genomic_scores import CNV, CnvCollection


[docs] def build_cnv_collection_annotator(pipeline: AnnotationPipeline, info: AnnotatorInfo) -> Annotator: return CnvCollectionAnnotator(pipeline, info)
[docs] class CnvCollectionAnnotator(Annotator): """Simple effect annotator class.""" CNV_FILTER_GRAMMAR = textwrap.dedent(""" ?start: filter | and_ | or and_: filter "and" filter or: filter "or" filter ?filter: subject operator subject | or | and_ ?subject: variable | value value: "\\"" word "\\"" | number variable: word operator: equals | greater_than | less_than | in equals: "==" greater_than: ">" less_than: "<" in: "in" word: /[a-zA-Z!@#$%^&*()_+]+/ number: /[0-9\\.]+/ %ignore " " """) def __init__(self, pipeline: AnnotationPipeline, info: AnnotatorInfo): cnv_collection_resrouce_id = info.parameters.get("resource_id") if cnv_collection_resrouce_id is None: raise ValueError(f"Can't create {info.type}: " "no resrouce_id parameter.") resource = pipeline.repository.get_resource(cnv_collection_resrouce_id) self.cnv_collection = CnvCollection(resource) info.resources.append(resource) self.filter_parser = Lark(self.CNV_FILTER_GRAMMAR) self.cnv_filter = None cnv_filter_str = info.parameters.get("cnv_filter") if cnv_filter_str is not None: assert isinstance(cnv_filter_str, str) cnv_filter_str = cnv_filter_str.replace( "\n", " ").replace("\t", " ").strip() self.cnv_filter = self._build_cnv_filter_func( self.filter_parser.parse(cnv_filter_str)) if not info.attributes: info.attributes = [AttributeInfo( "count", "count", internal=False, parameters={})] self.cnv_attributes = {} all_attributes = self.get_all_attribute_descriptions() for attribute_def in info.attributes: if attribute_def.source not in all_attributes: raise ValueError(f"The source {attribute_def.source} " " is not supported for the annotator " f"{info.type}") attribute = all_attributes[attribute_def.source] if attribute_def.source.startswith("attribute."): attribute_name = attribute_def.source[len("attribute."):] res_attribute_def = self.cnv_collection\ .get_score_definition(attribute_name) assert res_attribute_def is not None if "aggregator" in attribute_def.parameters: aggregator = attribute_def.parameters["aggregator"] else: aggregator = attribute.params["aggregator"] attribute_def.value_type = attribute.type attribute_def.description = attribute.description attribute_def._documentation = f""" {attribute_def.description} small values: {res_attribute_def.small_values_desc}, large_values: {res_attribute_def.large_values_desc} aggregator: {aggregator} """ # noqa: SLF001 self.cnv_attributes[attribute_def.name] = \ (attribute, aggregator) else: attribute_def.value_type = attribute.type attribute_def.description = attribute.description super().__init__(pipeline, info)
[docs] def get_all_attribute_descriptions(self) -> dict[str, AttributeDesc]: attributes = { "count": AttributeDesc( source="count", type="int", description="The number of CNVs overlapping with the " "annotatable.", ), } for score_id, score_def in \ self.cnv_collection.score_definitions.items(): score_id = f"attribute.{score_id}" attributes[score_id] = AttributeDesc( source=score_id, type=score_def.value_type, description=score_def.desc, params={"aggregator": score_def.allele_aggregator}, ) return attributes
@classmethod def _build_cnv_filter_func( cls, tree: Tree, ) -> Callable[[CNV], bool]: if tree.data == "and_": assert isinstance(tree.children[0], Tree) assert isinstance(tree.children[1], Tree) left_func = cls._build_cnv_filter_func(tree.children[0]) right_func = cls._build_cnv_filter_func(tree.children[1]) return lambda cnv: left_func(cnv) and right_func(cnv) if tree.data == "or": left_func = cls._build_cnv_filter_func(tree.children[0]) right_func = cls._build_cnv_filter_func(tree.children[1]) return lambda cnv: left_func(cnv) or right_func(cnv) left = tree.children[0] assert isinstance(left, Tree) assert isinstance(left.data, Token) left_type = left.data.value if left_type == "variable": assert isinstance(left.children[0], Tree) assert isinstance(left.children[0].data, Token) assert left.children[0].data.value == "word" assert isinstance(left.children[0].children[0], Token) left_value = left.children[0].children[0].value def left_accessor(_cnv: CNV) -> Any: return _cnv.attributes.get(left_value) else: assert isinstance(left.children[0], Tree) assert isinstance(left.children[0].data, Token) is_number = left.children[0].data.value == "number" assert isinstance(left.children[0].children[0], Token) left_value = left.children[0].children[0].value if is_number: left_value = float(left_value) def left_accessor( _cnv: CNV) -> Any: # pylint: disable=unused-argument return left_value assert isinstance(tree.children[1], Tree) assert isinstance(tree.children[1].children[0], Tree) assert isinstance(tree.children[1].children[0].data, Token) operator = tree.children[1].children[0].data.value right = tree.children[2] assert isinstance(right, Tree) assert isinstance(right.data, Token) right_type = right.data.value if right_type == "variable": assert isinstance(right.children[0], Tree) assert isinstance(right.children[0].data, Token) assert right.children[0].data.value == "word" assert isinstance(right.children[0].children[0], Token) right_value = right.children[0].children[0].value def right_accessor(_cnv: CNV) -> Any: return _cnv.attributes.get(right_value) else: assert isinstance(right.children[0], Tree) assert isinstance(right.children[0].data, Token) is_number = right.children[0].data.value == "number" assert isinstance(right.children[0].children[0], Token) right_value = right.children[0].children[0].value if is_number: right_value = float(right_value) def right_accessor( _cnv: CNV) -> Any: # pylint: disable=unused-argument return right_value if operator == "equals": return lambda cnv: left_accessor(cnv) == right_accessor(cnv) if operator == "greater_than": return lambda cnv: left_accessor(cnv) > right_accessor(cnv) if operator == "less_than": return lambda cnv: left_accessor(cnv) < right_accessor(cnv) if operator == "in": return lambda cnv: left_accessor(cnv) in right_accessor(cnv) raise ValueError(f"Unsupported operator {operator.data}")
[docs] def open(self) -> Annotator: self.cnv_collection.open() return super().open()
[docs] def close(self) -> None: self.cnv_collection.close() super().close()
[docs] def annotate( self, annotatable: Annotatable | None, context: dict[str, Any], # noqa: ARG002 ) -> dict[str, Any]: if annotatable is None: return self._empty_result() cnvs = self.cnv_collection.fetch_cnvs( annotatable.chrom, annotatable.pos, annotatable.pos_end) if self.cnv_filter: cnvs = [cnv for cnv in cnvs if self.cnv_filter(cnv)] aggregators = {name: build_aggregator(aggregator) for name, (_, aggregator) in self.cnv_attributes.items()} for cnv in cnvs: for name, (attribute, _) in self.cnv_attributes.items(): assert attribute.name is not None assert attribute.name.startswith("attribute.") attribute_name = attribute.name[len("attribute."):] aggregators[name].add(cnv.attributes[attribute_name]) ret = {} for attribute_config in self._info.attributes: if attribute_config.name in self.cnv_attributes: ret[attribute_config.name] = \ aggregators[attribute_config.name].get_final() elif attribute_config.source == "count": ret[attribute_config.name] = len(cnvs) return ret