Source code for dae.annotation.reannotate_instance

import argparse
import pathlib
from typing import Any

from dae.annotation.annotate_utils import AnnotationTool
from dae.annotation.context import CLIAnnotationContext
from dae.duckdb_storage.duckdb_genotype_storage import (
    DuckDbParquetStorage,
)
from dae.duckdb_storage.duckdb_storage_helpers import (
    PARQUET_SCAN,
)
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.schema2_storage.schema2_import_storage import Schema2ImportStorage
from dae.studies.study import GenotypeData
from dae.task_graph.cli_tools import TaskGraphCli
from dae.utils.verbosity_configuration import VerbosityConfiguration


[docs] class ReannotateInstanceTool(AnnotationTool): """Annotation tool to reannotate the configured GPF instance"""
[docs] def get_argument_parser(self) -> argparse.ArgumentParser: """Construct and configure argument parser.""" parser = argparse.ArgumentParser( description="Reannotate instance", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "-r", "--region-size", default=300_000_000, type=int, help="region size to parallelize by", ) parser.add_argument( "-w", "--work-dir", help="Directory to store intermediate output files in", default="annotate_schema2_output") parser.add_argument( "-n", "--dry-run", action="store_true", help="Output which studies will be reannotated" " without carrying out the reannotation.", ) # TODO Implement --full-reannotation after it is # noqa: FIX002 # implemented in annotate_schema2_parquet parser.add_argument( "-i", "--full-reannotation", help="Ignore any previous annotation and run " " a full reannotation.", ) CLIAnnotationContext.add_context_arguments(parser) TaskGraphCli.add_arguments(parser) VerbosityConfiguration.set_arguments(parser) return parser
@staticmethod def _get_genotype_storage( study: GenotypeData, gpf_instance: GPFInstance, ) -> Any: if study.config.genotype_storage is None: genotype_storage_id = None else: genotype_storage_id = study.config.genotype_storage.id if genotype_storage_id is None: genotype_storage = \ gpf_instance.genotype_storages.get_default_genotype_storage() else: genotype_storage = \ gpf_instance.genotype_storages.get_genotype_storage(genotype_storage_id) return genotype_storage @staticmethod def _is_reannotatable( study: GenotypeData, gpf_instance: GPFInstance, ) -> bool: genotype_storage = ReannotateInstanceTool._get_genotype_storage( study, gpf_instance) return genotype_storage.storage_type in {"duckdb_parquet", "parquet"} @staticmethod def _get_parquet_dir( study: GenotypeData, gpf_instance: GPFInstance, ) -> str: genotype_storage = ReannotateInstanceTool._get_genotype_storage( study, gpf_instance) if not isinstance(genotype_storage, DuckDbParquetStorage): raise NotImplementedError summary_path = genotype_storage.build_study_layout(study.config).summary if summary_path is None: raise ValueError(f"No summary data in study {study.study_id}!") match = PARQUET_SCAN.fullmatch(summary_path) if match is None: raise ValueError(f"Invalid path to summary data {summary_path}!") return str( pathlib.Path(match.groupdict()["parquet_path"]).parent.parent)
[docs] def work(self) -> None: if self.gpf_instance is None: raise ValueError("No configured GPF instance to work with!") reannotatable_data: list[GenotypeData] = [ study for study in self.gpf_instance.get_all_genotype_data() if self._is_reannotatable(study, self.gpf_instance) ] # TODO When constructing reannotatable_data, maybe for # noqa: FIX002 # each study we could check the contents of the constructed # ReannotationPipeline? so for studies with empty ReannotationPipelines # (i.e. no reannotation needed) they would not be displayed print("Studies to be reannotated:") for study in reannotatable_data: print(f"-> {study.study_id}") if self.args.dry_run: return for study in reannotatable_data: print(f"Processing {study.study_id}...") study_dir = self._get_parquet_dir(study, self.gpf_instance) graph = Schema2ImportStorage.generate_reannotate_task_graph( self.gpf_instance, study_dir, self.args.region_size, self.args.allow_repeated_attributes, ) TaskGraphCli.process_graph(graph, **vars(self.args))
[docs] def cli( raw_args: list[str] | None = None, gpf_instance: GPFInstance | None = None, ) -> None: """Entry point method for instance reannotation tool.""" if gpf_instance is None: gpf_instance = GPFInstance.build() tool = ReannotateInstanceTool(raw_args, gpf_instance) tool.run()