Source code for dae.effect_annotation.annotator

from __future__ import annotations

import logging

from dae.annotation.annotatable import Annotatable
from dae.genomic_resources.gene_models import GeneModels, TranscriptModel
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.utils.regions import Region

from .annotation_request import (
    AnnotationRequest,
    NegativeStrandAnnotationRequest,
    PositiveStrandAnnotationRequest,
)
from .effect import AnnotationEffect, EffectFactory
from .effect_checkers.coding import CodingEffectChecker
from .effect_checkers.effect_checker import EffectChecker
from .effect_checkers.frame_shift import FrameShiftEffectChecker
from .effect_checkers.intron import IntronicEffectChecker
from .effect_checkers.promoter import PromoterEffectChecker
from .effect_checkers.protein_change import ProteinChangeEffectChecker
from .effect_checkers.splice_site import SpliceSiteEffectChecker
from .effect_checkers.start_loss import StartLossEffectChecker
from .effect_checkers.stop_loss import StopLossEffectChecker
from .effect_checkers.utr import UTREffectChecker
from .gene_codes import NuclearCode
from .variant import Variant

logger = logging.getLogger(__name__)


[docs] class AnnotationRequestFactory: """Factory for annotation requests."""
[docs] @staticmethod def create_annotation_request( annotator: EffectAnnotator, variant: Variant, transcript_model: TranscriptModel) -> AnnotationRequest: """Create an annotation request.""" assert annotator.reference_genome is not None if transcript_model.strand == "+": return PositiveStrandAnnotationRequest( annotator.reference_genome, annotator.code, annotator.promoter_len, variant, transcript_model, ) return NegativeStrandAnnotationRequest( annotator.reference_genome, annotator.code, annotator.promoter_len, variant, transcript_model, )
[docs] class EffectAnnotator: """Predicts variant effect.""" def __init__( self, reference_genome: ReferenceGenome, gene_models: GeneModels, code: NuclearCode | None = None, promoter_len: int = 0): if code is None: code = NuclearCode() self.reference_genome = reference_genome self.gene_models = gene_models self.code = code self.promoter_len = promoter_len self.effects_checkers: list[EffectChecker] = [ PromoterEffectChecker(), CodingEffectChecker(), SpliceSiteEffectChecker(), StartLossEffectChecker(), StopLossEffectChecker(), FrameShiftEffectChecker(), ProteinChangeEffectChecker(), UTREffectChecker(), IntronicEffectChecker(), ]
[docs] def close(self) -> None: self.reference_genome = None # type: ignore self.gene_models = None # type: ignore
[docs] def get_effect_for_transcript( self, variant: Variant, transcript_model: TranscriptModel, ) -> AnnotationEffect | None: """Calculate effect for a given transcript.""" request = AnnotationRequestFactory.create_annotation_request( self, variant, transcript_model, ) for effect_checker in self.effects_checkers: effect = effect_checker.get_effect(request) if effect is not None: return effect return None
[docs] def annotate_cnv( self, chrom: str, pos_start: int, pos_end: int, variant_type: Annotatable.Type, ) -> list[AnnotationEffect]: """Annotate a CNV variant.""" assert self.gene_models is not None if self.gene_models.utr_models is None: raise ValueError("bad gene models") if variant_type == Annotatable.Type.LARGE_DUPLICATION: effect_type = "CNV+" elif variant_type == Annotatable.Type.LARGE_DELETION: effect_type = "CNV-" else: raise ValueError( f"unexpected variant type: {variant_type}") assert effect_type is not None return self.annotate_region( chrom, pos_start, pos_end, effect_type=effect_type)
[docs] def annotate_region( self, chrom: str, pos_start: int, pos_end: int, effect_type: str = "unknown", ) -> list[AnnotationEffect]: """Annotate a region or position.""" assert self.gene_models is not None if self.gene_models.utr_models is None: raise ValueError("bad gene models") region = Region(chrom, pos_start, pos_end) effects = [] length = pos_end - pos_start + 1 for (start, stop), tms in \ self.gene_models.utr_models[chrom].items(): if region.intersection( Region(chrom, start, stop)): for transcript_model in tms: effect = EffectFactory.create_effect_with_tm( effect_type, transcript_model) effect.length = length effects.append(effect) if len(effects) == 0: effect = EffectFactory.create_effect(effect_type) effect.length = pos_end - pos_start + 1 effects.append(effect) return effects
[docs] def annotate(self, variant: Variant) -> list[AnnotationEffect]: """Annotate effects for a variant.""" assert self.gene_models is not None if self.gene_models.utr_models is None: raise ValueError("bad gene models") if variant.variant_type in { Annotatable.Type.LARGE_DUPLICATION, Annotatable.Type.LARGE_DELETION}: assert variant.length is not None return self.annotate_cnv( variant.chromosome, variant.position, variant.position + variant.length, variant.variant_type) effects = [] if variant.chromosome not in self.gene_models.utr_models: effects.append(EffectFactory.create_effect("intergenic")) return effects assert variant.ref_position_last is not None for key in self.gene_models.utr_models[variant.chromosome]: if ( variant.position <= key[1] + self.promoter_len and variant.ref_position_last >= key[0] - self.promoter_len ): for transcript_model in self.gene_models\ .utr_models[variant.chromosome][key]: effect = self.get_effect_for_transcript( variant, transcript_model) if effect is not None: effects.append(effect) if len(effects) == 0: effects.append(EffectFactory.create_effect("intergenic")) return effects
[docs] def do_annotate_variant( self, *, chrom: str | None = None, pos: int | None = None, location: str | None = None, variant: str | None = None, ref: str | None = None, alt: str | None = None, length: int | None = None, seq: str | None = None, variant_type: Annotatable.Type | None = None, ) -> list[AnnotationEffect]: """Annotate effects for a variant.""" if variant in {"CNV+", "CNV-"}: if variant == "CNV+": cnv_type = Annotatable.Type.LARGE_DUPLICATION elif variant == "CNV-": cnv_type = Annotatable.Type.LARGE_DELETION else: raise ValueError( f"Unexpected CNV variant type: {variant_type}") if chrom is not None and pos is not None and length is not None: return self.annotate_cnv( chrom, pos, pos + length, cnv_type, ) if location is not None: chrom, beg_end = location.split(":") pos_beg, pos_end = beg_end.split("-") return self.annotate_cnv( chrom, int(pos_beg), int(pos_end), cnv_type) raise ValueError("unexpected CNV variant description") return self.annotate(Variant( chrom, pos, loc=location, var=variant, ref=ref, alt=alt, length=length, seq=seq, variant_type=variant_type))
[docs] def annotate_allele( self, *, chrom: str, pos: int, ref: str, alt: str, length: int | None = None, seq: str | None = None, variant_type: Annotatable.Type | None = None, ) -> list[AnnotationEffect]: """Annotate effects for a variant.""" variant = Variant( chrom, pos, ref=ref, alt=alt, length=length, seq=seq, variant_type=variant_type, ) return self.annotate(variant)
[docs] @staticmethod def annotate_variant( gm: GeneModels, reference_genome: ReferenceGenome, *, chrom: str | None = None, pos: int | None = None, location: str | None = None, variant: str | None = None, ref: str | None = None, alt: str | None = None, length: int | None = None, seq: str | None = None, variant_type: Annotatable.Type | None = None, promoter_len: int = 0, ) -> list[AnnotationEffect]: """Build effect annotator and annotate a variant.""" # pylint: disable=too-many-arguments annotator = EffectAnnotator( reference_genome, gm, promoter_len=promoter_len) effects = annotator.do_annotate_variant( chrom=chrom, pos=pos, location=location, variant=variant, ref=ref, alt=alt, length=length, seq=seq, variant_type=variant_type, ) desc = AnnotationEffect.effects_description(effects) logger.debug("effect: %s", desc) return effects