Source code for dae.annotation.annotate_utils

import argparse
import os
import sys
from abc import abstractmethod
from pathlib import Path

from pysam import TabixFile

from dae.annotation.annotation_config import RawAnnotatorsConfig
from dae.annotation.annotation_factory import (
    build_annotation_pipeline,
    load_pipeline_from_yaml,
)
from dae.annotation.annotation_pipeline import (
    AnnotationPipeline,
    ReannotationPipeline,
)
from dae.annotation.context import CLIAnnotationContext
from dae.genomic_resources.cached_repository import cache_resources
from dae.genomic_resources.genomic_context import get_genomic_context
from dae.genomic_resources.repository_factory import (
    build_genomic_resource_repository,
)
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.task_graph import TaskGraphCli
from dae.task_graph.graph import TaskGraph
from dae.utils.regions import get_chromosome_length_tabix
from dae.utils.verbosity_configuration import VerbosityConfiguration

PART_FILENAME = "{in_file}_annotation_{chrom}_{pos_beg}_{pos_end}.gz"


[docs] def produce_regions( pysam_file: TabixFile, region_size: int, ) -> list[tuple[str, int, int]]: """Given a region size, produce contig regions to annotate by.""" contig_lengths: dict[str, int] = {} for contig in map(str, pysam_file.contigs): length = get_chromosome_length_tabix(pysam_file, contig) if length is None: raise ValueError(f"unable to find length of contig '{contig}'") contig_lengths[contig] = length return [ (contig, start, start + region_size) for contig, length in contig_lengths.items() for start in range(1, length, region_size) ]
[docs] def produce_partfile_paths( input_file_path: str, regions: list[tuple[str, int, int]], work_dir: str, ) -> list[str]: """Produce a list of file paths for output region part files.""" filenames = [] for region in regions: pos_beg = region[1] if len(region) > 1 else "_" pos_end = region[2] if len(region) > 2 else "_" filenames.append(os.path.join(work_dir, PART_FILENAME.format( in_file=os.path.basename(input_file_path), chrom=region[0], pos_beg=pos_beg, pos_end=pos_end, ))) return filenames
[docs] class AnnotationTool: """Base class for annotation tools. Format-agnostic.""" def __init__( self, raw_args: list[str] | None = None, gpf_instance: GPFInstance | None = None, ) -> None: if not raw_args: raw_args = sys.argv[1:] parser = self.get_argument_parser() self.args = parser.parse_args(raw_args) VerbosityConfiguration.set(self.args) CLIAnnotationContext.register(self.args) self.gpf_instance = gpf_instance self.context = get_genomic_context() self.pipeline = CLIAnnotationContext.get_pipeline(self.context) grr = \ CLIAnnotationContext.get_genomic_resources_repository(self.context) if grr is None: raise ValueError("No valid GRR configured. Aborting.") self.grr = grr self.task_graph = TaskGraph() if not os.path.exists(self.args.work_dir): os.mkdir(self.args.work_dir) self.args.task_status_dir = os.path.join( self.args.work_dir, ".task-status") self.args.log_dir = os.path.join(self.args.work_dir, ".task-log") @staticmethod def _produce_annotation_pipeline( pipeline_config: RawAnnotatorsConfig, pipeline_config_old: str | None, grr_definition: dict | None, *, allow_repeated_attributes: bool, work_dir: Path | None = None, ) -> AnnotationPipeline: grr = build_genomic_resource_repository(definition=grr_definition) pipeline = build_annotation_pipeline( pipeline_config, grr, allow_repeated_attributes=allow_repeated_attributes, work_dir=work_dir, ) if pipeline_config_old is not None: pipeline_old = load_pipeline_from_yaml(pipeline_config_old, grr) pipeline = ReannotationPipeline(pipeline, pipeline_old) return pipeline
[docs] @abstractmethod def get_argument_parser(self) -> argparse.ArgumentParser: pass
[docs] @abstractmethod def work(self) -> None: pass
[docs] def run(self) -> None: """Construct annotation tasks and process them.""" # FIXME Is this too eager? What if a reannotation pipeline is created # inside work() and the only caching that must be done is far smaller # than the entire new annotation config suggests? resource_ids: set[str] = { res.resource_id for annotator in self.pipeline.annotators for res in annotator.resources } cache_resources(self.grr, resource_ids) self.task_graph.input_files.append(self.args.input) self.task_graph.input_files.append(self.args.pipeline) if hasattr(self.args, "reannotate") and self.args.reannotate: self.task_graph.input_files.append(self.args.reannotate) self.work() TaskGraphCli.process_graph(self.task_graph, **vars(self.args))