Source code for dae.annotation.annotate_vcf

from __future__ import annotations

import argparse
import logging
import os
import sys
from contextlib import closing
from pathlib import Path

from pysam import (
    TabixFile,
    VariantFile,
)

from dae.annotation.annotate_utils import (
    AnnotationTool,
    produce_partfile_paths,
    produce_regions,
    produce_tabix_index,
)
from dae.annotation.annotation_config import (
    RawAnnotatorsConfig,
)
from dae.annotation.context import CLIAnnotationContext
from dae.annotation.format_handlers import VCFFormat
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.task_graph import TaskGraphCli
from dae.utils.fs_utils import tabix_index_filename
from dae.utils.verbosity_configuration import VerbosityConfiguration

logger = logging.getLogger("annotate_vcf")


[docs] def combine( args, input_file_path: str, pipeline_config: RawAnnotatorsConfig, pipeline_config_old: str | None, grr_definition: dict | None, partfile_paths: list[str], output_file_path: str, ) -> None: """Combine annotated region parts into a single VCF file.""" output_handler = VCFFormat( pipeline_config, pipeline_config_old, vars(args), grr_definition, None, input_file_path, output_file_path.rstrip(".gz"), ) output_handler.open() assert output_handler.output_file is not None for partfile_path in partfile_paths: with VariantFile(partfile_path) as partfile: for rec in partfile.fetch(): output_handler.output_file.write(rec) output_handler.close() for partfile_path in partfile_paths: os.remove(partfile_path)
[docs] class AnnotateVCFTool(AnnotationTool): """Annotation tool for the VCF file format.""" def __init__( self, raw_args: list[str] | None = None, gpf_instance: GPFInstance | None = None, ): super().__init__(raw_args, gpf_instance) self.output = None
[docs] def get_argument_parser(self) -> argparse.ArgumentParser: """Construct and configure argument parser.""" parser = argparse.ArgumentParser( description="Annotate VCF", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("input", default="-", nargs="?", help="the input vcf file") parser.add_argument("-r", "--region-size", default=300_000_000, type=int, help="region size to parallelize by") parser.add_argument("-w", "--work-dir", help="Directory to store intermediate output files", default="annotate_vcf_output") parser.add_argument("-o", "--output", help="Filename of the output VCF result", default=None) parser.add_argument("--reannotate", default=None, help="Old pipeline config to reannotate over") parser.add_argument( "-i", "--full-reannotation", help="Ignore any previous annotation and run " " a full reannotation.", action="store_true", ) parser.add_argument( "--batch-size", type=int, default=0, # 0 = annotate iteratively, no batches help="Annotate in batches of", ) CLIAnnotationContext.add_context_arguments(parser) TaskGraphCli.add_arguments(parser) VerbosityConfiguration.set_arguments(parser) return parser
[docs] def prepare_for_annotation(self) -> None: if self.args.output: self.output = self.args.output else: self.output = os.path.basename( self.args.input).split(".")[0] + "_annotated.vcf"
[docs] def add_tasks_to_graph(self) -> None: assert self.grr is not None assert self.output is not None pipeline_config_old = None if self.args.reannotate: pipeline_config_old = Path(self.args.reannotate).read_text() if not tabix_index_filename(self.args.input): handler = VCFFormat( self.pipeline.raw, pipeline_config_old, vars(self.args), self.grr.definition, None, self.args.input, self.output, ) self.task_graph.create_task( "all_variants_annotate", AnnotateVCFTool.annotate, [handler, self.args.batch_size > 0], [], ) else: with closing(TabixFile(self.args.input)) as pysam_file: regions = produce_regions(pysam_file, self.args.region_size) file_paths = produce_partfile_paths( self.args.input, regions, self.args.work_dir) region_tasks = [] for index, (region, file_path) in enumerate( zip(regions, file_paths, strict=True), ): handler = VCFFormat( self.pipeline.raw, pipeline_config_old, vars(self.args), self.grr.definition, region, self.args.input, file_path, ) assert self.grr is not None region_tasks.append(self.task_graph.create_task( f"part-{index}", AnnotateVCFTool.annotate, [handler, self.args.batch_size > 0], [], )) assert self.grr is not None combine_task = self.task_graph.create_task( "combine", combine, [self.args, self.args.input, self.pipeline.raw, pipeline_config_old, self.grr.definition, file_paths, self.output], region_tasks, ) self.task_graph.create_task( "compress_and_tabix", produce_tabix_index, [self.output], [combine_task])
[docs] def cli(raw_args: list[str] | None = None) -> None: tool = AnnotateVCFTool(raw_args) tool.run()
if __name__ == "__main__": cli(sys.argv[1:])