Source code for dae.pheno.registry

from __future__ import annotations

import logging
from pathlib import Path
from threading import Lock

from box import Box

from dae.configuration.gpf_config_parser import GPFConfigParser
from dae.configuration.schemas.phenotype_data import pheno_conf_schema
from dae.pheno.pheno_data import PhenotypeData, PhenotypeGroup, PhenotypeStudy

logger = logging.getLogger(__name__)


[docs] class PhenoRegistry: """Class to register phenotype data.""" CACHE_LOCK = Lock() def __init__(self) -> None: self._cache: dict[str, PhenotypeData] = {} def _register_study(self, study: PhenotypeData) -> None: if study.pheno_id in self._cache: raise ValueError( f"Pheno ID {study.pheno_id} already loaded.", ) self._cache[study.pheno_id] = study
[docs] def register_phenotype_data( self, phenotype_data: PhenotypeData, *, lock: bool = True, ) -> None: """Register a phenotype data study.""" if lock: with self.CACHE_LOCK: self._register_study(phenotype_data) else: self._register_study(phenotype_data)
[docs] def has_phenotype_data(self, data_id: str) -> bool: with self.CACHE_LOCK: return data_id in self._cache
[docs] def get_phenotype_data(self, data_id: str) -> PhenotypeData: with self.CACHE_LOCK: return self._cache[data_id]
[docs] def get_phenotype_data_config(self, data_id: str) -> Box | None: with self.CACHE_LOCK: return self._cache[data_id].config
[docs] def get_phenotype_data_ids(self) -> list[str]: return list(self._cache.keys())
[docs] def get_all_phenotype_data(self) -> list[PhenotypeData]: return list(self._cache.values())
[docs] def get_or_load( self, pheno_id: str, pheno_configurations: dict[str, dict], ) -> PhenotypeData: """Return a phenotype data from the cache and load it if necessary.""" if pheno_id in self._cache: return self._cache[pheno_id] config = pheno_configurations[pheno_id] if config["type"] == "study": study = PhenotypeStudy(config["name"], config["dbfile"], config) self.register_phenotype_data(study, lock=False) return self._cache[pheno_id] if config["type"] == "group": children = [self.get_or_load(child, pheno_configurations) for child in config["children"]] group = PhenotypeGroup(config["name"], config, children) self.register_phenotype_data(group, lock=False) return self._cache[pheno_id] raise ValueError(f"Invalid type '{config['type']}'" f" in config for {pheno_id}")
[docs] @staticmethod def from_directory(pheno_data_dir: Path) -> PhenoRegistry: """Create a registry with all phenotype studies in a directory.""" registry = PhenoRegistry() logger.info("pheno registry created: %s", id(registry)) pheno_configs = [ Path(c) for c in GPFConfigParser.collect_directory_configs( str(pheno_data_dir), ) ] configurations: dict[str, dict] = {} with PhenoRegistry.CACHE_LOCK: for conf_path in pheno_configs: logger.info("collecting phenotype data config: %s", conf_path) config = GPFConfigParser.load_config( str(conf_path), pheno_conf_schema) configurations[config["name"]] = config for pheno_id in configurations: logger.info("loading phenotype data config: %s", pheno_id) registry.get_or_load(pheno_id, configurations) return registry