Source code for dae.effect_annotation.annotator

from __future__ import annotations

import logging

from dae.annotation.annotatable import Annotatable
from dae.genomic_resources.gene_models import GeneModels, TranscriptModel
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.utils.regions import Region

from .annotation_request import (
    AnnotationRequest,
    NegativeStrandAnnotationRequest,
    PositiveStrandAnnotationRequest,
)
from .effect import AnnotationEffect, EffectFactory
from .effect_checkers.coding import CodingEffectChecker
from .effect_checkers.effect_checker import EffectChecker
from .effect_checkers.frame_shift import FrameShiftEffectChecker
from .effect_checkers.intron import IntronicEffectChecker
from .effect_checkers.promoter import PromoterEffectChecker
from .effect_checkers.protein_change import ProteinChangeEffectChecker
from .effect_checkers.splice_site import SpliceSiteEffectChecker
from .effect_checkers.start_loss import StartLossEffectChecker
from .effect_checkers.stop_loss import StopLossEffectChecker
from .effect_checkers.utr import UTREffectChecker
from .gene_codes import NuclearCode
from .variant import Variant

logger = logging.getLogger(__name__)


[docs] class AnnotationRequestFactory: """Factory for annotation requests."""
[docs] @staticmethod def create_annotation_request( annotator: EffectAnnotator, variant: Variant, transcript_model: TranscriptModel) -> AnnotationRequest: """Create an annotation request.""" if transcript_model.strand == "+": return PositiveStrandAnnotationRequest( annotator.reference_genome, annotator.code, annotator.promoter_len, variant, transcript_model, ) return NegativeStrandAnnotationRequest( annotator.reference_genome, annotator.code, annotator.promoter_len, variant, transcript_model, )
[docs] class EffectAnnotator: """Predicts variant effect.""" def __init__( self, reference_genome: ReferenceGenome, gene_models: GeneModels, code: NuclearCode = NuclearCode(), promoter_len: int = 0): self.reference_genome = reference_genome self.gene_models = gene_models self.code = code self.promoter_len = promoter_len self.effects_checkers: list[EffectChecker] = [ PromoterEffectChecker(), CodingEffectChecker(), SpliceSiteEffectChecker(), StartLossEffectChecker(), StopLossEffectChecker(), FrameShiftEffectChecker(), ProteinChangeEffectChecker(), UTREffectChecker(), IntronicEffectChecker(), ]
[docs] def get_effect_for_transcript( self, variant: Variant, transcript_model: TranscriptModel, ) -> AnnotationEffect | None: """Calculate effect for a given transcript.""" request = AnnotationRequestFactory.create_annotation_request( self, variant, transcript_model, ) for effect_checker in self.effects_checkers: effect = effect_checker.get_effect(request) if effect is not None: return effect return None
[docs] def annotate_cnv( self, chrom: str, pos_start: int, pos_end: int, variant_type: Annotatable.Type, ) -> list[AnnotationEffect]: """Annotate a CNV variant.""" if self.gene_models.utr_models is None: raise ValueError("bad gene models") if variant_type == Annotatable.Type.LARGE_DUPLICATION: effect_type = "CNV+" elif variant_type == Annotatable.Type.LARGE_DELETION: effect_type = "CNV-" else: raise ValueError( f"unexpected variant type: {variant_type}") assert effect_type is not None return self.annotate_region( chrom, pos_start, pos_end, effect_type=effect_type)
[docs] def annotate_region( self, chrom: str, pos_start: int, pos_end: int, effect_type: str = "unknown", ) -> list[AnnotationEffect]: """Annotate a region or position.""" if self.gene_models.utr_models is None: raise ValueError("bad gene models") region = Region(chrom, pos_start, pos_end) effects = [] length = pos_end - pos_start + 1 for (start, stop), tms in \ self.gene_models.utr_models[chrom].items(): if region.intersection( Region(chrom, start, stop)): for transcript_model in tms: effect = EffectFactory.create_effect_with_tm( effect_type, transcript_model) effect.length = length effects.append(effect) if len(effects) == 0: effect = EffectFactory.create_effect(effect_type) effect.length = pos_end - pos_start + 1 effects.append(effect) return effects
[docs] def annotate(self, variant: Variant) -> list[AnnotationEffect]: """Annotate effects for a variant.""" if self.gene_models.utr_models is None: raise ValueError("bad gene models") if variant.variant_type in { Annotatable.Type.LARGE_DUPLICATION, Annotatable.Type.LARGE_DELETION}: assert variant.length is not None return self.annotate_cnv( variant.chromosome, variant.position, variant.position + variant.length, variant.variant_type) effects = [] if variant.chromosome not in self.gene_models.utr_models: effects.append(EffectFactory.create_effect("intergenic")) return effects for key in self.gene_models.utr_models[variant.chromosome]: if ( variant.position <= key[1] + self.promoter_len and variant.ref_position_last >= key[0] - self.promoter_len ): for transcript_model in self.gene_models\ .utr_models[variant.chromosome][key]: effect = self.get_effect_for_transcript( variant, transcript_model) if effect is not None: effects.append(effect) if len(effects) == 0: effects.append(EffectFactory.create_effect("intergenic")) return effects
[docs] def do_annotate_variant( self, chrom: str | None = None, pos: int | None = None, location: str | None = None, variant: str | None = None, ref: str | None = None, alt: str | None = None, length: int | None = None, seq: str | None = None, variant_type: Annotatable.Type | None = None, ) -> list[AnnotationEffect]: """Annotate effects for a variant.""" if variant in {"CNV+", "CNV-"}: if variant == "CNV+": cnv_type = Annotatable.Type.LARGE_DUPLICATION elif variant == "CNV-": cnv_type = Annotatable.Type.LARGE_DELETION else: raise ValueError( f"Unexpected CNV variant type: {variant_type}") if chrom is not None and pos is not None and length is not None: return self.annotate_cnv( chrom, pos, pos + length, cnv_type, ) if location is not None: chrom, beg_end = location.split(":") pos_beg, pos_end = beg_end.split("-") return self.annotate_cnv( chrom, int(pos_beg), int(pos_end), cnv_type) raise ValueError("unexpected CNV variant description") return self.annotate(Variant( chrom, pos, location, variant, ref, alt, length, seq, variant_type))
[docs] def annotate_allele( self, chrom: str, pos: int, ref: str, alt: str, length: int | None = None, seq: str | None = None, variant_type: str | None = None, ) -> list[AnnotationEffect]: """Annotate effects for a variant.""" variant = Variant( chrom, pos, None, None, ref, alt, length, seq, variant_type, ) return self.annotate(variant)
[docs] @staticmethod def annotate_variant( gm: GeneModels, reference_genome: ReferenceGenome, chrom: str | None = None, position: int | None = None, location: str | None = None, variant: str | None = None, ref: str | None = None, alt: str | None = None, length: int | None = None, seq: str | None = None, variant_type: Annotatable.Type | None = None, promoter_len: int = 0, ) -> list[AnnotationEffect]: """Build effect annotator and annotate a variant.""" # pylint: disable=too-many-arguments annotator = EffectAnnotator( reference_genome, gm, promoter_len=promoter_len) effects = annotator.do_annotate_variant( chrom, position, location, variant, ref, alt, length, seq, variant_type, ) desc = AnnotationEffect.effects_description(effects) logger.debug("effect: %s", desc) return effects
# @classmethod # def effect_description1(cls, E): # if E[0].effect == "unk_chr": # return ("unk_chr", "unk_chr", "unk_chr") # effect_type = [] # effect_gene = [] # effect_details = [] # D = {} # [D.setdefault( # AnnotationEffect.Severity[i.effect], []).append(i) for i in E] # set_worst_effect = False # for key in sorted(D, key=int, reverse=True): # if set_worst_effect is False: # effect_type = [D[key][0].effect] # set_worst_effect = True # if effect_type == "intergenic": # return ("intergenic", "intergenic", "intergenic") # if effect_type == "no-mutation": # return ("no-mutation", "no-mutation", "no-mutation") # G = {} # [G.setdefault(i.gene, []).append(i) for i in D[key]] # for gene in G: # for v in G[gene]: # effect_details.append(v.create_effect_details()) # if gene is not None: # gene_str = str(gene) # else: # gene_str = "" # effect_gene.append(gene_str + ":" + G[gene][0].effect) # return (effect_type, effect_gene, effect_details) # @classmethod # def effect_description(cls, E): # effect_type, effect_gene, effect_details = cls.effect_simplify(E) # if isinstance(effect_gene, list): # effect_gene = "|".join([":".join(eg) for eg in effect_gene]) # if isinstance(effect_details, list): # effect_details = "|".join( # [";".join([e for e in ed]) for ed in effect_details] # ) # return (effect_type, effect_gene, effect_details) # @classmethod # def effect_simplify(cls, E): # if E[0].effect == "unk_chr": # return ("unk_chr", "unk_chr", "unk_chr") # effect_type = "" # effect_gene = [] # effect_details = [] # D = {} # [D.setdefault(AnnotationEffect.Severity[i.effect], []).append(i) # for i in E] # set_worst_effect = False # for key in sorted(D, key=int, reverse=True): # if set_worst_effect is False: # effect_type = D[key][0].effect # set_worst_effect = True # if effect_type == "intergenic": # return ( # "intergenic", # [("intergenic", "intergenic")], # "intergenic", # ) # if effect_type == "no-mutation": # return ("no-mutation", "no-mutation", "no-mutation") # G = {} # [G.setdefault(i.gene, []).append(i) for i in D[key]] # effect_detail = [] # for gene in G: # for v in G[gene]: # effect_detail.append(v.create_effect_details()) # effect_gene.append((gene, G[gene][0].effect)) # effect_details.append(effect_detail) # return (effect_type, effect_gene, effect_details)