Source code for dae.genomic_resources.repository

"""
Provides basic classes for genomic resources and repositories.

This module defines the core architecture for managing genomic resources
through a flexible repository system. It supports different storage backends
(local files, HTTP, S3) and provides both read-only and read-write access.

Class Hierarchy:
       +---------------------+                    +-----------------+
 +-----| GenomicResourceRepo |--------------------| GenomicResource |
 |     +---------------------+                    +-----------------+
 |        ^               ^                                    |
 |        |               |                                    |
 |        |  +-----------------------------+     +----------------------------+
 |        |  | GenomicResourceProtocolRepo | ----| ReadOnlyRepositoryProtocol |
 |        |  +-----------------------------+     +----------------------------+
 |        |                                                    ^
 |        |                                                    |
 |    +--------------------------+            +-----------------------------+
 +----| GenomicResourceGroupRepo |            | ReadWriteRepositoryProtocol |
      +--------------------------+            +-----------------------------+

Key Concepts:
    - GenomicResource: Represents a single genomic resource (e.g., a reference
      genome, score set, or gene model) with metadata and file access methods.

    - GenomicResourceRepo: Abstract base for repositories that manage
      collections of genomic resources.

    - RepositoryProtocol: Defines the storage backend interface (file system,
      HTTP, S3, etc.) for accessing resource files.

    - Manifest: Tracks files and their checksums within a resource to ensure
      data integrity and enable caching.

Resource Identifiers:
    Resources are identified by an ID and optional version suffix:
    - Simple: "hg19/gene_models/refseq"
    - Versioned: "hg19/gene_models/refseq(1.2.3)"

Configuration Files:
    Each resource contains a genomic_resource.yaml configuration file with
    metadata including type, description, and resource-specific settings.

"""
from __future__ import annotations

import abc
import copy
import enum
import hashlib
import logging
import os
import re
from collections.abc import Generator, Iterator
from dataclasses import asdict, dataclass
from typing import IO, Any, cast

import pysam
import yaml

logger = logging.getLogger(__name__)


GR_CONF_FILE_NAME = "genomic_resource.yaml"
GR_MANIFEST_FILE_NAME = ".MANIFEST"
GR_CONTENTS_FILE_NAME = ".CONTENTS.json"
GR_INDEX_FILE_NAME = "index.html"

GR_ENCODING = "utf-8"

_GR_ID_TOKEN_RE = re.compile(r"[a-zA-Z0-9._-]+")


