Source code for dae.pheno.registry

from __future__ import annotations

import logging
from pathlib import Path
from threading import Lock

from dae.configuration.gpf_config_parser import GPFConfigParser
from dae.configuration.schemas.phenotype_data import pheno_conf_schema
from dae.pheno.pheno_data import PhenotypeData, PhenotypeGroup
from dae.pheno.storage import PhenotypeStorageRegistry

logger = logging.getLogger(__name__)


[docs] class PhenoRegistry: """ Class for managing runtime instances of phenotype data. Requires a PhenotypeStorageRegistry to function. The registry has 2 main operations, register and get. Registering requires a study configuration and makes the registry aware of a phenotype study's existence, making it loadable. Getting a phenotype data requires the ID and will perform a load if necessary. Both operations are synchronized and use a mutex to prevent faulty reads or duplicate loads of a phenotype data. """ CACHE_LOCK = Lock() def __init__( self, storage_registry: PhenotypeStorageRegistry, configurations: list[dict] | None = None, browser_cache_path: Path | None = None, ) -> None: self._study_configs: dict[str, dict] = {} self.browser_cache_path = browser_cache_path self._cache: dict[str, PhenotypeData] = {} self._storage_registry = storage_registry if self.browser_cache_path is not None: self.browser_cache_path.mkdir(parents=True, exist_ok=True) if configurations is not None: for configuration in configurations: try: self.register_study_config(configuration) except ValueError: logger.exception( "Failure while registering " "phenotype study configuration", )
[docs] @staticmethod def load_configurations(pheno_data_dir: str) -> list[dict]: config_files = GPFConfigParser.collect_directory_configs( pheno_data_dir, ) return [ GPFConfigParser.load_config_dict(file, pheno_conf_schema) for file in config_files ]
[docs] def register_study_config( self, study_config: dict, *, lock: bool = True, ) -> None: """Register a configuration as a loadable phenotype data.""" if not study_config["enabled"]: return study_id = study_config["id"] storage_config = study_config.get("phenotype_storage") if storage_config is not None: storage_id = study_config["phenotype_storage"]["id"] if storage_id not in self._storage_registry: raise ValueError( f"Cannot register '{study_id}', storage '{storage_id}' " "not present in storage registry!", ) if lock: with self.CACHE_LOCK: self._study_configs[study_config["id"]] = study_config self._study_configs[study_config["id"]] = study_config
[docs] def has_phenotype_data(self, data_id: str, *, lock: bool = True) -> bool: if lock: with self.CACHE_LOCK: return data_id in self._study_configs else: return data_id in self._study_configs
[docs] def get_phenotype_data_config(self, data_id: str) -> dict | None: with self.CACHE_LOCK: return self._study_configs[data_id]
[docs] def get_phenotype_data_ids(self, *, lock: bool = True) -> list[str]: if lock: with self.CACHE_LOCK: return list(self._study_configs.keys()) return list(self._study_configs.keys())
[docs] def get_phenotype_data( self, data_id: str, *, lock: bool = True, ) -> PhenotypeData: """ Return an instance of phenotype data from the registry. If the phenotype data hasn't been loaded it, load and cache. """ return self._get_or_load(data_id, lock=lock)
[docs] def get_all_phenotype_data( self, *, lock: bool = True, ) -> list[PhenotypeData]: """Return all registered phenotype data.""" return [ self._get_or_load(pheno_id, lock=lock) for pheno_id in self._study_configs ]
def _get_or_load( self, pheno_id: str, *, lock: bool = True, ) -> PhenotypeData: """Return a phenotype data from the cache and load it if necessary.""" if lock: with self.CACHE_LOCK: if pheno_id in self._cache: return self._cache[pheno_id] return self._load(pheno_id) if pheno_id in self._cache: return self._cache[pheno_id] return self._load(pheno_id) def _load(self, pheno_id: str): config = self._study_configs[pheno_id] if config["type"] == "study": pheno_data = self._load_study(config) elif config["type"] == "group": pheno_data = self._load_group(config) else: raise ValueError(f"Invalid type '{config['type']}'" f" in config for {pheno_id}") cache_path = self._make_cache_path_for(config) pheno_data.cache_path = cache_path return pheno_data def _make_cache_path_for(self, study_config: dict) -> Path | None: if self.browser_cache_path is None: return None path = self.browser_cache_path / study_config["id"] path.mkdir(parents=True, exist_ok=True) return path def _load_study(self, study_config: dict) -> PhenotypeData: pheno_id = study_config["id"] study_storage_config = study_config.get("phenotype_storage") if study_storage_config is not None: study_storage_id = study_storage_config["id"] study_storage = self._storage_registry.get_phenotype_storage( study_storage_id, ) else: study_storage = \ self._storage_registry.get_default_phenotype_storage() self._cache[pheno_id] = study_storage.build_phenotype_study( study_config, browser_cache_path=self.browser_cache_path) return self._cache[pheno_id] def _load_group(self, study_config: dict) -> PhenotypeData: pheno_id = study_config["id"] missing_children = set(study_config["children"]).difference( set(self._study_configs), ) if len(missing_children) > 0: raise ValueError( f"Cannot load group {pheno_id}; the following child studies " f"{missing_children} are not registered", ) children = [ self._get_or_load(child_id, lock=False) for child_id in study_config["children"] ] self._cache[pheno_id] = PhenotypeGroup( pheno_id, study_config, children, cache_path=self.browser_cache_path, ) return self._cache[pheno_id]