Source code for dae.annotation.annotate_utils
import argparse
import os
import sys
from abc import abstractmethod
from pathlib import Path
from typing import Any
from pysam import TabixFile
from dae.annotation.annotation_config import RawAnnotatorsConfig
from dae.annotation.annotation_factory import (
build_annotation_pipeline,
load_pipeline_from_yaml,
)
from dae.annotation.annotation_pipeline import (
AnnotationPipeline,
FullReannotationPipeline,
ReannotationPipeline,
)
from dae.annotation.context import CLIAnnotationContext
from dae.genomic_resources.cached_repository import cache_resources
from dae.genomic_resources.genomic_context import get_genomic_context
from dae.genomic_resources.repository_factory import (
build_genomic_resource_repository,
)
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.task_graph import TaskGraphCli
from dae.task_graph.graph import TaskGraph
from dae.utils.regions import (
Region,
get_chromosome_length_tabix,
split_into_regions,
)
from dae.utils.verbosity_configuration import VerbosityConfiguration
PART_FILENAME = "{in_file}_annotation_{chrom}_{pos_beg}_{pos_end}.gz"
[docs]
def produce_regions(
pysam_file: TabixFile, region_size: int,
) -> list[Region]:
"""Given a region size, produce contig regions to annotate by."""
contig_lengths: dict[str, int] = {}
for contig in map(str, pysam_file.contigs):
length = get_chromosome_length_tabix(pysam_file, contig)
if length is None:
raise ValueError(f"unable to find length of contig '{contig}'")
contig_lengths[contig] = length
regions: list[Region] = []
for contig, length in contig_lengths.items():
regions.extend(split_into_regions(contig, length, region_size))
return regions
[docs]
def produce_partfile_paths(
input_file_path: str, regions: list[Region], work_dir: str,
) -> list[str]:
"""Produce a list of file paths for output region part files."""
filenames = []
for region in regions:
pos_beg = region.start if region.start is not None else "_"
pos_end = region.stop if region.stop is not None else "_"
filenames.append(os.path.join(work_dir, PART_FILENAME.format(
in_file=os.path.basename(input_file_path),
chrom=region.chrom, pos_beg=pos_beg, pos_end=pos_end,
)))
return filenames
[docs]
def stringify(value: Any, *, vcf: bool = False) -> str:
"""Format the value to a string for human-readable output."""
if value is None:
return "." if vcf else ""
if isinstance(value, float):
return f"{value:.3g}"
if isinstance(value, bool):
return "yes" if value else ""
return str(value)
[docs]
class AnnotationTool:
"""Base class for annotation tools. Format-agnostic."""
def __init__(
self,
raw_args: list[str] | None = None,
gpf_instance: GPFInstance | None = None,
) -> None:
if not raw_args:
raw_args = sys.argv[1:]
parser = self.get_argument_parser()
self.args = parser.parse_args(raw_args)
VerbosityConfiguration.set(self.args)
CLIAnnotationContext.register(self.args)
self.gpf_instance = gpf_instance
self.context = get_genomic_context()
self.pipeline = CLIAnnotationContext.get_pipeline(self.context)
grr = \
CLIAnnotationContext.get_genomic_resources_repository(self.context)
if grr is None:
raise ValueError("No valid GRR configured. Aborting.")
self.grr = grr
self.task_graph = TaskGraph()
[docs]
def setup_work_dir(self) -> None:
if not os.path.exists(self.args.work_dir):
os.mkdir(self.args.work_dir)
self.args.task_status_dir = os.path.join(
self.args.work_dir, ".task-status")
self.args.log_dir = os.path.join(self.args.work_dir, ".task-log")
[docs]
@staticmethod
def produce_annotation_pipeline(
pipeline_config: RawAnnotatorsConfig,
pipeline_config_old: str | None,
grr_definition: dict | None,
*,
allow_repeated_attributes: bool,
work_dir: Path | None = None,
full_reannotation: bool = False,
) -> AnnotationPipeline:
"""Produce an annotation or reannotation pipeline."""
grr = build_genomic_resource_repository(definition=grr_definition)
pipeline = build_annotation_pipeline(
pipeline_config, grr,
allow_repeated_attributes=allow_repeated_attributes,
work_dir=work_dir,
)
if pipeline_config_old is not None:
pipeline_old = load_pipeline_from_yaml(pipeline_config_old, grr)
pipeline = ReannotationPipeline(pipeline, pipeline_old) \
if not full_reannotation \
else FullReannotationPipeline(pipeline, pipeline_old)
return pipeline
[docs]
def run(self) -> None:
"""Construct annotation tasks and process them."""
# Is this too eager? What if a reannotation pipeline is created
# inside work() and the only caching that must be done is far smaller
# than the entire new annotation config suggests?
resource_ids: set[str] = {
res.resource_id
for annotator in self.pipeline.annotators
for res in annotator.resources
}
cache_resources(self.grr, resource_ids)
self.setup_work_dir()
if hasattr(self.args, "input"):
self.task_graph.input_files.append(self.args.input)
if hasattr(self.args, "pipeline"):
self.task_graph.input_files.append(self.args.pipeline)
if hasattr(self.args, "reannotate") and self.args.reannotate:
self.task_graph.input_files.append(self.args.reannotate)
self.work()
TaskGraphCli.process_graph(self.task_graph, **vars(self.args))