Source code for dae.annotation.reannotate_instance

import argparse
import pathlib
from typing import Any, cast

from dae.annotation.annotate_utils import AnnotationTool
from dae.annotation.annotation_factory import load_pipeline_from_yaml
from dae.annotation.context import CLIAnnotationContext
from dae.duckdb_storage.duckdb2_variants import DuckDb2Variants
from dae.duckdb_storage.duckdb_genotype_storage import (
    DuckDbParquetStorage,
)
from dae.duckdb_storage.duckdb_storage_helpers import (
    PARQUET_SCAN,
)
from dae.gpf_instance.gpf_instance import GPFInstance
from dae.parquet_storage.storage import ParquetLoaderVariants
from dae.schema2_storage.schema2_import_storage import Schema2ImportStorage
from dae.studies.study import GenotypeData, GenotypeDataStudy
from dae.task_graph.cli_tools import TaskGraphCli
from dae.utils.verbosity_configuration import VerbosityConfiguration


[docs] class ReannotateInstanceTool(AnnotationTool): """Annotation tool to reannotate the configured GPF instance"""
[docs] def get_argument_parser(self) -> argparse.ArgumentParser: """Construct and configure argument parser.""" parser = argparse.ArgumentParser( description="Reannotate instance", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "-r", "--region-size", default=300_000_000, type=int, help="region size to parallelize by", ) parser.add_argument( "-w", "--work-dir", help="Directory to store intermediate output files in", default="annotate_schema2_output") parser.add_argument( "-n", "--dry-run", action="store_true", help="Output which studies will be reannotated" " without carrying out the reannotation.", ) parser.add_argument( "-i", "--full-reannotation", help="Ignore any previous annotation and run " " a full reannotation.", action="store_true", ) CLIAnnotationContext.add_context_arguments(parser) TaskGraphCli.add_arguments(parser) VerbosityConfiguration.set_arguments(parser) return parser
@staticmethod def _get_genotype_storage( study: GenotypeData, gpf_instance: GPFInstance, ) -> Any: if study.config.genotype_storage is None: genotype_storage_id = None else: genotype_storage_id = study.config.genotype_storage.id if genotype_storage_id is None: genotype_storage = \ gpf_instance.genotype_storages.get_default_genotype_storage() else: genotype_storage = \ gpf_instance.genotype_storages.get_genotype_storage(genotype_storage_id) return genotype_storage @staticmethod def _is_study_annotation_up_to_date( study_id: str, gpf_instance: GPFInstance, ) -> bool: canonical = set(gpf_instance.get_annotation_pipeline().get_info()) study = gpf_instance.get_genotype_data(study_id) assert study is not None assert not study.is_group study = cast(GenotypeDataStudy, study) storage = gpf_instance.genotype_storages.get_genotype_storage( study.config.genotype_storage.id, ) backend = study.backend if storage.storage_type == "duckdb_parquet": raw = cast(DuckDb2Variants, backend).fetch_annotation() elif storage.storage_type == "parquet": meta = cast(ParquetLoaderVariants, backend).loader.meta raw = meta["annotation_pipeline"] else: raise ValueError(f"Invalid storage type {storage.storage_type}") raw = raw.strip() annotation = load_pipeline_from_yaml(raw, gpf_instance.grr) new_infos = set(annotation.get_info()) return new_infos == canonical @staticmethod def _is_reannotatable( study: GenotypeData, gpf_instance: GPFInstance, ) -> bool: genotype_storage = ReannotateInstanceTool._get_genotype_storage( study, gpf_instance) annotation_ok = ReannotateInstanceTool._is_study_annotation_up_to_date( study.study_id, gpf_instance) return genotype_storage.storage_type in {"duckdb_parquet", "parquet"} \ and not annotation_ok @staticmethod def _get_parquet_dir( study: GenotypeData, gpf_instance: GPFInstance, ) -> str: genotype_storage = ReannotateInstanceTool._get_genotype_storage( study, gpf_instance) if not isinstance(genotype_storage, DuckDbParquetStorage): raise NotImplementedError summary_path = genotype_storage.build_study_layout(study.config).summary if summary_path is None: raise ValueError(f"No summary data in study {study.study_id}!") match = PARQUET_SCAN.fullmatch(summary_path) if match is None: raise ValueError(f"Invalid path to summary data {summary_path}!") return str( pathlib.Path(match.groupdict()["parquet_path"]).parent.parent)
[docs] def work(self) -> None: if self.gpf_instance is None: raise ValueError("No configured GPF instance to work with!") reannotatable_data: list[GenotypeData] = [ study for study in self.gpf_instance.get_all_genotype_data() if self._is_reannotatable(study, self.gpf_instance) ] if not reannotatable_data: print("Nothing to be done.") return print("Studies to be reannotated:") for study in reannotatable_data: print(f"-> {study.study_id}") if self.args.dry_run: return for study in reannotatable_data: print(f"Processing {study.study_id}...") study_dir = self._get_parquet_dir(study, self.gpf_instance) graph = Schema2ImportStorage.generate_reannotate_task_graph( self.gpf_instance, study_dir, self.args.region_size, self.args.allow_repeated_attributes, full_reannotation=self.args.full_reannotation, ) TaskGraphCli.process_graph(graph, **vars(self.args))
[docs] def cli( raw_args: list[str] | None = None, gpf_instance: GPFInstance | None = None, ) -> None: """Entry point method for instance reannotation tool.""" if gpf_instance is None: gpf_instance = GPFInstance.build() tool = ReannotateInstanceTool(raw_args, gpf_instance) tool.run()