Source code for dae.tools.simple_study_import

#!/usr/bin/env python

import argparse
import logging
import os
import sys
import time

from box import Box

from dae.annotation.annotation_pipeline import AnnotationPipeline
from dae.annotation.effect_annotator import EffectAnnotatorAdapter
from dae.common_reports import generate_common_report
from dae.gene_sets import generate_denovo_gene_sets
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.import_tools.cli import run_with_project
from dae.import_tools.import_tools import ImportProject
from dae.pedigrees.loader import FamiliesLoader
from dae.utils.verbosity_configuration import VerbosityConfiguration
from dae.variants_loaders.cnv.loader import CNVLoader
from dae.variants_loaders.dae.loader import DaeTransmittedLoader, DenovoLoader
from dae.variants_loaders.raw.loader import (
    AnnotationPipelineDecorator,
    EffectAnnotationDecorator,
    VariantsLoader,
)
from dae.variants_loaders.vcf.loader import VcfLoader

logger = logging.getLogger("simple_study_import")


[docs] def cli_arguments( dae_config: Box, argv: list[str] | None = None, ) -> argparse.Namespace: """Create and return CLI arguments parser.""" default_genotype_storage_id = None if dae_config and dae_config.genotype_storage: default_genotype_storage_id = \ dae_config.genotype_storage.default parser = argparse.ArgumentParser( description="simple import of new study data", conflict_handler="resolve", formatter_class=argparse.RawDescriptionHelpFormatter, ) VerbosityConfiguration.set_arguments(parser) FamiliesLoader.cli_arguments(parser) parser.add_argument( "--id", "--study-id", type=str, metavar="<study ID>", dest="id", help="Unique study ID to use. " "If not specified the basename of the family pedigree file is used " "for study ID", ) parser.add_argument( "--vcf-files", type=str, nargs="+", metavar="<VCF filename>", help="VCF file to import", ) parser.add_argument( "--denovo-file", type=str, metavar="<de Novo variants filename>", help="DAE denovo variants file", ) parser.add_argument( "--cnv-file", type=str, metavar="<CNV variants filename>", help="CNV variants file", ) parser.add_argument( "--dae-summary-file", type=str, metavar="<summary filename>", help="DAE transmitted summary variants file to import", ) parser.add_argument( "-o", "--out", type=str, default=None, dest="output", metavar="<output directory>", help="output directory for storing intermediate parquet files. " 'If none specified, "parquet/" directory inside GPF instance ' "study directory is used [default: %(default)s]", ) parser.add_argument( "--skip-reports", help="skip running report generation [default: %(default)s]", default=False, action="store_true", ) parser.add_argument( "--genotype-storage", "--gs", type=str, metavar="<genotype storage id>", dest="genotype_storage", help="Id of defined in DAE.conf genotype storage " "[default: %(default)s]", default=default_genotype_storage_id, action="store", ) parser.add_argument( "--add-chrom-prefix", type=str, default=None, help="Add specified prefix to each chromosome name in " "variants file", ) DenovoLoader.cli_arguments(parser, options_only=True) VcfLoader.cli_arguments(parser, options_only=True) DaeTransmittedLoader.cli_arguments(parser, options_only=True) CNVLoader.cli_arguments(parser, options_only=True) return parser.parse_args(argv or sys.argv[1:])
def _decorate_loader( variants_loader: VariantsLoader, effect_annotator: EffectAnnotatorAdapter, annotation_pipeline: AnnotationPipeline, ) -> VariantsLoader: variants_loader = EffectAnnotationDecorator( variants_loader, effect_annotator) # type: ignore if annotation_pipeline is not None: variants_loader = AnnotationPipelineDecorator( variants_loader, annotation_pipeline) return variants_loader
[docs] def build_import_project( args: argparse.Namespace, gpf_instance: GPFInstance, ) -> ImportProject: """Build an import project based on the CLI arguments.""" project = { "gpf_instance": { "path": gpf_instance.dae_config.conf_dir, }, "destination": { "storage_id": args.genotype_storage, }, } if args.id is not None: study_id = args.id else: study_id, _ = os.path.splitext(os.path.basename(args.families)) project["id"] = study_id if args.output is not None: project["processing_config"] = {} project["processing_config"]["work_dir"] = args.output project["input"] = {} families_filenames, families_params = \ FamiliesLoader.parse_cli_arguments(args) families_filename = families_filenames[0] project["input"]["pedigree"] = \ ImportProject.del_loader_prefix(families_params, "ped_") project["input"]["pedigree"]["file"] = families_filename if args.denovo_file is not None: denovo_filename, denovo_params = DenovoLoader.parse_cli_arguments(args) project["input"]["denovo"] = \ ImportProject.del_loader_prefix(denovo_params, "denovo_") project["input"]["denovo"]["files"] = [denovo_filename] if args.cnv_file is not None: cnv_filename, cnv_params = CNVLoader.parse_cli_arguments(args) project["input"]["cnv"] = \ ImportProject.del_loader_prefix(cnv_params, "cnv_") project["input"]["cnv"]["files"] = [cnv_filename] if args.vcf_files is not None: vcf_files, vcf_params = VcfLoader.parse_cli_arguments(args) project["input"]["vcf"] = \ ImportProject.del_loader_prefix(vcf_params, "vcf_") project["input"]["vcf"]["files"] = vcf_files if args.dae_summary_file is not None: dae_file, dae_params = DaeTransmittedLoader.parse_cli_arguments(args) project["input"]["dae"] = \ ImportProject.del_loader_prefix(dae_params, "dae_") project["input"]["dae"]["files"] = [dae_file] return ImportProject.build_from_config(project, gpf_instance=gpf_instance)
[docs] def main( argv: list[str] | None = None, gpf_instance: GPFInstance | None = None, ) -> None: """Run the simple study import procedure.""" # pylint: disable=too-many-locals,too-many-branches,too-many-statements dae_config = None if gpf_instance is not None: dae_config = gpf_instance.dae_config else: try: gpf_instance = GPFInstance.build() dae_config = gpf_instance.dae_config except Exception as ex: # pylint: disable=broad-except logger.exception("GPF not configured correctly") raise ValueError("unable to find configured GPF instance") from ex if argv is None: argv = sys.argv[1:] args = cli_arguments(dae_config, argv) VerbosityConfiguration.set(args) logging.getLogger("impala").setLevel(logging.WARNING) import_project = build_import_project(args, gpf_instance=gpf_instance) run_with_project(import_project) if not args.skip_reports: # needs to reload the configuration, hence gpf_instance=None gpf_instance.reload() argv = ["--studies", import_project.study_id] logger.info("generating common reports...") start = time.time() generate_common_report.main(argv, gpf_instance) logger.info( "DONE: generating common reports in %.2f sec", time.time() - start) logger.info("generating de Novo gene sets...") start = time.time() generate_denovo_gene_sets.main(gpf_instance=gpf_instance, argv=argv) logger.info( "DONE: generating de Novo gene sets in %.2f sec", time.time() - start)
if __name__ == "__main__": main(sys.argv[1:])