Source code for dae.genomic_resources.histogram

"""Handling of genomic scores statistics.

Currently we support only genomic scores histograms.
"""
from __future__ import annotations

import importlib
import json
import logging
import pathlib
import sys
from collections import Counter
from dataclasses import dataclass
from typing import IO, Any

import numpy as np
import yaml

from dae.genomic_resources.repository import GenomicResource
from dae.genomic_resources.statistics.base_statistic import Statistic
from dae.genomic_resources.statistics.min_max import MinMaxValue

logger = logging.getLogger(__name__)


[docs] class HistogramError(BaseException): """ Class used for histogram specific errors. Histograms should be nullified when a HistogramError occurs. """
[docs] @dataclass class NumberHistogramConfig: """Configuration class for number histograms.""" view_range: tuple[float | None, float | None] number_of_bins: int = 100 x_log_scale: bool = False y_log_scale: bool = False x_min_log: float | None = None plot_function: str | None = None
[docs] def has_view_range(self) -> bool: return self.view_range[0] is not None and \ self.view_range[1] is not None
[docs] def to_dict(self) -> dict[str, Any]: """Transform number histogram config to dict.""" result = { "type": "number", "view_range": { "min": self.view_range[0], "max": self.view_range[1], }, "number_of_bins": self.number_of_bins, "x_log_scale": self.x_log_scale, "y_log_scale": self.y_log_scale, "x_min_log": self.x_min_log, } if self.plot_function is not None: result["plot_function"] = self.plot_function return result
[docs] @staticmethod def from_dict(parsed: dict[str, Any]) -> NumberHistogramConfig: """Build a number histogram config from a parsed yaml file.""" hist_type = parsed.get("type") if hist_type != "number": logger.error( "Invalid configuration type (%s)" " for number histogram!\n%s", hist_type, parsed, ) raise TypeError( "Invalid configuration for number histogram!\n" f"{parsed}", ) yaml_range = parsed.get("view_range", {}) x_min = yaml_range.get("min", None) x_max = yaml_range.get("max", None) view_range = (x_min, x_max) number_of_bins = parsed.get("number_of_bins", 100) x_log_scale = parsed.get("x_log_scale", False) y_log_scale = parsed.get("y_log_scale", False) x_min_log = parsed.get("x_min_log") plot_function = parsed.get("plot_function") return NumberHistogramConfig( view_range, number_of_bins, x_log_scale, y_log_scale, x_min_log, plot_function=plot_function, )
[docs] @staticmethod def default_config( min_max: MinMaxValue | None, ) -> NumberHistogramConfig: """Build a number histogram config from a parsed yaml file.""" if min_max is None: view_range: tuple[float | None, float | None] = (None, None) elif min_max.min == min_max.max: view_range = (min_max.min, min_max.min + 1.0) else: view_range = (min_max.min, min_max.max) number_of_bins = 100 x_log_scale = False y_log_scale = False return NumberHistogramConfig( view_range, number_of_bins, x_log_scale, y_log_scale)
DEFAULT_DISPLAYED_VALUES_COUNT = 20
[docs] @dataclass class CategoricalHistogramConfig: """Configuration class for categorical histograms.""" displayed_values_count: int | None = 20 displayed_values_percent: float | None = None value_order: list[str | int] | None = None y_log_scale: bool = False plot_function: str | None = None enforce_type: bool = True
[docs] def to_dict(self) -> dict[str, Any]: """Transform categorical histogram config to dict.""" result: dict[str, Any] = { "type": "categorical", "value_order": self.value_order, "y_log_scale": self.y_log_scale, } if self.displayed_values_count != DEFAULT_DISPLAYED_VALUES_COUNT: result["displayed_values_count"] = self.displayed_values_count if self.displayed_values_percent is not None: result["displayed_values_percent"] = \ self.displayed_values_percent if self.plot_function is not None: result["plot_function"] = self.plot_function return result
[docs] @staticmethod def default_config() -> CategoricalHistogramConfig: return CategoricalHistogramConfig(enforce_type=False)
[docs] @staticmethod def from_dict(parsed: dict[str, Any]) -> CategoricalHistogramConfig: """Create categorical histogram config from configuratin dict.""" hist_type = parsed.get("type") if hist_type != "categorical": raise TypeError( "Invalid configuration type for categorical histogram!\n" f"{parsed}", ) displayed_values_count = parsed.get( "displayed_values_count") displayed_values_percent = parsed.get( "displayed_values_percent") if displayed_values_count is not None \ and displayed_values_percent is not None: raise ValueError( "Invalid configuration for categorical histogram: " "displayed_values_count and displayed_values_percent " "cannot be both set\n" f"{parsed}", ) if displayed_values_percent is None and displayed_values_count is None: displayed_values_count = DEFAULT_DISPLAYED_VALUES_COUNT value_order = parsed.get("value_order", []) y_log_scale = parsed.get("y_log_scale", False) plot_function = parsed.get("plot_function") return CategoricalHistogramConfig( displayed_values_count=displayed_values_count, displayed_values_percent=displayed_values_percent, value_order=value_order, y_log_scale=y_log_scale, plot_function=plot_function, enforce_type=True, )
[docs] @dataclass class NullHistogramConfig: """Configuration class for null histograms.""" reason: str
[docs] def to_dict(self) -> dict[str, Any]: return { "type": "null", "reason": self.reason, }
[docs] @staticmethod def default_config() -> NullHistogramConfig: return NullHistogramConfig("Unspecified reason")
[docs] @staticmethod def from_dict(parsed: dict[str, Any]) -> NullHistogramConfig: """Create Null histogram from configuration dict.""" hist_type = parsed.get("type") if hist_type != "null": raise TypeError( "Invalid configuration type for null histogram!\n" f"{parsed}", ) reason = parsed.get("reason", "Unspecified reason") return NullHistogramConfig( reason=reason, )
[docs] class NumberHistogram(Statistic): """Class to represent a histogram.""" type = "number_histogram" def __init__( self, config: NumberHistogramConfig, bins: np.ndarray | None = None, bars: np.ndarray | None = None): super().__init__("histogram", "Collects values for histogram.") logger.debug("number histogram config: %s", config) assert isinstance(config, NumberHistogramConfig) self.config = config self.out_of_range_values: list[float] = [] self.out_of_range_bins: list[int] = [0, 0] self.min_value: float = np.nan self.max_value: float = np.nan self.choose_bin_index = self.choose_bin_lin if self.config.x_log_scale: self.choose_bin_index = self.choose_bin_log if self.config.x_log_scale and self.config.x_min_log is None: raise ValueError( "Invalid histogram configuration, missing x_min_log", ) if self.config.view_range[0] is None or \ self.config.view_range[1] is None or \ np.isnan(self.config.view_range[0]) or \ np.isnan(self.config.view_range[1]): logger.error( "unexpected min/max value: [%s, %s]", self.config.view_range[0], self.config.view_range[1]) raise ValueError( "unexpected min/max value:" f"[{self.config.view_range[0]}, " f"{self.config.view_range[1]}]") self.view_range: tuple[float, float] = ( self.config.view_range[0], self.config.view_range[1]) if bins is not None and bars is not None: self.bins = bins self.bars = bars elif bins is None and bars is None: if self.config.x_log_scale: assert self.config.x_min_log is not None self.bins = np.array([ self.config.view_range[0], * np.logspace( np.log10(self.config.x_min_log), np.log10(self.config.view_range[1]), self.config.number_of_bins, )]) self._rstep = (self.config.number_of_bins - 1) / \ (np.log10(self.view_max()) - np.log10(self.config.x_min_log)) else: self.bins = np.linspace( self.config.view_range[0], self.config.view_range[1], self.config.number_of_bins + 1, ) self._rstep = self.config.number_of_bins / \ (self.view_max() - self.view_min()) self.bars = np.zeros(self.config.number_of_bins, dtype=np.int64) assert not np.any(np.isnan(self.bins)), ("nan bins", self.config) elif self.bins is None or self.bars is None: raise ValueError( "Cannot instantiate histogram with only bins or only bars!", )
[docs] def view_min(self) -> float: return self.view_range[0]
[docs] def view_max(self) -> float: return self.view_range[1]
[docs] def merge(self, other: Statistic) -> None: """Merge two histograms.""" assert isinstance(other, NumberHistogram) assert self.bins is not None assert self.bars is not None assert other.bins is not None assert other.bars is not None assert np.allclose(self.bins, other.bins, rtol=1e-5), \ (self.bins, other.bins) self.bars += other.bars self.out_of_range_bins[0] += other.out_of_range_bins[0] self.out_of_range_bins[1] += other.out_of_range_bins[1] if np.isnan(self.min_value): self.min_value = min(other.min_value, self.min_value) else: self.min_value = min(self.min_value, other.min_value) if np.isnan(self.max_value): self.max_value = max(other.max_value, self.max_value) else: self.max_value = max(self.max_value, other.max_value)
[docs] def values_domain(self) -> str: return f"[{self.min_value:0.3f}, {self.max_value:0.3f}]"
[docs] def add_value(self, value: float | None, count: int = 1) -> None: """Add value to the histogram.""" if value is None or np.isnan(value): return if not isinstance(value, (int, float, np.integer)): raise TypeError( "Cannot add non numerical value " f"{value} ({type(value)}) to number histogram", ) self.min_value = min(value, self.min_value) self.max_value = max(value, self.max_value) index = self.choose_bin_index(value) if index < 0: logger.warning( "out of range %s value %s", self.view_range, value) tindex = index + 2 self.out_of_range_bins[tindex] += count return self.bars[index] += count
[docs] def choose_bin_lin(self, value: float) -> int: """Compute bin index for a passed value for linear x-scale.""" if value < self.view_min(): return -2 if value > self.view_max(): return -1 index = int((value - self.view_min()) * self._rstep) return min(index, self.config.number_of_bins - 1)
[docs] def choose_bin_log(self, value: float) -> int: """Compute bin index for a passed value for log x-scale.""" assert self.config.x_log_scale assert self.config.x_min_log is not None if value < self.view_min(): return -2 if value > self.view_max(): return -1 if value < self.config.x_min_log: return 0 index = int( (np.log10(value) - np.log10(self.config.x_min_log)) * self._rstep) + 1 return min(index, self.config.number_of_bins - 1)
[docs] def to_dict(self) -> dict[str, Any]: return { "config": self.config.to_dict(), "bins": self.bins.tolist(), "bars": self.bars.tolist(), "out_of_range_bins": self.out_of_range_bins, "min_value": float(self.min_value), "max_value": float(self.max_value), }
[docs] def serialize(self) -> str: return json.dumps(self.to_dict(), indent=2)
[docs] def plot( self, outfile: IO, score_id: str, small_values_description: str | None = None, large_values_description: str | None = None, ) -> None: """Plot histogram and save it into outfile.""" # pylint: disable=import-outside-toplevel import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt width = self.bins[1:] - self.bins[:-1] fig, ax = plt.subplots() ax.bar( x=self.bins[:-1], height=self.bars, log=self.config.y_log_scale, width=width, align="edge") if self.config.x_log_scale: plt.xscale("log") if small_values_description is not None and \ large_values_description is not None: sec = ax.secondary_xaxis(location=0) sec.set_ticks( [ self.bins[0], self.bins[-1], ], labels=[ f"\n{small_values_description}", f"\n{large_values_description}", ], wrap=True, color="gray", style="italic", ) plt.xlabel(f"\n{score_id}") plt.ylabel("count") plt.grid(axis="y") plt.grid(axis="x") plt.tight_layout() plt.savefig(outfile) plt.clf() plt.close(fig)
[docs] @staticmethod def from_dict(data: dict[str, Any]) -> NumberHistogram: """Build a number histogram from a dict.""" config = NumberHistogramConfig.from_dict(data["config"]) hist = NumberHistogram( config, bins=np.array(data.get("bins")), bars=np.array(data.get("bars")), ) hist.min_value = data.get("min_value", np.nan) hist.max_value = data.get("max_value", np.nan) hist.out_of_range_bins = data.get("out_of_range_bins", [0, 0]) return hist
[docs] @staticmethod def deserialize(content: str) -> NumberHistogram: data = json.loads(content) return NumberHistogram.from_dict(data)
[docs] class HistogramStatisticMixin: """Mixin for creating statistics classes with histograms."""
[docs] @staticmethod def get_histogram_file(score_id: str) -> str: return f"histogram_{score_id}.yaml"
[docs] @staticmethod def get_histogram_image_file(score_id: str) -> str: return f"histogram_{score_id}.png"
[docs] class NullHistogram(Statistic): """Class for annulled histograms.""" type = "null_histogram" def __init__(self, config: NullHistogramConfig | None) -> None: super().__init__( "null_histogram", "Used for invalid/annulled histograms", ) if config is None: config = NullHistogramConfig.default_config() self.reason = config.reason
[docs] def add_value( self, value: Any, count: int = 1, # noqa: ARG002 ) -> None: # pylint: disable=unused-argument return
[docs] def merge(self, _other: Any) -> None: return
[docs] def to_dict(self) -> dict[str, Any]: return { "config": { "type": "null", "reason": self.reason, }, }
[docs] def values_domain(self) -> str: return "NO DOMAIN"
# pylint: disable=unused-argument
[docs] def plot(self, _outfile: IO, _score_id: str) -> None: return
[docs] def serialize(self) -> str: return json.dumps(self.to_dict(), indent=2)
[docs] @staticmethod def from_dict(data: dict[str, Any]) -> NullHistogram: """Build a null histogram from a dict.""" config = data["config"] hist_type = config.get("type") if hist_type != "null": raise TypeError( f"Invalid configuration type for null histogram!\n{data}", ) reason = config.get("reason", "") return NullHistogram(NullHistogramConfig(reason=reason))
[docs] @staticmethod def deserialize(content: str) -> NullHistogram: data = json.loads(content) return NullHistogram.from_dict(data)
[docs] class CategoricalHistogram(Statistic): """Class for categorical data histograms.""" type = "categorical_histogram" UNIQUE_VALUES_LIMIT = 100 # pylint: disable=too-few-public-methods def __init__( self, config: CategoricalHistogramConfig, counter: dict[str | int, int] | None = None, ): super().__init__( "categorical_histogram", "Collects values for categorical histogram.", ) self.config = config self.enforce_type = config.enforce_type if counter is not None: self._counter = Counter(counter) else: self._counter = Counter() self.y_log_scale = config.y_log_scale
[docs] def add_value( self, value: str | int | None, count: int = 1, ) -> None: """Add a value to the categorical histogram. Returns true if successfully added and false if failed. Will fail if too many values are accumulated. """ if value is None: return if not isinstance(value, str | int): raise TypeError( "Only string or int values can be added categorical histogram; " f"bad <{value}>", ) self._counter[value] += count if not self.enforce_type and \ len(self._counter) > CategoricalHistogram.UNIQUE_VALUES_LIMIT: raise HistogramError( f"Too many unique values {len(self._counter)} " f"for categorical histogram.", )
[docs] def merge(self, other: Statistic) -> None: """Merge with other histogram.""" assert isinstance(other, CategoricalHistogram) assert self.config == other.config # pylint: disable=protected-access self._counter += other._counter # noqa: SLF001 if not self.enforce_type and \ len(self._counter) > CategoricalHistogram.UNIQUE_VALUES_LIMIT: raise HistogramError( f"Can not merge categorical histograms; " f"too many unique values {len(self._counter)}")
@property def raw_values(self) -> dict[str | int, int]: return dict(self._counter) @property def display_values(self) -> dict[str | int, int]: """Return categorical histogram display values in order.""" values = {} if self.config.value_order: for key in self.config.value_order: values[key] = self._counter[key] if len(values) < len(self._counter): raise ValueError( "misconfigured categorical histogram value_order", f"{self.config.value_order} < {self._counter.keys()}") return values if self.config.displayed_values_percent is not None: total = sum(self._counter.values()) displayed = 0 other = 0 displayed_percent = self.config.displayed_values_percent for key, count in self._counter.most_common(): if 100.0 * displayed / total < displayed_percent: values[key] = count displayed += count else: other += count if other > 0: values["Other Values"] = other return values for key, count in self._counter.most_common( n=self.config.displayed_values_count): if key not in values: values[key] = count if self.config.displayed_values_count is not None and \ len(self._counter) > self.config.displayed_values_count: other = 0 for key, count in self._counter.items(): if key not in values: other += count if other > 0: values["Other Values"] = other return values
[docs] def values_domain(self) -> str: return ", ".join(str(k) for k in self.display_values)
[docs] def to_dict(self) -> dict[str, Any]: return { "config": self.config.to_dict(), "values": dict(self._counter), }
[docs] def serialize(self) -> str: return json.dumps(self.to_dict(), indent=2)
[docs] @staticmethod def from_dict(data: dict[str, Any]) -> CategoricalHistogram: config = CategoricalHistogramConfig.from_dict(data["config"]) return CategoricalHistogram(config, data.get("values"))
[docs] @staticmethod def deserialize(content: str) -> CategoricalHistogram: data = json.loads(content) return CategoricalHistogram.from_dict(data)
[docs] def plot( self, outfile: IO, score_id: str, small_values_description: str | None = None, large_values_description: str | None = None, ) -> None: """Plot histogram and save it into outfile.""" # pylint: disable=import-outside-toplevel import matplotlib matplotlib.use("agg") import matplotlib.pyplot as plt display_values = self.display_values values = [str(k) for k in display_values] counts = list(display_values.values()) plt.figure(figsize=(15, 10), tight_layout=True) fig, ax = plt.subplots() ax.bar( x=values, height=counts, tick_label=[str(v) for v in values], log=self.config.y_log_scale, align="center", ) if small_values_description is not None and \ large_values_description is not None: sec = ax.secondary_xaxis(location=0) sec.set_ticks( [ 0, len(values) - 1, ], labels=[ f"\n{small_values_description}", f"\n{large_values_description}", ], wrap=True, color="gray", style="italic", ) plt.xlabel(f"\n{score_id}") plt.ylabel("count") plt.tick_params(axis="x", labelrotation=90) plt.tight_layout() plt.savefig(outfile) plt.clf() plt.close(fig)
[docs] def build_histogram_config( config: dict[str, Any] | None) -> HistogramConfig | None: """Create histogram config form configuration dict.""" if config is None: return None if "histogram" in config: hist_config = config["histogram"] hist_type = hist_config["type"] else: return None if hist_type == "number": return NumberHistogramConfig.from_dict(hist_config) if hist_type == "categorical": return CategoricalHistogramConfig.from_dict(hist_config) if hist_type == "null": return NullHistogramConfig.from_dict(hist_config) return NullHistogramConfig(f"Invalid histogram configuration {config}")
[docs] def build_default_histogram_conf( value_type: str, **kwargs: Any, ) -> NumberHistogramConfig | CategoricalHistogramConfig | NullHistogramConfig: """Build default histogram config for given value type.""" if value_type in ["int", "float"]: min_max = kwargs.get("min_max") return NumberHistogramConfig.default_config(min_max) if value_type == "str": return CategoricalHistogramConfig.default_config() return NullHistogramConfig( "No histogram configured and no default config available for type" f"{value_type}", )
[docs] def build_empty_histogram( config: HistogramConfig, ) -> NumberHistogram | CategoricalHistogram | NullHistogram: """Create an empty histogram from a deserialize histogram dictionary.""" # pylint: disable=broad-except try: if isinstance(config, NumberHistogramConfig): return NumberHistogram(config) if isinstance(config, CategoricalHistogramConfig): return CategoricalHistogram(config) if isinstance(config, NullHistogramConfig): return NullHistogram(config) return NullHistogram(NullHistogramConfig( "Could not match histogram config type", )) except BaseException as err: # noqa: BLE001 logger.warning( "Failed to create empty histogram from config", exc_info=True) return NullHistogram(NullHistogramConfig( f"Failed to create empty histogram from config: {err}"))
[docs] def load_histogram( resource: GenomicResource, filename: str, ) -> Histogram: """Load and return a histogram in a resource. On an error or missing histogram, an appropriate NullHistogram is returned. """ try: with resource.open_raw_file(filename) as infile: content = infile.read() except FileNotFoundError: logger.exception( "unable to load histogram file: %s; file not found", filename) return NullHistogram(NullHistogramConfig( "Histogram file not found.", )) if filename.endswith(".yaml"): hist_data = yaml.safe_load(content) elif filename.endswith(".json"): hist_data = json.loads(content) else: logger.error( "Invalid histogram file format: %s", filename) return NullHistogram(NullHistogramConfig( "Invalid histogram file format.", )) config = hist_data["config"] hist_type = config["type"] try: if hist_type == "number": return NumberHistogram.from_dict(hist_data) if hist_type == "categorical": return CategoricalHistogram.from_dict(hist_data) if hist_type == "null": return NullHistogram.from_dict(hist_data) return NullHistogram(NullHistogramConfig("Invalid histogram type")) except BaseException: # pylint: disable=broad-except logger.exception( "Failed to deserialize histogram from %s", filename, ) return NullHistogram(NullHistogramConfig( "Failed to deserialize histogram.", ))
[docs] def save_histogram( resource: GenomicResource, filename: str, histogram: Histogram, ) -> None: """Save histogram into a resource.""" if not filename.endswith(".json"): logger.error( "Invalid histogram file format: %s; " "histograms are stored in JSON format only", filename) raise ValueError( f"Invalid histogram file format: <{filename}>; " f"histograms are stored in JSON format only", ) with resource.open_raw_file(filename, mode="wt") as outfile: outfile.write(histogram.serialize())
HistogramConfig = \ NullHistogramConfig | CategoricalHistogramConfig | NumberHistogramConfig Histogram = NullHistogram | CategoricalHistogram | NumberHistogram def _import_from_string(module_name: str, source_code: str) -> Any: spec = importlib.util.spec_from_loader(module_name, loader=None) assert spec is not None module = importlib.util.module_from_spec(spec) exec(source_code, module.__dict__) # noqa: S102 pylint: disable=exec-used sys.modules[spec.name] = module return module
[docs] def plot_histogram( res: GenomicResource, image_filename: str, hist: Histogram, score_id: str, small_values_desc: str | None = None, large_values_desc: str | None = None, ) -> None: """Plot histogram and save it into the resource.""" if isinstance(hist, NullHistogram): return if hist.config.plot_function is None: with res.open_raw_file(image_filename, mode="wb") as outfile: hist.plot( outfile, score_id, small_values_desc, large_values_desc, ) return plot_file, plot_function_name = hist.config.plot_function.split(":") with res.open_raw_file( plot_file, mode="rt", ) as srcfile: source_code = srcfile.read() plot_module_name = str( pathlib.Path(res.resource_id) / pathlib.Path(plot_file).with_suffix("")).replace("/", ".") plot_module = _import_from_string( plot_module_name, source_code) func = getattr(plot_module, plot_function_name) with res.open_raw_file( image_filename, mode="wb", ) as outfile: func( outfile, hist, score_id, small_values_desc, large_values_desc, ) return