Source code for dae.annotation.annotation_config

from __future__ import annotations

import copy
import fnmatch
import logging
import textwrap
from collections.abc import Callable, Iterator, Mapping
from dataclasses import dataclass, field
from textwrap import dedent
from typing import Any, TypedDict

import yaml
from lark import Lark, Token, Tree

from dae.genomic_resources.repository import (
    GenomicResource,
    GenomicResourceRepo,
)

logger = logging.getLogger(__name__)


[docs] class RawPreamble(TypedDict): summary: str description: str input_reference_genome: str metadata: dict[str, Any]
RawAnnotatorsConfig = list[dict[str, Any]]
[docs] class RawFullConfig(TypedDict): preamble: RawPreamble annotators: RawAnnotatorsConfig
RawPipelineConfig = RawAnnotatorsConfig | RawFullConfig
[docs] @dataclass class ErrorMark: """Marks an error position in a file.""" row: int column: int
[docs] class AnnotationConfigurationError(Exception): """Exception raised for errors in the annotation configuration.""" error_mark: ErrorMark | None message: str | None def __init__( self, message: str | None, other_error: Exception | None = None, error_mark: ErrorMark | None = None, ): super().__init__(message) self.message = message self.other_error = other_error self.error_mark = error_mark def __str__(self) -> str: message = self.message if self.other_error is not None: message = f"{message} {self.other_error}" mark = None if self.error_mark is not None: mark = ( f"At line {self.error_mark.row}, " f"column {self.error_mark.column}." ) result = "" if message is not None and mark is not None: result = f"{message}: {mark}" elif message is not None: result = message elif mark is not None: result = mark return result
[docs] class ParamsUsageMonitor(Mapping): """Class to monitor usage of annotator parameters.""" def __init__(self, data: dict[str, Any]): self._data = dict(data) self._used_keys: set[str] = set() def __hash__(self) -> int: return hash(tuple(sorted(self._data.items()))) def __getitem__(self, key: str) -> Any: self._used_keys.add(key) return self._data[key] def __len__(self) -> int: return len(self._data) def __iter__(self) -> Iterator: raise ValueError("Should not iterate a parameter dictionary.") def __repr__(self) -> str: return self._data.__repr__() def __eq__(self, other: Any) -> bool: if not isinstance(other, ParamsUsageMonitor): return False return self._data == other._data
[docs] def get_used_keys(self) -> set[str]: return self._used_keys
[docs] def get_unused_keys(self) -> set[str]: return set(self._data.keys()) - self._used_keys
[docs] @dataclass(init=False, eq=True, unsafe_hash=True) class AttributeInfo: """Defines annotation attribute configuration.""" def __init__(self, name: str, source: str, *, internal: bool | None, parameters: ParamsUsageMonitor | dict[str, Any], _type: str = "str", description: str = "", documentation: str | None = None): self.name = name self.source = source self.internal = internal if isinstance(parameters, ParamsUsageMonitor): self.parameters = parameters else: self.parameters = ParamsUsageMonitor(parameters) self.type = _type self.description = description self._documentation = documentation name: str source: str internal: bool | None parameters: ParamsUsageMonitor type: str = "str" # str, int, float, annotatable, or object description: str = "" # interpreted as md _documentation: str | None = None @property def documentation(self) -> str: if self._documentation is None: return self.description return self._documentation
[docs] @staticmethod def create( source: str, name: str | None = None, *, internal: bool = False, ) -> AttributeInfo: """Create an AttributeInfo instance.""" if name is None: name = source return AttributeInfo( name, source, internal=internal, parameters={}, )
[docs] @dataclass(init=False) class AnnotatorInfo: """Defines annotator configuration.""" def __init__(self, _type: str, attributes: list[AttributeInfo], parameters: ParamsUsageMonitor | dict[str, Any], documentation: str = "", resources: list[GenomicResource] | None = None, annotator_id: str = "N/A"): self.type = _type self.annotator_id = f"{annotator_id}" self.attributes = attributes self.documentation = documentation if isinstance(parameters, ParamsUsageMonitor): self.parameters = parameters else: self.parameters = ParamsUsageMonitor(parameters) if resources is None: self.resources = [] else: self.resources = resources annotator_id: str = field(compare=False, hash=None) type: str attributes: list[AttributeInfo] parameters: ParamsUsageMonitor documentation: str = "" resources: list[GenomicResource] = field(default_factory=list) def __hash__(self) -> int: attrs_hash = "".join(str(hash(attr)) for attr in self.attributes) resources_hash = "".join(str(hash(res)) for res in self.resources) params_hash = "".join(str(hash(self.parameters))) return hash(f"{self.type}{attrs_hash}{resources_hash}{params_hash}")
[docs] def to_dict(self) -> dict[str, Any]: """Convert annotator info to a configuration dictionary.""" result = { **self.parameters._data, # noqa: SLF001 "attributes": [ { "name": attr.name, "source": attr.source, "internal": attr.internal, "type": attr.type, } for attr in self.attributes ], } return { self.type: result, }
[docs] @dataclass class AnnotationPreamble: summary: str description: str input_reference_genome: str input_reference_genome_res: GenomicResource | None metadata: dict[str, Any]
[docs] class AnnotationConfigParser: """Parser for annotation configuration.""" WILDCARD_LIMIT = 500 ANNOTATION_CONFIG_GRAMMAR = textwrap.dedent(""" ?start: resource_id [filter] ?resource_id: (resource_name | wildcard) wildcard: /[\\w\\d\\/_*]+/ filter: "[" (equals | and_)+ "]" and_: operation "and" operation equals: (name"=\\""value"\\"") | (name"='"value"'") in: ("\\""value"\\"" " in " name) | ("'"value"'" " in " name) resource_name: /[\\w\\d\\/_\\-!@#$%^<>+]+/ ?name: /[\\w\\d\\/_\\-!@#$%^<>+*]+/ ?value: /[\\w\\d\\/ _\\-!@#$%^<>+*]+/ ?operation: equals | in | and_ %ignore " " """)
[docs] @staticmethod def match_labels_query( query: dict[str, Callable[[str], bool]], resource_labels: dict[str, str], ) -> bool: """Check if the labels query for a wildcard matches.""" for k, v in query.items(): if k not in resource_labels: return False if not v(resource_labels[k]): return False return True
[docs] @staticmethod def build_labels_query( node: Any, labels_query: dict[str, Any] | None = None, ) -> dict[str, Callable[[str], bool]]: """Build labels query from parsed tree node.""" if labels_query is None: labels_query = {} for child in node.children: if child.data.value == "equals": key = child.children[0].value value = child.children[1].value labels_query[key] = \ lambda x, v=value: x == v or fnmatch.fnmatch(x, v) elif child.data.value == "in": key = child.children[0].value value = child.children[1].value labels_query[key] = lambda x, v=value: v in x elif child.data.value == "and_": AnnotationConfigParser.build_labels_query( child, labels_query) else: raise AnnotationConfigurationError( f"Unsupported label query operation: {child.data}", ) return labels_query
[docs] @staticmethod def query_resources( annotator_type: str, resource_id: str, grr: GenomicResourceRepo, ) -> list[str]: """Collect resources matching a given query.""" labels_query: dict[str, Callable[[str], bool]] = {} config_parser = Lark(AnnotationConfigParser.ANNOTATION_CONFIG_GRAMMAR) tree = config_parser.parse(resource_id) assert len(tree.children) == 2 resource_id_node = tree.children[0] assert isinstance(resource_id_node, Tree) assert isinstance(resource_id_node.children[0], Token) parsed_id = resource_id_node.children[0].value if tree.children[1] is not None: labels_query = AnnotationConfigParser.build_labels_query( tree.children[1]) def match(resource: GenomicResource) -> bool: return ( resource.get_type() == annotator_type and fnmatch.fnmatch(resource.get_id(), parsed_id) and AnnotationConfigParser.match_labels_query( labels_query, resource.get_labels())) selected_resources: set[str] = set() result: list[str] = [] for resource in grr.get_all_resources(): if resource.get_id() in selected_resources: continue if match(resource): selected_resources.add(resource.resource_id) result.append(resource.resource_id) if len(result) > AnnotationConfigParser.WILDCARD_LIMIT: raise AnnotationConfigurationError( f"Too many resources ({len(result)}/" f"{AnnotationConfigParser.WILDCARD_LIMIT}) " "match the wildcard " f"'{parsed_id}' for annotator '{annotator_type}'.", ) if len(result) == 0: raise AnnotationConfigurationError( f"No resources match the wildcard '{parsed_id}' " f"for annotator type '{annotator_type}'.", ) return result
[docs] @staticmethod def has_wildcard(string: str) -> bool: """Ascertain whether a string contains a valid wildcard.""" # Check if at least one wildcard symbol is present # in the resource id itself, since '*' can also be used # in the label query as well (within square bracket) return "*" in string \ and ("[" not in string or string.index("*") < string.index("["))
[docs] @staticmethod def parse_minimal(raw: str, idx: int) -> AnnotatorInfo: """Parse a minimal-form annotation config.""" return AnnotatorInfo(raw, [], {}, annotator_id=f"A{idx}")
[docs] @staticmethod def parse_short( raw: dict[str, Any], idx: int, grr: GenomicResourceRepo | None = None, ) -> list[AnnotatorInfo]: """Parse a short-form annotation config.""" ann_type, ann_details = next(iter(raw.items())) if AnnotationConfigParser.has_wildcard(ann_details): assert grr is not None matching_resources = AnnotationConfigParser.query_resources( ann_type, ann_details, grr, ) return [ AnnotatorInfo( ann_type, [], {"resource_id": resource}, annotator_id=f"A{idx}_{resource}", ) for resource in matching_resources ] return [ AnnotatorInfo( ann_type, [], {"resource_id": ann_details}, annotator_id=f"A{idx}", ), ]
[docs] @staticmethod def parse_complete( raw: dict[str, Any], idx: int, grr: GenomicResourceRepo | None = None, ) -> list[AnnotatorInfo]: """Parse a full-form annotation config.""" ann_type, ann_details = next(iter(raw.items())) attributes = [] if "attributes" in ann_details: attributes = AnnotationConfigParser.parse_raw_attributes( ann_details["attributes"], ) parameters = { k: v for k, v in ann_details.items() if k != "attributes"} if "resource_id" in parameters \ and AnnotationConfigParser.has_wildcard(parameters["resource_id"]): assert grr is not None matching_resources = AnnotationConfigParser.query_resources( ann_type, parameters.pop("resource_id"), grr, ) return [ AnnotatorInfo(ann_type, attributes, {"resource_id": resource, **parameters}, annotator_id=f"A{idx}_{resource}") for resource in matching_resources ] return [AnnotatorInfo( ann_type, attributes, parameters, annotator_id=f"A{idx}")]
@staticmethod def _parse_preamble( raw: RawPreamble, grr: GenomicResourceRepo | None = None, ) -> AnnotationPreamble | None: """Parse the preamble section of a pipeline config, if present.""" if not set(raw.keys()) <= { "summary", "description", "input_reference_genome", "metadata", }: raise AnnotationConfigurationError("Invalid preamble keys") if not isinstance(raw.get("summary", ""), str): raise TypeError("preamble summary must be a string!") if not isinstance(raw.get("description", ""), str): raise TypeError("preamble description must be a string!") if not isinstance(raw.get("input_reference_genome", ""), str): raise TypeError("preamble reference genome id must be a string!") if not isinstance(raw.get("metadata", {}), dict): raise TypeError("preamble metadata must be a dictionary!") genome_id = raw.get("input_reference_genome", "") genome = None if genome_id != "" and grr is not None: genome = grr.get_resource(genome_id) return AnnotationPreamble( raw.get("summary", ""), raw.get("description", ""), genome_id, genome, raw.get("metadata", {}), )
[docs] @staticmethod def parse_raw( pipeline_raw_config: RawPipelineConfig | None, grr: GenomicResourceRepo | None = None, ) -> tuple[AnnotationPreamble | None, list[AnnotatorInfo]]: """Parse raw dictionary annotation pipeline configuration.""" if pipeline_raw_config is None: logger.warning("empty annotation pipeline configuration") return None, [] if isinstance(pipeline_raw_config, dict): annotators = pipeline_raw_config["annotators"] preamble = AnnotationConfigParser._parse_preamble( pipeline_raw_config["preamble"], grr, ) elif isinstance(pipeline_raw_config, list): annotators = pipeline_raw_config preamble = None else: raise AnnotationConfigurationError( "Raw pipeline configuration is not a list or dict.", ) result = [] for idx, raw_cfg in enumerate(annotators): if isinstance(raw_cfg, str): # the minimal annotator configuration form result.append( AnnotationConfigParser.parse_minimal(raw_cfg, idx), ) continue if isinstance(raw_cfg, dict): ann_details = next(iter(raw_cfg.values())) if isinstance(ann_details, str): # the short annotator configuation form result.extend(AnnotationConfigParser.parse_short( raw_cfg, idx, grr, )) continue if isinstance(ann_details, dict): # the complete annotator configuration form result.extend(AnnotationConfigParser.parse_complete( raw_cfg, idx, grr, )) continue raise AnnotationConfigurationError(dedent(f""" Incorrect annotator configuation form: {raw_cfg}. The allowed forms are: * minimal - <annotator type> * short - <annotator type>: <resource_id_pattern> * complete without attributes - <annotator type>: <param1>: <value1> ... * complete with attributes - <annotator type>: <param1>: <value1> ... attributes: - <att1 config> .... """)) return preamble, result
[docs] @staticmethod def parse_str( content: str, source_file_name: str | None = None, grr: GenomicResourceRepo | None = None, ) -> tuple[AnnotationPreamble | None, list[AnnotatorInfo]]: """Parse annotation pipeline configuration string.""" try: pipeline_raw_config = yaml.safe_load(content) except yaml.MarkedYAMLError as error: error_mark = None if error.problem_mark is not None: error_mark = ErrorMark( error.problem_mark.line + 1, error.problem_mark.column + 1, ) if source_file_name is None: raise AnnotationConfigurationError( f"The pipeline configuration {content} " f"is an invalid yaml string.", error_mark=error_mark, ) from error raise AnnotationConfigurationError( f"The pipeline configuration file {source_file_name} " f"is an invalid yaml file.", error_mark=error_mark, ) from error if pipeline_raw_config is None or len(pipeline_raw_config) == 0: raise AnnotationConfigurationError( "The annotation pipeline configuration is empty.", ) return AnnotationConfigParser.parse_raw(pipeline_raw_config, grr=grr)
[docs] @staticmethod def parse_raw_attribute_config( raw_attribute_config: dict[str, Any]) -> AttributeInfo: """Parse annotation attribute raw configuration.""" attribute_config = copy.deepcopy(raw_attribute_config) if "destination" in attribute_config: logger.warning( "usage of 'destination' in annotators attribute configuration " "is deprecated; use 'name' instead") name = attribute_config.get("destination") attribute_config.pop("destination") attribute_config["name"] = name name = attribute_config.get("name") source = attribute_config.get("source") if name is None and source is None: raise ValueError(f"The raw attribute configuraion " f"{attribute_config} has neigther " "name nor source.") name = name or source source = source or name attr_type = attribute_config.get("type", "str") internal = attribute_config.get("internal") if internal is not None: internal = bool(internal) assert source is not None if not isinstance(name, str): message = ("The name for in an attribute " f"config {attribute_config} should be a string") raise TypeError(message) parameters = {k: v for k, v in attribute_config.items() if k not in ["name", "source", "internal"]} return AttributeInfo( name, source, internal=internal, parameters=parameters, _type=attr_type, )
[docs] @staticmethod def parse_raw_attributes( raw_attributes_config: Any) -> list[AttributeInfo]: """Parse annotator pipeline attribute configuration.""" if not isinstance(raw_attributes_config, list): message = "The attributes parameters should be a list." raise TypeError(message) attribute_config = [] for raw_attribute_config in raw_attributes_config: if isinstance(raw_attribute_config, str): raw_attribute_config = {"name": raw_attribute_config} attribute_config.append( AnnotationConfigParser.parse_raw_attribute_config( raw_attribute_config)) return attribute_config