Source code for dae.annotation.annotate_columns
from __future__ import annotations
import argparse
import logging
import os
import sys
from contextlib import closing
from pathlib import Path
from typing import Any
from pysam import TabixFile
from dae.annotation.annotate_utils import (
AnnotationTool,
produce_partfile_paths,
produce_regions,
produce_tabix_index,
)
from dae.annotation.annotation_config import (
RawPipelineConfig,
)
from dae.annotation.context import CLIAnnotationContext
from dae.annotation.format_handlers import ColumnsFormat
from dae.annotation.record_to_annotatable import (
add_record_to_annotable_arguments,
)
from dae.genomic_resources.cli import VerbosityConfiguration
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.task_graph import TaskGraphCli
from dae.utils.fs_utils import tabix_index_filename
logger = logging.getLogger("annotate_columns")
[docs]
def combine(
args: Any,
pipeline_config: RawPipelineConfig,
pipeline_config_old: str | None,
grr_definition: dict | None,
partfile_paths: list[str],
out_file_path: str,
ref_genome_id: str,
) -> None:
"""Combine annotated region parts into a single VCF file."""
output_handler = ColumnsFormat(
pipeline_config,
pipeline_config_old,
vars(args),
grr_definition,
None,
args.input,
out_file_path.rstrip(".gz"),
ref_genome_id,
)
output_handler.open()
assert output_handler.output_file is not None
for partfile_path in partfile_paths:
with open(partfile_path, "rt") as partfile:
partfile.readline() # Skip header of partfile
content = partfile.read().strip()
if content == "":
continue
output_handler.output_file.write(content + "\n")
output_handler.close()
for partfile_path in partfile_paths:
os.remove(partfile_path)
[docs]
class AnnotateColumnsTool(AnnotationTool):
"""Annotation tool for TSV-style text files."""
def __init__(
self,
raw_args: list[str] | None = None,
gpf_instance: GPFInstance | None = None,
) -> None:
super().__init__(raw_args, gpf_instance)
self.output = None
self.ref_genome_id = None
[docs]
def get_argument_parser(self) -> argparse.ArgumentParser:
"""Configure argument parser."""
parser = argparse.ArgumentParser(
description="Annotate columns",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"input", default="-", nargs="?",
help="the input column file")
parser.add_argument(
"-r", "--region-size", default=300_000_000,
type=int, help="region size to parallelize by")
parser.add_argument(
"-w", "--work-dir",
help="Directory to store intermediate output files in",
default="annotate_columns_output")
parser.add_argument(
"-o", "--output",
help="Filename of the output result",
default=None)
parser.add_argument(
"-in-sep", "--input-separator", default="\t",
help="The column separator in the input")
parser.add_argument(
"-out-sep", "--output-separator", default="\t",
help="The column separator in the output")
parser.add_argument(
"--reannotate", default=None,
help="Old pipeline config to reannotate over")
parser.add_argument(
"-i", "--full-reannotation",
help="Ignore any previous annotation and run "
" a full reannotation.",
action="store_true",
)
parser.add_argument(
"--batch-size",
type=int,
default=0, # 0 = annotate iteratively, no batches
help="Annotate in batches of",
)
CLIAnnotationContext.add_context_arguments(parser)
add_record_to_annotable_arguments(parser)
TaskGraphCli.add_arguments(parser)
VerbosityConfiguration.set_arguments(parser)
return parser
[docs]
def prepare_for_annotation(self) -> None:
if self.args.output:
self.output = self.args.output
else:
input_name = self.args.input.rstrip(".gz")
if "." in input_name:
idx = input_name.find(".")
self.output = f"{input_name[:idx]}_annotated{input_name[idx:]}"
else:
self.output = f"{input_name}_annotated"
ref_genome = self.context.get_reference_genome()
self.ref_genome_id = ref_genome.resource_id \
if ref_genome is not None else None
[docs]
def add_tasks_to_graph(self) -> None:
assert self.output is not None
pipeline_config_old = None
if self.args.reannotate:
pipeline_config_old = Path(self.args.reannotate).read_text()
if tabix_index_filename(self.args.input):
with closing(TabixFile(self.args.input)) as pysam_file:
regions = produce_regions(pysam_file, self.args.region_size)
file_paths = produce_partfile_paths(
self.args.input, regions, self.args.work_dir)
region_tasks = []
for region, path in zip(regions, file_paths, strict=True):
task_id = f"part-{str(region).replace(':', '-')}"
handler = ColumnsFormat(
self.pipeline.raw,
pipeline_config_old,
vars(self.args),
self.grr.definition,
region,
self.args.input,
path,
self.ref_genome_id,
)
region_tasks.append(self.task_graph.create_task(
task_id,
AnnotateColumnsTool.annotate,
[handler, self.args.batch_size > 0],
[]))
combine_task = self.task_graph.create_task(
"combine",
combine,
[self.args,
self.pipeline.raw,
pipeline_config_old,
self.grr.definition,
file_paths,
self.output,
self.ref_genome_id],
region_tasks)
self.task_graph.create_task(
"compress_and_tabix",
produce_tabix_index,
[self.output, self.args],
[combine_task])
else:
handler = ColumnsFormat(
self.pipeline.raw,
pipeline_config_old,
vars(self.args),
self.grr.definition,
None,
self.args.input,
self.output,
self.ref_genome_id,
)
self.task_graph.create_task(
"annotate_all",
AnnotateColumnsTool.annotate,
[handler, self.args.batch_size > 0],
[])
[docs]
def cli(raw_args: list[str] | None = None) -> None:
tool = AnnotateColumnsTool(raw_args)
tool.run()
if __name__ == "__main__":
cli(sys.argv[1:])