Source code for dae.annotation.annotate_columns

from __future__ import annotations

import argparse
import gzip
import logging
import os
import sys
from collections.abc import Generator, Iterable
from contextlib import closing
from pathlib import Path
from typing import Any

from pysam import TabixFile, tabix_index

from dae.annotation.annotatable import Annotatable
from dae.annotation.annotate_utils import (
    AnnotationTool,
    produce_partfile_paths,
    produce_regions,
    stringify,
)
from dae.annotation.annotation_config import (
    RawAnnotatorsConfig,
    RawPipelineConfig,
)
from dae.annotation.annotation_factory import build_annotation_pipeline
from dae.annotation.annotation_pipeline import (
    AnnotationPipeline,
    ReannotationPipeline,
)
from dae.annotation.context import CLIAnnotationContext
from dae.annotation.record_to_annotatable import (
    RecordToAnnotable,
    RecordToCNVAllele,
    RecordToPosition,
    RecordToRegion,
    RecordToVcfAllele,
    add_record_to_annotable_arguments,
    build_record_to_annotatable,
)
from dae.genomic_resources.cli import VerbosityConfiguration
from dae.genomic_resources.reference_genome import (
    ReferenceGenome,
    build_reference_genome_from_resource,
)
from dae.genomic_resources.repository_factory import (
    build_genomic_resource_repository,
)
from dae.task_graph import TaskGraphCli
from dae.utils.fs_utils import tabix_index_filename
from dae.utils.regions import Region

logger = logging.getLogger("annotate_columns")


