Source code for dae.annotation.annotate_utils
import argparse
import os
import sys
from abc import abstractmethod
from pathlib import Path
from pysam import TabixFile
from dae.annotation.annotation_config import RawAnnotatorsConfig
from dae.annotation.annotation_factory import (
build_annotation_pipeline,
load_pipeline_from_yaml,
)
from dae.annotation.annotation_pipeline import (
AnnotationPipeline,
ReannotationPipeline,
)
from dae.annotation.context import CLIAnnotationContext
from dae.genomic_resources.cached_repository import cache_resources
from dae.genomic_resources.genomic_context import get_genomic_context
from dae.genomic_resources.repository_factory import (
build_genomic_resource_repository,
)
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.task_graph import TaskGraphCli
from dae.task_graph.graph import TaskGraph
from dae.utils.regions import get_chromosome_length_tabix
from dae.utils.verbosity_configuration import VerbosityConfiguration
PART_FILENAME = "{in_file}_annotation_{chrom}_{pos_beg}_{pos_end}.gz"
[docs]
def produce_regions(
pysam_file: TabixFile, region_size: int,
) -> list[tuple[str, int, int]]:
"""Given a region size, produce contig regions to annotate by."""
contig_lengths: dict[str, int] = {}
for contig in map(str, pysam_file.contigs):
length = get_chromosome_length_tabix(pysam_file, contig)
if length is None:
raise ValueError(f"unable to find length of contig '{contig}'")
contig_lengths[contig] = length
return [
(contig, start, start + region_size)
for contig, length in contig_lengths.items()
for start in range(1, length, region_size)
]
[docs]
def produce_partfile_paths(
input_file_path: str, regions: list[tuple[str, int, int]], work_dir: str,
) -> list[str]:
"""Produce a list of file paths for output region part files."""
filenames = []
for region in regions:
pos_beg = region[1] if len(region) > 1 else "_"
pos_end = region[2] if len(region) > 2 else "_"
filenames.append(os.path.join(work_dir, PART_FILENAME.format(
in_file=os.path.basename(input_file_path),
chrom=region[0], pos_beg=pos_beg, pos_end=pos_end,
)))
return filenames
[docs]
class AnnotationTool:
"""Base class for annotation tools. Format-agnostic."""
def __init__(
self,
raw_args: list[str] | None = None,
gpf_instance: GPFInstance | None = None,
) -> None:
if not raw_args:
raw_args = sys.argv[1:]
parser = self.get_argument_parser()
self.args = parser.parse_args(raw_args)
VerbosityConfiguration.set(self.args)
CLIAnnotationContext.register(self.args)
self.gpf_instance = gpf_instance
self.context = get_genomic_context()
self.pipeline = CLIAnnotationContext.get_pipeline(self.context)
grr = \
CLIAnnotationContext.get_genomic_resources_repository(self.context)
if grr is None:
raise ValueError("No valid GRR configured. Aborting.")
self.grr = grr
self.task_graph = TaskGraph()
if not os.path.exists(self.args.work_dir):
os.mkdir(self.args.work_dir)
self.args.task_status_dir = os.path.join(
self.args.work_dir, ".task-status")
self.args.log_dir = os.path.join(self.args.work_dir, ".task-log")
@staticmethod
def _produce_annotation_pipeline(
pipeline_config: RawAnnotatorsConfig,
pipeline_config_old: str | None,
grr_definition: dict | None,
*,
allow_repeated_attributes: bool,
work_dir: Path | None = None,
) -> AnnotationPipeline:
grr = build_genomic_resource_repository(definition=grr_definition)
pipeline = build_annotation_pipeline(
pipeline_config, grr,
allow_repeated_attributes=allow_repeated_attributes,
work_dir=work_dir,
)
if pipeline_config_old is not None:
pipeline_old = load_pipeline_from_yaml(pipeline_config_old, grr)
pipeline = ReannotationPipeline(pipeline, pipeline_old)
return pipeline
[docs]
def run(self) -> None:
"""Construct annotation tasks and process them."""
# FIXME Is this too eager? What if a reannotation pipeline is created
# inside work() and the only caching that must be done is far smaller
# than the entire new annotation config suggests?
resource_ids: set[str] = {
res.resource_id
for annotator in self.pipeline.annotators
for res in annotator.resources
}
cache_resources(self.grr, resource_ids)
self.task_graph.input_files.append(self.args.input)
self.task_graph.input_files.append(self.args.pipeline)
if hasattr(self.args, "reannotate") and self.args.reannotate:
self.task_graph.input_files.append(self.args.reannotate)
self.work()
TaskGraphCli.process_graph(self.task_graph, **vars(self.args))