import copy
import fnmatch
import logging
from collections.abc import Iterator, Mapping
from dataclasses import dataclass, field
from textwrap import dedent
from typing import Any, TypedDict
import yaml
from dae.genomic_resources.repository import (
GenomicResource,
GenomicResourceRepo,
)
logger = logging.getLogger(__name__)
[docs]
class RawPreamble(TypedDict):
summary: str
description: str
input_reference_genome: str
metadata: dict[str, Any]
RawAnnotatorsConfig = list[dict[str, Any]]
[docs]
class RawFullConfig(TypedDict):
preamble: RawPreamble
annotators: RawAnnotatorsConfig
RawPipelineConfig = RawAnnotatorsConfig | RawFullConfig
[docs]
class AnnotationConfigurationError(ValueError):
pass
[docs]
class ParamsUsageMonitor(Mapping):
"""Class to monitor usage of annotator parameters."""
def __init__(self, data: dict[str, Any]):
self._data = dict(data)
self._used_keys: set[str] = set()
def __hash__(self) -> int:
return hash(tuple(sorted(self._data.items())))
def __getitem__(self, key: str) -> Any:
self._used_keys.add(key)
return self._data[key]
def __len__(self) -> int:
return len(self._data)
def __iter__(self) -> Iterator:
raise ValueError("Should not iterate a parameter dictionary.")
def __repr__(self) -> str:
return self._data.__repr__()
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ParamsUsageMonitor):
return False
return self._data == other._data
[docs]
def get_used_keys(self) -> set[str]:
return self._used_keys
[docs]
def get_unused_keys(self) -> set[str]:
return set(self._data.keys()) - self._used_keys
[docs]
@dataclass(init=False, eq=True, unsafe_hash=True)
class AttributeInfo:
"""Defines annotation attribute configuration."""
def __init__(self, name: str, source: str, internal: bool,
parameters: ParamsUsageMonitor | dict[str, Any],
_type: str = "str", description: str = "",
documentation: str | None = None):
self.name = name
self.source = source
self.internal = internal
if isinstance(parameters, ParamsUsageMonitor):
self.parameters = parameters
else:
self.parameters = ParamsUsageMonitor(parameters)
self.type = _type
self.description = description
self._documentation = documentation
name: str
source: str
internal: bool
parameters: ParamsUsageMonitor
type: str = "str" # str, int, float, annotatable, or object
description: str = "" # interpreted as md
_documentation: str | None = None
@property
def documentation(self) -> str:
if self._documentation is None:
return self.description
return self._documentation
[docs]
@dataclass(init=False)
class AnnotatorInfo:
"""Defines annotator configuration."""
def __init__(self, _type: str, attributes: list[AttributeInfo],
parameters: ParamsUsageMonitor | dict[str, Any],
documentation: str = "",
resources: list[GenomicResource] | None = None,
annotator_id: str = "N/A"):
self.type = _type
self.annotator_id = f"{annotator_id}"
self.attributes = attributes
self.documentation = documentation
if isinstance(parameters, ParamsUsageMonitor):
self.parameters = parameters
else:
self.parameters = ParamsUsageMonitor(parameters)
if resources is None:
self.resources = []
else:
self.resources = resources
annotator_id: str = field(compare=False, hash=None)
type: str
attributes: list[AttributeInfo]
parameters: ParamsUsageMonitor
documentation: str = ""
resources: list[GenomicResource] = field(default_factory=list)
def __hash__(self) -> int:
attrs_hash = "".join(str(hash(attr)) for attr in self.attributes)
resources_hash = "".join(str(hash(res)) for res in self.resources)
params_hash = "".join(str(hash(self.parameters)))
return hash(f"{self.type}{attrs_hash}{resources_hash}{params_hash}")
[docs]
@dataclass
class AnnotationPreamble:
summary: str
description: str
input_reference_genome: str
input_reference_genome_res: GenomicResource | None
metadata: dict[str, Any]
[docs]
class AnnotationConfigParser:
"""Parser for annotation configuration."""
[docs]
@staticmethod
def match_labels_query(
query: dict[str, str], resource_labels: dict[str, str],
) -> bool:
"""Check if the labels query for a wildcard matches."""
for k, v in query.items():
if k not in resource_labels \
or not fnmatch.fnmatch(resource_labels[k], v):
return False
return True
[docs]
@staticmethod
def query_resources(
annotator_type: str, wildcard: str, grr: GenomicResourceRepo,
) -> list[str]:
"""Collect resources matching a given query."""
labels_query: dict[str, str] = {}
if wildcard.endswith("]"):
assert "[" in wildcard
wildcard, raw_labels = wildcard.split("[")
labels = raw_labels.strip("]").split(" and ")
for label in labels:
k, v = label.split("=")
labels_query[k] = v
def match(resource: GenomicResource) -> bool:
return (resource.get_type() == annotator_type
and fnmatch.fnmatch(resource.get_id(), wildcard)
and AnnotationConfigParser.match_labels_query(labels_query,
resource.get_labels()))
return [resource.get_id()
for resource in grr.get_all_resources()
if match(resource)]
[docs]
@staticmethod
def has_wildcard(string: str) -> bool:
"""Ascertain whether a string contains a valid wildcard."""
# Check if at least one wildcard symbol is present
# in the resource id itself, since '*' can also be used
# in the label query as well (within square bracket)
return "*" in string \
and ("[" not in string or string.index("*") < string.index("["))
[docs]
@staticmethod
def parse_minimal(raw: str, idx: int) -> AnnotatorInfo:
"""Parse a minimal-form annotation config."""
return AnnotatorInfo(raw, [], {}, annotator_id=f"A{idx}")
[docs]
@staticmethod
def parse_short(
raw: dict[str, Any], idx: int,
grr: GenomicResourceRepo | None = None,
) -> list[AnnotatorInfo]:
"""Parse a short-form annotation config."""
ann_type, ann_details = next(iter(raw.items()))
if AnnotationConfigParser.has_wildcard(ann_details):
assert grr is not None
matching_resources = AnnotationConfigParser.query_resources(
ann_type, ann_details, grr,
)
return [
AnnotatorInfo(
ann_type, [], {"resource_id": resource},
annotator_id=f"A{idx}_{resource}",
)
for resource in matching_resources
]
return [
AnnotatorInfo(
ann_type, [], {"resource_id": ann_details},
annotator_id=f"A{idx}",
),
]
[docs]
@staticmethod
def parse_complete(
raw: dict[str, Any], idx: int,
grr: GenomicResourceRepo | None = None,
) -> list[AnnotatorInfo]:
"""Parse a full-form annotation config."""
ann_type, ann_details = next(iter(raw.items()))
attributes = []
if "attributes" in ann_details:
attributes = AnnotationConfigParser.parse_raw_attributes(
ann_details["attributes"],
)
parameters = {k: v for k, v in ann_details.items() if k != "attributes"}
if "resource_id" in parameters \
and AnnotationConfigParser.has_wildcard(parameters["resource_id"]):
assert grr is not None
matching_resources = AnnotationConfigParser.query_resources(
ann_type, parameters.pop("resource_id"), grr,
)
return [
AnnotatorInfo(ann_type, attributes,
{"resource_id": resource, **parameters},
annotator_id=f"A{idx}_{resource}")
for resource in matching_resources
]
return [AnnotatorInfo(
ann_type, attributes, parameters, annotator_id=f"A{idx}")]
@staticmethod
def _parse_preamble(
raw: RawPreamble,
grr: GenomicResourceRepo | None = None,
) -> AnnotationPreamble | None:
"""Parse the preamble section of a pipeline config, if present."""
if not set(raw.keys()) <= {
"summary", "description", "input_reference_genome", "metadata",
}:
raise AnnotationConfigurationError
if not isinstance(raw.get("summary", ""), str):
raise TypeError("preamble summary must be a string!")
if not isinstance(raw.get("description", ""), str):
raise TypeError("preamble description must be a string!")
if not isinstance(raw.get("input_reference_genome", ""), str):
raise TypeError("preamble reference genome id must be a string!")
if not isinstance(raw.get("metadata", {}), dict):
raise TypeError("preamble metadata must be a dictionary!")
genome_id = raw.get("input_reference_genome", "")
genome = None
if genome_id != "" and grr is not None:
genome = grr.get_resource(genome_id)
return AnnotationPreamble(
raw.get("summary", ""),
raw.get("description", ""),
genome_id,
genome,
raw.get("metadata", {}),
)
[docs]
@staticmethod
def parse_raw(
pipeline_raw_config: RawPipelineConfig | None,
grr: GenomicResourceRepo | None = None,
) -> tuple[AnnotationPreamble | None, list[AnnotatorInfo]]:
"""Parse raw dictionary annotation pipeline configuration."""
if pipeline_raw_config is None:
logger.warning("empty annotation pipeline configuration")
return None, []
if isinstance(pipeline_raw_config, dict):
annotators = pipeline_raw_config["annotators"]
preamble = AnnotationConfigParser._parse_preamble(
pipeline_raw_config["preamble"], grr,
)
elif isinstance(pipeline_raw_config, list):
annotators = pipeline_raw_config
preamble = None
else:
raise AnnotationConfigurationError
result = []
for idx, raw_cfg in enumerate(annotators):
if isinstance(raw_cfg, str):
# the minimal annotator configuration form
result.append(
AnnotationConfigParser.parse_minimal(raw_cfg, idx),
)
continue
if isinstance(raw_cfg, dict):
ann_details = next(iter(raw_cfg.values()))
if isinstance(ann_details, str):
# the short annotator configuation form
result.extend(AnnotationConfigParser.parse_short(
raw_cfg, idx, grr,
))
continue
if isinstance(ann_details, dict):
# the complete annotator configuration form
result.extend(AnnotationConfigParser.parse_complete(
raw_cfg, idx, grr,
))
continue
raise AnnotationConfigurationError(dedent(f"""
Incorrect annotator configuation form: {raw_cfg}.
The allowed forms are:
* minimal
- <annotator type>
* short
- <annotator type>: <resource_id_pattern>
* complete without attributes
- <annotator type>:
<param1>: <value1>
...
* complete with attributes
- <annotator type>:
<param1>: <value1>
...
attributes:
- <att1 config>
....
"""))
return preamble, result
[docs]
@staticmethod
def parse_str(
content: str, source_file_name: str | None = None,
grr: GenomicResourceRepo | None = None,
) -> tuple[AnnotationPreamble | None, list[AnnotatorInfo]]:
"""Parse annotation pipeline configuration string."""
try:
pipeline_raw_config = yaml.safe_load(content)
except yaml.YAMLError as error:
if source_file_name is None:
raise AnnotationConfigurationError(
f"The pipeline configuration {content} is an invalid yaml "
"string.", error) from error
raise AnnotationConfigurationError(
f"The pipeline configuration file {source_file_name} is "
"an invalid yaml file.", error) from error
return AnnotationConfigParser.parse_raw(pipeline_raw_config, grr=grr)
[docs]
@staticmethod
def parse_raw_attribute_config(
raw_attribute_config: dict[str, Any]) -> AttributeInfo:
"""Parse annotation attribute raw configuration."""
attribute_config = copy.deepcopy(raw_attribute_config)
if "destination" in attribute_config:
logger.warning(
"usage of 'destination' in annotators attribute configuration "
"is deprecated; use 'name' instead")
name = attribute_config.get("destination")
attribute_config.pop("destination")
attribute_config["name"] = name
name = attribute_config.get("name")
source = attribute_config.get("source")
if name is None and source is None:
raise ValueError(f"The raw attribute configuraion "
f"{attribute_config} has neigther "
"name nor source.")
name = name or source
source = source or name
internal = bool(attribute_config.get("internal", False))
assert source is not None
if not isinstance(name, str):
message = ("The name for in an attribute "
f"config {attribute_config} should be a string")
raise TypeError(message)
parameters = {k: v for k, v in attribute_config.items()
if k not in ["name", "source", "internal"]}
return AttributeInfo(name, source, internal, parameters)
[docs]
@staticmethod
def parse_raw_attributes(
raw_attributes_config: Any) -> list[AttributeInfo]:
"""Parse annotator pipeline attribute configuration."""
if not isinstance(raw_attributes_config, list):
message = "The attributes parameters should be a list."
raise TypeError(message)
attribute_config = []
for raw_attribute_config in raw_attributes_config:
if isinstance(raw_attribute_config, str):
raw_attribute_config = {"name": raw_attribute_config}
attribute_config.append(
AnnotationConfigParser.parse_raw_attribute_config(
raw_attribute_config))
return attribute_config