from __future__ import annotations
import abc
import argparse
import glob
import logging
import os
import sys
from collections.abc import Iterable
from types import TracebackType
from typing import Any, cast
import toml
import yaml
from dae.utils.verbosity_configuration import VerbosityConfiguration
logger = logging.getLogger("gpf_instance_adjustments")
[docs]
class AdjustmentsCommand(abc.ABC):
"""Abstract class for adjusting an GPF instance config."""
def __init__(self, instance_dir: str) -> None:
self.instance_dir = instance_dir
self.filename = os.path.join(instance_dir, "gpf_instance.yaml")
if not os.path.exists(self.filename):
logger.error(
"%s is not a GPF instance; "
"gpf_instance.yaml (%s) not found",
instance_dir, self.filename)
raise ValueError(instance_dir)
with open(self.filename, "rt", encoding="utf8") as infile:
self.config = yaml.safe_load(infile.read())
[docs]
@abc.abstractmethod
def execute(self) -> None:
"""Execute adjustment command."""
[docs]
def close(self) -> None:
"""Save adjusted config."""
with open(self.filename, "w", encoding="utf8") as outfile:
outfile.write(yaml.safe_dump(self.config, sort_keys=False))
def __enter__(self) -> AdjustmentsCommand:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
[docs]
class InstanceIdCommand(AdjustmentsCommand):
"""Adjusts GPF instance ID."""
def __init__(self, instance_dir: str, instance_id: str) -> None:
super().__init__(instance_dir)
self.instance_id = instance_id
[docs]
def execute(self) -> None:
variables = self.config["vars"]
variables["instance_id"] = self.instance_id
logger.info(
"replacing instance id with %s", self.instance_id)
[docs]
class AdjustImpalaStorageCommand(AdjustmentsCommand):
"""Adjusts impala storage."""
def __init__(
self, instance_dir: str, storage_id: str, read_only: bool,
hdfs_host: str, impala_hosts: list[str],
) -> None:
super().__init__(instance_dir)
self.storage_id = storage_id
self.read_only = read_only
self.hdfs_host = hdfs_host
self.impala_hosts = impala_hosts
[docs]
def execute(self) -> None:
storages = self.config["genotype_storage"]["storages"]
storage = None
for current in storages:
if current["id"] == self.storage_id:
storage = current
break
if storage is None:
logger.error(
"unable to find storage (%s) in instance at %s",
self.storage_id, self.instance_dir)
raise ValueError(f"unable to find storage {self.storage_id}")
if storage.get("storage_type") != "impala":
logger.error(
"storage %s is not Impala", self.storage_id)
raise ValueError(f"storage {self.storage_id} is not Impala")
if self.read_only is not None:
storage["read_only"] = self.read_only
if self.hdfs_host is not None:
storage["hdfs"]["host"] = self.hdfs_host
if self.impala_hosts is not None:
storage["impala"]["hosts"] = self.impala_hosts
[docs]
class AdjustDuckDbStorageCommand(AdjustmentsCommand):
"""Adjusts impala storage."""
def __init__(
self, instance_dir: str, storage_id: str,
read_only: str,
) -> None:
super().__init__(instance_dir)
self.storage_id = storage_id
self.read_only = bool(read_only)
[docs]
def execute(self) -> None:
storages = self.config["genotype_storage"]["storages"]
storage = None
for current in storages:
if current["id"] == self.storage_id:
storage = current
break
if storage is None:
logger.error(
"unable to find storage (%s) in instance at %s",
self.storage_id, self.instance_dir)
raise ValueError(f"unable to find storage {self.storage_id}")
if storage.get("storage_type") not in set(["duckdb", "duckdb2"]):
logger.error(
"storage %s is not DuckDb", self.storage_id)
raise ValueError(f"storage {self.storage_id} is not DuckDb")
storage["read_only"] = self.read_only
[docs]
class StudyConfigsAdjustmentCommand(AdjustmentsCommand):
"""Command to adjust study configs."""
def _execute_studies(self, config_format: str = "toml") -> None:
study_configs_dir = os.path.join(self.instance_dir, "studies")
if config_format == "toml":
pattern = os.path.join(study_configs_dir, "**/*.conf")
elif config_format == "yaml":
pattern = os.path.join(study_configs_dir, "**/*.yaml")
config_filenames = glob.glob(pattern, recursive=True)
for config_filename in config_filenames:
logger.info("processing study %s", config_filename)
with open(config_filename, "rt", encoding="utf8") as infile:
if config_format == "toml":
study_config = toml.loads(infile.read())
elif config_format == "yaml":
study_config = yaml.safe_load(infile.read())
study_id = study_config["id"]
result_config = self.adjust_study(
study_id, cast(dict[str, Any], study_config))
with open(config_filename, "w", encoding="utf8") as outfile:
if config_format == "toml":
outfile.write(toml.dumps(result_config))
elif config_format == "yaml":
outfile.write(
yaml.safe_dump(result_config, sort_keys=False))
def _execute_datasets(self, config_format: str = "toml") -> None:
study_configs_dir = os.path.join(self.instance_dir, "datasets")
if config_format == "toml":
pattern = os.path.join(study_configs_dir, "**/*.conf")
elif config_format == "yaml":
pattern = os.path.join(study_configs_dir, "**/*.yaml")
config_filenames = glob.glob(pattern, recursive=True)
for config_filename in config_filenames:
logger.info("processing study %s", config_filename)
with open(config_filename, "rt", encoding="utf8") as infile:
if config_format == "toml":
dataset_config = toml.loads(infile.read())
elif config_format == "yaml":
dataset_config = yaml.safe_load(infile.read())
dataset_id = dataset_config["id"]
result_config = self.adjust_dataset(
dataset_id, cast(dict[str, Any], dataset_config))
with open(config_filename, "w", encoding="utf8") as outfile:
if config_format == "toml":
outfile.write(toml.dumps(result_config))
elif config_format == "yaml":
outfile.write(
yaml.safe_dump(result_config, sort_keys=False))
[docs]
def adjust_study(
self, _study_id: str,
study_config: dict[str, Any],
) -> dict[str, Any]:
return study_config
[docs]
def adjust_dataset(
self, _dataset_id: str, dataset_config: dict[str, Any],
) -> dict[str, Any]:
return dataset_config
[docs]
class DefaultGenotypeStorage(StudyConfigsAdjustmentCommand):
"""Adjust default genotype storage."""
def __init__(self, instance_dir: str, storage_id: str) -> None:
super().__init__(instance_dir)
self.storage_id = storage_id
[docs]
def execute(self) -> None:
genotype_storage_config = self.config["genotype_storage"]
default_storage = genotype_storage_config["default"]
storages = genotype_storage_config["storages"]
storage_ids = set(map(lambda s: s["id"], storages))
if default_storage not in storage_ids:
logger.error(
"GPF instance misconfigured; "
"current default genotype storage %s not found "
"in the list of storages: %s",
default_storage, storage_ids)
raise ValueError(default_storage)
if self.storage_id not in storage_ids:
logger.error(
"bad storage for GPF instance; "
"passed genotype storage %s not found "
"in the list of configured storages: %s",
default_storage, storage_ids)
raise ValueError(default_storage)
genotype_storage_config["default"] = self.storage_id
logger.info(
"replacing default storage id with %s", self.storage_id)
self._execute_studies()
[docs]
def adjust_study(
self, _study_id: str,
study_config: dict[str, Any],
) -> dict[str, Any]:
genotype_storage = study_config.get("genotype_storage")
if genotype_storage is not None and \
genotype_storage.get("id") is None:
genotype_storage["id"] = self.storage_id
return study_config
[docs]
class EnableDisableStudies(StudyConfigsAdjustmentCommand):
"""Enable or disable collection of studies."""
def __init__(
self, instance_dir: str,
study_ids: Iterable[str],
enabled: bool = False,
) -> None:
super().__init__(instance_dir)
self.study_ids = study_ids
self.enabled = enabled
def _msg(self) -> str:
msg = "disable"
if self.enabled:
msg = "enable"
return msg
[docs]
def execute(self) -> None:
logger.info(
"going to %s following studies: %s", self._msg(), self.study_ids)
self._execute_studies(config_format="toml")
self._execute_studies(config_format="yaml")
self._execute_datasets(config_format="toml")
self._execute_datasets(config_format="yaml")
gpfjs = self.config.get("gpfjs")
if gpfjs is not None:
visible_datasets = gpfjs.get("visible_datasets")
if visible_datasets:
if self.enabled:
result = visible_datasets
for study_id in self.study_ids:
if study_id not in result:
result.append(study_id)
else:
result = []
for study_id in visible_datasets:
if study_id in self.study_ids:
continue
result.append(study_id)
gpfjs["visible_datasets"] = result
[docs]
def adjust_study(
self, study_id: str,
study_config: dict[str, Any],
) -> dict[str, Any]:
if study_id in self.study_ids:
logger.info("study %s %s", study_id, self._msg())
study_config["enabled"] = self.enabled
return study_config
[docs]
def adjust_dataset(
self, dataset_id: str,
dataset_config: dict[str, Any],
) -> dict[str, Any]:
if dataset_id in self.study_ids:
logger.info("dataset %s %s", dataset_id, self._msg())
dataset_config["enabled"] = self.enabled
studies = dataset_config["studies"]
result = []
for study_id in studies:
if study_id in self.study_ids:
logger.info(
"removing %s from dataset %s", study_id, dataset_id)
continue
result.append(study_id)
dataset_config["studies"] = result
return dataset_config
[docs]
def cli(argv: list[str] | None = None) -> None:
"""Handle cli invocation."""
argv = argv or sys.argv[1:]
parser = argparse.ArgumentParser(
description="adjustments in GPF instance configuration")
VerbosityConfiguration.set_arguments(parser)
parser.add_argument("-i", "--instance", type=str, default=None)
subparsers = parser.add_subparsers(dest="command",
help="Command to execute")
parser_instance_id = subparsers.add_parser(
"id", help="change the GPF instance ID")
parser_instance_id.add_argument(
"instance_id", type=str,
help="new GPF instance ID")
parser_impala_storage = subparsers.add_parser(
"impala-storage", help="adjust the GPF instance impala storage")
parser_impala_storage.add_argument(
"storage_id", type=str,
help="impala storage ID")
parser_impala_storage.add_argument(
"--read-only", type=str, default="true",
help="read-only flag for impala storage")
parser_impala_storage.add_argument(
"--impala-hosts", type=str, nargs="+",
help="list of impala hosts")
parser_impala_storage.add_argument(
"--hdfs-host", type=str,
help="HDFS host")
parser_duckdb_storage = subparsers.add_parser(
"duckdb-storage", help="adjust the GPF instance DuckDb storage")
parser_duckdb_storage.add_argument(
"storage_id", type=str,
help="DuckDb storage ID")
parser_duckdb_storage.add_argument(
"--read-only", type=str, default="true",
help="DuckDb storage read only flag")
parser_genotype_storage = subparsers.add_parser(
"storage", help="change the GPF default genotype storage")
parser_genotype_storage.add_argument(
"storage_id", type=str,
help="new GPF default genotype storage")
parser_disable_studies = subparsers.add_parser(
"disable-studies", help="disable studies from GPF instance")
parser_disable_studies.add_argument(
"study_id", type=str, nargs="+",
help="study IDs to disable")
parser_enable_studies = subparsers.add_parser(
"enable-studies", help="enable studies from GPF instance")
parser_enable_studies.add_argument(
"study_id", type=str, nargs="+",
help="study IDs to enable")
args = parser.parse_args(argv)
instance_dir = args.instance
if instance_dir is None:
instance_dir = os.environ.get("DAE_DB_DIR")
if instance_dir is None:
logger.error("can't identify GPF instance to work with")
sys.exit(1)
VerbosityConfiguration.set(args)
if args.command == "id":
with InstanceIdCommand(instance_dir, args.instance_id) as cmd:
cmd.execute()
elif args.command == "impala-storage":
read_only = args.read_only.lower() != "false"
with AdjustImpalaStorageCommand(
instance_dir, args.storage_id, read_only,
args.hdfs_host, args.impala_hosts) as cmd:
cmd.execute()
elif args.command == "storage":
with DefaultGenotypeStorage(instance_dir, args.storage_id) as cmd:
cmd.execute()
elif args.command == "duckdb-storage":
with AdjustDuckDbStorageCommand(
instance_dir, args.storage_id, args.read_only) as cmd:
cmd.execute()
elif args.command == "disable-studies":
with EnableDisableStudies(
instance_dir, set(args.study_id), enabled=False) as cmd:
cmd.execute()
elif args.command == "enable-studies":
with EnableDisableStudies(
instance_dir, set(args.study_id), enabled=True) as cmd:
cmd.execute()