Source code for dae.enrichment_tool.gene_weights_background

from __future__ import annotations

import logging
from collections.abc import Iterable
from functools import cached_property
from typing import Any, cast

import pandas as pd
from scipy import stats

from dae.enrichment_tool.base_enrichment_background import (
    BaseEnrichmentBackground,
    BaseEnrichmentResourceBackground,
)
from dae.enrichment_tool.event_counters import (
    EnrichmentResult,
    EnrichmentSingleResult,
    EventCountersResult,
)
from dae.gene_scores.implementations.gene_scores_impl import (
    build_gene_score_from_resource,
)
from dae.genomic_resources.repository import GenomicResource

logger = logging.getLogger(__name__)


[docs] class GeneWeightsEnrichmentBackground(BaseEnrichmentResourceBackground): """Provides class for gene weights enrichment background model.""" def __init__(self, resource: GenomicResource): if resource.get_type() != "gene_weights_enrichment_background": raise ValueError( f"unexpected enrichment background resource type: " f"<{resource.get_type()}> for resource " f"<{resource.resource_id}>") super().__init__(resource) self._total: float | None = None self._gene_weights: dict[str, float] | None = None
[docs] def is_loaded(self) -> bool: return self._total is not None and self._gene_weights is not None
[docs] def load(self) -> None: """Load enrichment background model.""" if self.is_loaded(): logger.info( "loading already loaded enrichment background model: %s", self.resource.resource_id) return filename = self.config["filename"] compression = False if filename.endswith(".gz"): compression = True with self.resource.open_raw_file( filename, mode="rt", compression=compression) as infile: df = pd.read_csv(infile, sep="\t") self._gene_weights = {} for row in df.iterrows(): self._gene_weights[row[1]["gene"]] = \ float(row[1]["gene_weight"]) self._total = float(df.gene_weight.sum())
[docs] def genes_weight(self, genes: Iterable[str]) -> float: assert self._gene_weights is not None result = 0.0 for gene in genes: result += self._gene_weights.get(gene.upper(), 0.0) return result
[docs] def genes_prob(self, genes: Iterable[str]) -> float: assert self._total is not None return self.genes_weight(genes) / self._total
[docs] @staticmethod def calc_expected_observed_pvalue( events_prob: float, events_count: int, observed: int, ) -> tuple[float, float]: """Calculate expected event count and binomtest p-value.""" expected = events_count * events_prob if observed == 0: return expected, 1.0 assert observed >= 1 binom = stats.binomtest(observed, events_count, events_prob) return expected, binom.pvalue
[docs] def calc_enrichment_test( self, events_counts: EventCountersResult, overlapped_counts: EventCountersResult, gene_set: Iterable[str], **kwargs: Any, # noqa: ARG002 ) -> EnrichmentResult: """Calculate enrichment statistics.""" gene_syms = {gs.upper() for gs in gene_set} events_prob = self.genes_prob(gene_syms) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.all, overlapped_counts.all) all_result = EnrichmentSingleResult( "all", events_counts.all, overlapped_counts.all, expected, pvalue) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.rec, overlapped_counts.rec) rec_result = EnrichmentSingleResult( "rec", events_counts.rec, overlapped_counts.rec, expected, pvalue, overlapped_counts.rec_genes) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.male, overlapped_counts.male) male_result = EnrichmentSingleResult( "male", events_counts.male, overlapped_counts.male, expected, pvalue) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.female, overlapped_counts.female) female_result = EnrichmentSingleResult( "female", events_counts.female, overlapped_counts.female, expected, pvalue) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.unspecified, overlapped_counts.unspecified) unspecified_result = EnrichmentSingleResult( "unspecified", events_counts.unspecified, overlapped_counts.unspecified, expected, pvalue) return EnrichmentResult( all_result, rec_result, male_result, female_result, unspecified_result, )
[docs] class GeneScoreEnrichmentBackground(BaseEnrichmentBackground): """Provides class for gene weights enrichment background model.""" def __init__(self, resource: GenomicResource): super().__init__(resource) if resource.get_type() != "gene_score": raise ValueError( f"unexpected enrichment background resource type: " f"<{resource.get_type()}> for resource " f"<{resource.resource_id}>") self.resource = resource self.gene_score = build_gene_score_from_resource(self.resource) if len(self.gene_score.score_definitions) != 1: raise ValueError( f"gene score is expected to have one score included; " f"found {len(self.gene_score.score_definitions)} scores") self._total: float | None = None self._gene_weights: dict[str, float] | None = None @property def filename(self) -> str: return cast(str, self.gene_score.config["filename"]) @cached_property def score_id(self) -> str: score_ids = self.gene_score.get_all_scores() assert len(score_ids) == 1 return score_ids[0] @cached_property def name(self) -> str: return self.gene_score.score_definitions[self.score_id].name @property def resource_id(self) -> str: return self.resource.resource_id @property def background_id(self) -> str: return self.resource.resource_id @property def background_type(self) -> str: return self.resource.get_type()
[docs] def is_loaded(self) -> bool: return self._total is not None and self._gene_weights is not None
[docs] def load(self) -> None: """Load enrichment background model.""" if self.is_loaded(): logger.info( "loading already loaded enrichment background model: %s", self.resource.resource_id) return self._gene_weights = {} score_id = self.score_id for row in self.gene_score.df.iterrows(): self._gene_weights[row[1]["gene"]] = \ float(row[1][score_id]) self._total = float(self.gene_score.df[score_id].sum())
[docs] def genes_weight(self, genes: Iterable[str]) -> float: assert self._gene_weights is not None result = 0.0 for gene in genes: result += self._gene_weights.get(gene.upper(), 0.0) return result
[docs] def genes_prob(self, genes: Iterable[str]) -> float: assert self._total is not None return self.genes_weight(genes) / self._total
[docs] @staticmethod def calc_expected_observed_pvalue( events_prob: float, events_count: int, observed: int, ) -> tuple[float, float]: """Calculate expected event count and binomtest p-value.""" expected = events_count * events_prob if observed == 0: return expected, 1.0 assert observed >= 1 binom = stats.binomtest(observed, events_count, events_prob) return expected, binom.pvalue
[docs] def calc_enrichment_test( self, events_counts: EventCountersResult, overlapped_counts: EventCountersResult, gene_set: Iterable[str], **kwargs: Any, # noqa: ARG002 ) -> EnrichmentResult: """Calculate enrichment statistics.""" gene_syms = {gs.upper() for gs in gene_set} events_prob = self.genes_prob(gene_syms) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.all, overlapped_counts.all) all_result = EnrichmentSingleResult( "all", events_counts.all, overlapped_counts.all, expected, pvalue) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.rec, overlapped_counts.rec) rec_result = EnrichmentSingleResult( "rec", events_counts.rec, overlapped_counts.rec, expected, pvalue, overlapped_counts.rec_genes) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.male, overlapped_counts.male) male_result = EnrichmentSingleResult( "male", events_counts.male, overlapped_counts.male, expected, pvalue) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.female, overlapped_counts.female) female_result = EnrichmentSingleResult( "female", events_counts.female, overlapped_counts.female, expected, pvalue) expected, pvalue = self.calc_expected_observed_pvalue( events_prob, events_counts.unspecified, overlapped_counts.unspecified) unspecified_result = EnrichmentSingleResult( "unspecified", events_counts.unspecified, overlapped_counts.unspecified, expected, pvalue) return EnrichmentResult( all_result, rec_result, male_result, female_result, unspecified_result, )