Source code for dae.tools.annotate_variants

#!/usr/bin/env python

import argparse
import os.path
import sys
import time
from typing import Any

import pysam

from dae.effect_annotation.annotator import EffectAnnotator
from dae.effect_annotation.effect import AnnotationEffect
from dae.genomic_resources.gene_models import (
    GeneModels,
    build_gene_models_from_file,
)
from dae.genomic_resources.reference_genome import (
    ReferenceGenome,
    build_reference_genome_from_file,
)


[docs] def cli_genome_options( parser: argparse.ArgumentParser) -> argparse.ArgumentParser: """Create parser options for reference genome and gene models.""" genome_group = parser.add_argument_group("genome specification") genome_group.add_argument( "--gene-models-id", "-T", help="gene models ID <RefSeq, CCDS, knownGene>", ) genome_group.add_argument( "--gene-models-filename", "--Traw", help="outside gene models file path", ) genome_group.add_argument( "--gene-models-fileformat", "--TrawFormat", help="outside gene models format (refseq, ccds, knowngene)", action="store", ) genome_group.add_argument( "--gene-mapping-filename", "-I", help="geneIDs mapping file", default=None, action="store", ) genome_group.add_argument( "--genome-id", "-G", help="genome ID <GATK_ResourceBundle_5777_b37_phiX174, hg19> ", action="store", ) genome_group.add_argument( "--genome-filename", "--Graw", help="outside genome file name", action="store", ) genome_group.add_argument( "--promoter-len", "-P", help="promoter length", default=0, type=int, dest="promoter_len", ) return parser
[docs] def parse_cli_genome_options( args: argparse.Namespace, ) -> tuple[ReferenceGenome | None, GeneModels | None]: """Parse reference genome and gene models options.""" genomic_sequence = None gene_models = None if args.gene_models_filename: gene_models = build_gene_models_from_file( args.gene_models_filename, file_format=args.gene_models_fileformat, gene_mapping_file_name=args.gene_mapping_filename, ) if args.genome_filename: genomic_sequence = build_reference_genome_from_file( args.genome_filename) genomic_sequence = genomic_sequence.open() if gene_models and genomic_sequence: return genomic_sequence, gene_models if genomic_sequence is None or gene_models is None: return None, None return None, None
[docs] def cli_variants_options(parser: argparse.ArgumentParser) -> None: """Configure parser for variant specifying options.""" location_group = parser.add_argument_group("variants location") location_group.add_argument( "--chrom", "-c", help="chromosome column number/name", action="store", ) location_group.add_argument( "--pos", "-p", help="position column number/name", action="store", ) location_group.add_argument( "--location", "-x", help="location (chr:pos) column number/name", action="store", ) variants_group = parser.add_argument_group("variants specification") variants_group.add_argument( "--variant", "-v", help="variant column number/name", action="store", ) variants_group.add_argument( "--ref", "-r", help="reference allele column number/name", action="store", ) variants_group.add_argument( "--alt", "-a", help="alternative allele column number/name", action="store", ) parser.add_argument( "--no-header", "-H", help="no header in the input file", default=False, action="store_true", )
# variants_group.add_argument( # "-t", help="type of mutation column number/name", action="store" # ) # variants_group.add_argument( # "-q", help="seq column number/name", action="store" # ) # variants_group.add_argument( # "-l", help="length column number/name", action="store" # )
[docs] def parse_cli_variants_options(args: argparse.Namespace) -> dict[str, Any]: """Parse variant definition options.""" columns = {} if args.location is None: if args.chrom is None and args.pos is None: # default is location columns["loc"] = "location" else: assert args.chrom is not None and args.pos is not None columns["chrom"] = args.chrom columns["position"] = args.pos else: assert args.chrom is None and args.pos is None columns["loc"] = args.location if args.variant is None: if args.ref is None and args.alt is None: # default is variant columns["var"] = "variant" else: assert args.ref is not None and args.alt is not None columns["ref"] = args.ref columns["alt"] = args.alt else: assert args.ref is None and args.alt is None columns["var"] = args.variant return columns
[docs] def cli(argv: list[str] | None = None) -> None: """Annotate variants main function.""" # pylint: disable=too-many-branches,too-many-statements if argv is None: argv = sys.argv[1:] parser = argparse.ArgumentParser( description="variants effect annotator", conflict_handler="resolve", formatter_class=argparse.RawDescriptionHelpFormatter, ) cli_genome_options(parser) cli_variants_options(parser) parser.add_argument( "input_filename", nargs="?", help="input variants file name", ) parser.add_argument( "output_filename", nargs="?", help="output file name (default: stdout)", ) args = parser.parse_args(argv) genomic_sequence, gene_models = parse_cli_genome_options(args) assert genomic_sequence is not None assert gene_models is not None annotator = EffectAnnotator( genomic_sequence, gene_models, promoter_len=args.promoter_len, ) variant_columns = parse_cli_variants_options(args) if args.input_filename == "-" or args.input_filename is None: infile = sys.stdin else: assert os.path.exists(args.input_filename), args.input_filename # pylint: disable=consider-using-with infile = open(args.input_filename, "r") if args.output_filename is None: outfile = sys.stdout else: # pylint: disable=consider-using-with outfile = open(args.output_filename, "w") start = time.time() header = None assert not args.no_header, args if args.no_header: assert False for key, value in variant_columns.items(): variant_columns[key] = int(value) else: line = infile.readline().strip() header = [c.strip() for c in line.split("\t")] for key, value in variant_columns.items(): assert value in header variant_columns[key] = header.index(value) header.extend(["effectType", "effectGene", "effectDetails"]) print("header:", header, variant_columns, file=sys.stderr) print("\t".join(header), file=sys.stderr) print("\t".join(header), file=outfile) counter = 0 for counter, line in enumerate(infile): if line[0] == "#": continue columns = [c.strip() for c in line.split("\t")] variant = { key: columns[value] for key, value in variant_columns.items() } effects = annotator.do_annotate_variant(**variant) desc = AnnotationEffect.effects_description(effects) columns.extend(desc) print("\t".join(columns), file=sys.stderr) print("\t".join(columns), file=outfile) if (counter + 1) % 1000 == 0: elapsed = time.time() - start print( f"processed {counter + 1} lines in {elapsed:0.2f} sec", file=sys.stderr, ) infile.close() if args.output_filename: outfile.close() elapsed = time.time() - start print(80 * "=", file=sys.stderr) print( f"DONE: {counter + 1} variants in {elapsed:0.2f} sec", file=sys.stderr, ) print(80 * "=", file=sys.stderr)
[docs] def cli_vcf(argv: list[str] | None = None) -> None: """Annotate variants main function for annotating VCF file.""" # pylint: disable=too-many-branches,too-many-statements,too-many-locals if argv is None: argv = sys.argv[1:] parser = argparse.ArgumentParser( description="VCF variants effect annotator", conflict_handler="resolve", formatter_class=argparse.RawDescriptionHelpFormatter, ) cli_genome_options(parser) parser.add_argument("input_filename", help="input VCF variants file name") parser.add_argument( "output_filename", nargs="?", help="output file name (default: stdout)", ) args = parser.parse_args(argv) genomic_sequence, gene_models = parse_cli_genome_options(args) assert genomic_sequence is not None assert gene_models is not None annotator = EffectAnnotator( genomic_sequence, gene_models, promoter_len=args.promoter_len, ) assert os.path.exists(args.input_filename), args.input_filename # pylint: disable=no-member infile = pysam.VariantFile(args.input_filename) if args.output_filename is None: outfile = sys.stdout else: # pylint: disable=consider-using-with outfile = open(args.output_filename, "w") start = time.time() # Transfer VCF header header = infile.header header.add_meta( "variant_effect_annotation", "GPF variant effects annotation", ) header.add_meta( "variant_effect_annotation_command", f"'{' '.join(sys.argv)}'", ) header.info.add("ET", ".", "String", "effected type") header.info.add("EG", ".", "String", "effected gene") header.info.add("ED", ".", "String", "effect details") print(str(header), file=outfile, end="") counter = 0 for counter, variant in enumerate(infile): effect_types = [] effect_genes = [] effect_details = [] egs = "" eds = "" if variant.alts is not None: for alt in variant.alts: effects = annotator.do_annotate_variant( chrom=variant.chrom, pos=variant.pos, ref=variant.ref, alt=alt, ) ets, egs, eds = AnnotationEffect.effects_description(effects) eds = eds.replace(";", "|") effect_types.append(ets) effect_genes.append(egs) effect_details.append(eds) variant.info["ET"] = ",".join(effect_types) variant.info["EG"] = egs variant.info["ED"] = eds print(str(variant), file=outfile, end="") if (counter + 1) % 1000 == 0: elapsed = time.time() - start print( f"processed {counter + 1} variants in {elapsed:0.2f} sec", file=sys.stderr, ) infile.close() if args.output_filename: outfile.close() elapsed = time.time() - start print(80 * "=", file=sys.stderr) print( f"DONE: {counter + 1} variants in {elapsed:0.2f} sec", file=sys.stderr, ) print(80 * "=", file=sys.stderr)
if __name__ == "__main__": cli(sys.argv[1:])