[docs] def is_gr_id_token(token: str) -> bool: """Check if token can be used as a genomic resource ID. Genomic Resource Id Token is a string with one or more letters, numbers, '.', '_', or '-'. The function checks if the parameter token is a Genomic REsource Id Token. """ return bool(_GR_ID_TOKEN_RE.fullmatch(token))
_GR_ID_WITH_VERSION_TOKEN_RE = re.compile( r"([a-zA-Z0-9._-]+)(?:\(([1-9]\d*(?:\.\d+)*)\))?")
[docs] def parse_gr_id_version_token(token: str) -> tuple[str, tuple[int, ...]]: """Parse genomic resource ID with version. Genomic Resource Id Version Token is a Genomic Resource Id Token with an optional version appened. If present, the version suffix has the form "(3.3.2)". The default version is (0). Returns None if s in not a Genomic Resource Id Version. Otherwise returns token,version tupple """ if token == "": return "", (0, ) match = _GR_ID_WITH_VERSION_TOKEN_RE.fullmatch(token) if not match: raise ValueError( f"unexpected value for resource ID + version: {token}") token = match[1] version_string = match[2] if version_string: version = tuple(map(int, version_string.split("."))) else: version = (0,) return token, version
_RESOURCE_ID_WITH_VERSION_PATH_RE = re.compile( r"([a-zA-Z0-9/._-]+)(?:\(([1-9]\d*(?:\.\d+)*)\))?")
[docs] def parse_resource_id_version( resource_path: str) -> tuple[str, tuple[int, ...]]: """Parse genomic resource id and version path into Id, Version tuple. An optional version (0,) appened if needed. If present, the version suffix has the form "(3.3.2)". The default version is (0,). Returns tuple (None, None) if the path does not match the resource_id/version requirements. Otherwise returns tuple (resource_id, version). """ if resource_path == "": return "", (0,) match = _RESOURCE_ID_WITH_VERSION_PATH_RE.fullmatch(resource_path) if not match: raise ValueError(f"unexpeced resource path: {resource_path}") token = match[1] version_string = match[2] if version_string: version = tuple(map(int, version_string.split("."))) else: version = (0,) return token, version
[docs] def version_string_to_suffix(version: str) -> str: """Transform version string into resource ID version suffix.""" if version == "0": return "" return f"({version})"
[docs] def version_tuple_to_string(version: tuple[int, ...]) -> str: """Convert version tuple to string representation. Args: version: Version tuple like (1, 2, 3) Returns: String representation like "1.2.3" """ return ".".join(map(str, version))
[docs] def version_tuple_to_suffix(version: tuple[int, ...]) -> str: """Transform version tuple into resource ID version suffix. The suffix is used to append version information to resource IDs. Default version (0,) produces no suffix. Args: version: Version tuple like (1, 2, 3) Returns: Empty string for version (0,), otherwise "(1.2.3)" format """ if version == (0,): return "" return f"({'.'.join(map(str, version))})"
VERSION_CONSTRAINT_RE = re.compile(r"(>=|=)?(\d+(?:\.\d+)*)")
[docs] def is_version_constraint_satisfied( version_constraint: str | None, version: tuple[int, ...]) -> bool: """Check if a version matches a version constraint. Supports two types of constraints: - "=X.Y.Z": Exact match required - ">=X.Y.Z" or "X.Y.Z": Minimum version required (default) Args: version_constraint: Constraint string like ">=1.2.0" or "=1.2.3". None or empty string matches any version. version: Version tuple to check like (1, 2, 3) Returns: True if the version satisfies the constraint Raises: ValueError: If constraint has invalid syntax or unknown operator """ if not version_constraint: return True match = VERSION_CONSTRAINT_RE.fullmatch(version_constraint) if not match: raise ValueError( f"Bad syntax of version constraint {version_constraint}") operator = match[1] or ">=" constraint_version = tuple(map(int, match[2].split("."))) if operator == "=": return version == constraint_version if operator == ">=": return version >= constraint_version raise ValueError( f"wrong operation {operator} in version constraint " f"{version_constraint}")
[docs] @dataclass(order=True) class ManifestEntry: """Represents a file entry in a genomic resource manifest. A manifest tracks all files within a resource with their sizes and checksums to ensure data integrity and enable efficient caching. Attributes: name: Relative path to the file within the resource size: File size in bytes md5: MD5 checksum of file content, or None if not computed """ name: str size: int md5: str | None
[docs] @dataclass(order=True) class ResourceFileState: """Tracks the state of a resource file in internal repository storage. Used for caching and synchronization to determine if files need to be refreshed or re-downloaded. Attributes: filename: Relative path to the file within the resource size: File size in bytes timestamp: Last modification time as Unix timestamp md5: MD5 checksum of file content """ filename: str size: int timestamp: float md5: str
[docs] class Manifest: """Manages file listings and checksums for a genomic resource. A manifest maintains a catalog of all files in a resource with their sizes and MD5 checksums. This enables data integrity verification, efficient caching, and incremental updates. The manifest is typically stored in a .MANIFEST file within the resource directory and is automatically loaded when accessing the resource. """ def __init__(self) -> None: self.entries: dict[str, ManifestEntry] = {}
[docs] @staticmethod def from_file_content(file_content: str) -> Manifest: """Create a manifest from raw YAML file content. Args: file_content: YAML-formatted string containing manifest entries Returns: Manifest object with entries parsed from the content """ manifest_entries = yaml.safe_load(file_content) if manifest_entries is None: manifest_entries = [] return Manifest.from_manifest_entries(manifest_entries)
[docs] @staticmethod def from_manifest_entries( manifest_entries: list[dict[str, Any]]) -> Manifest: """Create a manifest from parsed manifest entry dictionaries. Args: manifest_entries: List of dicts with 'name', 'size', 'md5' keys Returns: Manifest object populated with the provided entries """ result = Manifest() for data in manifest_entries: entry = ManifestEntry( data["name"], data["size"], data["md5"]) result.entries[entry.name] = entry return result
[docs] def get_files(self) -> list[tuple[str, int]]: """Get list of all files with their sizes. Returns: List of (filename, size) tuples for all files in manifest """ return [ (entry.name, entry.size) for entry in self.entries.values() ]
def __getitem__(self, name: str) -> ManifestEntry: return self.entries[name] def __contains__(self, name: str) -> bool: return name in self.entries def __iter__(self) -> Iterator[ManifestEntry]: return iter(sorted(self.entries.values())) def __eq__(self, other: object) -> bool: if not isinstance(other, Manifest): return False return self.entries == other.entries def __len__(self) -> int: return len(self.entries) def __repr__(self) -> str: return str(self.entries)
[docs] def to_manifest_entries(self) -> list[dict[str, Any]]: """Convert manifest to list of dictionaries for serialization. Returns: List of dictionaries with 'name', 'size', 'md5' keys, sorted by filename """ return [ asdict(entry) for entry in sorted(self.entries.values())]
[docs] def add(self, entry: ManifestEntry) -> None: """Add or update a manifest entry. Args: entry: ManifestEntry to add to the manifest """ self.entries[entry.name] = entry
[docs] def update(self, entries: dict[str, ManifestEntry]) -> None: """Add or update multiple manifest entries. Args: entries: Dictionary mapping filenames to ManifestEntry objects """ for entry in entries.values(): self.add(entry)
[docs] def names(self) -> set[str]: """Get set of all filenames in the manifest. Returns: Set of filenames tracked by this manifest """ return set(self.entries.keys())
[docs] @dataclass class ManifestUpdate: """Represents a set of changes to apply to a manifest. Used during resource synchronization to track which files need to be deleted or updated. Attributes: manifest: The updated manifest with all changes applied entries_to_delete: Set of filenames to remove entries_to_update: Set of filenames that need updating """ manifest: Manifest entries_to_delete: set[str] entries_to_update: set[str] def __bool__(self) -> bool: """Check if there are any changes in this update. Returns: True if there are files to delete or update """ return bool(self.entries_to_delete or self.entries_to_update)
[docs] class GenomicResource: """Represents a single genomic resource with metadata and file access. A genomic resource is a versioned collection of data files with a configuration file (genomic_resource.yaml) that defines its type, description, and resource-specific settings. Common resource types include: - genome: Reference genome sequences - gene_models: Gene annotations and transcript models - position_score: Position-based genomic scores - allele_score: Variant effect scores - gene_score: Gene-level scores Attributes: resource_id: Unique identifier like "hg19/gene_models/refseq" version: Version tuple like (1, 2, 3) config: Configuration dictionary from genomic_resource.yaml proto: Repository protocol for accessing resource files """ def __init__( self, resource_id: str, version: tuple[int, ...], protocol: RepositoryProtocol, config: dict[str, Any] | None = None, manifest: Manifest | None = None): self.resource_id = resource_id self.version: tuple[int, ...] = version self.config = config self.proto = protocol self._manifest: Manifest | None = manifest def __eq__(self, other: object) -> bool: if not isinstance(other, GenomicResource): return False return self.resource_id == other.resource_id and \ self.version == other.version and \ self.config == other.config and \ self._manifest == other._manifest def __hash__(self) -> int: return hash(self.resource_id + ".".join(map(str, self.version)) + self.proto.get_url())
[docs] def invalidate(self) -> None: """Clean up cached attributes like manifest, etc.""" self._manifest = None
[docs] def get_id(self) -> str: """Return genomic resource ID.""" return self.resource_id
[docs] def get_full_id(self) -> str: """Return genomic resource ID with version.""" version = "" if self.get_version_str() != "0": version = f"({self.get_version_str()})" return f"{self.resource_id}{version}"
[docs] def get_config(self) -> dict[str, Any]: """Return the resouce configuration.""" if self.config is None: raise ValueError( f"use of unconfigured genomic resource: {self.resource_id}") return self.config
[docs] def get_description(self) -> str: """Return resource description.""" config = self.get_config() if config is None: raise ValueError(f"resource {self.resource_id} not configured") if config.get("meta"): meta = config["meta"] if meta.get("description"): return str(meta["description"]) return ""
[docs] def get_summary(self) -> str | None: """Return resource summary.""" config = self.get_config() if config is None: raise ValueError(f"resource {self.resource_id} not configured") if config.get("meta"): meta = config["meta"] if meta.get("summary"): return str(meta["summary"]) return self.get_description()
[docs] def get_repo_url(self) -> str: """Return repository's URL.""" return self.proto.get_url()
[docs] def get_repo_public_url(self) -> str: """Return repository's URL.""" return self.proto.get_public_url()
[docs] def get_public_url(self) -> str: return f"{self.get_repo_public_url()}/{self.get_full_id()}"
[docs] def get_url(self) -> str: return f"{self.get_repo_url()}/{self.get_full_id()}"
[docs] def get_labels(self) -> dict[str, Any]: """Return resource labels.""" config = self.get_config() if config is None: raise ValueError(f"resource {self.resource_id} not configured") if config.get("meta"): meta: dict[str, Any] = config["meta"] if meta.get("labels"): return cast(dict[str, Any], meta["labels"]) return {}
[docs] def get_type(self) -> str: """Return resource type as defined in 'genomic_resource.yaml'.""" config = self.get_config() if config is None: raise ValueError(f"resource {self.resource_id} not configured") config_type = config.get("type") if config_type is None: return "Basic" return cast(str, config_type)
[docs] def get_version_str(self) -> str: """Return version string of the form '3.1'.""" return version_tuple_to_string(self.version)
[docs] def get_genomic_resource_id_version(self) -> str: """Return a string combinint resource ID and version. Returns a string of the form aa/bb/cc[3.2] for a genomic resource with id aa/bb/cc and version 3.2. If the version is 0 the string will be aa/bb/cc. """ return f"{self.resource_id}{version_tuple_to_suffix(self.version)}"
[docs] def file_exists(self, filename: str) -> bool: """Check if filename exists in this resource.""" return self.proto.file_exists(self, filename)
[docs] def get_manifest(self) -> Manifest: """Load resource manifest if it exists. Otherwise builds it.""" if self._manifest is None: self._manifest = self.proto.get_manifest(self) return self._manifest
[docs] def get_file_url(self, filename: str) -> str: return self.proto.get_resource_file_url(self, filename)
[docs] def get_file_content( self, filename: str, *, uncompress: bool = True, mode: str = "t", ) -> Any: """Return the content of file in a resource.""" return self.proto.get_file_content( self, filename, uncompress=uncompress, mode=mode)
[docs] def open_raw_file( self, filename: str, mode: str = "rt", **kwargs: str | bool | None) -> IO: """Open a file in the resource and returns a File-like object.""" return self.proto.open_raw_file( self, filename, mode, **kwargs)
[docs] def open_tabix_file( self, filename: str, index_filename: str | None = None) -> pysam.TabixFile: """Open a tabix file and returns a pysam.TabixFile.""" return self.proto.open_tabix_file(self, filename, index_filename)
[docs] def open_vcf_file( self, filename: str, index_filename: str | None = None) -> pysam.VariantFile: """Open a vcf file and returns a pysam.VariantFile.""" return self.proto.open_vcf_file(self, filename, index_filename)
[docs] def open_bigwig_file(self, filename: str) -> Any: """Open a bigwig file and return it.""" return self.proto.open_bigwig_file(self, filename)
[docs] class Mode(enum.Enum): """Enumeration of repository protocol access modes. Attributes: READONLY: Protocol supports only read operations READWRITE: Protocol supports both read and write operations """ READONLY = 1 READWRITE = 2
[docs] class ReadOnlyRepositoryProtocol(abc.ABC): """Abstract base class for read-only repository storage protocols. A protocol defines how to access genomic resources from a specific storage backend (local filesystem, HTTP server, S3 bucket, etc.). Read-only protocols can retrieve resources but cannot modify them. Subclasses must implement methods for: - Listing available resources - Reading configuration files - Opening resource files - Loading manifests Attributes: proto_id: Unique identifier for this protocol instance url: Base URL or path to the repository root CHUNK_SIZE: Default buffer size for file operations (32KB) """ CHUNK_SIZE = 32768 def __init__(self, proto_id: str, url: str): self.proto_id = proto_id self.url = url
[docs] def mode(self) -> Mode: """Return repository protocol mode. Returns: Mode.READONLY for this base class """ return Mode.READONLY
[docs] def get_id(self) -> str: """Return the repository protocol identifier. Returns: Protocol ID string """ return self.proto_id
[docs] @abc.abstractmethod def get_url(self) -> str: """Return the base URL of the repository. Returns: URL or path string pointing to repository root """
[docs] @abc.abstractmethod def get_public_url(self) -> str: """Return the public base URL of the repository. Returns: URL or path string pointing to a public repository root """
[docs] @abc.abstractmethod def invalidate(self) -> None: """Invalidate internal cache of repository protocol."""
[docs] @abc.abstractmethod def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator for all resources in the repository."""
[docs] def find_resource( self, resource_id: str, version_constraint: str | None = None, ) -> GenomicResource | None: """Return requested resource or None if not found.""" matching_resources: list[GenomicResource] = [] for res in self.get_all_resources(): if res.resource_id != resource_id: continue if is_version_constraint_satisfied( version_constraint, res.version): matching_resources.append(res) if not matching_resources: return None def get_resource_version(res: GenomicResource) -> tuple[int, ...]: return res.version return max( matching_resources, key=get_resource_version)
[docs] def get_resource( self, resource_id: str, version_constraint: str | None = None) -> GenomicResource: """Return requested resource or raises exception if not found. In case resource is not found a FileNotFoundError exception is raised. """ resource = self.find_resource(resource_id, version_constraint) if resource is None: raise FileNotFoundError( f"resource <{resource_id}> ({version_constraint}) not found") return resource
[docs] def load_yaml(self, resource: GenomicResource, filename: str) -> Any: """Return parsed YAML file.""" content = self.get_file_content( resource, filename, uncompress=True) result = yaml.safe_load(content) if result is None: return {} return result
[docs] def get_file_content( self, resource: GenomicResource, filename: str, *, uncompress: bool = True, mode: str = "t", ) -> Any: """Return content of a file in given resource.""" with self.open_raw_file( resource, filename, mode=f"r{mode}", uncompress=uncompress) as infile: return infile.read()
[docs] def get_resource_url(self, resource: GenomicResource) -> str: """Return url of the specified resources.""" return os.path.join( self.url, resource.get_genomic_resource_id_version())
[docs] def get_resource_file_url( self, resource: GenomicResource, filename: str) -> str: """Return url of a file in the resource.""" return os.path.join( self.get_resource_url(resource), filename)
[docs] @abc.abstractmethod def load_manifest(self, resource: GenomicResource) -> Manifest: """Load resource manifest."""
[docs] @abc.abstractmethod def file_exists(self, resource: GenomicResource, filename: str) -> bool: """Check if given file exist in give resource."""
[docs] @abc.abstractmethod def open_raw_file( self, resource: GenomicResource, filename: str, mode: str = "rt", **kwargs: str | bool | None) -> IO: """Open file in a resource and returns a file-like object."""
[docs] @abc.abstractmethod def open_tabix_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.TabixFile: """Open a tabix file in a resource and return a pysam tabix file. Not all repositories support this method. Repositories that do no support this method raise and exception. """
[docs] @abc.abstractmethod def open_vcf_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.VariantFile: """Open a vcf file in a resource and return a pysam VariantFile. Not all repositories support this method. Repositories that do no support this method raise and exception. """
[docs] @abc.abstractmethod def open_bigwig_file( self, resource: GenomicResource, filename: str, ) -> Any: """Open a bigwig file in a resource and return it. Not all repositories support this method. Repositories that do no support this method raise and exception. """
[docs] def compute_md5_sum(self, resource: GenomicResource, filename: str) -> str: """Compute a md5 hash for a file in the resource.""" logger.debug( "compute md5sum for %s in %s", filename, resource.resource_id) with self.open_raw_file(resource, filename, "rb") as infile: md5_hash = hashlib.md5() # noqa S324 while chunk := infile.read(self.CHUNK_SIZE): md5_hash.update(chunk) return md5_hash.hexdigest()
[docs] def get_manifest(self, resource: GenomicResource) -> Manifest: """Load and returns a resource manifest.""" return self.load_manifest(resource)
[docs] def build_genomic_resource( self, resource_id: str, version: tuple[int, ...], config: dict | None = None, manifest: Manifest | None = None) -> GenomicResource: """Build a genomic resource instance using this protocol. Args: resource_id: Resource identifier like "hg19/gene_models/refseq" version: Version tuple like (1, 2, 3) config: Optional pre-loaded configuration dict. If None, will load from genomic_resource.yaml manifest: Optional pre-loaded manifest. If None, will load when first accessed Returns: GenomicResource instance configured with this protocol """ if not config: res = GenomicResource(resource_id, version, self) config = self.load_yaml(res, GR_CONF_FILE_NAME) return GenomicResource( resource_id, version, self, config, manifest)
[docs] class ReadWriteRepositoryProtocol(ReadOnlyRepositoryProtocol): """Abstract base class for read-write repository storage protocols. Extends ReadOnlyRepositoryProtocol with write capabilities including: - Creating and updating resources - Managing manifests - File upload and deletion - Resource versioning This protocol type is used for local repositories and writable remote storage backends where resources can be modified or created. """ # pylint: disable=too-many-public-methods
[docs] def mode(self) -> Mode: """Return repository protocol mode. Returns: Mode.READWRITE for this protocol type """ return Mode.READWRITE
[docs] @abc.abstractmethod def collect_all_resources(self) -> Generator[GenomicResource, None, None]: """Scan repository and yield all resources. Returns: Generator yielding GenomicResource instances for each resource found in the repository """
[docs] @abc.abstractmethod def collect_resource_entries(self, resource: GenomicResource) -> Manifest: """Scan resource directory and build manifest from files found. Args: resource: Resource to scan Returns: Manifest containing entries for all files in the resource """
def _update_manifest_entry_and_state( self, resource: GenomicResource, entry: ManifestEntry, prebuild_entries: dict[str, ManifestEntry]) -> None: pre_state = self.load_resource_file_state(resource, entry.name) size = None md5 = None if entry.name in prebuild_entries: ready_entry = prebuild_entries[entry.name] size = ready_entry.size md5 = ready_entry.md5 if pre_state is None: state = self.build_resource_file_state( resource, entry.name, size=size, md5=md5) self.save_resource_file_state(resource, state) elif entry.name in prebuild_entries: state = self.build_resource_file_state( resource, entry.name, size=size, md5=md5) else: timestamp = self.get_resource_file_timestamp(resource, entry.name) size = self.get_resource_file_size(resource, entry.name) if abs(timestamp - pre_state.timestamp) <= 1e-2 \ and size == pre_state.size: state = pre_state else: state = self.build_resource_file_state( resource, entry.name, size=size, md5=md5) self.save_resource_file_state(resource, state) entry.md5 = state.md5 entry.size = state.size
[docs] def build_manifest( self, resource: GenomicResource, prebuild_entries: dict[str, ManifestEntry] | None = None, ) -> Manifest: """Build full manifest for the resource.""" if prebuild_entries is None: prebuild_entries = {} manifest = Manifest() for entry in self.collect_resource_entries(resource): self._update_manifest_entry_and_state( resource, entry, prebuild_entries) manifest.add(entry) return manifest
[docs] def check_update_manifest( self, resource: GenomicResource, prebuild_entries: dict[str, ManifestEntry] | None = None, ) -> ManifestUpdate: """Check if the resource manifest needs update.""" try: current_manifest = self.load_manifest(resource) except FileNotFoundError: current_manifest = Manifest() manifest = Manifest() entries_to_update = set() for entry in self.collect_resource_entries(resource): manifest.add(entry) state = self.load_resource_file_state(resource, entry.name) if state is None: md5 = None size = None if prebuild_entries and entry.name in prebuild_entries: md5 = prebuild_entries[entry.name].md5 size = prebuild_entries[entry.name].size state = self.build_resource_file_state( resource, entry.name, md5=md5, size=size) self.save_resource_file_state(resource, state) if state.filename not in current_manifest: entries_to_update.add(entry.name) continue file_timestamp = self.get_resource_file_timestamp( resource, entry.name) file_size = self.get_resource_file_size( resource, entry.name) if state.timestamp != file_timestamp or \ state.size != file_size: md5 = None if prebuild_entries and entry.name in prebuild_entries: md5 = prebuild_entries[entry.name].md5 state = self.build_resource_file_state( resource, entry.name, md5=md5) if state.md5 == current_manifest[entry.name].md5: self.save_resource_file_state(resource, state) else: entries_to_update.add(entry.name) continue if state.md5 != current_manifest[entry.name].md5: entries_to_update.add(entry.name) continue entry.md5 = state.md5 entry.size = state.size if prebuild_entries is not None: manifest.update(prebuild_entries) entries_to_delete = current_manifest.names() - manifest.names() return ManifestUpdate(manifest, entries_to_delete, entries_to_update)
[docs] def update_manifest( self, resource: GenomicResource, prebuild_entries: dict[str, ManifestEntry] | None = None, ) -> Manifest: """Update or create full manifest for the resource.""" manifest_update = self.check_update_manifest( resource, prebuild_entries) if not bool(manifest_update): return manifest_update.manifest manifest = manifest_update.manifest if prebuild_entries is None: prebuild_entries = {} for filename in manifest_update.entries_to_update: entry = manifest[filename] self._update_manifest_entry_and_state( resource, entry, prebuild_entries) return manifest
[docs] def save_manifest( self, resource: GenomicResource, manifest: Manifest) -> None: """Save manifest into genomic resource's directory.""" logger.debug( "save manifest of resource %s from %s", resource.resource_id, self.proto_id) with self.open_raw_file( resource, GR_MANIFEST_FILE_NAME, "wt") as outfile: yaml.dump(manifest.to_manifest_entries(), outfile) resource.invalidate()
[docs] def save_index(self, resource: GenomicResource, contents: str) -> None: """Save an index HTML file into the genomic resource's directory.""" with self.open_raw_file(resource, GR_INDEX_FILE_NAME, "wt") as outfile: outfile.write(contents)
[docs] def get_manifest(self, resource: GenomicResource) -> Manifest: """Load or build a resource manifest.""" try: manifest = self.load_manifest(resource) except FileNotFoundError: manifest = self.build_manifest(resource) return manifest
[docs] @abc.abstractmethod def get_resource_file_timestamp( self, resource: GenomicResource, filename: str) -> float: """Return the timestamp (ISO formatted) of a resource file."""
[docs] @abc.abstractmethod def get_resource_file_size( self, resource: GenomicResource, filename: str) -> int: """Return the size of a resource file."""
[docs] def build_resource_file_state( self, resource: GenomicResource, filename: str, **kwargs: str | float | int | None) -> ResourceFileState: """Build resource file state.""" if not self.file_exists(resource, filename): raise ValueError( f"can't build resource state for not existing resource file " f"{resource.resource_id} > {filename}") md5 = kwargs.get("md5") timestamp = kwargs.get("timestamp") size = kwargs.get("size") if md5 is None: md5 = self.compute_md5_sum(resource, filename) if timestamp is None: timestamp = self.get_resource_file_timestamp(resource, filename) if size is None: size = self.get_resource_file_size(resource, filename) return ResourceFileState( filename, cast(int, size), cast(float, timestamp), cast(str, md5))
[docs] @abc.abstractmethod def save_resource_file_state( self, resource: GenomicResource, state: ResourceFileState) -> None: """Save resource file state into internal GRR state."""
[docs] @abc.abstractmethod def load_resource_file_state( self, resource: GenomicResource, filename: str) -> ResourceFileState | None: """Load resource file state from internal GRR state. If the specified resource file has no internal state returns None. """
[docs] @abc.abstractmethod def delete_resource_file( self, resource: GenomicResource, filename: str) -> None: """Delete a resource file and it's internal state."""
[docs] @abc.abstractmethod def copy_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> ResourceFileState | None: """Copy a remote resource file into local repository."""
[docs] @abc.abstractmethod def update_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> ResourceFileState | None: """Update a resource file into repository if needed."""
[docs] def get_or_create_resource( self, resource_id: str, version: tuple[int, ...]) -> GenomicResource: """Return a resource with specified ID and version. If the resource is not found create an empty resource. """ resource = self.find_resource( resource_id=resource_id, version_constraint=f"={version_tuple_to_string(version)}") if resource is None: logger.info( "resource %s (%s) not found in %s; creating...", resource_id, version, self.get_id()) resource = GenomicResource( resource_id, version, self) return resource
[docs] def copy_resource( self, remote_resource: GenomicResource) -> GenomicResource: """Copy a remote resource into repository.""" local_resource = self.get_or_create_resource( remote_resource.resource_id, remote_resource.version) remote_manifest = remote_resource.get_manifest() local_manifest = self.get_manifest(local_resource) filenames_to_delete = local_manifest.names() - remote_manifest.names() for filename in filenames_to_delete: self.delete_resource_file(local_resource, filename) for manifest_entry in remote_manifest: self.copy_resource_file( remote_resource, local_resource, manifest_entry.name) self.save_manifest(local_resource, remote_resource.get_manifest()) self.invalidate() return self.get_resource( resource_id=remote_resource.resource_id, version_constraint=f"={remote_resource.get_version_str()}")
[docs] def update_resource( self, remote_resource: GenomicResource, files_to_copy: set[str] | None = None, ) -> GenomicResource: """Copy a remote resource into repository. Allows copying of a subset of files from the resource via files_to_copy. If files_to_copy is None, copies all files. """ local_resource = self.get_or_create_resource( remote_resource.resource_id, remote_resource.version) remote_manifest = remote_resource.get_manifest() local_manifest = self.get_manifest(local_resource) filenames_to_delete = local_manifest.names() - remote_manifest.names() if files_to_copy is None: files_to_copy = {entry.name for entry in remote_manifest} else: files_to_copy.add(GR_CONF_FILE_NAME) # config is always required for filename in filenames_to_delete: self.delete_resource_file(local_resource, filename) for file in files_to_copy: self.update_resource_file(remote_resource, local_resource, file) if local_manifest != remote_manifest: self.save_manifest(local_resource, remote_resource.get_manifest()) self.invalidate() return self.get_resource( resource_id=remote_resource.resource_id, version_constraint=f"={remote_resource.get_version_str()}")
[docs] @abc.abstractmethod def build_content_file(self) -> list[dict[str, Any]]: """Build the content of the repository (i.e '.CONTENTS.json' file)."""
[docs] class GenomicResourceRepo(abc.ABC): """Abstract base class for genomic resource repositories. A repository manages a collection of genomic resources, providing methods to discover, retrieve, and (for writable repos) create resources. Repositories can be: - Protocol-based: Direct access to a single storage backend - Group: Aggregates multiple child repositories - Cached: Wraps another repository with local caching All repositories support resource lookup with optional version constraints: repo.get_resource("hg19/genome") # Latest version repo.get_resource("hg19/genome", ">=2.0") # Version 2.0 or higher repo.get_resource("hg19/genome", "=2.1") # Exact version 2.1 Attributes: repo_id: Unique identifier for this repository definition: Configuration dict used to create this repository """ def __init__(self, repo_id: str): self._repo_id: str = repo_id self._definition: dict[str, Any] | None = None
[docs] def close(self) -> None: """Release any resources held by this repository.""" self._definition = None
@property def definition(self) -> dict[str, Any] | None: """Get a copy of the repository configuration definition. Returns: Deep copy of definition dict, or None if not set """ if self._definition: return copy.deepcopy(self._definition) return self._definition @definition.setter def definition(self, value: dict[str, Any]) -> None: """Set the repository configuration definition. Args: value: Configuration dict to store (will be deep copied) """ self._definition = copy.deepcopy(value)
[docs] @abc.abstractmethod def invalidate(self) -> None: """Clear cached state and force reload on next access. Implementations should clear any cached resource lists, metadata, or file contents to ensure fresh data is loaded. """
@property def repo_id(self) -> str: """Get the repository identifier. Returns: Repository ID string """ return self._repo_id
[docs] @abc.abstractmethod def get_resource( self, resource_id: str, version_constraint: str | None = None, repository_id: str | None = None) -> GenomicResource: """Return one resource with id qual to resource_id. If resource is not found, exception is raised. """
[docs] @abc.abstractmethod def find_resource( self, resource_id: str, version_constraint: str | None = None, repository_id: str | None = None) -> GenomicResource | None: """Return one resource with id qual to resource_id. If resource is not found, None is returned. """
[docs] @abc.abstractmethod def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return a generator over all resource in the repository."""
[docs] class GenomicResourceProtocolRepo(GenomicResourceRepo): """Base class for real genomic resources repositories.""" def __init__( self, proto: ReadOnlyRepositoryProtocol | ReadWriteRepositoryProtocol): super().__init__(proto.get_id()) self.proto = proto
[docs] def close(self) -> None: self.invalidate() super().close()
[docs] def invalidate(self) -> None: self.proto.invalidate()
[docs] def get_resource( self, resource_id: str, version_constraint: str | None = None, repository_id: str | None = None) -> GenomicResource: if repository_id and self.repo_id != repository_id: raise ValueError( f"can't find resource ({resource_id}, {version_constraint}: " f"repository {repository_id} in repository {self.repo_id}") return self.proto.get_resource(resource_id, version_constraint)
[docs] def find_resource( self, resource_id: str, version_constraint: str | None = None, repository_id: str | None = None) -> GenomicResource | None: if repository_id and self.repo_id != repository_id: return None return self.proto.find_resource(resource_id, version_constraint)
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: return self.proto.get_all_resources()
RepositoryProtocol = ReadOnlyRepositoryProtocol | ReadWriteRepositoryProtocol