Source code for dae.annotation.annotation_config

from __future__ import annotations

import copy
import fnmatch
import logging
from collections.abc import Iterator, Mapping
from dataclasses import dataclass, field
from textwrap import dedent
from typing import Any, TypedDict

import yaml

from dae.genomic_resources.repository import (
    GenomicResource,
    GenomicResourceRepo,
)

logger = logging.getLogger(__name__)


[docs] class RawPreamble(TypedDict): summary: str description: str input_reference_genome: str metadata: dict[str, Any]
RawAnnotatorsConfig = list[dict[str, Any]]
[docs] class RawFullConfig(TypedDict): preamble: RawPreamble annotators: RawAnnotatorsConfig
RawPipelineConfig = RawAnnotatorsConfig | RawFullConfig
[docs] class AnnotationConfigurationError(ValueError): pass
[docs] class ParamsUsageMonitor(Mapping): """Class to monitor usage of annotator parameters.""" def __init__(self, data: dict[str, Any]): self._data = dict(data) self._used_keys: set[str] = set() def __hash__(self) -> int: return hash(tuple(sorted(self._data.items()))) def __getitem__(self, key: str) -> Any: self._used_keys.add(key) return self._data[key] def __len__(self) -> int: return len(self._data) def __iter__(self) -> Iterator: raise ValueError("Should not iterate a parameter dictionary.") def __repr__(self) -> str: return self._data.__repr__() def __eq__(self, other: Any) -> bool: if not isinstance(other, ParamsUsageMonitor): return False return self._data == other._data
[docs] def get_used_keys(self) -> set[str]: return self._used_keys
[docs] def get_unused_keys(self) -> set[str]: return set(self._data.keys()) - self._used_keys
[docs] @dataclass(init=False, eq=True, unsafe_hash=True) class AttributeInfo: """Defines annotation attribute configuration.""" def __init__(self, name: str, source: str, *, internal: bool, parameters: ParamsUsageMonitor | dict[str, Any], _type: str = "str", description: str = "", documentation: str | None = None): self.name = name self.source = source self.internal = internal if isinstance(parameters, ParamsUsageMonitor): self.parameters = parameters else: self.parameters = ParamsUsageMonitor(parameters) self.type = _type self.description = description self._documentation = documentation name: str source: str internal: bool parameters: ParamsUsageMonitor type: str = "str" # str, int, float, annotatable, or object description: str = "" # interpreted as md _documentation: str | None = None @property def documentation(self) -> str: if self._documentation is None: return self.description return self._documentation
[docs] @staticmethod def create( source: str, name: str | None = None, *, internal: bool = False, ) -> AttributeInfo: """Create an AttributeInfo instance.""" if name is None: name = source return AttributeInfo( name, source, internal=internal, parameters={}, )
[docs] @dataclass(init=False) class AnnotatorInfo: """Defines annotator configuration.""" def __init__(self, _type: str, attributes: list[AttributeInfo], parameters: ParamsUsageMonitor | dict[str, Any], documentation: str = "", resources: list[GenomicResource] | None = None, annotator_id: str = "N/A"): self.type = _type self.annotator_id = f"{annotator_id}" self.attributes = attributes self.documentation = documentation if isinstance(parameters, ParamsUsageMonitor): self.parameters = parameters else: self.parameters = ParamsUsageMonitor(parameters) if resources is None: self.resources = [] else: self.resources = resources annotator_id: str = field(compare=False, hash=None) type: str attributes: list[AttributeInfo] parameters: ParamsUsageMonitor documentation: str = "" resources: list[GenomicResource] = field(default_factory=list) def __hash__(self) -> int: attrs_hash = "".join(str(hash(attr)) for attr in self.attributes) resources_hash = "".join(str(hash(res)) for res in self.resources) params_hash = "".join(str(hash(self.parameters))) return hash(f"{self.type}{attrs_hash}{resources_hash}{params_hash}")
[docs] @dataclass class AnnotationPreamble: summary: str description: str input_reference_genome: str input_reference_genome_res: GenomicResource | None metadata: dict[str, Any]
[docs] class AnnotationConfigParser: """Parser for annotation configuration."""
[docs] @staticmethod def match_labels_query( query: dict[str, str], resource_labels: dict[str, str], ) -> bool: """Check if the labels query for a wildcard matches.""" for k, v in query.items(): if k not in resource_labels \ or not fnmatch.fnmatch(resource_labels[k], v): return False return True
[docs] @staticmethod def query_resources( annotator_type: str, wildcard: str, grr: GenomicResourceRepo, ) -> list[str]: """Collect resources matching a given query.""" labels_query: dict[str, str] = {} if wildcard.endswith("]"): assert "[" in wildcard wildcard, raw_labels = wildcard.split("[") labels = raw_labels.strip("]").split(" and ") for label in labels: k, v = label.split("=") labels_query[k] = v def match(resource: GenomicResource) -> bool: return ( resource.get_type() == annotator_type and fnmatch.fnmatch(resource.get_id(), wildcard) and AnnotationConfigParser.match_labels_query( labels_query, resource.get_labels())) return [resource.get_id() for resource in grr.get_all_resources() if match(resource)]
[docs] @staticmethod def has_wildcard(string: str) -> bool: """Ascertain whether a string contains a valid wildcard.""" # Check if at least one wildcard symbol is present # in the resource id itself, since '*' can also be used # in the label query as well (within square bracket) return "*" in string \ and ("[" not in string or string.index("*") < string.index("["))
[docs] @staticmethod def parse_minimal(raw: str, idx: int) -> AnnotatorInfo: """Parse a minimal-form annotation config.""" return AnnotatorInfo(raw, [], {}, annotator_id=f"A{idx}")
[docs] @staticmethod def parse_short( raw: dict[str, Any], idx: int, grr: GenomicResourceRepo | None = None, ) -> list[AnnotatorInfo]: """Parse a short-form annotation config.""" ann_type, ann_details = next(iter(raw.items())) if AnnotationConfigParser.has_wildcard(ann_details): assert grr is not None matching_resources = AnnotationConfigParser.query_resources( ann_type, ann_details, grr, ) return [ AnnotatorInfo( ann_type, [], {"resource_id": resource}, annotator_id=f"A{idx}_{resource}", ) for resource in matching_resources ] return [ AnnotatorInfo( ann_type, [], {"resource_id": ann_details}, annotator_id=f"A{idx}", ), ]
[docs] @staticmethod def parse_complete( raw: dict[str, Any], idx: int, grr: GenomicResourceRepo | None = None, ) -> list[AnnotatorInfo]: """Parse a full-form annotation config.""" ann_type, ann_details = next(iter(raw.items())) attributes = [] if "attributes" in ann_details: attributes = AnnotationConfigParser.parse_raw_attributes( ann_details["attributes"], ) parameters = { k: v for k, v in ann_details.items() if k != "attributes"} if "resource_id" in parameters \ and AnnotationConfigParser.has_wildcard(parameters["resource_id"]): assert grr is not None matching_resources = AnnotationConfigParser.query_resources( ann_type, parameters.pop("resource_id"), grr, ) return [ AnnotatorInfo(ann_type, attributes, {"resource_id": resource, **parameters}, annotator_id=f"A{idx}_{resource}") for resource in matching_resources ] return [AnnotatorInfo( ann_type, attributes, parameters, annotator_id=f"A{idx}")]
@staticmethod def _parse_preamble( raw: RawPreamble, grr: GenomicResourceRepo | None = None, ) -> AnnotationPreamble | None: """Parse the preamble section of a pipeline config, if present.""" if not set(raw.keys()) <= { "summary", "description", "input_reference_genome", "metadata", }: raise AnnotationConfigurationError("Invalid preamble keys") if not isinstance(raw.get("summary", ""), str): raise TypeError("preamble summary must be a string!") if not isinstance(raw.get("description", ""), str): raise TypeError("preamble description must be a string!") if not isinstance(raw.get("input_reference_genome", ""), str): raise TypeError("preamble reference genome id must be a string!") if not isinstance(raw.get("metadata", {}), dict): raise TypeError("preamble metadata must be a dictionary!") genome_id = raw.get("input_reference_genome", "") genome = None if genome_id != "" and grr is not None: genome = grr.get_resource(genome_id) return AnnotationPreamble( raw.get("summary", ""), raw.get("description", ""), genome_id, genome, raw.get("metadata", {}), )
[docs] @staticmethod def parse_raw( pipeline_raw_config: RawPipelineConfig | None, grr: GenomicResourceRepo | None = None, ) -> tuple[AnnotationPreamble | None, list[AnnotatorInfo]]: """Parse raw dictionary annotation pipeline configuration.""" if pipeline_raw_config is None: logger.warning("empty annotation pipeline configuration") return None, [] if isinstance(pipeline_raw_config, dict): annotators = pipeline_raw_config["annotators"] preamble = AnnotationConfigParser._parse_preamble( pipeline_raw_config["preamble"], grr, ) elif isinstance(pipeline_raw_config, list): annotators = pipeline_raw_config preamble = None else: raise AnnotationConfigurationError result = [] for idx, raw_cfg in enumerate(annotators): if isinstance(raw_cfg, str): # the minimal annotator configuration form result.append( AnnotationConfigParser.parse_minimal(raw_cfg, idx), ) continue if isinstance(raw_cfg, dict): ann_details = next(iter(raw_cfg.values())) if isinstance(ann_details, str): # the short annotator configuation form result.extend(AnnotationConfigParser.parse_short( raw_cfg, idx, grr, )) continue if isinstance(ann_details, dict): # the complete annotator configuration form result.extend(AnnotationConfigParser.parse_complete( raw_cfg, idx, grr, )) continue raise AnnotationConfigurationError(dedent(f""" Incorrect annotator configuation form: {raw_cfg}. The allowed forms are: * minimal - <annotator type> * short - <annotator type>: <resource_id_pattern> * complete without attributes - <annotator type>: <param1>: <value1> ... * complete with attributes - <annotator type>: <param1>: <value1> ... attributes: - <att1 config> .... """)) return preamble, result
[docs] @staticmethod def parse_str( content: str, source_file_name: str | None = None, grr: GenomicResourceRepo | None = None, ) -> tuple[AnnotationPreamble | None, list[AnnotatorInfo]]: """Parse annotation pipeline configuration string.""" try: pipeline_raw_config = yaml.safe_load(content) except yaml.YAMLError as error: if source_file_name is None: raise AnnotationConfigurationError( f"The pipeline configuration {content} is an invalid yaml " "string.", error) from error raise AnnotationConfigurationError( f"The pipeline configuration file {source_file_name} is " "an invalid yaml file.", error) from error return AnnotationConfigParser.parse_raw(pipeline_raw_config, grr=grr)
[docs] @staticmethod def parse_raw_attribute_config( raw_attribute_config: dict[str, Any]) -> AttributeInfo: """Parse annotation attribute raw configuration.""" attribute_config = copy.deepcopy(raw_attribute_config) if "destination" in attribute_config: logger.warning( "usage of 'destination' in annotators attribute configuration " "is deprecated; use 'name' instead") name = attribute_config.get("destination") attribute_config.pop("destination") attribute_config["name"] = name name = attribute_config.get("name") source = attribute_config.get("source") if name is None and source is None: raise ValueError(f"The raw attribute configuraion " f"{attribute_config} has neigther " "name nor source.") name = name or source source = source or name internal = bool(attribute_config.get("internal", False)) assert source is not None if not isinstance(name, str): message = ("The name for in an attribute " f"config {attribute_config} should be a string") raise TypeError(message) parameters = {k: v for k, v in attribute_config.items() if k not in ["name", "source", "internal"]} return AttributeInfo( name, source, internal=internal, parameters=parameters)
[docs] @staticmethod def parse_raw_attributes( raw_attributes_config: Any) -> list[AttributeInfo]: """Parse annotator pipeline attribute configuration.""" if not isinstance(raw_attributes_config, list): message = "The attributes parameters should be a list." raise TypeError(message) attribute_config = [] for raw_attribute_config in raw_attributes_config: if isinstance(raw_attribute_config, str): raw_attribute_config = {"name": raw_attribute_config} attribute_config.append( AnnotationConfigParser.parse_raw_attribute_config( raw_attribute_config)) return attribute_config