from __future__ import annotations
import argparse
import glob
import logging
import os
import sys
from collections.abc import Iterable
from typing import Any, cast
import toml
import yaml
from dae.gpf_instance.adjustments.adjust_command import (
AdjustmentsCommand,
)
from dae.gpf_instance.adjustments.adjust_duckdb_storage import (
AdjustDuckDbStorageCommand,
)
from dae.gpf_instance.adjustments.adjust_impala_storage import (
AdjustImpalaStorageCommand,
)
from dae.utils.verbosity_configuration import VerbosityConfiguration
logger = logging.getLogger("gpf_instance_adjustments")
[docs]
class InstanceIdCommand(AdjustmentsCommand):
"""Adjusts GPF instance ID."""
def __init__(self, instance_dir: str, instance_id: str) -> None:
super().__init__(instance_dir)
self.instance_id = instance_id
[docs]
def execute(self) -> None:
self.config["instance_id"] = self.instance_id
logger.info(
"replacing instance id with %s", self.instance_id)
[docs]
class StudyConfigsAdjustmentCommand(AdjustmentsCommand):
"""Command to adjust study configs."""
def _execute_studies(self, config_format: str = "toml") -> None:
study_configs_dir = os.path.join(self.instance_dir, "studies")
if config_format == "toml":
pattern = os.path.join(study_configs_dir, "**/*.conf")
elif config_format == "yaml":
pattern = os.path.join(study_configs_dir, "**/*.yaml")
else:
raise ValueError(f"unknown config format {config_format}")
config_filenames = glob.glob(pattern, recursive=True)
for config_filename in config_filenames:
logger.info("processing study %s", config_filename)
with open(config_filename, "rt", encoding="utf8") as infile:
if config_format == "toml":
study_config = toml.loads(infile.read())
elif config_format == "yaml":
study_config = yaml.safe_load(infile.read())
else:
raise ValueError(f"unknown config format {config_format}")
study_id = study_config["id"]
result_config = self.adjust_study(
study_id, cast(dict[str, Any], study_config))
with open(config_filename, "w", encoding="utf8") as outfile:
if config_format == "toml":
outfile.write(toml.dumps(result_config))
elif config_format == "yaml":
outfile.write(
yaml.safe_dump(result_config, sort_keys=False))
def _execute_datasets(self, config_format: str = "toml") -> None:
study_configs_dir = os.path.join(self.instance_dir, "datasets")
if config_format == "toml":
pattern = os.path.join(study_configs_dir, "**/*.conf")
elif config_format == "yaml":
pattern = os.path.join(study_configs_dir, "**/*.yaml")
else:
raise ValueError(f"unknown config format {config_format}")
config_filenames = glob.glob(pattern, recursive=True)
for config_filename in config_filenames:
logger.info("processing study %s", config_filename)
with open(config_filename, "rt", encoding="utf8") as infile:
if config_format == "toml":
dataset_config = toml.loads(infile.read())
elif config_format == "yaml":
dataset_config = yaml.safe_load(infile.read())
else:
raise ValueError(f"unknown config format {config_format}")
dataset_id = dataset_config["id"]
result_config = self.adjust_dataset(
dataset_id, cast(dict[str, Any], dataset_config))
with open(config_filename, "w", encoding="utf8") as outfile:
if config_format == "toml":
outfile.write(toml.dumps(result_config))
elif config_format == "yaml":
outfile.write(
yaml.safe_dump(result_config, sort_keys=False))
[docs]
def adjust_study(
self, _study_id: str,
study_config: dict[str, Any],
) -> dict[str, Any]:
return study_config
[docs]
def adjust_dataset(
self, _dataset_id: str, dataset_config: dict[str, Any],
) -> dict[str, Any]:
return dataset_config
[docs]
class DefaultGenotypeStorage(StudyConfigsAdjustmentCommand):
"""Adjust default genotype storage."""
def __init__(self, instance_dir: str, storage_id: str) -> None:
super().__init__(instance_dir)
self.storage_id = storage_id
[docs]
def execute(self) -> None:
genotype_storage_config = self.config["genotype_storage"]
default_storage = genotype_storage_config["default"]
storages = genotype_storage_config["storages"]
storage_ids = set(storages.keys())
if default_storage not in storage_ids:
logger.error(
"GPF instance misconfigured; "
"current default genotype storage %s not found "
"in the list of storages: %s",
default_storage, storage_ids)
raise ValueError(default_storage)
if self.storage_id not in storage_ids:
logger.error(
"bad storage for GPF instance; "
"passed genotype storage %s not found "
"in the list of configured storages: %s",
default_storage, storage_ids)
raise ValueError(default_storage)
genotype_storage_config["default"] = self.storage_id
logger.info(
"replacing default storage id with %s", self.storage_id)
self._execute_studies()
[docs]
def adjust_study(
self, _study_id: str,
study_config: dict[str, Any],
) -> dict[str, Any]:
genotype_storage = study_config.get("genotype_storage")
if genotype_storage is not None and \
genotype_storage.get("id") is None:
genotype_storage["id"] = self.storage_id
return study_config
[docs]
class EnableDisableStudies(StudyConfigsAdjustmentCommand):
"""Enable or disable collection of studies."""
def __init__(
self, instance_dir: str,
study_ids: Iterable[str], *,
enabled: bool = False,
) -> None:
super().__init__(instance_dir)
self.study_ids = study_ids
self.enabled = enabled
def _msg(self) -> str:
msg = "disable"
if self.enabled:
msg = "enable"
return msg
[docs]
def execute(self) -> None:
logger.info(
"going to %s following studies: %s", self._msg(), self.study_ids)
self._execute_studies(config_format="toml")
self._execute_studies(config_format="yaml")
self._execute_datasets(config_format="toml")
self._execute_datasets(config_format="yaml")
gpfjs = self.config.get("gpfjs")
if gpfjs is not None:
visible_datasets = gpfjs.get("visible_datasets")
if visible_datasets:
if self.enabled:
result = visible_datasets
for study_id in self.study_ids:
if study_id not in result:
result.append(study_id)
else:
result = []
for study_id in visible_datasets:
if study_id in self.study_ids:
continue
result.append(study_id)
gpfjs["visible_datasets"] = result
[docs]
def adjust_study(
self, study_id: str,
study_config: dict[str, Any],
) -> dict[str, Any]:
if study_id in self.study_ids:
logger.info("study %s %s", study_id, self._msg())
study_config["enabled"] = self.enabled
return study_config
[docs]
def adjust_dataset(
self, dataset_id: str,
dataset_config: dict[str, Any],
) -> dict[str, Any]:
if dataset_id in self.study_ids:
logger.info("dataset %s %s", dataset_id, self._msg())
dataset_config["enabled"] = self.enabled
studies = dataset_config["studies"]
result = []
for study_id in studies:
if study_id in self.study_ids:
logger.info(
"removing %s from dataset %s", study_id, dataset_id)
continue
result.append(study_id)
dataset_config["studies"] = result
return dataset_config
[docs]
def cli(argv: list[str] | None = None) -> None:
"""Handle cli invocation."""
argv = argv or sys.argv[1:]
parser = argparse.ArgumentParser(
description="adjustments in GPF instance configuration")
VerbosityConfiguration.set_arguments(parser)
parser.add_argument("-i", "--instance", type=str, default=None)
subparsers = parser.add_subparsers(dest="command",
help="Command to execute")
parser_instance_id = subparsers.add_parser(
"id", help="change the GPF instance ID")
parser_instance_id.add_argument(
"instance_id", type=str,
help="new GPF instance ID")
parser_impala_storage = subparsers.add_parser(
"impala-storage", help="adjust the GPF instance impala storage")
AdjustImpalaStorageCommand.add_arguments(parser_impala_storage)
parser_duckdb_storage = subparsers.add_parser(
"duckdb-storage", help="adjust the GPF instance DuckDb storage")
AdjustDuckDbStorageCommand.add_arguments(parser_duckdb_storage)
parser_genotype_storage = subparsers.add_parser(
"storage", help="change the GPF default genotype storage")
parser_genotype_storage.add_argument(
"storage_id", type=str,
help="new GPF default genotype storage")
parser_disable_studies = subparsers.add_parser(
"disable-studies", help="disable studies from GPF instance")
parser_disable_studies.add_argument(
"study_id", type=str, nargs="+",
help="study IDs to disable")
parser_enable_studies = subparsers.add_parser(
"enable-studies", help="enable studies from GPF instance")
parser_enable_studies.add_argument(
"study_id", type=str, nargs="+",
help="study IDs to enable")
args = parser.parse_args(argv)
instance_dir = args.instance
if instance_dir is None:
instance_dir = os.environ.get("DAE_DB_DIR")
if instance_dir is None:
logger.error("can't identify GPF instance to work with")
sys.exit(1)
VerbosityConfiguration.set(args)
if args.command == "id":
with InstanceIdCommand(instance_dir, args.instance_id) as cmd:
cmd.execute()
elif args.command == "impala-storage":
with AdjustImpalaStorageCommand(
instance_dir, **vars(args)) as cmd:
cmd.execute()
elif args.command == "storage":
with DefaultGenotypeStorage(instance_dir, args.storage_id) as cmd:
cmd.execute()
elif args.command == "duckdb-storage":
with AdjustDuckDbStorageCommand(
instance_dir, **vars(args)) as cmd:
cmd.execute()
elif args.command == "disable-studies":
with EnableDisableStudies(
instance_dir, set(args.study_id), enabled=False) as cmd:
cmd.execute()
elif args.command == "enable-studies":
with EnableDisableStudies(
instance_dir, set(args.study_id), enabled=True) as cmd:
cmd.execute()