"""Classes and helpers for variant annotation effects."""
from __future__ import annotations
import itertools
from collections.abc import Iterable
from typing import Any, ClassVar
from dae.effect_annotation.annotation_request import AnnotationRequest
from dae.genomic_resources.gene_models import TranscriptModel
[docs]
class AnnotationEffect: # pylint: disable=too-many-instance-attributes
"""Class to represent variant effect."""
SEVERITY: ClassVar[dict[str, int]] = {
"CNV+": 35,
"CNV-": 35,
"tRNA:ANTICODON": 30,
"all": 24,
"splice-site": 23,
"frame-shift": 22,
"nonsense": 21,
"no-frame-shift-newStop": 20,
"noStart": 19,
"noEnd": 18,
"missense": 17,
"no-frame-shift": 16,
"CDS": 15,
"synonymous": 14,
"coding_unknown": 13,
"regulatory": 12,
"3'UTR": 11,
"5'UTR": 10,
"intron": 9,
"non-coding": 8,
"5'UTR-intron": 7,
"3'UTR-intron": 6,
"promoter": 5,
"non-coding-intron": 4,
"unknown": 3,
"intergenic": 2,
"no-mutation": 1,
}
def __init__(self, effect_name: str) -> None:
self.effect: str = effect_name
self.gene: str | None = None
self.transcript_id: str | None = None
self.strand: str | None = None
self.prot_pos: int | None = None
self.non_coding_pos: int | None = None
self.prot_length: int | None = None
self.length: int | None = None
self.which_intron: int | None = None
self.how_many_introns: int | None = None
self.dist_from_coding: int | None = None
self.aa_change: str | None = None
self.dist_from_acceptor: int | None = None
self.dist_from_donor: int | None = None
self.intron_length: int | None = None
# pylint: disable=invalid-name
self.mRNA_length: int | None = None
self.mRNA_position: int | None = None
self.ref_aa: list[str] | None = None
self.alt_aa: list[str] | None = None
self.dist_from_5utr = None
def __repr__(self) -> str:
return (
f"Effect gene:{self.gene} trID:{self.transcript_id} "
f"strand:{self.strand} effect:{self.effect} "
f"protein pos:{self.prot_pos}/{self.prot_length} "
f"aa: {self.aa_change} "
f"len: {self.length}"
)
[docs]
def create_effect_details(self) -> str:
"""Build effect details."""
eff_data = [
(
[
"CNV+",
"CNV-",
],
str(self.length),
),
(
[
"noStart",
"noEnd",
"CDS",
"all",
],
str(self.prot_length),
),
(
[
"intron",
"5'UTR-intron",
"3'UTR-intron",
"non-coding-intron",
],
str(self.which_intron) + "/" + str(self.how_many_introns),
),
(
[
"intron",
"5'UTR-intron",
"3'UTR-intron",
"non-coding-intron",
],
"[" + str(self.dist_from_coding) + "]",
),
(
[
"no-frame-shift",
"no-frame-shift-newStop",
"frame-shift",
"splice-site",
"synonymous",
"missense",
"nonsense",
"coding_unknown",
],
str(self.prot_pos) + "/" + str(self.prot_length),
),
(
[
"no-frame-shift",
"no-frame-shift-newStop",
"missense",
"nonsense",
"coding_unknown",
],
"(" + str(self.aa_change) + ")",
),
(["no-mutation"], "no-mutation"),
(["5'UTR", "3'UTR"], str(self.dist_from_coding)),
(
[
"non-coding",
"unknown",
"tRNA:ANTICODON",
],
str(self.length),
),
(["promoter"], str(self.dist_from_5utr)),
]
return "".join(
[data for cond, data in eff_data if self.effect in cond],
)
[docs]
@classmethod
def effect_severity(cls, effect: AnnotationEffect) -> int:
return AnnotationEffect.SEVERITY[effect.effect]
[docs]
@classmethod
def sort_effects(
cls, effects: list[AnnotationEffect]) -> list[AnnotationEffect]:
return sorted(
effects, key=lambda v: - cls.effect_severity(v))
[docs]
@classmethod
def worst_effect(cls, effects: list[AnnotationEffect]) -> str:
sorted_effects = cls.sort_effects(effects)
return sorted_effects[0].effect
[docs]
@classmethod
def gene_effects(cls, effects: list[AnnotationEffect]) -> list[list[str]]:
"""Build parallel lists of genes and effects in that genes.
Consider deprecating this.
"""
sorted_effects = cls.sort_effects(effects)
worst_effect = sorted_effects[0].effect
if worst_effect == "intergenic":
return [["intergenic"], ["intergenic"]]
if worst_effect == "no-mutation":
return [["no-mutation"], ["no-mutation"]]
result = []
for _severity, severity_effects in itertools.groupby(
sorted_effects, cls.effect_severity,
):
for gene, gene_effects in itertools.groupby(
severity_effects, lambda e: e.gene,
):
result.append((gene, next(gene_effects).effect))
return [[str(r[0]) for r in result], [str(r[1]) for r in result]]
[docs]
@classmethod
def transcript_effects(
cls, effects: list[AnnotationEffect],
) -> tuple[list[str], list[str | None], list[str | None], list[str]]:
"""Build parallel lists of transcripts, genes, effects and details.
Consider deprecating this.
"""
worst_effect = cls.worst_effect(effects)
if worst_effect == "intergenic":
return (
["intergenic"],
["intergenic"],
["intergenic"],
["intergenic"],
)
if worst_effect == "no-mutation":
return (
["no-mutation"],
["no-mutation"],
["no-mutation"],
["no-mutation"],
)
transcripts = []
genes = []
gene_effects = []
details = []
for effect in effects:
transcripts.append(effect.transcript_id)
genes.append(effect.gene)
gene_effects.append(effect.effect)
details.append(effect.create_effect_details())
return (transcripts, genes, gene_effects, details) # type: ignore
[docs]
@classmethod
def simplify_effects(
cls, effects: list[AnnotationEffect],
) -> tuple[
str,
list[tuple[str, str]],
list[tuple[str, str | None, str | None, str]],
]:
"""Simplify effects.
Consider deprecating this.
"""
if effects[0].effect == "unk_chr":
return (
"unk_chr",
[("unk_chr", "unk_chr")],
[("unk_chr", "unk_chr", "unk_chr", "unk_chr")],
)
gene_effects = cls.gene_effects(effects)
transcript_effects = cls.transcript_effects(effects)
return (
cls.worst_effect(effects),
list(zip(*gene_effects, strict=True)),
list(zip(*transcript_effects, strict=True)))
[docs]
@classmethod
def filter_gene_effects(
cls, effects: list[AnnotationEffect],
effect_type: str,
) -> list[list[str]]:
"""Filter and map genes to effects by a group or effect type."""
gene_effects = zip(*cls.gene_effects(effects), strict=True)
if effect_type in EffectTypesMixin.EFFECT_GROUPS:
result = [
gene_effect
for gene_effect in gene_effects
if gene_effect[1] in EffectTypesMixin.EFFECT_GROUPS[effect_type]
]
elif effect_type in EffectTypesMixin.EFFECT_TYPES:
result = [
gene_effect
for gene_effect in gene_effects
if gene_effect[1] == effect_type
]
else:
raise ValueError(
f"{effect_type} does not match any effect type or group",
)
return [[str(r[0]) for r in result], [str(r[1]) for r in result]]
[docs]
@classmethod
def effects_description(
cls, effects: list[AnnotationEffect],
) -> tuple[str, str, str]:
"""Build effects description.
Condider deprecating this.
"""
worst_effect, gene_effects, transcript_effects = \
cls.simplify_effects(effects)
gene_effects_out = "|".join([f"{g}:{e}" for g, e in gene_effects])
transcript_effects_out = "|".join(
[f"{t}:{g}:{e}:{d}" for t, g, e, d in transcript_effects],
)
return (worst_effect, gene_effects_out, transcript_effects_out)
[docs]
class EffectFactory:
"""Factory class for build annotation effects."""
[docs]
@classmethod
def create_effect(cls, effect_name: str) -> AnnotationEffect:
return AnnotationEffect(effect_name)
[docs]
@classmethod
def create_effect_with_tm(
cls, effect_name: str, transcript_model: TranscriptModel,
) -> AnnotationEffect:
"""Create effect with transcript model."""
effect = cls.create_effect(effect_name)
effect.gene = transcript_model.gene
effect.strand = transcript_model.strand
try:
effect.transcript_id = transcript_model.tr_name
except AttributeError:
effect.transcript_id = transcript_model.tr_id
return effect
[docs]
@classmethod
def create_effect_with_request(
cls, effect_name: str, request: AnnotationRequest,
) -> AnnotationEffect:
"""Create effect with annotation request."""
effect = cls.create_effect_with_tm(
effect_name, request.transcript_model,
)
effect.mRNA_length = request.get_exonic_length()
effect.mRNA_position = request.get_exonic_position()
return effect
[docs]
@classmethod
def create_effect_with_prot_length(
cls, effect_name: str, request: AnnotationRequest,
) -> AnnotationEffect:
effect = cls.create_effect_with_request(effect_name, request)
effect.prot_length = request.get_protein_length()
return effect
[docs]
@classmethod
def create_effect_with_prot_pos(
cls, effect_name: str, request: AnnotationRequest,
) -> AnnotationEffect:
"""Create effect with protein change position."""
effect = cls.create_effect_with_prot_length(effect_name, request)
start_prot, _ = request.get_protein_position()
effect.prot_pos = start_prot
return effect
[docs]
@classmethod
def create_effect_with_aa_change(
cls, effect_name: str, request: AnnotationRequest,
) -> AnnotationEffect:
"""Create effect with amino acid change."""
effect = cls.create_effect_with_prot_pos(effect_name, request)
ref_aa, alt_aa = request.get_amino_acids()
assert ref_aa is not None
assert alt_aa is not None
effect.aa_change = f"{''.join(ref_aa)}->{''.join(alt_aa)}"
effect.ref_aa = ref_aa
effect.alt_aa = alt_aa
return effect
[docs]
@classmethod
def create_intronic_non_coding_effect(
cls, effect_type: str,
request: AnnotationRequest,
start: int, end: int, index: int,
) -> AnnotationEffect:
"""Create intronic non coding effect."""
effect = cls.create_effect_with_prot_length(effect_type, request)
dist_left = request.variant.position - start - 1
dist_right = end - request.variant.ref_position_last
effect.dist_from_coding = min(dist_left, dist_right)
effect.how_many_introns = len(request.transcript_model.exons) - 1
effect.intron_length = end - start - 1
if request.transcript_model.strand == "+":
effect.dist_from_acceptor = dist_right
effect.dist_from_donor = dist_left
effect.which_intron = index
else:
effect.dist_from_acceptor = dist_left
effect.dist_from_donor = dist_right
effect.which_intron = effect.how_many_introns - index + 1
return effect
[docs]
@classmethod
def create_intronic_effect(
cls, effect_type: str,
request: AnnotationRequest,
start: int, end: int, index: int,
) -> AnnotationEffect:
"""Create intronic effect."""
effect = cls.create_intronic_non_coding_effect(
effect_type, request, start, end, index,
)
if request.transcript_model.strand == "+":
effect.prot_pos = request.get_protein_position_for_pos(end)
else:
effect.prot_pos = request.get_protein_position_for_pos(start)
return effect
[docs]
class EffectGene:
"""Combine gene and effect and that gene."""
def __init__(
self, symbol: str | None = None,
effect: str | None = None,
) -> None:
self.symbol = symbol
self.effect = effect
[docs]
@staticmethod
def from_string(data: str) -> EffectGene | None:
"""Deserialize effect gene."""
if not data:
return None
parts = [p.strip() for p in data.split(":")]
if len(parts) == 2:
return EffectGene(parts[0], parts[1])
if len(parts) == 1:
return EffectGene(symbol=None, effect=parts[0])
raise ValueError(f"unexpected effect gene format: {data}")
def __repr__(self) -> str:
if self.symbol is None and self.effect is None:
return "(no effect)"
if self.symbol is None:
assert self.effect is not None
return self.effect
return f"{self.symbol}:{self.effect}"
def __str__(self) -> str:
return self.__repr__()
def __eq__(self, other: Any) -> bool:
assert isinstance(other, EffectGene)
return self.symbol == other.symbol and self.effect == other.effect
def __hash__(self) -> int:
return hash((self.symbol, self.effect))
[docs]
@classmethod
def from_gene_effects(
cls, gene_effects: list[tuple[str, str]],
) -> list[EffectGene]:
"""Build effect gene objects from list of gene, effect tuples."""
result = []
for symbol, effect in gene_effects:
result.append(cls.from_tuple((symbol, effect)))
return result
[docs]
@classmethod
def from_tuple(cls, gene_effect: tuple[str, str]) -> EffectGene:
(symbol, effect) = tuple(gene_effect)
return EffectGene(symbol, effect)
[docs]
class EffectTranscript:
"""Defines effect transcript."""
def __init__(
self, transcript_id: str,
gene: str | None = None,
effect: str | None = None,
details: str | None = None,
) -> None:
self.transcript_id = transcript_id
self.gene = gene
self.effect = effect
self.details = details
[docs]
@staticmethod
def from_string(data: str) -> EffectTranscript | None:
"""Deserialize effect transcript/details."""
if not data:
return None
parts = [p.strip() for p in data.split(":")]
if len(parts) == 4:
return EffectTranscript(
parts[0], gene=parts[1], effect=parts[2], details=parts[3])
raise ValueError(
f"unexpected effect details format: {data}")
def __repr__(self) -> str:
return f"{self.transcript_id}:{self.gene}:{self.effect}:{self.details}"
def __str__(self) -> str:
return self.__repr__()
def __eq__(self, other: Any) -> bool:
assert isinstance(other, EffectTranscript)
return (
self.transcript_id == other.transcript_id
and self.details == other.details
)
[docs]
@classmethod
def from_tuple(
cls, transcript_tuple: tuple[str, str | None, str | None, str],
) -> EffectTranscript:
(transcript_id, gene, effect, details) = tuple(transcript_tuple)
assert transcript_id is not None, transcript_tuple
return EffectTranscript(
transcript_id, gene=gene, effect=effect, details=details)
[docs]
@classmethod
def from_effect_transcripts(
cls, effect_transcripts: list[tuple[str, str]],
) -> dict[str, EffectTranscript]:
"""Build effect transcripts."""
result = {}
for transcript_id, details in effect_transcripts:
parts = [p.strip() for p in details.split(":")]
if transcript_id is None:
result["intergenic"] = EffectTranscript(
"intergenic", "intergenic", "intergenic", "intergenic")
elif len(parts) == 4:
result[transcript_id] = EffectTranscript(
parts[0], gene=parts[1], effect=parts[2], details=parts[3])
else:
result[transcript_id] = EffectTranscript.from_tuple(
(transcript_id, None, None, details),
)
return result
[docs]
class AlleleEffects:
"""Class for allele effect used in alleles."""
def __init__(
self, worst_effect: str,
gene_effects: list[EffectGene],
effect_transcripts: dict[str, EffectTranscript],
) -> None:
self.worst_effect = worst_effect
self.genes = gene_effects
self.transcripts = effect_transcripts
self._effect_types: list | None = None
self.all_effects: list[AnnotationEffect] | None = None
@property
def worst(self) -> str:
return self.worst_effect
def __repr__(self) -> str:
effects = "|".join([str(g) for g in self.genes])
transcripts = "|".join([str(t) for t in self.transcripts.values()])
return f"{self.worst}!{effects}!{transcripts}"
def __str__(self) -> str:
return repr(self)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, AlleleEffects):
return False
assert isinstance(other, AlleleEffects)
return (
self.worst == other.worst
and self.genes == other.genes
and self.transcripts == other.transcripts
)
@property
def types(self) -> list[str]:
if self._effect_types is None:
self._effect_types = [eg.effect for eg in self.genes]
return self._effect_types
[docs]
@classmethod
def from_simplified_effects(
cls, effect_type: str,
effect_genes: list[tuple[str, str]],
transcripts: list[tuple[str, str]],
) -> AlleleEffects | None:
"""Build allele effects from simplified effects."""
if effect_type is None:
return None
effect_genes_out = EffectGene.from_gene_effects(effect_genes)
transcripts_out = EffectTranscript.from_effect_transcripts(transcripts)
return AlleleEffects(effect_type, effect_genes_out, transcripts_out)
[docs]
@staticmethod
def from_string(data: str | None) -> AlleleEffects | None:
"""Build allele effect from string."""
if data is None:
return None
parts = data.split("!")
assert len(parts) == 3, parts
worst = parts[0].strip()
effect_genes = list(filter(None, [
EffectGene.from_string(eg.strip()) for eg in parts[1].split("|")
]))
transcripts_list: list[EffectTranscript] = list(filter(None, [
EffectTranscript.from_string(et.strip())
for et in parts[2].split("|")
]))
transcripts = {t.transcript_id: t for t in transcripts_list}
if not effect_genes and not transcripts:
return None
return AlleleEffects(worst, effect_genes, transcripts)
[docs]
@classmethod
def from_effects(cls, effects: list[AnnotationEffect]) -> AlleleEffects:
"""Build allele effects from list of annotation effect."""
worst_effect, gene_effects, transcript_effects = \
AnnotationEffect.simplify_effects(effects)
gene_effects_out = list(itertools.starmap(EffectGene, gene_effects))
transcript_effects_out = {
t: EffectTranscript(t, g, e, d)
for t, g, e, d in transcript_effects
}
result = AlleleEffects(
worst_effect, gene_effects_out, transcript_effects_out)
result.all_effects = effects
return result
[docs]
class EffectTypesMixin:
"""Helper function to work with variant effects."""
EFFECT_TYPES: ClassVar[list[str]] = [
"3'UTR",
"3'UTR-intron",
"5'UTR",
"5'UTR-intron",
"frame-shift",
"intergenic",
"intron",
"missense",
"no-frame-shift",
"no-frame-shift-newStop",
"noEnd",
"noStart",
"non-coding",
"non-coding-intron",
"nonsense",
"splice-site",
"synonymous",
"CDS",
"CNV+",
"CNV-",
# "cnv+",
# "cnv-",
# "nonsynonymous"
]
EFFECT_TYPES_MAPPING: ClassVar[dict[str, list[str]]] = {
"Nonsense": ["nonsense"],
"Frame-shift": ["frame-shift"],
"Splice-site": ["splice-site"],
"Missense": ["missense"],
"No-frame-shift": ["no-frame-shift"],
"No-frame-shift-newStop": ["no-frame-shift-newStop"],
"noStart": ["noStart"],
"noEnd": ["noEnd"],
"Synonymous": ["synonymous"],
"Non coding": ["non-coding"],
"Intron": ["intron", "non-coding-intron"],
"Intergenic": ["intergenic"],
"3'-UTR": ["3'UTR", "3'UTR-intron"],
"5'-UTR": ["5'UTR", "5'UTR-intron"],
"CNV": ["CNV"],
"CNV+": ["CNV+"],
"CNV-": ["CNV-"],
"Nonsynonymous": ["nonsynonymous"],
}
EFFECT_TYPES_UI_NAMING: ClassVar[dict[str, str]] = {
"nonsense": "Nonsense",
"frame-shift": "Frame-shift",
"splice-site": "Splice-site",
"missense": "Missense",
"no-frame-shift": "No-frame-shift",
"no-frame-shift-newStop": "No-frame-shift-newStop",
"synonymous": "Synonymous",
"non-coding": "Non coding",
"intron": "Intron",
"non-coding-intron": "Intron",
}
EFFECT_GROUPS: ClassVar[dict[str, list[str]]] = {
"coding": [
"nonsense",
"frame-shift",
"splice-site",
"no-frame-shift-newStop",
"missense",
"no-frame-shift",
"noStart",
"noEnd",
"synonymous",
],
"noncoding": [
"non coding",
"intron",
"intergenic",
"3'UTR",
"5'UTR",
],
"CNV": ["CNV+", "CNV-"],
"LGDs": [
"frame-shift",
"nonsense",
"splice-site",
"no-frame-shift-newStop",
],
"nonsynonymous": [
"nonsense",
"frame-shift",
"splice-site",
"no-frame-shift-newStop",
"missense",
"no-frame-shift",
"noStart",
"noEnd",
],
"UTRs": [
"3'UTR",
"5'UTR",
"3'UTR-intron",
"5'UTR-intron",
],
}
[docs]
@classmethod
def build_effect_types_groups(cls, effect_types: list[str]) -> list[str]:
"""Expand effect groups into effect types."""
etl = [
cls.EFFECT_GROUPS.get(et, [et])
for et in effect_types
]
return list(itertools.chain.from_iterable(etl))
[docs]
@classmethod
def build_effect_types_list(cls, effect_types: list[str]) -> list[str]:
"""Fix naming of effect types coming from the UI."""
etl = [
cls.EFFECT_TYPES_MAPPING.get(et, [et])
for et in effect_types
]
return list(itertools.chain.from_iterable(etl))
[docs]
@classmethod
def build_effect_types(
cls, effect_types: str | Iterable[str], *,
safe: bool = True,
) -> list[str]:
"""Build list of effect types."""
if isinstance(effect_types, str):
effect_types = effect_types.split(",")
etl = [et.strip() for et in effect_types]
etl = cls.build_effect_types_list(etl)
etl = cls.build_effect_types_groups(etl)
if safe:
assert all(et in cls.EFFECT_TYPES for et in etl), etl
else:
etl = [et for et in etl if et in cls.EFFECT_TYPES]
return etl
[docs]
@classmethod
def build_effect_types_naming(
cls, effect_types: str | Iterable[str], *,
safe: bool = True,
) -> list[str]:
"""Build list of effect types appropriate for the UI."""
if isinstance(effect_types, str):
effect_types = effect_types.split(",")
assert isinstance(effect_types, list)
if safe:
assert all(
et in cls.EFFECT_TYPES
or et in cls.EFFECT_TYPES_MAPPING
for et in effect_types
)
return [cls.EFFECT_TYPES_UI_NAMING.get(et, et) for et in effect_types]
[docs]
@classmethod
def get_effect_types(
cls, *, safe: bool = True, **kwargs: Any,
) -> list[str] | None:
"""Process effect types from kwargs."""
effect_types = kwargs.get("effectTypes", None)
if effect_types is None:
return None
return cls.build_effect_types(effect_types, safe=safe)
[docs]
def expand_effect_types(
effect_groups: str | Iterable[str] | Iterable[Iterable[str]],
) -> list[str]:
"""Expand effect type groups into list of effect types."""
if isinstance(effect_groups, str):
effect_groups = [effect_groups]
effects = []
for effect_group in effect_groups:
if isinstance(effect_group, str):
effect_group = [effect_group]
for effect in effect_group:
assert isinstance(effect, str)
if effect in EffectTypesMixin.EFFECT_GROUPS:
effects += EffectTypesMixin.EFFECT_GROUPS[effect]
else:
effects.append(effect)
result = []
for effect in effects:
if effect not in EffectTypesMixin.EFFECT_TYPES_MAPPING:
result.append(effect)
else:
result.extend(EffectTypesMixin.EFFECT_TYPES_MAPPING[effect])
return result
[docs]
def ge2str(eff: AlleleEffects) -> str:
return "|".join([f"{g.symbol}:{g.effect}" for g in eff.genes])
[docs]
def gd2str(eff: AlleleEffects) -> str:
return "|".join(
[
str(t) for t in eff.transcripts.values()
],
)
[docs]
def gene_effect_get_worst_effect(
gene_effects: AlleleEffects | None,
) -> str:
if gene_effects is None:
return ""
return ",".join([gene_effects.worst])
[docs]
def gene_effect_get_genes_worst(
gene_effects: AlleleEffects | None,
) -> str:
"""Return comma separted list of genes."""
if gene_effects is None:
return ""
genes_set: set[str] = {
x.symbol for x in gene_effects.genes
if x.symbol is not None and x.effect == gene_effects.worst
}
genes = sorted(genes_set)
return ";".join(genes)
[docs]
def gene_effect_get_genes(
gene_effects: AlleleEffects | None,
) -> str:
"""Return comma separted list of genes."""
if gene_effects is None:
return ""
genes_set: set[str] = {
x.symbol for x in gene_effects.genes if x.symbol is not None
}
genes = sorted(genes_set)
return ";".join(genes)