Source code for dae.genomic_resources.implementations.genomic_scores_impl

from __future__ import annotations

import json
import logging
from collections.abc import Iterable
from typing import Any, ClassVar, cast

import numpy as np
from jinja2 import Template

from dae.genomic_resources.genomic_position_table import (
    TabixGenomicPositionTable,
)
from dae.genomic_resources.genomic_position_table.table_bigwig import (
    BigWigTable,
)
from dae.genomic_resources.genomic_position_table.table_inmemory import (
    InmemoryGenomicPositionTable,
)
from dae.genomic_resources.genomic_scores import (
    GenomicScore,
    build_score_from_resource,
)
from dae.genomic_resources.histogram import (
    CategoricalHistogramConfig,
    Histogram,
    HistogramConfig,
    HistogramError,
    NullHistogram,
    NullHistogramConfig,
    NumberHistogramConfig,
    build_default_histogram_conf,
    build_empty_histogram,
    plot_histogram,
)
from dae.genomic_resources.reference_genome import (
    ReferenceGenome,
    build_reference_genome_from_resource,
)
from dae.genomic_resources.repository import (
    GenomicResource,
    GenomicResourceRepo,
)
from dae.genomic_resources.resource_implementation import (
    GenomicResourceImplementation,
    InfoImplementationMixin,
)
from dae.genomic_resources.statistics.min_max import MinMaxValue
from dae.task_graph.graph import Task, TaskGraph
from dae.utils.regions import (
    Region,
    get_chromosome_length_tabix,
    split_into_regions,
)

logger = logging.getLogger(__name__)


