"""This contains the implementation of the three score annotators.
Genomic score annotators defined are position_score_annotator,
np_score_annotator, and allele_score_annotator.
"""
import abc
import logging
import textwrap
from collections.abc import Callable
from typing import Any, cast
from lark import Lark, Token, Tree
from gain.annotation.annotatable import Annotatable, VCFAllele
from gain.annotation.annotate_utils import stringify
from gain.annotation.annotation_config import (
AnnotationConfigParser,
AnnotationConfigurationError,
AnnotatorInfo,
Attribute,
AttributeConfig,
)
from gain.annotation.annotation_pipeline import (
AnnotationPipeline,
Annotator,
AttributeSpec,
)
from gain.annotation.annotator_base import AnnotatorBase
from gain.genomic_resources.aggregators import AggregatorSource
from gain.genomic_resources.genomic_scores import (
AlleleScore,
GenomicScore,
PositionScore,
ScoreDef,
ScoreLine,
)
from gain.genomic_resources.repository import GenomicResource
from gain.templates import get_template
logger = logging.getLogger(__name__)
[docs]
def get_genomic_resource(
pipeline: AnnotationPipeline, info: AnnotatorInfo,
resource_types: set[str]) -> GenomicResource:
"""Return genomic score resource used for given genomic score annotator."""
if "resource_id" not in info.parameters:
raise ValueError(f"The {info} has not 'resource_id' parameters")
resource_id = info.parameters["resource_id"]
resource = pipeline.repository.get_resource(resource_id)
if resource.get_type() not in resource_types:
raise ValueError(
f"The {info} requires 'resource_id' to point to a "
f"resource of type {resource_types}; "
f"resource of type <{resource.get_type()}> found.")
return resource
[docs]
class GenomicScoreAnnotatorBase(AnnotatorBase):
"""Genomic score base annotator."""
def __init__(self, pipeline: AnnotationPipeline, info: AnnotatorInfo,
score: GenomicScore):
self.score = score
self._resource_attr_params: dict[str, dict[str, Any]] = {}
info.resources.append(score.resource)
default_annotation = self.score.get_config().get("default_annotation")
if default_annotation is not None:
score_defs = self.score.score_definitions
parsed_defaults = [
AnnotationConfigParser.parse_raw_attribute_config(attr)
for attr in default_annotation
]
for parsed in parsed_defaults:
if parsed.source not in score_defs:
raise ValueError(
f"Default annotation attribute '{parsed.source}' is "
"not defined in the score resource!")
params = {
k: v for k, v in parsed.parameters.items()
if k != "description"
}
if parsed.aggregator is not None:
params["aggregator"] = parsed.aggregator
if params:
self._resource_attr_params[parsed.source] = params
if not info.attributes:
defaults_by_source = {p.source: p for p in parsed_defaults}
for source in score_defs:
if source not in defaults_by_source:
continue
parsed = defaults_by_source[source]
info.attributes.append(AttributeConfig(
name=parsed.name or parsed.source,
source=parsed.source,
internal=parsed.internal,
aggregator=parsed.aggregator,
))
super().__init__(pipeline, info)
self._region_length_cutoff = info.parameters.get(
"region_length_cutoff", 500_000)
self.simple_score_queries: list[str] = [
attr.source for attr in self._attributes
if attr.source in self.score.score_definitions]
[docs]
def open(self) -> Annotator:
self.score.open()
super().open()
return self
[docs]
def is_open(self) -> bool:
return self.score.is_open()
def _collect_score_queries(self) -> list:
return []
[docs]
def close(self) -> None:
self.score.close()
super().close()
[docs]
def get_attribute_specs(self) -> dict[str, AttributeSpec]:
has_default_annotation = \
self.score.get_config().get("default_annotation") is not None
return {
attr_source: AttributeSpec(
source=attr_def.score_id,
value_type=attr_def.value_type,
description=attr_def.desc,
is_default=not has_default_annotation,
internal_default=False,
)
for attr_source, attr_def in self.score.score_definitions.items()
}
[docs]
def get_attribute_defaults(
self, spec: AttributeSpec,
) -> dict[str, Any]:
return dict(self._resource_attr_params.get(spec.source, {}))
def _build_score_aggregator_documentation(
self, attr: Attribute,
aggregator: str,
attribute_conf_agg: AggregatorSource | None,
) -> str:
"""Collect score aggregator documentation."""
default_aggregators = {
"position_aggregator": {
"float": "mean",
"int": "mean",
"str": "list",
},
"allele_aggregator": {
"float": "max",
"int": "max",
"str": "list",
},
}
aggregators_score_def_att: \
dict[str, Callable[[ScoreDef], str | None]] = {
"position_aggregator":
lambda sc: sc.pos_aggregator,
"allele_aggregator":
lambda sc: sc.allele_aggregator,
}
if attribute_conf_agg is None:
score_def = self.score.get_score_definition(attr.source)
assert score_def is not None
value = aggregators_score_def_att[aggregator](
cast(ScoreDef, score_def))
if value is not None:
value_str = f"`{value}` [default]"
else:
value = default_aggregators[aggregator][score_def.value_type]
value_str = f"`{value}` [type default]"
else:
value_str = str(attribute_conf_agg)
return f"**{aggregator}**: {value_str}"
[docs]
def add_score_aggregator_documentation(
self, attr: Attribute,
aggregator: str,
attribute_conf_agg: AggregatorSource | None) -> None:
"""Collect score aggregator documentation."""
aggregator_doc = self._build_score_aggregator_documentation(
attr, aggregator, attribute_conf_agg)
attr._documentation = ( # noqa: SLF001
f"{attr.documentation}"
f"\n\n{aggregator_doc}")
[docs]
@abc.abstractmethod
def build_score_aggregator_documentation(
self, attr: Attribute,
) -> list[str]:
"""Construct score aggregator documentation."""
[docs]
def build_attribute_help(self, attr: Attribute) -> str:
"""Build attribute help."""
hist_url = self.score.get_histogram_image_public_url(attr.source)
score_def = self.score.get_score_definition(attr.source)
assert score_def is not None
histogram = get_template("score_histogram.jinja").render(
hist_url=hist_url,
score_def=score_def,
)
assert attr.spec is not None
data = {
"name": attr.name,
"description": attr.spec.description,
"resource_id": self.score.resource_id,
"resource_summary": self.score.resource.get_summary(),
"resource_url":
f"{self.score.resource.get_public_url()}/index.html",
"resource_type": self.score.resource.get_type(),
"histogram": histogram,
"source": attr.source,
"aggregators": self.build_score_aggregator_documentation(
attr,
),
"annotator_type": self.get_info().type,
"annotator_doc": self.get_info().documentation,
}
return get_template("genomic_score_help.jinja").render(data=data)
[docs]
def build_position_score_annotator(pipeline: AnnotationPipeline,
info: AnnotatorInfo) -> Annotator:
return PositionScoreAnnotator(pipeline, info)
[docs]
class PositionScoreAnnotator(GenomicScoreAnnotatorBase):
"""This class implements the position_score_annotator.
The position_score_annotator requires the resource_id parameter, whose
value must be an id of a genomic resource of type position_score.
The position_score resource provides a set of scores (see …) that the
position_score_annotator uses as attributes to assign to the annotatable.
The position_score_annotator recognizes one attribute level parameter
called aggregator that controls how the position scores are
aggregated for annotatables that refer to a region of the reference genome.
The deprecated name position_aggregator is still accepted.
"""
def __init__(self, pipeline: AnnotationPipeline, info: AnnotatorInfo):
resource = get_genomic_resource(pipeline, info, {"position_score"})
self.position_score = PositionScore(resource)
super().__init__(pipeline, info, self.position_score)
info.documentation += textwrap.dedent(f"""
Annotator to use with genomic scores depending on genomic position like
phastCons, phyloP, FitCons2, etc.
<a href="{self.BASE_DOC_URL}#position-score-annotator" target="_blank">More info</a>
""") # noqa
for attr, attr_config in zip(
self._attributes, self.get_info().attributes, strict=True,
):
self.add_score_aggregator_documentation(
attr, "position_aggregator", attr_config.aggregator)
[docs]
def get_attribute_defaults(
self, spec: AttributeSpec,
) -> dict[str, Any]:
defaults = super().get_attribute_defaults(spec)
if "aggregator" not in defaults:
score_def = self.position_score.get_score_definition(spec.source)
if score_def is not None and score_def.pos_aggregator is not None:
defaults["aggregator"] = score_def.pos_aggregator
return defaults
[docs]
def build_score_aggregator_documentation(
self, attr: Attribute,
) -> list[str]:
"""Collect score aggregator documentation."""
doc = self._build_score_aggregator_documentation(
attr, "position_aggregator", attr.aggregator)
return [doc]
def _fetch_raw_region_scores(
self, chrom: str, pos_begin: int, pos_end: int,
sources: list[str],
) -> dict[str, list[Any]]:
raw: dict[str, list[Any]] = {source: [] for source in sources}
fetch = self.position_score.fetch_region_values(
chrom, pos_begin, pos_end, sources,
)
for row_left, row_right, values in fetch:
if values is None:
continue
n = max(pos_begin, row_left), min(pos_end, row_right)
n_positions = n[1] - n[0] + 1
for i, source in enumerate(sources):
raw[source].extend([values[i]] * n_positions)
return raw
def _do_annotate(
self, annotatable: Annotatable,
context: dict[str, Any], # noqa: ARG002
) -> dict[str, Any]:
if annotatable.chromosome not in self.score.get_all_chromosomes():
return self._empty_result()
sources = list(dict.fromkeys(
attr.source for attr in self._attributes))
if annotatable.type == Annotatable.Type.SUBSTITUTION:
assert isinstance(annotatable, VCFAllele)
point_scores = self.position_score.fetch_scores(
annotatable.chromosome, annotatable.position, sources)
if not point_scores:
return self._empty_result()
return dict(zip(sources, point_scores, strict=True))
if len(annotatable) > self._region_length_cutoff:
return self._empty_result()
raw = self._fetch_raw_region_scores(
annotatable.chrom, annotatable.pos, annotatable.pos_end, sources)
if not any(raw.values()):
return self._empty_result()
return raw
[docs]
def build_np_score_annotator(pipeline: AnnotationPipeline,
info: AnnotatorInfo) -> Annotator:
logger.warning(
"usage of 'np_score_annotator' is deprecated, "
"use 'allele_score_annotator' instead")
return AlleleScoreAnnotator(pipeline, info)
[docs]
def build_allele_score_annotator(pipeline: AnnotationPipeline,
info: AnnotatorInfo) -> Annotator:
return AlleleScoreAnnotator(pipeline, info)
[docs]
class AlleleScoreAnnotator(GenomicScoreAnnotatorBase):
"""Annotator for allele-level genomic scores (frequencies, pathogenicity…).
Operates in one of two modes, selected by the ``mode`` parameter:
- ``allele`` (**default**): performs an exact chrom/pos/ref/alt lookup and
returns the single matching line's scores. The annotatable must be a
``VCFAllele``; other types receive an empty result.
- ``region``: iterates all allele lines that overlap the annotatable's span
and aggregates their scores. Works with any ``Annotatable``
(``VCFAllele``, ``Region``, CNV, …). An aggregator must be defined for
every score attribute, either in the attribute config or as the score's
``allele_aggregator`` default in the resource YAML.
Virtual ``allele`` attribute
----------------------------
All annotators expose a virtual attribute ``"allele"``
(``is_default=False``)
that is synthesised rather than read from the data file.
- In ``allele`` mode: returns ``["chrom:pos:ref:alt"]`` for the matched
line.
- In ``region`` mode: returns the set of ``"chrom:pos:ref:alt"`` strings
for all lines that pass the optional ``allele_filter``.
Optionally append score values to each allele string with
``include_attributes``.
``allele_filter``
-----------------
An optional annotator-level boolean expression evaluated against each
``ScoreLine`` before it is included in the result. Supported operators:
``>``, ``<``, ``==``, ``in``, ``and``, ``or``. Variables resolve via
``ScoreLine.get_score``.
"""
ALLELE_FILTER_GRAMMAR = textwrap.dedent("""
?start: filter | and_ | or
and_: filter "and" filter
or: filter "or" filter
?filter: subject operator subject | or | and_
?subject: variable | value
value: "\\"" word "\\"" | number
variable: word
operator: equals | greater_than | less_than | in
equals: "=="
greater_than: ">"
less_than: "<"
in: "in"
word: /[0-9]*[a-zA-Z_!@#$%^&*()_+][a-zA-Z0-9!@#$%^&*()_+]*/
number: /-?[0-9]+\\.?[0-9]*/
%ignore " "
""")
def __init__(self, pipeline: AnnotationPipeline, info: AnnotatorInfo):
resource = get_genomic_resource(
pipeline, info, {"np_score", "allele_score"})
self.allele_score = AlleleScore(resource)
self.filter_parser = Lark(self.ALLELE_FILTER_GRAMMAR)
self.allele_filter = None
allele_filter_str = info.parameters.get("allele_filter")
if allele_filter_str is not None:
assert isinstance(allele_filter_str, str)
cnv_filter_str = allele_filter_str.replace(
"\n", " ").replace("\t", " ").strip()
try:
self.allele_filter = self._build_allele_filter_func(
self.filter_parser.parse(cnv_filter_str))
except Exception as e:
raise AnnotationConfigurationError(
f"Error parsing cnv_filter: {e}") from e
mode = info.parameters.get("mode", "allele")
if mode not in {"allele", "region"}:
raise AnnotationConfigurationError(
f"Invalid mode '{mode}' for allele_score_annotator; "
"valid values are 'allele' and 'region'")
self.mode = mode
super().__init__(pipeline, info, self.allele_score)
info.documentation += textwrap.dedent(f"""
Annotator to use with scores that depend on allele like
variant frequencies, etc.
**Mode** (``mode`` parameter, applies to ``VCFAllele`` inputs only):
- ``allele`` (default): exact chrom/pos/ref/alt match.
- ``region``: aggregates scores for all allele lines overlapping the
annotatable's span.
Non-``VCFAllele`` annotatables always use region aggregation.
<a href="{self.BASE_DOC_URL}#allele-score-annotator" target="_blank">More info</a>
""") # noqa
self.allele_attribute = None
self.attrs_to_include = []
self.allele_score_sources: list[str] = []
for attr in self._attributes:
if attr.source == "allele":
self.attrs_to_include = attr.parameters.get(
"include_attributes", [])
if isinstance(self.attrs_to_include, str):
self.attrs_to_include = [self.attrs_to_include]
self.allele_attribute = attr
continue
self.allele_score_sources.append(attr.source)
self.add_score_aggregator_documentation(
attr, "allele_aggregator", attr.aggregator)
@classmethod
def _build_allele_filter_func(
cls, tree: Tree,
) -> Callable[[ScoreLine], bool]:
"""Recursively compile a Lark parse tree into a ScoreLine predicate."""
if tree.data == "and_":
assert isinstance(tree.children[0], Tree)
assert isinstance(tree.children[1], Tree)
left_func = cls._build_allele_filter_func(tree.children[0])
right_func = cls._build_allele_filter_func(tree.children[1])
return lambda cnv: left_func(cnv) and right_func(cnv)
if tree.data == "or":
left_func = cls._build_allele_filter_func(tree.children[0])
right_func = cls._build_allele_filter_func(tree.children[1])
return lambda cnv: left_func(cnv) or right_func(cnv)
left = tree.children[0]
assert isinstance(left, Tree)
assert isinstance(left.data, Token)
left_type = left.data.value
if left_type == "variable":
assert isinstance(left.children[0], Tree)
assert isinstance(left.children[0].data, Token)
assert left.children[0].data.value == "word"
assert isinstance(left.children[0].children[0], Token)
left_value = left.children[0].children[0].value
def left_accessor(_score: ScoreLine) -> Any:
return _score.get_score(left_value)
else:
assert isinstance(left.children[0], Tree)
assert isinstance(left.children[0].data, Token)
is_number = left.children[0].data.value == "number"
assert isinstance(left.children[0].children[0], Token)
left_value = left.children[0].children[0].value
if is_number:
left_value = float(left_value)
def left_accessor(
_score: ScoreLine,
) -> Any: # pylint: disable=unused-argument
return left_value
assert isinstance(tree.children[1], Tree)
assert isinstance(tree.children[1].children[0], Tree)
assert isinstance(tree.children[1].children[0].data, Token)
operator = tree.children[1].children[0].data.value
right = tree.children[2]
assert isinstance(right, Tree)
assert isinstance(right.data, Token)
right_type = right.data.value
if right_type == "variable":
assert isinstance(right.children[0], Tree)
assert isinstance(right.children[0].data, Token)
assert right.children[0].data.value == "word"
assert isinstance(right.children[0].children[0], Token)
right_value = right.children[0].children[0].value
def right_accessor(_score: ScoreLine) -> Any:
return _score.get_score(right_value)
else:
assert isinstance(right.children[0], Tree)
assert isinstance(right.children[0].data, Token)
is_number = right.children[0].data.value == "number"
assert isinstance(right.children[0].children[0], Token)
right_value = right.children[0].children[0].value
if is_number:
right_value = float(right_value)
def right_accessor(
_score: ScoreLine,
) -> Any: # pylint: disable=unused-argument
return right_value
if operator == "equals":
return lambda cnv: left_accessor(cnv) == right_accessor(cnv)
if operator == "greater_than":
return lambda cnv: left_accessor(cnv) > right_accessor(cnv)
if operator == "less_than":
return lambda cnv: left_accessor(cnv) < right_accessor(cnv)
if operator == "in":
return lambda cnv: left_accessor(cnv) in right_accessor(cnv)
raise ValueError(f"Unsupported operator {operator.data}")
[docs]
def get_attribute_defaults(
self, spec: AttributeSpec,
) -> dict[str, Any]:
defaults = super().get_attribute_defaults(spec)
if "aggregator" not in defaults:
score_def = self.allele_score.get_score_definition(spec.source)
if score_def is not None \
and score_def.allele_aggregator is not None:
defaults["aggregator"] = score_def.allele_aggregator
return defaults
[docs]
def get_attribute_specs(self) -> dict[str, AttributeSpec]:
"""Return score attribute specs plus the virtual ``allele``."""
result = super().get_attribute_specs()
result["allele"] = AttributeSpec(
source="allele",
value_type="list",
description="The allele in the format 'chr:pos:ref:alt'",
is_default=False,
internal_default=False,
)
return result
[docs]
def build_score_aggregator_documentation(
self, attr: Attribute,
) -> list[str]:
"""Collect score aggregator documentation."""
allele_doc = self._build_score_aggregator_documentation(
attr, "allele_aggregator", attr.aggregator,
)
return [allele_doc]
def _annotate_allele(
self, annotatable: VCFAllele,
) -> dict[str, Any]:
"""Return scores for an exact chrom/pos/ref/alt match."""
line = self.allele_score.fetch_allele_line(
annotatable.chrom,
annotatable.position,
annotatable.reference,
annotatable.alternative,
)
if line is None:
return self._empty_result()
if self.allele_filter is not None and not self.allele_filter(line):
return self._empty_result()
scores: dict[str, Any] = {
sc: line.get_score(sc)
for sc in (
self.simple_score_queries or self.allele_score.get_all_scores()
)
}
if self.allele_attribute is not None:
allele_str = (
f"{annotatable.chromosome}:{annotatable.position}"
f":{annotatable.reference}:{annotatable.alternative}"
)
if self.attrs_to_include:
attrs_str = ",".join(
stringify(scores.get(a)) for a in self.attrs_to_include)
allele_str += f":{attrs_str}"
scores[self.allele_attribute.source] = [allele_str]
return {
attr.source: scores.get(attr.source) for attr in self.attributes
}
def _annotate_region(
self, annotatable: Annotatable,
) -> dict[str, Any]:
"""Collect raw score lists for all allele lines overlapping the region.
Aggregation is handled by AnnotatorBase._apply_aggregators.
"""
raw: dict[str, list] = {
source: [] for source in self.allele_score_sources}
alleles: set[str] = set()
has_lines = False
for line in self.allele_score.fetch_lines(
annotatable.chrom, annotatable.position, annotatable.pos_end,
):
has_lines = True
if self.allele_filter is not None and not self.allele_filter(line):
continue
for source in self.allele_score_sources:
raw[source].append(line.get_score(source))
if self.allele_attribute is not None:
allele_str = f"{line.chrom}:{line.pos_begin}"
if line.ref is not None and line.alt is not None:
allele_str += f":{line.ref}:{line.alt}"
if self.attrs_to_include:
attrs_str = ",".join(
stringify(line.get_score(a))
for a in self.attrs_to_include)
allele_str += f":{attrs_str}"
alleles.add(allele_str)
if not has_lines:
return self._empty_result()
result = {
attr.source: raw.get(attr.source) for attr in self.attributes
}
if self.allele_attribute is not None:
result[self.allele_attribute.source] = list(alleles)
return result
def _do_annotate(
self, annotatable: Annotatable,
context: dict[str, Any], # noqa: ARG002
) -> dict[str, Any]:
"""Dispatch annotation based on annotatable type and mode.
For VCFAllele: mode selects between exact-match and region aggregation.
For all other annotatables: always use region aggregation.
"""
all_chroms = self.allele_score.get_all_chromosomes()
if annotatable.chromosome not in all_chroms:
return self._empty_result()
if isinstance(annotatable, VCFAllele):
if self.mode == "allele":
return self._annotate_allele(annotatable)
return self._annotate_region(annotatable)
if len(annotatable) > self._region_length_cutoff:
return self._empty_result()
return self._annotate_region(annotatable)