import argparse
import gzip
import logging
import os
from pathlib import Path
from typing import Any
import numpy as np
from pysam import TabixFile, tabix_index
from dae.annotation.annotation_pipeline import AnnotationPipeline
from dae.annotation.genomic_context import (
CLIAnnotationContextProvider,
get_context_pipeline,
)
from dae.annotation.record_to_annotatable import (
DaeAlleleRecordToAnnotatable,
RecordToCNVAllele,
RecordToPosition,
RecordToRegion,
RecordToVcfAllele,
build_record_to_annotatable,
)
from dae.genomic_resources.cached_repository import cache_resources
from dae.genomic_resources.genomic_context import (
GenomicContext,
context_providers_init,
get_genomic_context,
register_context_provider,
)
from dae.genomic_resources.repository import GenomicResourceRepo
from dae.task_graph.graph import TaskGraph
from dae.utils.regions import (
Region,
get_chromosome_length_tabix,
split_into_regions,
)
PART_FILENAME = "{in_file}_annotation_{chrom}_{pos_beg}_{pos_end}"
logger = logging.getLogger("annotate_utils")
[docs]
def produce_regions(
pysam_file: TabixFile, region_size: int,
) -> list[Region]:
"""Given a region size, produce contig regions to annotate by."""
contig_lengths: dict[str, int] = {}
for contig in map(str, pysam_file.contigs):
length = get_chromosome_length_tabix(pysam_file, contig)
if length is None:
raise ValueError(f"unable to find length of contig '{contig}'")
contig_lengths[contig] = length
regions: list[Region] = []
for contig, length in contig_lengths.items():
regions.extend(split_into_regions(contig, length, region_size))
return regions
[docs]
def produce_partfile_paths(
input_file_path: str, regions: list[Region], work_dir: str,
) -> list[str]:
"""Produce a list of file paths for output region part files."""
filenames = []
for region in regions:
pos_beg = region.start if region.start is not None else "_"
pos_end = region.stop if region.stop is not None else "_"
filenames.append(os.path.join(work_dir, PART_FILENAME.format(
in_file=os.path.basename(input_file_path),
chrom=region.chrom, pos_beg=pos_beg, pos_end=pos_end,
)))
return filenames
def _read_header(filepath: str, separator: str = "\t") -> list[str]:
"""Extract header from columns file."""
if filepath.endswith(".gz"):
file = gzip.open(filepath, "rt") # noqa: SIM115
else:
file = open(filepath, "r") # noqa: SIM115
with file:
header = file.readline()
return [c.strip() for c in header.split(separator)]
[docs]
def produce_tabix_index(filepath: str, args: dict | None = None) -> None:
"""Produce a tabix index file for the given variants file."""
header = _read_header(filepath)
line_skip = 0 if header[0].startswith("#") else 1
header = [c.strip("#") for c in header]
record_to_annotatable = build_record_to_annotatable(
args if args is not None else {},
set(header),
)
if isinstance(record_to_annotatable, (RecordToRegion,
RecordToCNVAllele)):
seq_col = header.index(record_to_annotatable.chrom_col)
start_col = header.index(record_to_annotatable.pos_beg_col)
end_col = header.index(record_to_annotatable.pos_end_col)
elif isinstance(record_to_annotatable, RecordToVcfAllele):
seq_col = header.index(record_to_annotatable.chrom_col)
start_col = header.index(record_to_annotatable.pos_col)
end_col = start_col
elif isinstance(
record_to_annotatable,
(RecordToPosition, DaeAlleleRecordToAnnotatable)):
seq_col = header.index(record_to_annotatable.chrom_column)
start_col = header.index(record_to_annotatable.pos_column)
end_col = start_col
else:
raise TypeError(
"Could not generate tabix index: record"
f" {type(record_to_annotatable)} is of unsupported type.")
logger.info(
"producing tabix index for '%s': "
"tabix_index(%s, seq_col=%s, start_col=%s, end_col=%s, "
"line_skip=%s, force=True)",
filepath, filepath, seq_col, start_col, end_col, line_skip)
tabix_index(filepath,
seq_col=seq_col,
start_col=start_col,
end_col=end_col,
line_skip=line_skip,
force=True)
[docs]
def stringify(value: Any, *, vcf: bool = False) -> str:
"""Format the value to a string for human-readable output."""
if value is None:
return "." if vcf else ""
if isinstance(value, (float, np.floating)):
return f"{value:.3g}"
if isinstance(value, bool):
return "yes" if value else ("." if vcf else "")
if vcf is True and value == "":
return "."
return str(value)
[docs]
def get_stuff_from_context(
cli_args: dict[str, Any],
) -> tuple[AnnotationPipeline, GenomicContext, GenomicResourceRepo]:
"""Helper method to collect necessary objects from the genomic context."""
register_context_provider(CLIAnnotationContextProvider())
context_providers_init(**cli_args)
registered_context = get_genomic_context()
# Maybe add a method to build a pipeline from a genomic context
# the pipeline arguments are registered as a context above, where
# the pipeline is also written into the context, only to be accessed
# 3 lines down
pipeline = get_context_pipeline(registered_context)
if pipeline is None:
raise ValueError("no valid annotation pipeline configured")
context = pipeline.build_pipeline_genomic_context()
grr = context.get_genomic_resources_repository()
if grr is None:
raise ValueError("no valid GRR configured")
return pipeline, context, grr
[docs]
def cache_pipeline_resources(
grr: GenomicResourceRepo,
pipeline: AnnotationPipeline,
) -> None:
"""Cache resources that the given pipeline will use."""
resource_ids: set[str] = {
res.resource_id
for annotator in pipeline.annotators
for res in annotator.resources
}
cache_resources(grr, resource_ids)
[docs]
def handle_default_args(args: dict[str, Any]) -> dict[str, Any]:
"""Handle default arguments for annotation command line tools."""
if not os.path.exists(args["input"]):
raise ValueError(f"{args['input']} does not exist!")
output = build_output_path(args["input"], args.get("output"))
args["output"] = output
if args.get("work_dir") is None:
path = Path(args["output"])
if path.suffix == ".gz":
path = path.with_suffix("")
path = path.with_suffix("")
args["work_dir"] = str(f"{path}_work")
if not os.path.exists(args["work_dir"]):
os.mkdir(args["work_dir"])
if args.get("task_status_dir") is None:
args["task_status_dir"] = os.path.join(
args["work_dir"], ".task-status")
if args.get("task_log_dir") is None:
args["task_log_dir"] = os.path.join(
args["work_dir"], ".task-log")
return args
[docs]
def add_common_annotation_arguments(parser: argparse.ArgumentParser) -> None:
"""Add common arguments to an annotation command line parser."""
parser.add_argument(
"input", default="-", nargs="?",
help="the input column file")
parser.add_argument(
"-r", "--region-size", default=300_000_000,
type=int, help="region size to parallelize by")
parser.add_argument(
"-w", "--work-dir",
help="Directory to store intermediate output files in",
default=None)
parser.add_argument(
"-o", "--output",
help="Filename of the output result",
default=None)
parser.add_argument(
"--reannotate", default=None,
help="Old pipeline config to reannotate over")
parser.add_argument(
"-i", "--full-reannotation",
help="Ignore any previous annotation and run "
" a full reannotation.",
action="store_true",
)
parser.add_argument(
"--keep-parts", "--keep-intermediate-files",
help="Keep intermediate files after annotatio.",
action="store_true",
default=True,
)
parser.add_argument(
"--no-keep-parts", "--no-keep-intermediate-files",
help="Remove intermediate files after annotatio.",
dest="keep_parts",
action="store_false",
)
parser.add_argument(
"--batch-size",
type=int,
default=0, # 0 = annotate iteratively, no batches
help="Annotate in batches of",
)
[docs]
def build_output_path(raw_input_path: str, output_path: str | None) -> str:
"""Build an output filepath for an annotation tool's output."""
if output_path:
return output_path.rstrip(".gz")
# no output filename given, produce from input filename
path = Path(raw_input_path.rstrip(".gz"))
# backup suffixes
suffixes = path.suffixes
path = Path(path.name)
# append '_annotated' to filename stem
path = path.with_stem(f"{path.stem}_annotated")
# restore suffixes and return
if not suffixes:
return str(path)
return str(path.with_suffix(suffixes[-1]))