[docs] class GenomicScoreImplementation( GenomicResourceImplementation, InfoImplementationMixin, ): # pylint: disable=too-many-public-methods """Genomic scores base class.""" def __init__(self, resource: GenomicResource): super().__init__(resource) self.score: GenomicScore = build_score_from_resource(resource)
[docs] def get_config_histograms(self) -> dict[str, Any]: """Collect all configurations of histograms for the genomic score.""" result: dict[str, Any] = {} for score_id, score_def in self.score.score_definitions.items(): result[score_id] = score_def.hist_conf return result
[docs] def get_template(self) -> Template: return Template(GENOMIC_SCORES_TEMPLATE)
def _get_template_data(self) -> dict[str, Any]: return {"genomic_scores": self}
[docs] def get_info(self, **kwargs: Any) -> str: # noqa: ARG002 return InfoImplementationMixin.get_info(self)
[docs] def add_statistics_build_tasks( self, task_graph: TaskGraph, **kwargs: Any, ) -> list[Task]: with self.score.open(): region_size = kwargs.get("region_size", 1_000_000) grr = kwargs.get("grr") all_min_max_scores = [] all_hist_confs: dict[str, HistogramConfig] = {} impl = build_score_implementation_from_resource(self.resource) for score_id, score_def in impl.score.score_definitions.items(): if score_def.hist_conf is not None: hist_conf = score_def.hist_conf else: hist_conf = build_default_histogram_conf( score_def.value_type) if isinstance(hist_conf, NullHistogramConfig): all_hist_confs[score_id] = hist_conf continue if isinstance(hist_conf, CategoricalHistogramConfig): all_hist_confs[score_id] = hist_conf continue assert isinstance(hist_conf, NumberHistogramConfig) if not hist_conf.has_view_range(): all_min_max_scores.append(score_id) all_hist_confs[score_id] = hist_conf min_max_task = None if all_min_max_scores: _, min_max_task = self._add_min_max_tasks( task_graph, all_min_max_scores, region_size, grr) _, _, save_task = self._add_histogram_tasks( task_graph, all_hist_confs, min_max_task, region_size, grr) return [save_task]
_REF_GENOME_CACHE: ClassVar[dict[str, Any]] = {} @property def files(self) -> set[str]: files = set() files.add(self.score.table.definition.filename) if isinstance(self.score.table, TabixGenomicPositionTable): files.add(f"{self.score.table.definition.filename}.tbi") return files @staticmethod def _get_reference_genome_cached( grr: GenomicResourceRepo | None, genome_id: str | None, ) -> ReferenceGenome | None: if genome_id is None or grr is None: return None if genome_id in GenomicScoreImplementation._REF_GENOME_CACHE: return cast( ReferenceGenome, GenomicScoreImplementation._REF_GENOME_CACHE[genome_id], ) try: ref_genome = build_reference_genome_from_resource( grr.get_resource(genome_id), ) logger.info( "Using reference genome label <%s> ", genome_id, ) except FileNotFoundError: logger.warning( "Couldn't find reference genome %s", genome_id, ) return None GenomicScoreImplementation._REF_GENOME_CACHE[genome_id] = ref_genome return ref_genome def _get_chrom_regions( self, region_size: int, grr: GenomicResourceRepo | None = None, ) -> list[Region]: regions = [] ref_genome_id = cast( str, self.resource.get_labels().get("reference_genome"), ) ref_genome = self._get_reference_genome_cached(grr, ref_genome_id) chrom_length: int | None for chrom in self.score.get_all_chromosomes(): if ref_genome is not None and chrom in ref_genome.chromosomes: chrom_length = ref_genome.get_chrom_length(chrom) else: if isinstance(self.score.table, InmemoryGenomicPositionTable): chrom_length = \ max(line.pos_end for line in self.score.table.get_records_in_region(chrom)) elif isinstance(self.score.table, BigWigTable): chrom_length = self.score.table.get_chromosome_length(chrom) else: assert isinstance(self.score.table, TabixGenomicPositionTable) assert self.score.table.pysam_file is not None fchrom = self.score.table.unmap_chromosome(chrom) if fchrom is not None: chrom_length = get_chromosome_length_tabix( self.score.table.pysam_file, fchrom) if chrom_length is None: logger.warning( "unable to find chromosome length for %s", chrom) continue regions.extend( split_into_regions( chrom, chrom_length, region_size, ), ) return regions @property def resource_id(self) -> str: return self.score.resource_id def _add_min_max_tasks( self, graph: TaskGraph, score_ids: Iterable[str], region_size: int, grr: GenomicResourceRepo | None = None, ) -> tuple[list[Task], Task]: """ Add and return calculation, merging and saving tasks for min max. The tasks are returned in a triple containing a list of calculation tasks, the merge task and the save task. """ min_max_tasks = [] regions = self._get_chrom_regions(region_size, grr) for region in regions: chrom = region.chrom start = region.start end = region.stop min_max_tasks.append(graph.create_task( f"{self.resource.get_full_id()}_calculate_min_max_{chrom}_{start}_{end}", GenomicScoreImplementation._do_min_max, [self.resource, score_ids, chrom, start, end], [], )) merge_task = graph.create_task( f"{self.resource.get_full_id()}_merge_min_max", GenomicScoreImplementation._merge_min_max, [score_ids, *min_max_tasks], min_max_tasks, ) return min_max_tasks, merge_task @staticmethod def _do_min_max( resource: GenomicResource, score_ids: list[str], chrom: str, start: int, end: int, ) -> dict[str, MinMaxValue]: impl = build_score_implementation_from_resource(resource) result = { scr_id: MinMaxValue(scr_id) for scr_id in score_ids } with impl.score.open() as score: for _left, _right, rec in score._fetch_region_values( # noqa chrom, start, end, score_ids): for score_index, score_id in enumerate(score_ids): result[score_id].add_value(rec[score_index]) # type: ignore return result @staticmethod def _merge_min_max( score_ids: list[str], *calculate_tasks: dict[str, MinMaxValue], ) -> dict[str, Any]: res: dict[str, MinMaxValue | None] = dict.fromkeys(score_ids) for score_id in score_ids: for min_max_region in calculate_tasks: if res[score_id] is None: res[score_id] = min_max_region[score_id] else: assert res[score_id] is not None res[score_id].merge( # type: ignore min_max_region[score_id]) return res @staticmethod def _update_hist_confs( all_hist_confs: dict[str, HistogramConfig], minmax_task: dict[str, MinMaxValue] | None, ) -> dict[str, HistogramConfig]: if minmax_task is None: return all_hist_confs for score_id, min_max in minmax_task.items(): hist_conf = all_hist_confs[score_id] assert isinstance(hist_conf, NumberHistogramConfig) assert not hist_conf.has_view_range() if np.isnan(min_max.min) or np.isnan(min_max.max): logger.warning( "min/max value for %s not found; " "nullify the histogram", score_id) all_hist_confs[score_id] = NullHistogramConfig( f"min/max for {score_id} not found") else: hist_conf.view_range = (min_max.min, min_max.max) logger.info("histogram configs updated: %s", all_hist_confs) return all_hist_confs def _add_histogram_tasks( self, graph: TaskGraph, all_hist_confs: dict[str, HistogramConfig], minmax_task: Task | None, region_size: int, grr: GenomicResourceRepo | None = None, ) -> tuple[list[Task], Task, Task]: """ Add histogram tasks for specific score id. The histogram tasks are dependant on the provided minmax task. """ regions = self._get_chrom_regions(region_size, grr) update_hist_confs_deps = [] if minmax_task is None else [minmax_task] update_hist_confs = graph.create_task( f"{self.resource.get_full_id()}_update_hist_confs", GenomicScoreImplementation._update_hist_confs, [all_hist_confs, minmax_task], update_hist_confs_deps, ) histogram_tasks = [] for region in regions: chrom = region.chrom start = region.start end = region.stop histogram_tasks.append(graph.create_task( f"{self.resource.get_full_id()}_calculate_histogram_" f"{chrom}_{start}_{end}", GenomicScoreImplementation._do_histogram, [self.resource, update_hist_confs, chrom, start, end], [update_hist_confs], )) merge_task = graph.create_task( f"{self.resource.get_full_id()}_merge_histograms", GenomicScoreImplementation._merge_histograms, [self.resource, update_hist_confs, *histogram_tasks], histogram_tasks, ) save_task = graph.create_task( f"{self.resource.get_full_id()}_save_histograms", GenomicScoreImplementation._save_histograms, [self.resource, merge_task], [merge_task], ) return histogram_tasks, merge_task, save_task @staticmethod def _do_histogram( resource: GenomicResource, all_hist_confs: dict[str, HistogramConfig], chrom: str, start: int, end: int, ) -> dict[str, Histogram]: impl = build_score_implementation_from_resource(resource) result: dict[str, Histogram] = {} logger.info("updated hist confs: %s", all_hist_confs) for score_id, hist_conf in all_hist_confs.items(): if isinstance(hist_conf, NullHistogramConfig): continue result[score_id] = build_empty_histogram(hist_conf) score_ids = list(result.keys()) with impl.score.open() as score: for left, right, rec in score._fetch_region_values( # noqa chrom, start, end, score_ids): for scr_index, scr_id in enumerate(score_ids): try: result[scr_id].add_value( rec[scr_index], right - left + 1) # type: ignore except TypeError as err: logger.exception( "Failed adding value %s to histogram of %s; " "%s:%s-%s", rec[scr_index] if rec else None, resource.resource_id, chrom, start, end) result[scr_id] = NullHistogram( NullHistogramConfig(str(err)), ) except HistogramError as err: logger.warning( "Histogram for %s nullified", scr_id, ) result[scr_id] = NullHistogram( NullHistogramConfig(str(err)), ) return result @staticmethod def _merge_histograms( resource: GenomicResource, all_hist_confs: dict[str, HistogramConfig], *calculated_histograms: dict[str, Any], ) -> dict[str, Histogram]: result: dict[str, Histogram] = {} for score_id, hist_conf in all_hist_confs.items(): result[score_id] = build_empty_histogram(hist_conf) for score_id, hist_conf in all_hist_confs.items(): if isinstance(hist_conf, NullHistogramConfig): continue try: for histogram_region in calculated_histograms: if score_id not in histogram_region: logger.warning( "region has no histogram for score %s in %s", score_id, resource.resource_id) continue hist = histogram_region[score_id] if isinstance(result[score_id], NullHistogram): continue if isinstance(hist, NullHistogram): result[score_id] = NullHistogram(NullHistogramConfig( f"Empty histogram for {score_id} in a region: " f"{hist.reason}")) else: result[score_id].merge(hist) except HistogramError as err: logger.exception( "Histogram for %s nullified", score_id, ) result[score_id] = NullHistogram( NullHistogramConfig(str(err))) return result @staticmethod def _save_histograms( resource: GenomicResource, merged_histograms: dict[str, Histogram], ) -> dict[str, Histogram]: impl = build_score_implementation_from_resource(resource) proto = resource.proto for score_id, score_histogram in merged_histograms.items(): with proto.open_raw_file( resource, impl.score.get_histogram_filename(score_id), mode="wt", ) as outfile: outfile.write(score_histogram.serialize()) if not isinstance(score_histogram, NullHistogram): plot_histogram( resource, impl.score.get_histogram_image_filename(score_id), score_histogram, score_id, impl.score.score_definitions[score_id].small_values_desc, impl.score.score_definitions[score_id].large_values_desc, ) return merged_histograms
[docs] def calc_info_hash(self) -> bytes: """Compute and return the info hash.""" return b"infohash"
[docs] def calc_statistics_hash(self) -> bytes: """ Compute the statistics hash. This hash is used to decide whether the resource statistics should be recomputed. """ manifest = self.resource.get_manifest() return json.dumps({ "config": { "histograms": [ hist_conf.to_dict() for hist_conf in self.get_config_histograms().values() if hist_conf is not None ], "table": { "config": self.score.table.definition, "files_md5": {file_name: manifest[file_name].md5 for file_name in sorted(self.files)}, }, }, "score_config": [ { "id": score_def.score_id, "type": score_def.value_type, "name": score_def.col_name, "index": score_def.col_index, "na_values": str(sorted(score_def.na_values)), } for score_def in self.score.score_definitions.values()], }, indent=2).encode()
[docs] def build_score_implementation_from_resource( resource: GenomicResource, ) -> GenomicScoreImplementation: return GenomicScoreImplementation(resource)
GENOMIC_SCORES_TEMPLATE = """ {% extends base %} {% block content %} <style> .modal { display: none; position: fixed; z-index: 1; padding-top: 100px; left: 0; top: 0; width: 100%; height: 100%; background-color: rgba(0,0,0,0.5); justify-content: center; } .modal-content { margin: auto; display: block; width: 80%; max-width: 700px; } .close { float: right; font-size: 40px; font-weight: bold; } .close:hover, .close:focus { color: #bbb; text-decoration: none; cursor: pointer; } </style> {% block javascript %} <script type="text/javascript"> function openModal(scoreId) { var modal = document.getElementById("modal-" + scoreId); modal.style.display = "flex"; } function closeModal(scoreId) { var modal = document.getElementById("modal-" + scoreId); modal.style.display = "none"; } </script> {% endblock %} {% set impl = data.genomic_scores %} {% set scores = impl.score %} <h1>Scores</h1> <table border="1"> <tr> <th>ID</th> <th>Type</th> <th>Default annotation</th> <th>Description</th> <th>Histogram</th> <th>Range</th> </tr> {%- for score_id, score in scores.score_definitions.items() -%} <tr class="score-definition"> <td>{{ score_id }}</td> <td>{{ score.value_type }}</td> {% set d_atts = scores.get_default_annotation_attribute(score_id) %} <td> <p>{{ d_atts }}</p> </td> <td> <div>{{ score.desc }}</div> {% if score.small_values_desc %} <div style="color: rgb(145,145,145)"> {{ "Small values desc: " + score.small_values_desc }} </div> {% endif %} {% if score.large_values_desc %} <div style="color: rgb(145,145,145)"> {{ "Large values desc: " + score.large_values_desc }} </div> {% endif %} </td> <td> {% set hist = impl.score.get_score_histogram(score_id) %} {%- if hist.type != 'null_histogram' %} {% set hist_image_file = impl.score.get_histogram_image_filename(score_id) %} <img src="{{ hist_image_file }}" alt="{{ "HISTOGRAM FOR " + score_id }}" title={{ score_id | replace(" ", "_") }} width="200" style="cursor: pointer" onclick="openModal(title)"> {%- else -%} <p>No histogram: {{ hist.reason }}</p> {%- endif -%} </td> <td> {%- if hist.type != 'null_histogram' %} {{ hist.values_domain() }} {%- else -%} NO DOMAIN {%- endif -%} </td> </tr> {%- endfor %} {%- for score_id in scores.score_definitions.keys() -%} <div id="modal-{{score_id | replace(" ", "_")}}" class="modal"> <div style="padding: 10px 20px; background-color: #fff; height: fit-content; width: fit-content;"> <span title={{score_id | replace(" ", "_")}} class="close" onclick="closeModal(title)">&times;</span> <img class="modal-content" id="histogram-{{score_id}}" src="{{ impl.score.get_histogram_image_filename(score_id) }}" alt="{{ "HISTOGRAM FOR " + score_id }}" title={{ score_id | replace(" ", "_")}} width="200"> </div> </div> {%- endfor %} </table> {% endblock %} """ # noqa: E501