# pylint: disable=too-many-lines
from __future__ import annotations
import abc
import copy
import enum
import logging
from collections.abc import Callable, Generator, Iterator
from dataclasses import dataclass
from functools import lru_cache
from types import TracebackType
from typing import (
Any,
cast,
)
from urllib.parse import quote
from dae.genomic_resources.genomic_position_table import (
Line,
VCFGenomicPositionTable,
VCFLine,
build_genomic_position_table,
)
from dae.genomic_resources.genomic_position_table.line import (
BigWigLine,
LineBase,
)
from dae.genomic_resources.histogram import (
Histogram,
HistogramConfig,
NumberHistogram,
build_histogram_config,
load_histogram,
)
from dae.genomic_resources.repository import (
GenomicResource,
GenomicResourceRepo,
)
from dae.genomic_resources.repository_factory import (
build_genomic_resource_repository,
)
from dae.genomic_resources.resource_implementation import (
ResourceConfigValidationMixin,
get_base_resource_schema,
)
from .aggregators import AGGREGATOR_SCHEMA, Aggregator, build_aggregator
logger = logging.getLogger(__name__)
ScoreValue = str | int | float | bool | None
VCF_TYPE_CONVERSION_MAP = {
"Integer": "int",
"Float": "float",
"String": "str",
"Flag": "bool",
}
SCORE_TYPE_PARSERS = {
"str": str,
"float": float,
"int": int,
"bool": bool,
}
[docs]
@dataclass
class ScoreDef:
"""Score configuration definition."""
score_id: str
desc: str # string that will be interpretted as md
value_type: str # "str", "int", "float"
pos_aggregator: str | None # a valid aggregator type
allele_aggregator: str | None # a valid aggregator type
small_values_desc: str | None
large_values_desc: str | None
hist_conf: HistogramConfig | None
@dataclass
class _ScoreDef:
"""Private score configuration definition. Includes internals."""
# pylint: disable=too-many-instance-attributes
score_id: str
desc: str # string that will be interpretted as md
value_type: str # "str", "int", "float"
pos_aggregator: str | None # a valid aggregator type
allele_aggregator: str | None # a valid aggregator type
small_values_desc: str | None
large_values_desc: str | None
hist_conf: HistogramConfig | None
col_name: str | None # internal
col_index: int | None # internal
value_parser: Any # internal
na_values: Any # internal
score_index: int | str | None = None # internal
def to_public(self) -> ScoreDef:
return ScoreDef(
self.score_id,
self.desc,
self.value_type,
self.pos_aggregator,
self.allele_aggregator,
self.small_values_desc,
self.large_values_desc,
self.hist_conf,
)
def __post_init__(self) -> None:
if self.value_type is None:
return
default_na_values = {
"str": {},
"float": {"", "nan", ".", "NA"},
"int": {"", "nan", ".", "NA"},
"bool": {},
}
default_pos_aggregators = {
"float": "mean",
"int": "mean",
"str": "concatenate",
"bool": None,
}
default_allele_aggregators = {
"float": "max",
"int": "max",
"str": "concatenate",
"bool": None,
}
if self.pos_aggregator is None:
self.pos_aggregator = default_pos_aggregators[self.value_type]
if self.allele_aggregator is None:
self.allele_aggregator = \
default_allele_aggregators[self.value_type]
if self.na_values is None:
self.na_values = default_na_values[self.value_type]
[docs]
class ScoreLine:
"""Abstraction for a genomic score line. Wraps the line adapter."""
def __init__(self, line: LineBase, score_defs: dict[str, _ScoreDef]):
assert isinstance(line, (Line, VCFLine, BigWigLine))
self.line = line
self.score_defs = score_defs
@property
def chrom(self) -> str:
return self.line.chrom
@property
def pos_begin(self) -> int:
return self.line.pos_begin
@property
def pos_end(self) -> int:
return self.line.pos_end
@property
def ref(self) -> str | None:
return self.line.ref
@property
def alt(self) -> str | None:
return self.line.alt
[docs]
def get_score(self, score_id: str) -> ScoreValue:
"""Get and parse configured score from line."""
key = self.score_defs[score_id].score_index
assert key is not None
value: str | int | float | None = self.line.get(key)
if value is None:
return None
if score_id not in self.score_defs:
logger.warning(
"unexpected score_id %s in score", score_id)
return None
col_def = self.score_defs[score_id]
if value in col_def.na_values:
value = None
elif col_def.value_parser is not None:
# pylint: disable=broad-except
try: # Temporary workaround for GRR generation
value = col_def.value_parser(value)
except Exception:
logger.exception(
"unable to parse value %s for score %s",
value, score_id)
value = None
return value
[docs]
def get_available_scores(self) -> tuple[Any, ...]:
return tuple(self.score_defs.keys())
[docs]
@dataclass
class PositionScoreQuery:
score: str
position_aggregator: str | None = None
[docs]
@dataclass
class AlleleScoreQuery:
score: str
position_aggregator: str | None = None
allele_aggregator: str | None = None
[docs]
@dataclass
class PositionScoreAggr:
score: str
position_aggregator: Aggregator
[docs]
@dataclass
class AlleleScoreAggr:
score: str
position_aggregator: Aggregator
allele_aggregator: Aggregator
ScoreQuery = PositionScoreQuery | AlleleScoreQuery
[docs]
class GenomicScore(ResourceConfigValidationMixin):
"""Base class for genomic score resources.
GenomicScore provides a unified interface for accessing and managing
genomic annotation scores stored in various formats. It serves as the
foundation for specialized score types including PositionScore (position-
based scores) and AlleleScore (variant-specific scores).
This abstract base class handles:
- Resource configuration validation and normalization
- Score definition management and parsing
- File format abstraction through GenomicPositionTable
- Histogram and statistics management
- Default annotation attribute configuration
- Context manager protocol for resource lifecycle
Score resources can be stored in multiple formats:
- Tabix-indexed files (TSV, BED)
- VCF files (particularly for allele scores)
- BigWig files (for position scores)
- In-memory tables (for testing)
Configuration Structure:
A genomic score resource requires a YAML configuration file
(genomic_resource.yaml) specifying:
- **type**: Resource type (position_score, allele_score, np_score)
- **table**: Table configuration with filename, format, and column
mappings for chrom, pos_begin, pos_end (and ref/alt for allele scores)
- **scores**: List of score definitions with id, type, name/index,
description, and optional aggregators
- **default_annotation**: Optional list specifying which scores to
include in default annotations with optional name mappings
- **histograms**: Optional histogram configurations for statistics
Score Definition:
Each score in the resource is defined with:
- **id**: Unique identifier for the score
- **type**: Data type (int, float, str, bool)
- **name/index**: Column name or index in the data file
- **desc**: Human-readable description
- **na_values**: Values to treat as missing/NA (optional)
- **hist_conf**: Histogram configuration for statistics (optional)
- **position_aggregator**: Default aggregator for positions (optional)
- **allele_aggregator**: Default aggregator for alleles (optional)
Usage Pattern:
Genomic scores follow a resource lifecycle pattern:
1. Build/retrieve the resource from a repository
2. Create a score object from the resource
3. Open the score to initialize data access
4. Query scores using fetch methods
5. Close the score to release resources
Example using context manager:
>>> from dae.genomic_resources.genomic_scores import (
... build_score_from_resource_id
... )
>>> score = build_score_from_resource_id("phastCons100way")
>>> with score.open():
... # Score is open and ready to use
... chromosomes = score.get_all_chromosomes()
... scores = score.get_all_scores()
... # Query data...
>>> # Score is automatically closed
Statistics and Histograms:
GenomicScore supports automatic statistics generation including:
- Value distribution histograms
- Min/max ranges for numeric scores
- Category frequencies for categorical scores
- Custom histogram configurations per score
Attributes:
resource (GenomicResource): The underlying genomic resource object
resource_id (str): Unique identifier for the resource
config (dict): Validated and normalized configuration dictionary
table (GenomicPositionTable): Data access abstraction layer
score_definitions (dict[str, _ScoreDef]): Mapping of score IDs to
their internal definitions including parsers and metadata
table_loaded (bool): Flag indicating if the table is currently open
Key Methods:
open(): Initialize the score resource for data access
close(): Release resources and close the data table
get_all_scores(): Get list of all available score IDs
get_all_chromosomes(): Get list of all available chromosomes
get_score_definition(): Get metadata for a specific score
get_default_annotation_attributes(): Get default annotation config
get_histogram(): Load histogram for a score (if available)
get_score_range(): Get value range for a numeric scores
Abstract Methods:
Subclasses must implement:
- _fetch_region_values(): Core method for retrieving score values
in a genomic region, used for statistics computation
See Also:
- PositionScore: For position-based genomic scores
- AlleleScore: For variant-specific genomic scores
- GenomicResource: Base resource abstraction
- GenomicPositionTable: Table format abstraction
"""
def __init__(self, resource: GenomicResource):
self.resource = resource
self.resource_id = resource.resource_id
assert self.resource.config is not None
self.config: dict = self.resource.config
self.config = self.validate_and_normalize_schema(
self.config, resource,
)
self.config["id"] = resource.resource_id
self.table_loaded = False
self.table = build_genomic_position_table(
self.resource, self.config["table"],
)
self.score_definitions = self._build_scoredefs()
[docs]
@staticmethod
def get_schema() -> dict[str, Any]:
scores_schema = {
"type": "list", "schema": {
"type": "dict",
"schema": {
"id": {"type": "string"},
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
"type": {"type": "string"},
"desc": {"type": "string"},
"na_values": {"type": ["string", "list"]},
"large_values_desc": {"type": "string"},
"small_values_desc": {"type": "string"},
"histogram": {"type": "dict", "schema": {
"type": {"type": "string"},
"plot_function": {"type": "string"},
"number_of_bins": {
"type": "number",
"dependencies": {"type": "number"},
},
"view_range": {"type": "dict", "schema": {
"min": {"type": "number"},
"max": {"type": "number"},
}, "dependencies": {"type": "number"}},
"x_log_scale": {
"type": "boolean",
"dependencies": {"type": "number"},
},
"y_log_scale": {
"type": "boolean",
"dependencies": {
"type": ["number", "categorical"]},
},
"x_min_log": {
"type": "number",
"dependencies": {
"type": ["number", "categorical"]},
},
"label_rotation": {
"type": "integer",
"dependencies": {"type": "categorical"},
},
"value_order": {
"type": "list",
"schema": {"type": ["string", "integer"]},
"dependencies": {"type": "categorical"},
},
"displayed_values_count": {
"type": "integer",
"dependencies": {"type": "categorical"},
},
"displayed_values_percent": {
"type": "number",
"dependencies": {"type": "categorical"},
},
"reason": {
"type": "string",
"dependencies": {"type": "null"},
},
}},
},
},
}
return {
**get_base_resource_schema(),
"table": {"type": "dict", "schema": {
"filename": {"type": "string"},
"index_filename": {"type": "string"},
"zero_based": {"type": "boolean"},
"desc": {"type": "string"},
"format": {"type": "string"},
"header_mode": {"type": "string"},
"header": {"type": ["string", "list"]},
"chrom": {"type": "dict", "schema": {
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
}},
"pos_begin": {"type": "dict", "schema": {
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
}},
"pos_end": {"type": "dict", "schema": {
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
}},
"chrom_mapping": {"type": "dict", "schema": {
"filename": {
"type": "string",
"excludes": ["add_prefix", "del_prefix"],
},
"add_prefix": {"type": "string"},
"del_prefix": {"type": "string", "excludes": "add_prefix"},
}},
}},
"scores": scores_schema,
"default_annotation": {
"type": ["dict", "list"], "allow_unknown": True,
},
}
@staticmethod
def _parse_scoredef_config(
config: dict[str, Any],
) -> dict[str, _ScoreDef]:
"""Parse ScoreDef configuration."""
scores = {}
for score_conf in config["scores"]:
value_parser = SCORE_TYPE_PARSERS[score_conf.get("type", "float")]
col_name = score_conf.get("column_name") \
or score_conf.get("name")
col_index_str = score_conf.get("column_index") \
or score_conf.get("index")
col_index = int(col_index_str) if col_index_str else None
hist_conf = build_histogram_config(score_conf)
nuc_aggregator = score_conf.get("nucleotide_aggregator")
allele_aggregator = score_conf.get("allele_aggregator")
if nuc_aggregator is not None:
logger.warning(
"Use of 'nucleotide_aggregator' is deprecated, use "
"'allele_aggregator' instead.")
assert allele_aggregator is None
allele_aggregator = nuc_aggregator
score_def = _ScoreDef(
score_id=score_conf["id"],
desc=score_conf.get("desc", ""),
value_type=score_conf.get("type"),
pos_aggregator=score_conf.get("position_aggregator"),
allele_aggregator=allele_aggregator,
small_values_desc=score_conf.get("small_values_desc"),
large_values_desc=score_conf.get("large_values_desc"),
col_name=col_name,
col_index=col_index,
hist_conf=hist_conf,
value_parser=value_parser,
na_values=score_conf.get("na_values"),
)
scores[score_conf["id"]] = score_def
return scores
def _parse_vcf_scoredefs(
self,
vcf_header_info: dict[str, Any] | None,
config_scoredefs: dict[str, _ScoreDef] | None, *,
merge: bool = False,
) -> dict[str, _ScoreDef]:
def converter(val: Any) -> Any:
try:
if isinstance(val, tuple):
return "|".join(map(str, val))
except TypeError:
pass
return val
vcf_scoredefs = {}
assert vcf_header_info is not None
for key, value in vcf_header_info.items():
value_parser: Callable[[str], Any] | None = converter
if value.number in (1, "A", "R"):
value_parser = None
vcf_scoredefs[key] = _ScoreDef(
score_id=key,
col_name=key,
col_index=None,
desc=value.description or "",
value_type=VCF_TYPE_CONVERSION_MAP[value.type],
value_parser=value_parser,
na_values=(),
pos_aggregator=None,
allele_aggregator=None,
small_values_desc=None,
large_values_desc=None,
hist_conf=None,
)
if config_scoredefs is None:
return vcf_scoredefs
# allow overriding of vcf-generated scoredefs
scoredefs = {}
for score, config_scoredef in config_scoredefs.items():
vcf_scoredef = vcf_scoredefs[score]
if config_scoredef.desc:
vcf_scoredef.desc = config_scoredef.desc
if config_scoredef.value_type:
vcf_scoredef.value_type = config_scoredef.value_type
vcf_scoredef.value_parser = config_scoredef.value_parser
vcf_scoredef.na_values = config_scoredef.na_values
vcf_scoredef.hist_conf = config_scoredef.hist_conf
scoredefs[score] = vcf_scoredef
if merge:
for score, vcf_scoredef in vcf_scoredefs.items():
if score in scoredefs:
continue
scoredefs[score] = vcf_scoredef
return scoredefs
def _validate_scoredefs(self) -> None:
assert "scores" in self.config
if self.table.header_mode == "none":
assert all("name" not in score
for score in self.config["scores"]), \
("Cannot configure score columns by"
" name when header_mode is 'none'!")
else:
assert self.table.header is not None
for score in self.config["scores"]:
if "name" in score:
score["column_name"] = score["name"]
logger.debug(
"%s: Using 'name' to configure score columns is"
" outdated, use 'column_name' instead.",
self.resource.get_full_id(),
)
elif "index" in score:
score["column_index"] = score["index"]
logger.debug(
"%s: Using 'index' to configure score columns is"
" outdated, use 'column_index' instead.",
self.resource.get_full_id(),
)
if "column_name" in score:
assert score["column_name"] in self.table.header, (
score, self.table.header)
elif "column_index" in score:
assert 0 <= score["column_index"] < len(self.table.header)
else:
raise AssertionError("Either an index or name must"
" be configured for scores!")
def _build_scoredefs(self) -> dict[str, _ScoreDef]:
config_scoredefs = None
if "scores" in self.config:
config_scoredefs = self._parse_scoredef_config(self.config)
if isinstance(self.table, VCFGenomicPositionTable):
merge = bool(self.config.get("merge_vcf_scores", False))
return self._parse_vcf_scoredefs(
cast(dict[str, Any], self.table.header),
config_scoredefs,
merge=merge)
if config_scoredefs is None:
raise ValueError("No scores configured and not using a VCF")
return config_scoredefs
[docs]
def get_config(self) -> dict[str, Any]:
return self.config
[docs]
def get_default_annotation_attributes(self) -> list[Any]:
"""Collect default annotation attributes."""
default_annotation = self.get_config().get("default_annotation")
if not default_annotation:
return [
{"source": attr, "name": attr}
for attr in self.score_definitions
]
if not isinstance(default_annotation, list):
raise TypeError(
"The default_annotation in the "
f"{self.resource_id} resource is not a list.")
return default_annotation
[docs]
def get_default_annotation_attribute(self, score_id: str) -> str | None:
"""Return default annotation attribute for a score.
Returns None if the score is not included in the default annotation.
Returns the name of the attribute if present or the score if not.
"""
attributes = self.get_default_annotation_attributes()
result = []
for attr in attributes:
if attr["source"] != score_id:
continue
dst = score_id
if "name" in attr:
dst = attr["name"]
result.append(dst)
if result:
return ",".join(result)
return None
[docs]
def get_score_definition(self, score_id: str) -> _ScoreDef | None:
return self.score_definitions.get(score_id)
[docs]
def close(self) -> None:
self.table.close()
self.table_loaded = False
[docs]
def is_open(self) -> bool:
return self.table_loaded
[docs]
def open(self) -> GenomicScore:
"""Open genomic score resource and returns it."""
if self.is_open():
logger.info(
"opening already opened genomic score: %s",
self.resource.resource_id)
return self
self.table.open()
self.table_loaded = True
if "scores" in self.config:
self._validate_scoredefs()
if isinstance(self.table, VCFGenomicPositionTable):
for score_def in self.score_definitions.values():
assert score_def.col_name is not None
score_def.score_index = score_def.col_name
else:
for score_def in self.score_definitions.values():
if score_def.col_index is None:
assert self.table.header is not None
assert score_def.col_name is not None
score_def.score_index = self.table.header.index(
score_def.col_name)
else:
assert score_def.col_name is None
score_def.score_index = score_def.col_index
return self
def __enter__(self) -> GenomicScore:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
if exc_type is not None:
logger.error(
"exception while working with genomic score: %s, %s, %s",
exc_type, exc_value, exc_tb)
self.close()
@staticmethod
def _line_to_begin_end(line: ScoreLine) -> tuple[int, int]:
if line.pos_end < line.pos_begin:
raise OSError(
f"The resource line {line} has a regions "
f" with end {line.pos_end} smaller that the "
f"begining {line.pos_end}.")
return line.pos_begin, line.pos_end
def _get_header(self) -> tuple[Any, ...] | None:
assert self.table is not None
return self.table.header
def _fetch_lines(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
) -> Iterator[ScoreLine]:
for line in self.table.get_records_in_region(
chrom, pos_begin, pos_end,
):
yield ScoreLine(line, self.score_definitions)
[docs]
def get_all_chromosomes(self) -> list[str]:
if not self.is_open():
raise ValueError(f"genomic score <{self.resource_id}> is not open")
return self.table.get_chromosomes()
[docs]
def get_all_scores(self) -> list[str]:
return list(self.score_definitions)
def _fetch_region_lines(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, int, list[ScoreValue] | None, ScoreLine], None, None]:
"""Return score values in a region."""
if not self.is_open():
raise ValueError(f"genomic score <{self.resource_id}> is not open")
if chrom is not None and chrom not in self.get_all_chromosomes():
raise ValueError(
f"{chrom} is not among the available chromosomes.")
if scores is None:
scores = self.get_all_scores()
for line in self._fetch_lines(chrom, pos_begin, pos_end):
line_pos_begin, line_pos_end = self._line_to_begin_end(line)
if pos_begin is not None and line_pos_end < pos_begin:
continue
val = [line.get_score(scr_id) for scr_id in scores]
if pos_begin is not None:
left = max(pos_begin, line_pos_begin)
else:
left = line_pos_begin
if pos_end is not None:
right = min(pos_end, line_pos_end)
else:
right = line_pos_end
yield (left, right, val, line)
@abc.abstractmethod
def _fetch_region_values(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, int, list[ScoreValue] | None], None, None]:
"""Return score values - either all available or in a specific region.
This method is used for calculation of score statistics.
"""
[docs]
@lru_cache(maxsize=64)
def get_score_range(
self, score_id: str,
) -> tuple[float, float] | None:
"""Return the value range for a numeric score."""
if score_id not in self.get_all_scores():
raise ValueError(
f"unknown score {score_id}; "
f"available scores are {self.get_all_scores()}")
hist = self.get_score_histogram(score_id)
if isinstance(hist, NumberHistogram):
return (hist.min_value, hist.max_value)
return None
[docs]
def get_histogram_filename(self, score_id: str) -> str:
"""Return the histogram filename for a genomic score."""
filename = f"statistics/histogram_{score_id}.yaml"
if filename in self.resource.get_manifest():
return filename
return f"statistics/histogram_{score_id}.json"
[docs]
@lru_cache(maxsize=64)
def get_score_histogram(self, score_id: str) -> Histogram:
"""Return defined histogram for a score."""
if score_id not in self.score_definitions:
raise ValueError(
f"unexpected score ID {score_id}; available scores are: "
f"{self.score_definitions.keys()}")
hist_filename = self.get_histogram_filename(score_id)
return load_histogram(self.resource, hist_filename)
[docs]
def get_histogram_image_filename(self, score_id: str) -> str:
return f"statistics/histogram_{score_id}.png"
[docs]
def get_histogram_image_url(self, score_id: str) -> str | None:
return (
f"{self.resource.get_url()}/"
f"{quote(self.get_histogram_image_filename(score_id))}"
)
[docs]
class PositionScore(GenomicScore):
"""Position-based genomic score resource.
A PositionScore provides scores associated with genomic positions,
where each score value applies to a specific genomic coordinate or range.
Unlike AlleleScore, PositionScore does not consider reference or
alternative alleles - scores are purely position-based.
Typical use cases include:
- Conservation scores (e.g., phastCons, phyloP)
- Mappability scores
- GC content
- Recombination rates
- Any metric that depends only on genomic position
The score data can be stored in various formats including tabix-indexed
files, BigWig files, or in-memory tables.
Example:
>>> from dae.genomic_resources.repository_factory import (
... build_genomic_resource_repository
... )
>>> repo = build_genomic_resource_repository()
>>> resource = repo.get_resource("phastCons100way")
>>> score = build_score_from_resource(resource)
>>> with score.open() as score:
... # Fetch scores at a specific position
... values = score.fetch_scores("chr1", 12345)
... # Fetch scores across a region
... for pos_begin, pos_end, scores in score.fetch_region(
... "chr1", 10000, 20000
... ):
... print(f"{pos_begin}-{pos_end}: {scores}")
... # Aggregate scores over a region
... aggs = score.fetch_scores_agg("chr1", 10000, 20000)
Attributes:
resource: The underlying GenomicResource object
resource_id: Unique identifier for the resource
config: Configuration dictionary for the score
table: GenomicPositionTable for data access
score_definitions: Dictionary mapping score IDs to their definitions
Key Methods:
fetch_scores: Get score values at a specific position
fetch_region: Iterate over score values in a genomic region
fetch_scores_agg: Aggregate score values over a region
get_region_scores: Get all scores in a region for a specific score ID
"""
[docs]
@staticmethod
def get_schema() -> dict[str, Any]:
schema = copy.deepcopy(GenomicScore.get_schema())
scores_schema = schema["scores"]["schema"]["schema"]
scores_schema["position_aggregator"] = AGGREGATOR_SCHEMA
return schema
[docs]
def open(self) -> PositionScore:
return cast(PositionScore, super().open())
def _fetch_region_values(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, int, list[ScoreValue] | None], None, None]:
"""Return position score values in a region."""
returned_region: tuple[
int | None, int | None, list[ScoreValue] | None,
] = (None, None, None)
for left, right, val, _ in self._fetch_region_lines(
chrom, pos_begin, pos_end, scores,
):
prev_end = returned_region[1]
if prev_end and left <= prev_end:
logger.warning(
"multiple values for positions %s:%s-%s",
chrom, left, right)
raise ValueError(
f"multiple values for positions "
f"{chrom}:{left}-{right}")
returned_region = (left, right, val)
yield (left, right, val)
[docs]
def fetch_region(
self, chrom: str,
pos_begin: int | None, pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, int, list[ScoreValue] | None], None, None]:
"""Return position score values in a region."""
yield from self._fetch_region_values(chrom, pos_begin, pos_end, scores)
[docs]
def get_region_scores(
self,
chrom: str,
pos_beg: int,
pos_end: int,
score_id: str,
) -> list[ScoreValue]:
"""Return score values in a region."""
result: list[ScoreValue | None] = [None] * (pos_end - pos_beg + 1)
for b, e, v in self.fetch_region(
chrom, pos_beg, pos_end, [score_id]):
e = min(e, pos_end)
if v is None:
continue
result[b - pos_beg:e - pos_beg + 1] = [v[0]] * (e - b + 1)
return result
[docs]
def fetch_scores(
self, chrom: str, position: int,
scores: list[str] | None = None,
) -> list[ScoreValue] | None:
"""Fetch score values at specific genomic position."""
if chrom not in self.get_all_chromosomes():
raise ValueError(
f"{chrom} is not among the available chromosomes.")
if scores is None:
scores = self.get_all_scores()
else:
scores = [
s.score if isinstance(s, PositionScoreQuery) else s
for s in scores]
assert all(isinstance(s, str) for s in scores)
lines = list(self._fetch_lines(chrom, position, position))
if not lines:
return None
if len(lines) > 1:
logger.warning(
"multiple values for positions %s:%s",
chrom, position)
raise ValueError(
f"multiple values ({len(lines)}) for positions "
f"{chrom}:{position}")
line = lines[0]
requested_scores = scores or self.get_all_scores()
return [line.get_score(scr) for scr in requested_scores]
def _build_scores_agg(
self, scores: list[str] | list[PositionScoreQuery],
) -> list[PositionScoreAggr]:
score_aggs = []
aggregator_type: str | None
for score in scores:
if isinstance(score, str):
aggregator_type = self.score_definitions[score].pos_aggregator
assert aggregator_type is not None
score_aggs.append(PositionScoreAggr(
score,
build_aggregator(aggregator_type),
))
continue
assert isinstance(score, PositionScoreQuery)
if score.position_aggregator is not None:
aggregator_type = score.position_aggregator
else:
aggregator_type = \
self.score_definitions[score.score].pos_aggregator
assert aggregator_type is not None
score_aggs.append(
PositionScoreAggr(
score.score,
build_aggregator(aggregator_type)),
)
return score_aggs
[docs]
def fetch_scores_agg( # pylint: disable=too-many-arguments,too-many-locals
self, chrom: str, pos_begin: int, pos_end: int,
scores: list[str] | list[PositionScoreQuery] | None = None,
) -> list[Aggregator]:
"""Fetch score values in a region and aggregates them.
Case 1:
res.fetch_scores_agg("1", 10, 20) -->
all score with default aggregators
Case 2:
res.fetch_scores_agg("1", 10, 20,
non_default_aggregators={"bla":"max"}) -->
all score with default aggregators but 'bla' should use 'max'
"""
if chrom not in self.get_all_chromosomes():
raise ValueError(
f"{chrom} is not among the "
f"available chromosomes.")
if scores is None:
scores = [
PositionScoreQuery(score_id)
for score_id in self.get_all_scores()]
score_aggs = self._build_scores_agg(scores)
for line in self._fetch_lines(chrom, pos_begin, pos_end):
line_pos_begin, line_pos_end = self._line_to_begin_end(line)
for sagg in score_aggs:
val = line.get_score(sagg.score)
left = (
max(pos_begin, line_pos_begin)
)
right = (
min(pos_end, line_pos_end)
)
for _ in range(left, right + 1):
sagg.position_aggregator.add(val)
return [squery.position_aggregator for squery in score_aggs]
[docs]
class AlleleScore(GenomicScore):
"""Allele-specific genomic score resource.
An AlleleScore provides scores that depend on specific alleles at genomic
positions. Unlike PositionScore, AlleleScore considers both the reference
and alternative alleles when computing scores. This makes it suitable for
variant-specific predictions and annotations.
AlleleScore supports two operational modes:
1. **SUBSTITUTIONS mode**: Scores are specific to nucleotide substitutions
(e.g., A>T, C>G). This mode is optimized for single nucleotide variants
and considers the directionality of the change. Used by resources like
CADD, which provide substitution-specific scores.
2. **ALLELES mode**: Scores are associated with specific alleles at
positions, without considering the reference allele. This mode supports
insertions, deletions, and more complex variants. The score depends on
the alternative allele itself rather than the substitution pattern.
Typical use cases include:
- Variant pathogenicity scores (e.g., CADD, DANN)
- Functional impact predictions (e.g., PolyPhen, SIFT scores)
- Splice site predictions
- Regulatory variant scores
- Any metric that depends on specific alleles
The score data is typically stored in VCF files or tabix-indexed tables
with reference and alternative allele columns.
Example:
>>> from dae.genomic_resources.repository_factory import (
... build_genomic_resource_repository
... )
>>> repo = build_genomic_resource_repository()
>>> resource = repo.get_resource("cadd_v1_6")
>>> score = build_score_from_resource(resource)
>>> with score.open() as score:
... # Fetch scores for a specific variant
... values = score.fetch_scores(
... "chr1", 12345, "A", "T"
... )
... # Iterate over variants in a region
... for pos, ref, alt, scores in score.fetch_region(
... "chr1", 10000, 20000
... ):
... print(f"{pos} {ref}>{alt}: {scores}")
... # Aggregate scores over a region
... from dae.genomic_resources.genomic_scores import (
... AlleleScoreQuery
... )
... queries = [
... AlleleScoreQuery(
... "cadd_raw",
... position_aggregator="mean",
... allele_aggregator="max"
... )
... ]
... aggs = score.fetch_scores_agg(
... "chr1", 10000, 20000, queries
... )
Attributes:
resource: The underlying GenomicResource object
resource_id: Unique identifier for the resource
config: Configuration dictionary for the score
table: GenomicPositionTable for data access (typically VCF)
score_definitions: Dictionary mapping score IDs to their definitions
mode: Operating mode (SUBSTITUTIONS or ALLELES)
Key Methods:
fetch_scores: Get score values for a specific variant
fetch_region: Iterate over variant scores in a genomic region
fetch_scores_agg: Aggregate scores over a region with position and
allele aggregation
substitutions_mode: Check if operating in SUBSTITUTIONS mode
alleles_mode: Check if operating in ALLELES mode
Configuration:
The resource configuration should specify:
- table.filename: Path to the data file (usually VCF)
- table.reference: Column/field containing reference alleles
- table.alternative: Column/field containing alternative alleles
- allele_score_mode: Either "substitutions" or "alleles" (optional)
- scores: List of score definitions with optional position_aggregator
and allele_aggregator specifications
"""
[docs]
class Mode(enum.Enum):
"""Allele score mode."""
SUBSTITUTIONS = 1
ALLELES = 2
[docs]
@staticmethod
def from_name(name: str) -> AlleleScore.Mode:
if name == "substitutions":
return AlleleScore.Mode.SUBSTITUTIONS
if name == "alleles":
return AlleleScore.Mode.ALLELES
raise ValueError(f"unknown allele mode: {name}")
def __init__(self, resource: GenomicResource):
if resource.get_type() not in {"allele_score", "np_score"}:
raise ValueError(
"The resrouce provided to AlleleScore should be of"
f"'allele_score' type, not a '{resource.get_type()}'")
if resource.get_type() == "np_score":
logger.warning(
"The resource type `np_score` is deprecated. "
"Please use `allele_score` instead for resource %s.",
resource.get_id())
super().__init__(resource)
if self.config.get("allele_score_mode") is None:
if resource.get_type() == "np_score":
self.mode = AlleleScore.Mode.SUBSTITUTIONS
elif resource.get_type() == "allele_score":
self.mode = AlleleScore.Mode.ALLELES
else:
raise ValueError(
f"unknown resource type {resource.get_type()}")
else:
self.mode = AlleleScore.Mode.from_name(
self.config.get("allele_score_mode", "substitutions"))
[docs]
def substitutions_mode(self) -> bool:
"""Return True if the score is in substitutions mode."""
return self.mode == AlleleScore.Mode.SUBSTITUTIONS
[docs]
def alleles_mode(self) -> bool:
"""Return True if the score is in alleles mode."""
return self.mode == AlleleScore.Mode.ALLELES
[docs]
@staticmethod
def get_schema() -> dict[str, Any]:
schema = copy.deepcopy(GenomicScore.get_schema())
schema["allele_score_mode"] = {
"type": "string",
"allowed": ["substitutions", "alleles"],
}
schema["merge_vcf_scores"] = {
"type": "boolean",
"default": False,
}
schema["table"]["schema"]["reference"] = {
"type": "dict", "schema": {
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
},
}
schema["table"]["schema"]["alternative"] = {
"type": "dict", "schema": {
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
},
}
schema["table"]["schema"]["variant"] = {
"type": "dict", "schema": {
"index": {"type": "integer"},
"name": {"type": "string", "excludes": "index"},
"column_index": {
"type": "integer",
"excludes": ["index", "name", "column_name"],
},
"column_name": {
"type": "string",
"excludes": ["name", "index", "column_index"],
},
},
}
scores_schema = schema["scores"]["schema"]["schema"]
scores_schema["position_aggregator"] = AGGREGATOR_SCHEMA
scores_schema["allele_aggregator"] = AGGREGATOR_SCHEMA
scores_schema["nucleotide_aggregator"] = AGGREGATOR_SCHEMA
return schema
[docs]
def open(self) -> AlleleScore:
return cast(AlleleScore, super().open())
def _fetch_region_values(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, int, list[ScoreValue] | None], None, None]:
"""Return score values in a region."""
for pos, _, _, values in self.fetch_region(
chrom, pos_begin, pos_end, scores):
yield pos, pos, values
[docs]
def fetch_region(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, str | None, str | None, list[ScoreValue] | None],
None, None]:
"""Return position score values in a region."""
region_lines = self._fetch_region_lines(
chrom, pos_begin, pos_end, scores,
)
first_line = next(region_lines, None)
if first_line is None:
return
left, right, val, line = first_line
if left != right:
raise ValueError(
f"value for a region in allele score "
f"{chrom}:{left}-{right}")
returned_region: tuple[
int, int, list[ScoreValue] | None,
set[tuple[str | None, str | None]],
] = (left, right, val, {(line.ref, line.alt)})
yield (left, line.ref, line.alt, val)
for left, right, val, line in region_lines:
if left != right:
raise ValueError(
f"value for a region in allele score "
f"{chrom}:{left}-{right}")
returned_nucleotides = (line.ref, line.alt)
if (left, right) == (returned_region[0], returned_region[1]):
if returned_nucleotides in returned_region[3]:
logger.debug(
"multiple values for positions %s:%s-%s "
"and nucleotides %s",
chrom, left, right, returned_nucleotides)
returned_region[3].add((line.ref, line.alt))
yield (left, line.ref, line.alt, val)
continue
prev_right = returned_region[1]
if left < prev_right:
raise ValueError(
f"multiple values for positions [{left}, {prev_right}]")
returned_region = (left, right, val, {(line.ref, line.alt)})
yield (left, line.ref, line.alt, val)
[docs]
def fetch_scores(
self, chrom: str, position: int,
reference: str, alternative: str,
scores: list[str] | None = None,
) -> list[ScoreValue] | None:
"""Fetch score values at specified genomic position and nucleotide."""
if chrom not in self.get_all_chromosomes():
raise ValueError(
f"{chrom} is not among the available chromosomes for "
f"NP Score resource {self.resource_id}")
lines = list(self._fetch_lines(chrom, position, position))
if not lines:
return None
selected_line = None
for line in lines:
if line.ref == reference and line.alt == alternative:
selected_line = line
break
if not selected_line:
return None
requested_scores = scores or self.get_all_scores()
return [selected_line.get_score(sc) for sc in requested_scores]
def _build_scores_agg(
self, score_queries: list[AlleleScoreQuery],
) -> list[AlleleScoreAggr]:
score_aggs = []
for squery in score_queries:
scr_def = self.score_definitions[squery.score]
if squery.position_aggregator is not None:
aggregator_type = squery.position_aggregator
else:
assert scr_def.pos_aggregator is not None
aggregator_type = scr_def.pos_aggregator
position_aggregator = build_aggregator(aggregator_type)
if squery.allele_aggregator is not None:
aggregator_type = squery.allele_aggregator
else:
assert scr_def.allele_aggregator is not None
aggregator_type = scr_def.allele_aggregator
allele_aggregator = build_aggregator(aggregator_type)
score_aggs.append(
AlleleScoreAggr(
squery.score, position_aggregator, allele_aggregator))
return score_aggs
[docs]
def fetch_scores_agg(
self, chrom: str, pos_begin: int, pos_end: int,
scores: list[AlleleScoreQuery] | None = None,
) -> list[Aggregator]:
"""Fetch score values in a region and aggregates them."""
# pylint: disable=too-many-locals
if chrom not in self.get_all_chromosomes():
raise ValueError(
f"{chrom} is not among the available chromosomes for "
f"NP Score resource {self.resource_id}")
if scores is None:
scores = [
AlleleScoreQuery(score_id)
for score_id in self.get_all_scores()]
score_aggs = self._build_scores_agg(scores)
score_lines = list(self._fetch_lines(chrom, pos_begin, pos_end))
if not score_lines:
return [sagg.position_aggregator for sagg in score_aggs]
def aggregate_alleles() -> None:
for sagg in score_aggs:
sagg.position_aggregator.add(
sagg.allele_aggregator.get_final())
sagg.allele_aggregator.clear()
last_pos: int = score_lines[0].pos_begin
for line in score_lines:
if line.pos_begin != last_pos:
aggregate_alleles()
for sagg in score_aggs:
val = line.get_score(sagg.score)
left = (
max(pos_begin, line.pos_begin)
)
right = (
min(pos_end, line.pos_end)
)
for _ in range(left, right + 1):
sagg.allele_aggregator.add(val)
last_pos = line.pos_begin
aggregate_alleles()
return [sagg.position_aggregator for sagg in score_aggs]
[docs]
@dataclass
class CNV:
"""Copy number object from a cnv_collection."""
chrom: str
pos_begin: int
pos_end: int
attributes: dict[str, Any]
@property
def size(self) -> int:
return self.pos_end - self.pos_begin
[docs]
class CnvCollection(GenomicScore):
"""A collection of CNVs."""
def __init__(self, resource: GenomicResource):
if resource.get_type() != "cnv_collection":
raise ValueError(
"The resource provided to CnvCollection should be of "
f"'cnv_collection' type, not a '{resource.get_type()}'")
super().__init__(resource)
[docs]
@staticmethod
def get_schema() -> dict[str, Any]:
schema = copy.deepcopy(GenomicScore.get_schema())
scores_schema = schema["scores"]["schema"]["schema"]
scores_schema["allele_aggregator"] = AGGREGATOR_SCHEMA
return schema
[docs]
def open(self) -> CnvCollection:
return cast(CnvCollection, super().open())
def _fetch_region_values(
self,
chrom: str | None,
pos_begin: int | None,
pos_end: int | None,
scores: list[str] | None = None,
) -> Generator[
tuple[int, int, list[ScoreValue] | None], None, None]:
"""Return score values in a region."""
for start, stop, values, _ in self._fetch_region_lines(
chrom, pos_begin, pos_end, scores):
yield start, stop, values
[docs]
def fetch_cnvs(
self, chrom: str,
start: int, stop: int,
scores: list[str] | None = None,
) -> list[CNV]:
"""Return list of CNVs that overlap with the provided region."""
if not self.is_open():
raise ValueError(f"The resource <{self.resource_id}> is not open")
cnvs: list = []
if chrom not in self.table.get_chromosomes():
return cnvs
lines = list(self._fetch_lines(chrom, start, stop))
if not lines:
return cnvs
requested_scores = scores or self.get_all_scores()
for line in lines:
attributes = {}
for score_id in requested_scores:
attributes[score_id] = line.get_score(score_id)
cnvs.append(CNV(line.chrom, line.pos_begin, line.pos_end,
attributes))
return cnvs
[docs]
def build_score_from_resource(
resource: GenomicResource,
) -> GenomicScore:
"""Build a genomic score resource and return the coresponding score."""
if resource.get_type() == "position_score":
return PositionScore(resource)
if resource.get_type() == "np_score":
logger.warning(
"The resource type `np_score` is deprecated. "
"Please use `allele_score` instead for resource %s.",
resource.get_id())
return AlleleScore(resource)
if resource.get_type() == "allele_score":
return AlleleScore(resource)
if resource.get_type() == "cnv_collection":
return CnvCollection(resource)
raise ValueError(f"Resource {resource.get_id()} is not of score type")
[docs]
def build_score_from_resource_id(
resource_id: str, grr: GenomicResourceRepo | None = None,
) -> GenomicScore:
if grr is None:
grr = build_genomic_resource_repository()
return build_score_from_resource(grr.get_resource(resource_id))