Source code for dae.annotation.liftover_annotator

"""Provides a lift over annotator and helpers."""
import abc
import logging
import textwrap
from typing import Any, Callable

from dae.annotation.annotation_config import AnnotatorInfo, AttributeInfo
from dae.annotation.annotation_pipeline import (
    AnnotationPipeline,
    Annotator,
)
from dae.genomic_resources.liftover_chain import (
    LiftoverChain,
    build_liftover_chain_from_resource,
)
from dae.genomic_resources.reference_genome import (
    ReferenceGenome,
    build_reference_genome_from_resource,
)
from dae.genomic_resources.variant_utils import (
    maximally_extend_variant,
    normalize_variant,
)
from dae.utils.variant_utils import reverse_complement

from .annotatable import Annotatable, CNVAllele, Position, Region, VCFAllele
from .annotator_base import AnnotatorBase

logger = logging.getLogger(__name__)


[docs] def build_liftover_annotator(pipeline: AnnotationPipeline, info: AnnotatorInfo) -> Annotator: """Create a liftover annotator.""" chain_resource_id = info.parameters.get("chain") if chain_resource_id is None: raise ValueError(f"The {info} requires a 'chain' parameter.") chain_resource = pipeline.repository.get_resource(chain_resource_id) if chain_resource is None: raise ValueError(f"The {info} points to a resource " f"{chain_resource_id} that is unavailable.") chain = build_liftover_chain_from_resource(chain_resource) resource_id = info.parameters.get("target_genome", chain.target_genome_id) if resource_id is None: raise ValueError( f"The {info} requires a 'target_genome' parameter.") resource = pipeline.repository.get_resource(resource_id) if resource is None: raise ValueError(f"The {info} points to a resource " f"{resource_id} that is " "unavailable.") target_genome = build_reference_genome_from_resource(resource) resource_id = info.parameters.get("source_genome", chain.source_genome_id) if resource_id is None: raise ValueError( f"The {info} requires a 'source_genome' parameter.") resource = pipeline.repository.get_resource(resource_id) if resource is None: raise ValueError(f"The {info} points to a resource " f"{resource_id} that is " "unavailable.") source_genome = build_reference_genome_from_resource(resource) if info.type in {"liftover_annotator", "bcf_liftover_annotator"}: return BcfLiftoverAnnotator( pipeline, info, chain, source_genome, target_genome, ) if info.type == "basic_liftover_annotator": return BasicLiftoverAnnotator( pipeline, info, chain, source_genome, target_genome, ) raise ValueError(f"Unsupported liftover annotator type: {info.type}")
[docs] class AbstractLiftoverAnnotator(AnnotatorBase): """Liftovver annotator class.""" def __init__( self, pipeline: AnnotationPipeline | None, info: AnnotatorInfo, chain: LiftoverChain, source_genome: ReferenceGenome, target_genome: ReferenceGenome, ): info.documentation += textwrap.dedent(""" Annotator to lift over a variant from one reference genome to another. <a href="https://iossifovlab.com/gpfuserdocs/administration/annotation.html#liftover-annotator" target="_blank">More info</a> """) # noqa info.resources += [ chain.resource, source_genome.resource, target_genome.resource] if not info.attributes: info.attributes = [AttributeInfo( "liftover_annotatable", "liftover_annotatable", True, # noqa FTB003 {}, )] super().__init__(pipeline, info, { "liftover_annotatable": ("annotatable", "Lifted over allele."), }) self.chain = chain self.source_genome = source_genome self.target_genome = target_genome
[docs] def close(self) -> None: self.target_genome.close() self.chain.close() super().close()
[docs] def open(self) -> Annotator: self.source_genome.open() self.target_genome.open() self.chain.open() return super().open()
def _do_annotate(self, annotatable: Annotatable, _: dict[str, Any]) \ -> dict[str, Any]: assert annotatable is not None if annotatable.type == Annotatable.Type.POSITION: value = self.liftover_position(annotatable) elif annotatable.type == Annotatable.Type.REGION: value = self.liftover_region(annotatable) elif annotatable.type in { Annotatable.Type.LARGE_DELETION, Annotatable.Type.LARGE_DUPLICATION}: value = self.liftover_cnv(annotatable) else: assert isinstance(annotatable, VCFAllele) value = self.liftover_allele(annotatable) if value is None: logger.debug("unable to liftover allele: %s", annotatable) return {"liftover_annotatable": value}
[docs] def liftover_allele(self, allele: VCFAllele) -> VCFAllele | None: """Liftover an allele.""" if not isinstance(allele, VCFAllele): return None try: lo_allele = self._internal_liftover_allele(allele) if lo_allele is None: return None lo_chrom, lo_pos, lo_ref, lo_alt = lo_allele result = VCFAllele(lo_chrom, lo_pos, lo_ref, lo_alt) except BaseException as ex: # noqa BLE001 pylint: disable=broad-except logger.warning( "problem in variant %s liftover: %s", allele, ex, exc_info=True) return None return result
@abc.abstractmethod def _internal_liftover_allele( self, allele: VCFAllele, ) -> tuple[str, int, str, str] | None: """Liftover an allele.""" raise NotImplementedError
[docs] def liftover_position( self, position: Annotatable, ) -> Annotatable | None: """Liftover position annotatable.""" assert isinstance(position, Position) lo_coord = self.chain.convert_coordinate( position.chrom, position.position, ) if lo_coord is None: return None return Position(lo_coord[0], lo_coord[1])
def _do_liftover_region( self, region: Annotatable, ) -> Annotatable | None: """Liftover region annotatable.""" assert isinstance(region, (Region, CNVAllele)) lo_start = self.chain.convert_coordinate( region.chrom, region.position, ) lo_end = self.chain.convert_coordinate( region.chrom, region.end_position, ) if lo_start is None or lo_end is None: logger.error( "unable to liftover start or end of the region: %s", region) return None if lo_start[0] != lo_end[0]: logger.error( "lifted over region spans multiple chroms: %s -> (%s, %s)", region, lo_start[0], lo_end[0]) return None return Region( lo_start[0], min(lo_start[1], lo_end[1]), max(lo_start[1], lo_end[1]))
[docs] def liftover_region(self, region: Annotatable) -> Annotatable | None: """Liftover region annotatable.""" assert isinstance(region, Region) return self._do_liftover_region(region)
[docs] def liftover_cnv(self, cnv_allele: Annotatable) -> Annotatable | None: """Liftover CNV allele annotatable.""" assert isinstance(cnv_allele, CNVAllele) region = self._do_liftover_region(cnv_allele) if region is None: return None return CNVAllele( region.chrom, region.pos, region.pos_end, cnv_allele.type)
def _liftover_snp_simple( lo_coordinates: tuple[str, int, str, int], nref: str, nalt: str, target_genome: ReferenceGenome, ) -> tuple[str, int, str, str] | None: tchrom, tpos, tstrand, _ = lo_coordinates tseq = target_genome.get_sequence(tchrom, tpos, tpos) if tstrand == "-": tseq = reverse_complement(tseq) if tseq == nref: tref = tseq talt = nalt elif tseq == nalt: tref = tseq talt = nref else: return None nchrom, npos, nref, nalts = normalize_variant( tchrom, tpos, tref, [talt], target_genome) return nchrom, npos, nref, nalts[0] _LO_LENGTH_CHANGE_CUTOFF = 10 def _liftover_sequence( chrom: str, pos: int, ref: str, alt: str, liftover_chain: LiftoverChain, target_genome: ReferenceGenome, ) -> tuple[str, int, str, str] | None: slen = len(ref) anchor_5prime = pos anchor_3prime = pos + slen - 1 lo_anchor_5prime = liftover_chain.convert_coordinate(chrom, anchor_5prime) lo_anchor_3prime = liftover_chain.convert_coordinate(chrom, anchor_3prime) if lo_anchor_5prime is None or lo_anchor_3prime is None: logger.debug( "liftover anchors not found: %s:%d-%d", chrom, anchor_5prime, anchor_3prime, ) return None # check if the anchors are on the same chromosome and strand if lo_anchor_5prime[0] != lo_anchor_3prime[0] or \ lo_anchor_5prime[2] != lo_anchor_3prime[2]: logger.debug( "liftover anchors are on different chromosomes or strands: " "%s:%d-%d %s:%d %s:%d", chrom, anchor_5prime, anchor_3prime, lo_anchor_5prime[0], lo_anchor_5prime[1], lo_anchor_3prime[0], lo_anchor_3prime[1], ) return None tchrom = lo_anchor_5prime[0] tpos = min(lo_anchor_5prime[1], lo_anchor_3prime[1]) tend = max(lo_anchor_5prime[1], lo_anchor_3prime[1]) if lo_anchor_5prime[2] == "-": assert lo_anchor_3prime[2] == "-" ref = reverse_complement(ref) alt = reverse_complement(alt) tlen = tend - tpos + 1 if tlen > _LO_LENGTH_CHANGE_CUTOFF * slen: logger.debug( "liftover allele length changed too much: %s:%d %s>%s; " "source length: %d, target length: %d", chrom, pos, ref, alt, slen, tlen) return None tseq = target_genome.get_sequence(tchrom, tpos, tend) if tseq == ref: tref = ref talt = alt return tchrom, tpos, tref, talt if tseq == alt: tref = tseq talt = ref return tchrom, tpos, tref, talt return None
[docs] def bcf_liftover_allele( chrom: str, pos: int, ref: str, alt: str, liftover_chain: LiftoverChain, source_genome: ReferenceGenome, target_genome: ReferenceGenome, ) -> tuple[str, int, str, str] | None: """Liftover a variant.""" nchrom, npos, nref, nalts = normalize_variant( chrom, pos, ref, [alt], source_genome) assert len(nalts) == 1 if len(nref) == 1 and len(nalts[0]) == 1: # liftover substitution lo_coordinates = liftover_chain.convert_coordinate(nchrom, npos) if lo_coordinates is not None: return _liftover_snp_simple( lo_coordinates, nref, nalts[0], target_genome) mchrom, mpos, mref, malts = maximally_extend_variant( nchrom, npos, nref, nalts, source_genome) malt = malts[0] lo_variant = _liftover_sequence( mchrom, mpos, mref, malt, liftover_chain, target_genome) if lo_variant is None: lo_variant = _liftover_sequence( mchrom, mpos, malt, mref, liftover_chain, target_genome) if lo_variant is None: return None tchrom, tpos, tref, talt = lo_variant nchrom, npos, nref, nalts = normalize_variant( tchrom, tpos, tref, [talt], target_genome) return nchrom, npos, nref, nalts[0]
[docs] def basic_liftover_allele( chrom: str, pos: int, ref: str, alt: str, liftover_chain: LiftoverChain, source_genome: ReferenceGenome, target_genome: ReferenceGenome, ) -> tuple[str, int, str, str] | None: """Basic liftover an allele.""" nchrom, npos, nref, nalts = normalize_variant( chrom, pos, ref, [alt], source_genome) nalt = nalts[0] lo_coordinates = liftover_chain.convert_coordinate( nchrom, npos, ) if lo_coordinates is None: return None lo_chrom, lo_pos, lo_strand, _ = lo_coordinates if lo_strand == "+": pass elif lo_strand == "-": lo_pos -= len(nref) lo_ref = target_genome.get_sequence( lo_chrom, lo_pos, lo_pos + len(ref) - 1) if lo_ref is None: logger.warning( "can't find genomic sequence for %s:%s", lo_chrom, lo_pos) return None lo_alt = nalt if lo_strand == "-": lo_alt = reverse_complement(nalt) if lo_ref == lo_alt: logger.warning( "allele %s:%d %s>%s mapped to no variant: %s:%d %s>%s", chrom, pos, ref, alt, lo_chrom, lo_pos, lo_ref, lo_alt) return None return lo_chrom, lo_pos, lo_ref, lo_alt
def _liftover_variant( chrom: str, pos: int, ref: str, alts: list[str], liftover_chain: LiftoverChain, source_genome: ReferenceGenome, target_genome: ReferenceGenome, liftover_allele_func: Callable[ [str, int, str, str, LiftoverChain, ReferenceGenome, ReferenceGenome], tuple[str, int, str, str] | None, ], ) -> tuple[str, int, str, list[str]] | None: """Liftover a variant.""" lo_alleles: list[tuple[str, int, str, str]] = [] for alt in alts: lo_allele = liftover_allele_func( chrom, pos, ref, alt, liftover_chain, source_genome, target_genome, ) if lo_allele is None: return None lo_alleles.append(lo_allele) if not all( chrom == lo_alleles[0][0] for chrom, _, _, _ in lo_alleles): return None if not all( pos == lo_alleles[0][1] for _, pos, _, _ in lo_alleles): return None max_ref = max(len(ref) for _, _, ref, _ in lo_alleles) r_alleles: list[tuple[str, int, str, str]] = [] for allele in lo_alleles: if len(allele[2]) == max_ref: r_alleles.append(allele) continue chrom, pos, ref, alt = allele fill_start = pos + len(ref) fill_end = pos + max_ref - 1 fill_seq = target_genome.get_sequence(chrom, fill_start, fill_end) fill_ref = f"{ref}{fill_seq}" fill_alt = f"{alt}{fill_seq}" assert len(fill_ref) == max_ref r_alleles.append((chrom, pos, fill_ref, fill_alt)) assert all(ref == r_alleles[0][2] for _, _, ref, _ in r_alleles) chrom, pos, ref, _ = r_alleles[0] return chrom, pos, ref, [alt for _, _, _, alt in r_alleles]
[docs] def bcf_liftover_variant( chrom: str, pos: int, ref: str, alts: list[str], liftover_chain: LiftoverChain, source_genome: ReferenceGenome, target_genome: ReferenceGenome, ) -> tuple[str, int, str, list[str]] | None: """BCF liftover variant utility function.""" return _liftover_variant( chrom, pos, ref, alts, liftover_chain, source_genome, target_genome, bcf_liftover_allele, )
[docs] def basic_liftover_variant( chrom: str, pos: int, ref: str, alts: list[str], liftover_chain: LiftoverChain, source_genome: ReferenceGenome, target_genome: ReferenceGenome, ) -> tuple[str, int, str, list[str]] | None: """Basic liftover variant utility function.""" return _liftover_variant( chrom, pos, ref, alts, liftover_chain, source_genome, target_genome, basic_liftover_allele, )
[docs] class BasicLiftoverAnnotator(AbstractLiftoverAnnotator): """Basic liftover annotator class.""" def _internal_liftover_allele( self, allele: VCFAllele, ) -> tuple[str, int, str, str] | None: """Liftover an allele.""" return basic_liftover_allele( allele.chrom, allele.position, allele.ref, allele.alt, self.chain, self.source_genome, self.target_genome, )
[docs] class BcfLiftoverAnnotator(AbstractLiftoverAnnotator): """BCF tools liftover re-implementation annotator class.""" def _internal_liftover_allele( self, allele: VCFAllele, ) -> tuple[str, int, str, str] | None: """Liftover an allele.""" return bcf_liftover_allele( allele.chrom, allele.position, allele.ref, allele.alt, self.chain, self.source_genome, self.target_genome, )