[docs] def read_input( args: Any, region: Region | None = None, ) -> tuple[Any, Any, list[str]]: """Return a file object, line iterator and list of header columns. Handles differences between tabixed and non-tabixed input files. """ if args.input.endswith(".gz"): tabix_file = TabixFile(args.input) region_tuple: tuple = (region.chrom, region.start, region.stop) \ if region is not None else () with gzip.open(args.input, "rt") as in_file_raw: header = in_file_raw.readline() \ .strip("\r\n") \ .split(args.input_separator) return closing(tabix_file), tabix_file.fetch(*region_tuple), header # pylint: disable=consider-using-with text_file = open(args.input, "rt") # noqa: SIM115 header = text_file.readline().strip("\r\n").split(args.input_separator) return text_file, text_file, header
[docs] def produce_tabix_index( filepath: str, args: Any, header: list[str], ref_genome: ReferenceGenome | None, ) -> None: """Produce a tabix index file for the given variants file.""" record_to_annotatable = build_record_to_annotatable( vars(args), set(header), ref_genome) line_skip = 0 if header[0].startswith("#") else 1 if isinstance(record_to_annotatable, (RecordToRegion, RecordToCNVAllele)): seq_col = header.index(record_to_annotatable.chrom_col) start_col = header.index(record_to_annotatable.pos_beg_col) end_col = header.index(record_to_annotatable.pos_end_col) elif isinstance(record_to_annotatable, RecordToVcfAllele): seq_col = header.index(record_to_annotatable.chrom_col) start_col = header.index(record_to_annotatable.pos_col) end_col = start_col elif isinstance(record_to_annotatable, RecordToPosition): seq_col = header.index(record_to_annotatable.chrom_column) start_col = header.index(record_to_annotatable.pos_column) end_col = start_col else: raise TypeError( "Could not generate tabix index: record" f" {type(record_to_annotatable)} is of unsupported type.") tabix_index(filepath, seq_col=seq_col, start_col=start_col, end_col=end_col, line_skip=line_skip, force=True)
[docs] def combine( args: Any, pipeline_config: RawPipelineConfig, grr_definition: dict | None, ref_genome_id: str | None, partfile_paths: list[str], out_file_path: str, ) -> None: """Combine annotated region parts into a single VCF file.""" grr = build_genomic_resource_repository(definition=grr_definition) pipeline = build_annotation_pipeline( pipeline_config, grr, allow_repeated_attributes=args.allow_repeated_attributes, work_dir=Path(args.work_dir), ) if ref_genome_id is not None: genome = build_reference_genome_from_resource( grr.get_resource(ref_genome_id)) else: genome = None annotation_attributes = [ attr.name for attr in pipeline.get_attributes() if not attr.internal ] with gzip.open(args.input, "rt") as in_file_raw: hcs = in_file_raw.readline().strip("\r\n").split(args.input_separator) header = args.output_separator.join(hcs + annotation_attributes) with open(out_file_path, "wt") as out_file: out_file.write(header + "\n") for partfile_path in partfile_paths: with gzip.open(partfile_path, "rt") as partfile: partfile.readline() # skip header out_file.write(partfile.read()) for partfile_path in partfile_paths: os.remove(partfile_path) produce_tabix_index(out_file_path, args, hcs, genome)
[docs] class AnnotateColumnsTool(AnnotationTool): """Annotation tool for TSV-style text files."""
[docs] def get_argument_parser(self) -> argparse.ArgumentParser: """Configure argument parser.""" parser = argparse.ArgumentParser( description="Annotate columns", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "input", default="-", nargs="?", help="the input column file") parser.add_argument( "-r", "--region-size", default=300_000_000, type=int, help="region size to parallelize by") parser.add_argument( "-w", "--work-dir", help="Directory to store intermediate output files in", default="annotate_columns_output") parser.add_argument( "-o", "--output", help="Filename of the output result", default=None) parser.add_argument( "-in-sep", "--input-separator", default="\t", help="The column separator in the input") parser.add_argument( "-out-sep", "--output-separator", default="\t", help="The column separator in the output") parser.add_argument( "--reannotate", default=None, help="Old pipeline config to reannotate over") parser.add_argument( "--batch-mode", default=False, action="store_true", ) CLIAnnotationContext.add_context_arguments(parser) add_record_to_annotable_arguments(parser) TaskGraphCli.add_arguments(parser) VerbosityConfiguration.set_arguments(parser) return parser
[docs] @staticmethod def annotate( args: argparse.Namespace, pipeline_config: RawAnnotatorsConfig, grr_definition: dict | None, ref_genome_id: str | None, out_file_path: str, region: Region | None = None, compress_output: bool = False, # noqa: FBT001 FBT002 ) -> None: """Annotate a variants file with a given pipeline configuration.""" # pylint: disable=too-many-locals,too-many-branches # TODO Insisting on having the pipeline config passed in args # prevents the finding of a default annotation config. Consider fixing pipeline_config_old = None if args.reannotate: pipeline_config_old = Path(args.reannotate).read_text() pipeline = AnnotateColumnsTool.produce_annotation_pipeline( pipeline_config, pipeline_config_old, grr_definition, allow_repeated_attributes=args.allow_repeated_attributes, work_dir=Path(args.work_dir), ) grr = pipeline.repository ref_genome = None if ref_genome_id: res = grr.find_resource(ref_genome_id) if res is not None: ref_genome = build_reference_genome_from_resource(res).open() in_file, line_iterator, header_columns = read_input(args, region) record_to_annotatable = build_record_to_annotatable( vars(args), set(header_columns), ref_genome=ref_genome) annotation_columns = [ attr.name for attr in pipeline.get_attributes() if not attr.internal] if compress_output: out_file = gzip.open(out_file_path, "wt") else: out_file = open(out_file_path, "wt") # noqa: SIM115 if region is None: batch_work_dir = None else: chrom = region.chrom pos_beg = region.start if region.start is not None else "_" pos_end = region.stop if region.stop is not None else "_" batch_work_dir = f"{chrom}_{pos_beg}_{pos_end}" pipeline.open() with pipeline, in_file, out_file: if isinstance(pipeline, ReannotationPipeline): old_annotation_columns = { attr.name for attr in pipeline.pipeline_old.get_attributes() if not attr.internal } new_header = [ col for col in header_columns if col not in old_annotation_columns ] else: new_header = list(header_columns) new_header = new_header + annotation_columns out_file.write(args.output_separator.join(new_header) + "\n") if args.batch_mode: values = AnnotateColumnsTool.batch_annotate( args, pipeline, line_iterator, header_columns, record_to_annotatable, batch_work_dir=batch_work_dir, ) else: values = AnnotateColumnsTool.single_annotate( args, pipeline, line_iterator, header_columns, record_to_annotatable, ) for val in values: out_file.write(args.output_separator.join(val) + "\n")
[docs] @staticmethod def batch_annotate( args: argparse.Namespace, pipeline: AnnotationPipeline, line_iterator: Iterable, header_columns: list[str], record_to_annotatable: RecordToAnnotable, batch_work_dir: str | None = None, ) -> Generator[list[str], None, None]: """Annotate given lines as a batch.""" errors = [] annotation_columns = [ attr.name for attr in pipeline.get_attributes() if not attr.internal] records = [] annotatables: list[Annotatable | None] = [] for lnum, line in enumerate(line_iterator): try: columns = line.strip("\n\r").split(args.input_separator) record = dict(zip(header_columns, columns, strict=True)) records.append(record) annotatables.append(record_to_annotatable.build(record)) except Exception as ex: # pylint: disable=broad-except logger.exception( "unexpected input data format at line %s: %s", lnum, line) errors.append((lnum, line, str(ex))) if len(annotatables) == 0: return try: if isinstance(pipeline, ReannotationPipeline): annotations = pipeline.batch_annotate( annotatables, records, batch_work_dir=batch_work_dir, ) else: annotations = pipeline.batch_annotate( annotatables, batch_work_dir=batch_work_dir, ) except Exception as ex: # pylint: disable=broad-except logger.exception("Error during batch annotation") errors.append((-1, -1, str(ex))) for record, annotation in zip(records, annotations, strict=True): for col in annotation_columns: record[col] = annotation[col] yield [stringify(val) for val in record.values()] if len(errors) > 0: logger.error("there were errors during the import") for lnum, line, error in errors: logger.error("line %s: %s", lnum, line) logger.error("\t%s", error)
[docs] @staticmethod def single_annotate( args: argparse.Namespace, pipeline: AnnotationPipeline, line_iterator: Iterable, header_columns: list[str], record_to_annotatable: RecordToAnnotable, ) -> Generator[list[str], None, None]: """Annotate given lines one by one.""" errors = [] annotation_columns = [ attr.name for attr in pipeline.get_attributes() if not attr.internal] for lnum, line in enumerate(line_iterator): try: columns = line.strip("\n\r").split(args.input_separator) record = dict(zip(header_columns, columns, strict=True)) if isinstance(pipeline, ReannotationPipeline): for col in pipeline.attributes_deleted: del record[col] annotation = pipeline.annotate( record_to_annotatable.build(record), record, ) else: annotation = pipeline.annotate( record_to_annotatable.build(record), ) for col in annotation_columns: record[col] = annotation[col] yield [stringify(val) for val in record.values()] except Exception as ex: # pylint: disable=broad-except logger.exception( "unexpected input data format at line %s: %s", lnum, line) errors.append((lnum, line, str(ex))) if len(errors) > 0: logger.error("there were errors during the import") for lnum, line, error in errors: logger.error("line %s: %s", lnum, line) logger.error("\t%s", error)
[docs] def work(self) -> None: if self.args.output: output = self.args.output else: input_name = self.args.input.rstrip(".gz") if "." in input_name: idx = input_name.find(".") output = f"{input_name[:idx]}_annotated{input_name[idx:]}" else: output = f"{input_name}_annotated" ref_genome = self.context.get_reference_genome() ref_genome_id = ref_genome.resource_id \ if ref_genome is not None else None if tabix_index_filename(self.args.input): with closing(TabixFile(self.args.input)) as pysam_file: regions = produce_regions(pysam_file, self.args.region_size) file_paths = produce_partfile_paths( self.args.input, regions, self.args.work_dir) region_tasks = [] for index, (region, path) in enumerate( zip(regions, file_paths, strict=True), ): region_tasks.append(self.task_graph.create_task( f"part-{index}", AnnotateColumnsTool.annotate, [self.args, self.pipeline.raw, self.grr.definition, ref_genome_id, path, region, True], [])) self.task_graph.create_task( "combine", combine, [self.args, self.pipeline.raw, self.grr.definition, ref_genome_id, file_paths, output], region_tasks) else: self.task_graph.create_task( "annotate_all", AnnotateColumnsTool.annotate, [self.args, self.pipeline.raw, self.grr.definition, ref_genome_id, output, None, output.endswith(".gz")], [])
[docs] def cli(raw_args: list[str] | None = None) -> None: tool = AnnotateColumnsTool(raw_args) tool.run()
if __name__ == "__main__": cli(sys.argv[1:])