"""
Provides basic classes for genomic resources and repositories.
This module defines the core architecture for managing genomic resources
through a flexible repository system. It supports different storage backends
(local files, HTTP, S3) and provides both read-only and read-write access.
Class Hierarchy:
+---------------------+ +-----------------+
+-----| GenomicResourceRepo |--------------------| GenomicResource |
| +---------------------+ +-----------------+
| ^ ^ |
| | | |
| | +-----------------------------+ +----------------------------+
| | | GenomicResourceProtocolRepo | ----| ReadOnlyRepositoryProtocol |
| | +-----------------------------+ +----------------------------+
| | ^
| | |
| +--------------------------+ +-----------------------------+
+----| GenomicResourceGroupRepo | | ReadWriteRepositoryProtocol |
+--------------------------+ +-----------------------------+
Key Concepts:
- GenomicResource: Represents a single genomic resource (e.g., a reference
genome, score set, or gene model) with metadata and file access methods.
- GenomicResourceRepo: Abstract base for repositories that manage
collections of genomic resources.
- RepositoryProtocol: Defines the storage backend interface (file system,
HTTP, S3, etc.) for accessing resource files.
- Manifest: Tracks files and their checksums within a resource to ensure
data integrity and enable caching.
Resource Identifiers:
Resources are identified by an ID and optional version suffix:
- Simple: "hg19/gene_models/refseq"
- Versioned: "hg19/gene_models/refseq(1.2.3)"
Configuration Files:
Each resource contains a genomic_resource.yaml configuration file with
metadata including type, description, and resource-specific settings.
"""
from __future__ import annotations
import abc
import copy
import enum
import hashlib
import logging
import os
import re
from collections.abc import Generator, Iterator
from dataclasses import asdict, dataclass
from typing import IO, Any, cast
import pysam
import yaml
logger = logging.getLogger(__name__)
GR_CONF_FILE_NAME = "genomic_resource.yaml"
GR_MANIFEST_FILE_NAME = ".MANIFEST"
GR_CONTENTS_FILE_NAME = ".CONTENTS.json"
GR_INDEX_FILE_NAME = "index.html"
GR_ENCODING = "utf-8"
_GR_ID_TOKEN_RE = re.compile(r"[a-zA-Z0-9._-]+")
[docs]
def is_gr_id_token(token: str) -> bool:
"""Check if token can be used as a genomic resource ID.
Genomic Resource Id Token is a string with one or more letters,
numbers, '.', '_', or '-'. The function checks if the parameter
token is a Genomic REsource Id Token.
"""
return bool(_GR_ID_TOKEN_RE.fullmatch(token))
_GR_ID_WITH_VERSION_TOKEN_RE = re.compile(
r"([a-zA-Z0-9._-]+)(?:\(([1-9]\d*(?:\.\d+)*)\))?")
[docs]
def parse_gr_id_version_token(token: str) -> tuple[str, tuple[int, ...]]:
"""Parse genomic resource ID with version.
Genomic Resource Id Version Token is a Genomic Resource Id Token with
an optional version appened. If present, the version suffix has the
form "(3.3.2)". The default version is (0).
Returns None if s in not a Genomic Resource Id Version. Otherwise
returns token,version tupple
"""
if token == "":
return "", (0, )
match = _GR_ID_WITH_VERSION_TOKEN_RE.fullmatch(token)
if not match:
raise ValueError(
f"unexpected value for resource ID + version: {token}")
token = match[1]
version_string = match[2]
if version_string:
version = tuple(map(int, version_string.split(".")))
else:
version = (0,)
return token, version
_RESOURCE_ID_WITH_VERSION_PATH_RE = re.compile(
r"([a-zA-Z0-9/._-]+)(?:\(([1-9]\d*(?:\.\d+)*)\))?")
[docs]
def parse_resource_id_version(
resource_path: str) -> tuple[str, tuple[int, ...]]:
"""Parse genomic resource id and version path into Id, Version tuple.
An optional version (0,) appened if needed. If present, the version suffix
has the form "(3.3.2)". The default version is (0,).
Returns tuple (None, None) if the path does not match the
resource_id/version requirements. Otherwise returns tuple
(resource_id, version).
"""
if resource_path == "":
return "", (0,)
match = _RESOURCE_ID_WITH_VERSION_PATH_RE.fullmatch(resource_path)
if not match:
raise ValueError(f"unexpeced resource path: {resource_path}")
token = match[1]
version_string = match[2]
if version_string:
version = tuple(map(int, version_string.split(".")))
else:
version = (0,)
return token, version
[docs]
def version_string_to_suffix(version: str) -> str:
"""Transform version string into resource ID version suffix."""
if version == "0":
return ""
return f"({version})"
[docs]
def version_tuple_to_string(version: tuple[int, ...]) -> str:
"""Convert version tuple to string representation.
Args:
version: Version tuple like (1, 2, 3)
Returns:
String representation like "1.2.3"
"""
return ".".join(map(str, version))
[docs]
def version_tuple_to_suffix(version: tuple[int, ...]) -> str:
"""Transform version tuple into resource ID version suffix.
The suffix is used to append version information to resource IDs.
Default version (0,) produces no suffix.
Args:
version: Version tuple like (1, 2, 3)
Returns:
Empty string for version (0,), otherwise "(1.2.3)" format
"""
if version == (0,):
return ""
return f"({'.'.join(map(str, version))})"
VERSION_CONSTRAINT_RE = re.compile(r"(>=|=)?(\d+(?:\.\d+)*)")
[docs]
def is_version_constraint_satisfied(
version_constraint: str | None, version: tuple[int, ...]) -> bool:
"""Check if a version matches a version constraint.
Supports two types of constraints:
- "=X.Y.Z": Exact match required
- ">=X.Y.Z" or "X.Y.Z": Minimum version required (default)
Args:
version_constraint: Constraint string like ">=1.2.0" or "=1.2.3".
None or empty string matches any version.
version: Version tuple to check like (1, 2, 3)
Returns:
True if the version satisfies the constraint
Raises:
ValueError: If constraint has invalid syntax or unknown operator
"""
if not version_constraint:
return True
match = VERSION_CONSTRAINT_RE.fullmatch(version_constraint)
if not match:
raise ValueError(
f"Bad syntax of version constraint {version_constraint}")
operator = match[1] or ">="
constraint_version = tuple(map(int, match[2].split(".")))
if operator == "=":
return version == constraint_version
if operator == ">=":
return version >= constraint_version
raise ValueError(
f"wrong operation {operator} in version constraint "
f"{version_constraint}")
[docs]
@dataclass(order=True)
class ManifestEntry:
"""Represents a file entry in a genomic resource manifest.
A manifest tracks all files within a resource with their sizes and
checksums to ensure data integrity and enable efficient caching.
Attributes:
name: Relative path to the file within the resource
size: File size in bytes
md5: MD5 checksum of file content, or None if not computed
"""
name: str
size: int
md5: str | None
[docs]
@dataclass(order=True)
class ResourceFileState:
"""Tracks the state of a resource file in internal repository storage.
Used for caching and synchronization to determine if files need
to be refreshed or re-downloaded.
Attributes:
filename: Relative path to the file within the resource
size: File size in bytes
timestamp: Last modification time as Unix timestamp
md5: MD5 checksum of file content
"""
filename: str
size: int
timestamp: float
md5: str
[docs]
class Manifest:
"""Manages file listings and checksums for a genomic resource.
A manifest maintains a catalog of all files in a resource with their
sizes and MD5 checksums. This enables data integrity verification,
efficient caching, and incremental updates.
The manifest is typically stored in a .MANIFEST file within the resource
directory and is automatically loaded when accessing the resource.
"""
def __init__(self) -> None:
self.entries: dict[str, ManifestEntry] = {}
[docs]
@staticmethod
def from_file_content(file_content: str) -> Manifest:
"""Create a manifest from raw YAML file content.
Args:
file_content: YAML-formatted string containing manifest entries
Returns:
Manifest object with entries parsed from the content
"""
manifest_entries = yaml.safe_load(file_content)
if manifest_entries is None:
manifest_entries = []
return Manifest.from_manifest_entries(manifest_entries)
[docs]
@staticmethod
def from_manifest_entries(
manifest_entries: list[dict[str, Any]]) -> Manifest:
"""Create a manifest from parsed manifest entry dictionaries.
Args:
manifest_entries: List of dicts with 'name', 'size', 'md5' keys
Returns:
Manifest object populated with the provided entries
"""
result = Manifest()
for data in manifest_entries:
entry = ManifestEntry(
data["name"], data["size"], data["md5"])
result.entries[entry.name] = entry
return result
[docs]
def get_files(self) -> list[tuple[str, int]]:
"""Get list of all files with their sizes.
Returns:
List of (filename, size) tuples for all files in manifest
"""
return [
(entry.name, entry.size)
for entry in self.entries.values()
]
def __getitem__(self, name: str) -> ManifestEntry:
return self.entries[name]
def __contains__(self, name: str) -> bool:
return name in self.entries
def __iter__(self) -> Iterator[ManifestEntry]:
return iter(sorted(self.entries.values()))
def __eq__(self, other: object) -> bool:
if not isinstance(other, Manifest):
return False
return self.entries == other.entries
def __len__(self) -> int:
return len(self.entries)
def __repr__(self) -> str:
return str(self.entries)
[docs]
def to_manifest_entries(self) -> list[dict[str, Any]]:
"""Convert manifest to list of dictionaries for serialization.
Returns:
List of dictionaries with 'name', 'size', 'md5' keys,
sorted by filename
"""
return [
asdict(entry) for entry in sorted(self.entries.values())]
[docs]
def add(self, entry: ManifestEntry) -> None:
"""Add or update a manifest entry.
Args:
entry: ManifestEntry to add to the manifest
"""
self.entries[entry.name] = entry
[docs]
def update(self, entries: dict[str, ManifestEntry]) -> None:
"""Add or update multiple manifest entries.
Args:
entries: Dictionary mapping filenames to ManifestEntry objects
"""
for entry in entries.values():
self.add(entry)
[docs]
def names(self) -> set[str]:
"""Get set of all filenames in the manifest.
Returns:
Set of filenames tracked by this manifest
"""
return set(self.entries.keys())
[docs]
@dataclass
class ManifestUpdate:
"""Represents a set of changes to apply to a manifest.
Used during resource synchronization to track which files need
to be deleted or updated.
Attributes:
manifest: The updated manifest with all changes applied
entries_to_delete: Set of filenames to remove
entries_to_update: Set of filenames that need updating
"""
manifest: Manifest
entries_to_delete: set[str]
entries_to_update: set[str]
def __bool__(self) -> bool:
"""Check if there are any changes in this update.
Returns:
True if there are files to delete or update
"""
return bool(self.entries_to_delete or self.entries_to_update)
[docs]
class GenomicResource:
"""Represents a single genomic resource with metadata and file access.
A genomic resource is a versioned collection of data files with a
configuration file (genomic_resource.yaml) that defines its type,
description, and resource-specific settings.
Common resource types include:
- genome: Reference genome sequences
- gene_models: Gene annotations and transcript models
- position_score: Position-based genomic scores
- allele_score: Variant effect scores
- gene_score: Gene-level scores
Attributes:
resource_id: Unique identifier like "hg19/gene_models/refseq"
version: Version tuple like (1, 2, 3)
config: Configuration dictionary from genomic_resource.yaml
proto: Repository protocol for accessing resource files
"""
def __init__(
self, resource_id: str, version: tuple[int, ...],
protocol: RepositoryProtocol,
config: dict[str, Any] | None = None,
manifest: Manifest | None = None):
self.resource_id = resource_id
self.version: tuple[int, ...] = version
self.config = config
self.proto = protocol
self._manifest: Manifest | None = manifest
def __eq__(self, other: object) -> bool:
if not isinstance(other, GenomicResource):
return False
return self.resource_id == other.resource_id and \
self.version == other.version and \
self.config == other.config and \
self._manifest == other._manifest
def __hash__(self) -> int:
return hash(self.resource_id
+ ".".join(map(str, self.version))
+ self.proto.get_url())
[docs]
def invalidate(self) -> None:
"""Clean up cached attributes like manifest, etc."""
self._manifest = None
[docs]
def get_id(self) -> str:
"""Return genomic resource ID."""
return self.resource_id
[docs]
def get_full_id(self) -> str:
"""Return genomic resource ID with version."""
version = ""
if self.get_version_str() != "0":
version = f"({self.get_version_str()})"
return f"{self.resource_id}{version}"
[docs]
def get_config(self) -> dict[str, Any]:
"""Return the resouce configuration."""
if self.config is None:
raise ValueError(
f"use of unconfigured genomic resource: {self.resource_id}")
return self.config
[docs]
def get_description(self) -> str:
"""Return resource description."""
config = self.get_config()
if config is None:
raise ValueError(f"resource {self.resource_id} not configured")
if config.get("meta"):
meta = config["meta"]
if meta.get("description"):
return str(meta["description"])
return ""
[docs]
def get_summary(self) -> str | None:
"""Return resource summary."""
config = self.get_config()
if config is None:
raise ValueError(f"resource {self.resource_id} not configured")
if config.get("meta"):
meta = config["meta"]
if meta.get("summary"):
return str(meta["summary"])
return self.get_description()
[docs]
def get_repo_url(self) -> str:
"""Return repository's URL."""
return self.proto.get_url()
[docs]
def get_repo_public_url(self) -> str:
"""Return repository's URL."""
return self.proto.get_public_url()
[docs]
def get_public_url(self) -> str:
return f"{self.get_repo_public_url()}/{self.get_full_id()}"
[docs]
def get_url(self) -> str:
return f"{self.get_repo_url()}/{self.get_full_id()}"
[docs]
def get_labels(self) -> dict[str, Any]:
"""Return resource labels."""
config = self.get_config()
if config is None:
raise ValueError(f"resource {self.resource_id} not configured")
if config.get("meta"):
meta: dict[str, Any] = config["meta"]
if meta.get("labels"):
return cast(dict[str, Any], meta["labels"])
return {}
[docs]
def get_type(self) -> str:
"""Return resource type as defined in 'genomic_resource.yaml'."""
config = self.get_config()
if config is None:
raise ValueError(f"resource {self.resource_id} not configured")
config_type = config.get("type")
if config_type is None:
return "Basic"
return cast(str, config_type)
[docs]
def get_version_str(self) -> str:
"""Return version string of the form '3.1'."""
return version_tuple_to_string(self.version)
[docs]
def get_genomic_resource_id_version(self) -> str:
"""Return a string combinint resource ID and version.
Returns a string of the form aa/bb/cc[3.2] for a genomic resource with
id aa/bb/cc and version 3.2.
If the version is 0 the string will be aa/bb/cc.
"""
return f"{self.resource_id}{version_tuple_to_suffix(self.version)}"
[docs]
def file_exists(self, filename: str) -> bool:
"""Check if filename exists in this resource."""
return self.proto.file_exists(self, filename)
[docs]
def get_manifest(self) -> Manifest:
"""Load resource manifest if it exists. Otherwise builds it."""
if self._manifest is None:
self._manifest = self.proto.get_manifest(self)
return self._manifest
[docs]
def get_file_url(self, filename: str) -> str:
return self.proto.get_resource_file_url(self, filename)
[docs]
def get_file_content(
self, filename: str,
*,
uncompress: bool = True,
mode: str = "t",
) -> Any:
"""Return the content of file in a resource."""
return self.proto.get_file_content(
self, filename, uncompress=uncompress, mode=mode)
[docs]
def open_raw_file(
self, filename: str, mode: str = "rt",
**kwargs: str | bool | None) -> IO:
"""Open a file in the resource and returns a File-like object."""
return self.proto.open_raw_file(
self, filename, mode, **kwargs)
[docs]
def open_tabix_file(
self, filename: str,
index_filename: str | None = None) -> pysam.TabixFile:
"""Open a tabix file and returns a pysam.TabixFile."""
return self.proto.open_tabix_file(self, filename, index_filename)
[docs]
def open_vcf_file(
self, filename: str,
index_filename: str | None = None) -> pysam.VariantFile:
"""Open a vcf file and returns a pysam.VariantFile."""
return self.proto.open_vcf_file(self, filename, index_filename)
[docs]
def open_bigwig_file(self, filename: str) -> Any:
"""Open a bigwig file and return it."""
return self.proto.open_bigwig_file(self, filename)
[docs]
class Mode(enum.Enum):
"""Enumeration of repository protocol access modes.
Attributes:
READONLY: Protocol supports only read operations
READWRITE: Protocol supports both read and write operations
"""
READONLY = 1
READWRITE = 2
[docs]
class ReadOnlyRepositoryProtocol(abc.ABC):
"""Abstract base class for read-only repository storage protocols.
A protocol defines how to access genomic resources from a specific
storage backend (local filesystem, HTTP server, S3 bucket, etc.).
Read-only protocols can retrieve resources but cannot modify them.
Subclasses must implement methods for:
- Listing available resources
- Reading configuration files
- Opening resource files
- Loading manifests
Attributes:
proto_id: Unique identifier for this protocol instance
url: Base URL or path to the repository root
CHUNK_SIZE: Default buffer size for file operations (32KB)
"""
CHUNK_SIZE = 32768
def __init__(self, proto_id: str, url: str):
self.proto_id = proto_id
self.url = url
[docs]
def mode(self) -> Mode:
"""Return repository protocol mode.
Returns:
Mode.READONLY for this base class
"""
return Mode.READONLY
[docs]
def get_id(self) -> str:
"""Return the repository protocol identifier.
Returns:
Protocol ID string
"""
return self.proto_id
[docs]
@abc.abstractmethod
def get_url(self) -> str:
"""Return the base URL of the repository.
Returns:
URL or path string pointing to repository root
"""
[docs]
@abc.abstractmethod
def get_public_url(self) -> str:
"""Return the public base URL of the repository.
Returns:
URL or path string pointing to a public repository root
"""
[docs]
@abc.abstractmethod
def invalidate(self) -> None:
"""Invalidate internal cache of repository protocol."""
[docs]
@abc.abstractmethod
def get_all_resources(self) -> Generator[GenomicResource, None, None]:
"""Return generator for all resources in the repository."""
[docs]
def find_resource(
self, resource_id: str,
version_constraint: str | None = None,
) -> GenomicResource | None:
"""Return requested resource or None if not found."""
matching_resources: list[GenomicResource] = []
for res in self.get_all_resources():
if res.resource_id != resource_id:
continue
if is_version_constraint_satisfied(
version_constraint, res.version):
matching_resources.append(res)
if not matching_resources:
return None
def get_resource_version(res: GenomicResource) -> tuple[int, ...]:
return res.version
return max(
matching_resources,
key=get_resource_version)
[docs]
def get_resource(
self, resource_id: str,
version_constraint: str | None = None) -> GenomicResource:
"""Return requested resource or raises exception if not found.
In case resource is not found a FileNotFoundError exception
is raised.
"""
resource = self.find_resource(resource_id, version_constraint)
if resource is None:
raise FileNotFoundError(
f"resource <{resource_id}> ({version_constraint}) not found")
return resource
[docs]
def load_yaml(self, resource: GenomicResource, filename: str) -> Any:
"""Return parsed YAML file."""
content = self.get_file_content(
resource, filename, uncompress=True)
result = yaml.safe_load(content)
if result is None:
return {}
return result
[docs]
def get_file_content(
self, resource: GenomicResource,
filename: str,
*,
uncompress: bool = True,
mode: str = "t",
) -> Any:
"""Return content of a file in given resource."""
with self.open_raw_file(
resource, filename, mode=f"r{mode}",
uncompress=uncompress) as infile:
return infile.read()
[docs]
def get_resource_url(self, resource: GenomicResource) -> str:
"""Return url of the specified resources."""
return os.path.join(
self.url,
resource.get_genomic_resource_id_version())
[docs]
def get_resource_file_url(
self, resource: GenomicResource, filename: str) -> str:
"""Return url of a file in the resource."""
return os.path.join(
self.get_resource_url(resource), filename)
[docs]
@abc.abstractmethod
def load_manifest(self, resource: GenomicResource) -> Manifest:
"""Load resource manifest."""
[docs]
@abc.abstractmethod
def file_exists(self, resource: GenomicResource, filename: str) -> bool:
"""Check if given file exist in give resource."""
[docs]
@abc.abstractmethod
def open_raw_file(
self, resource: GenomicResource, filename: str,
mode: str = "rt", **kwargs: str | bool | None) -> IO:
"""Open file in a resource and returns a file-like object."""
[docs]
@abc.abstractmethod
def open_tabix_file(
self, resource: GenomicResource, filename: str,
index_filename: str | None = None) -> pysam.TabixFile:
"""Open a tabix file in a resource and return a pysam tabix file.
Not all repositories support this method. Repositories that do
no support this method raise and exception.
"""
[docs]
@abc.abstractmethod
def open_vcf_file(
self, resource: GenomicResource, filename: str,
index_filename: str | None = None) -> pysam.VariantFile:
"""Open a vcf file in a resource and return a pysam VariantFile.
Not all repositories support this method. Repositories that do
no support this method raise and exception.
"""
[docs]
@abc.abstractmethod
def open_bigwig_file(
self, resource: GenomicResource, filename: str,
) -> Any:
"""Open a bigwig file in a resource and return it.
Not all repositories support this method. Repositories that do
no support this method raise and exception.
"""
[docs]
def compute_md5_sum(self, resource: GenomicResource, filename: str) -> str:
"""Compute a md5 hash for a file in the resource."""
logger.debug(
"compute md5sum for %s in %s", filename, resource.resource_id)
with self.open_raw_file(resource, filename, "rb") as infile:
md5_hash = hashlib.md5() # noqa S324
while chunk := infile.read(self.CHUNK_SIZE):
md5_hash.update(chunk)
return md5_hash.hexdigest()
[docs]
def get_manifest(self, resource: GenomicResource) -> Manifest:
"""Load and returns a resource manifest."""
return self.load_manifest(resource)
[docs]
def build_genomic_resource(
self, resource_id: str, version: tuple[int, ...],
config: dict | None = None,
manifest: Manifest | None = None) -> GenomicResource:
"""Build a genomic resource instance using this protocol.
Args:
resource_id: Resource identifier like "hg19/gene_models/refseq"
version: Version tuple like (1, 2, 3)
config: Optional pre-loaded configuration dict. If None, will
load from genomic_resource.yaml
manifest: Optional pre-loaded manifest. If None, will load
when first accessed
Returns:
GenomicResource instance configured with this protocol
"""
if not config:
res = GenomicResource(resource_id, version, self)
config = self.load_yaml(res, GR_CONF_FILE_NAME)
return GenomicResource(
resource_id, version, self, config, manifest)
[docs]
class ReadWriteRepositoryProtocol(ReadOnlyRepositoryProtocol):
"""Abstract base class for read-write repository storage protocols.
Extends ReadOnlyRepositoryProtocol with write capabilities including:
- Creating and updating resources
- Managing manifests
- File upload and deletion
- Resource versioning
This protocol type is used for local repositories and writable remote
storage backends where resources can be modified or created.
"""
# pylint: disable=too-many-public-methods
[docs]
def mode(self) -> Mode:
"""Return repository protocol mode.
Returns:
Mode.READWRITE for this protocol type
"""
return Mode.READWRITE
[docs]
@abc.abstractmethod
def collect_all_resources(self) -> Generator[GenomicResource, None, None]:
"""Scan repository and yield all resources.
Returns:
Generator yielding GenomicResource instances for each
resource found in the repository
"""
[docs]
@abc.abstractmethod
def collect_resource_entries(self, resource: GenomicResource) -> Manifest:
"""Scan resource directory and build manifest from files found.
Args:
resource: Resource to scan
Returns:
Manifest containing entries for all files in the resource
"""
def _update_manifest_entry_and_state(
self, resource: GenomicResource, entry: ManifestEntry,
prebuild_entries: dict[str, ManifestEntry]) -> None:
pre_state = self.load_resource_file_state(resource, entry.name)
size = None
md5 = None
if entry.name in prebuild_entries:
ready_entry = prebuild_entries[entry.name]
size = ready_entry.size
md5 = ready_entry.md5
if pre_state is None:
state = self.build_resource_file_state(
resource, entry.name, size=size, md5=md5)
self.save_resource_file_state(resource, state)
elif entry.name in prebuild_entries:
state = self.build_resource_file_state(
resource, entry.name, size=size, md5=md5)
else:
timestamp = self.get_resource_file_timestamp(resource, entry.name)
size = self.get_resource_file_size(resource, entry.name)
if abs(timestamp - pre_state.timestamp) <= 1e-2 \
and size == pre_state.size:
state = pre_state
else:
state = self.build_resource_file_state(
resource, entry.name, size=size, md5=md5)
self.save_resource_file_state(resource, state)
entry.md5 = state.md5
entry.size = state.size
[docs]
def build_manifest(
self, resource: GenomicResource,
prebuild_entries: dict[str, ManifestEntry] | None = None,
) -> Manifest:
"""Build full manifest for the resource."""
if prebuild_entries is None:
prebuild_entries = {}
manifest = Manifest()
for entry in self.collect_resource_entries(resource):
self._update_manifest_entry_and_state(
resource, entry, prebuild_entries)
manifest.add(entry)
return manifest
[docs]
def check_update_manifest(
self, resource: GenomicResource,
prebuild_entries: dict[str, ManifestEntry] | None = None,
) -> ManifestUpdate:
"""Check if the resource manifest needs update."""
try:
current_manifest = self.load_manifest(resource)
except FileNotFoundError:
current_manifest = Manifest()
manifest = Manifest()
entries_to_update = set()
for entry in self.collect_resource_entries(resource):
manifest.add(entry)
state = self.load_resource_file_state(resource, entry.name)
if state is None:
md5 = None
size = None
if prebuild_entries and entry.name in prebuild_entries:
md5 = prebuild_entries[entry.name].md5
size = prebuild_entries[entry.name].size
state = self.build_resource_file_state(
resource, entry.name,
md5=md5, size=size)
self.save_resource_file_state(resource, state)
if state.filename not in current_manifest:
entries_to_update.add(entry.name)
continue
file_timestamp = self.get_resource_file_timestamp(
resource, entry.name)
file_size = self.get_resource_file_size(
resource, entry.name)
if state.timestamp != file_timestamp or \
state.size != file_size:
md5 = None
if prebuild_entries and entry.name in prebuild_entries:
md5 = prebuild_entries[entry.name].md5
state = self.build_resource_file_state(
resource, entry.name, md5=md5)
if state.md5 == current_manifest[entry.name].md5:
self.save_resource_file_state(resource, state)
else:
entries_to_update.add(entry.name)
continue
if state.md5 != current_manifest[entry.name].md5:
entries_to_update.add(entry.name)
continue
entry.md5 = state.md5
entry.size = state.size
if prebuild_entries is not None:
manifest.update(prebuild_entries)
entries_to_delete = current_manifest.names() - manifest.names()
return ManifestUpdate(manifest, entries_to_delete, entries_to_update)
[docs]
def update_manifest(
self, resource: GenomicResource,
prebuild_entries: dict[str, ManifestEntry] | None = None,
) -> Manifest:
"""Update or create full manifest for the resource."""
manifest_update = self.check_update_manifest(
resource, prebuild_entries)
if not bool(manifest_update):
return manifest_update.manifest
manifest = manifest_update.manifest
if prebuild_entries is None:
prebuild_entries = {}
for filename in manifest_update.entries_to_update:
entry = manifest[filename]
self._update_manifest_entry_and_state(
resource, entry, prebuild_entries)
return manifest
[docs]
def save_manifest(
self, resource: GenomicResource, manifest: Manifest) -> None:
"""Save manifest into genomic resource's directory."""
logger.debug(
"save manifest of resource %s from %s", resource.resource_id,
self.proto_id)
with self.open_raw_file(
resource, GR_MANIFEST_FILE_NAME, "wt") as outfile:
yaml.dump(manifest.to_manifest_entries(), outfile)
resource.invalidate()
[docs]
def save_index(self, resource: GenomicResource, contents: str) -> None:
"""Save an index HTML file into the genomic resource's directory."""
with self.open_raw_file(resource, GR_INDEX_FILE_NAME, "wt") as outfile:
outfile.write(contents)
[docs]
def get_manifest(self, resource: GenomicResource) -> Manifest:
"""Load or build a resource manifest."""
try:
manifest = self.load_manifest(resource)
except FileNotFoundError:
manifest = self.build_manifest(resource)
return manifest
[docs]
@abc.abstractmethod
def get_resource_file_timestamp(
self, resource: GenomicResource, filename: str) -> float:
"""Return the timestamp (ISO formatted) of a resource file."""
[docs]
@abc.abstractmethod
def get_resource_file_size(
self, resource: GenomicResource, filename: str) -> int:
"""Return the size of a resource file."""
[docs]
def build_resource_file_state(
self, resource: GenomicResource,
filename: str,
**kwargs: str | float | int | None) -> ResourceFileState:
"""Build resource file state."""
if not self.file_exists(resource, filename):
raise ValueError(
f"can't build resource state for not existing resource file "
f"{resource.resource_id} > {filename}")
md5 = kwargs.get("md5")
timestamp = kwargs.get("timestamp")
size = kwargs.get("size")
if md5 is None:
md5 = self.compute_md5_sum(resource, filename)
if timestamp is None:
timestamp = self.get_resource_file_timestamp(resource, filename)
if size is None:
size = self.get_resource_file_size(resource, filename)
return ResourceFileState(
filename,
cast(int, size),
cast(float, timestamp),
cast(str, md5))
[docs]
@abc.abstractmethod
def save_resource_file_state(
self, resource: GenomicResource, state: ResourceFileState) -> None:
"""Save resource file state into internal GRR state."""
[docs]
@abc.abstractmethod
def load_resource_file_state(
self, resource: GenomicResource,
filename: str) -> ResourceFileState | None:
"""Load resource file state from internal GRR state.
If the specified resource file has no internal state returns None.
"""
[docs]
@abc.abstractmethod
def delete_resource_file(
self, resource: GenomicResource, filename: str) -> None:
"""Delete a resource file and it's internal state."""
[docs]
@abc.abstractmethod
def copy_resource_file(
self,
remote_resource: GenomicResource,
dest_resource: GenomicResource,
filename: str) -> ResourceFileState | None:
"""Copy a remote resource file into local repository."""
[docs]
@abc.abstractmethod
def update_resource_file(
self, remote_resource: GenomicResource,
dest_resource: GenomicResource,
filename: str) -> ResourceFileState | None:
"""Update a resource file into repository if needed."""
[docs]
def get_or_create_resource(
self, resource_id: str,
version: tuple[int, ...]) -> GenomicResource:
"""Return a resource with specified ID and version.
If the resource is not found create an empty resource.
"""
resource = self.find_resource(
resource_id=resource_id,
version_constraint=f"={version_tuple_to_string(version)}")
if resource is None:
logger.info(
"resource %s (%s) not found in %s; creating...",
resource_id,
version,
self.get_id())
resource = GenomicResource(
resource_id,
version,
self)
return resource
[docs]
def copy_resource(
self,
remote_resource: GenomicResource) -> GenomicResource:
"""Copy a remote resource into repository."""
local_resource = self.get_or_create_resource(
remote_resource.resource_id, remote_resource.version)
remote_manifest = remote_resource.get_manifest()
local_manifest = self.get_manifest(local_resource)
filenames_to_delete = local_manifest.names() - remote_manifest.names()
for filename in filenames_to_delete:
self.delete_resource_file(local_resource, filename)
for manifest_entry in remote_manifest:
self.copy_resource_file(
remote_resource, local_resource, manifest_entry.name)
self.save_manifest(local_resource, remote_resource.get_manifest())
self.invalidate()
return self.get_resource(
resource_id=remote_resource.resource_id,
version_constraint=f"={remote_resource.get_version_str()}")
[docs]
def update_resource(
self,
remote_resource: GenomicResource,
files_to_copy: set[str] | None = None,
) -> GenomicResource:
"""Copy a remote resource into repository.
Allows copying of a subset of files from the resource via
files_to_copy. If files_to_copy is None, copies all files.
"""
local_resource = self.get_or_create_resource(
remote_resource.resource_id, remote_resource.version)
remote_manifest = remote_resource.get_manifest()
local_manifest = self.get_manifest(local_resource)
filenames_to_delete = local_manifest.names() - remote_manifest.names()
if files_to_copy is None:
files_to_copy = {entry.name for entry in remote_manifest}
else:
files_to_copy.add(GR_CONF_FILE_NAME) # config is always required
for filename in filenames_to_delete:
self.delete_resource_file(local_resource, filename)
for file in files_to_copy:
self.update_resource_file(remote_resource, local_resource, file)
if local_manifest != remote_manifest:
self.save_manifest(local_resource, remote_resource.get_manifest())
self.invalidate()
return self.get_resource(
resource_id=remote_resource.resource_id,
version_constraint=f"={remote_resource.get_version_str()}")
[docs]
@abc.abstractmethod
def build_content_file(self) -> list[dict[str, Any]]:
"""Build the content of the repository (i.e '.CONTENTS.json' file)."""
[docs]
class GenomicResourceRepo(abc.ABC):
"""Abstract base class for genomic resource repositories.
A repository manages a collection of genomic resources, providing
methods to discover, retrieve, and (for writable repos) create resources.
Repositories can be:
- Protocol-based: Direct access to a single storage backend
- Group: Aggregates multiple child repositories
- Cached: Wraps another repository with local caching
All repositories support resource lookup with optional version constraints:
repo.get_resource("hg19/genome") # Latest version
repo.get_resource("hg19/genome", ">=2.0") # Version 2.0 or higher
repo.get_resource("hg19/genome", "=2.1") # Exact version 2.1
Attributes:
repo_id: Unique identifier for this repository
definition: Configuration dict used to create this repository
"""
def __init__(self, repo_id: str):
self._repo_id: str = repo_id
self._definition: dict[str, Any] | None = None
[docs]
def close(self) -> None:
"""Release any resources held by this repository."""
self._definition = None
@property
def definition(self) -> dict[str, Any] | None:
"""Get a copy of the repository configuration definition.
Returns:
Deep copy of definition dict, or None if not set
"""
if self._definition:
return copy.deepcopy(self._definition)
return self._definition
@definition.setter
def definition(self, value: dict[str, Any]) -> None:
"""Set the repository configuration definition.
Args:
value: Configuration dict to store (will be deep copied)
"""
self._definition = copy.deepcopy(value)
[docs]
@abc.abstractmethod
def invalidate(self) -> None:
"""Clear cached state and force reload on next access.
Implementations should clear any cached resource lists, metadata,
or file contents to ensure fresh data is loaded.
"""
@property
def repo_id(self) -> str:
"""Get the repository identifier.
Returns:
Repository ID string
"""
return self._repo_id
[docs]
@abc.abstractmethod
def get_resource(
self, resource_id: str, version_constraint: str | None = None,
repository_id: str | None = None) -> GenomicResource:
"""Return one resource with id qual to resource_id.
If resource is not found, exception is raised.
"""
[docs]
@abc.abstractmethod
def find_resource(
self, resource_id: str, version_constraint: str | None = None,
repository_id: str | None = None) -> GenomicResource | None:
"""Return one resource with id qual to resource_id.
If resource is not found, None is returned.
"""
[docs]
@abc.abstractmethod
def get_all_resources(self) -> Generator[GenomicResource, None, None]:
"""Return a generator over all resource in the repository."""
[docs]
class GenomicResourceProtocolRepo(GenomicResourceRepo):
"""Base class for real genomic resources repositories."""
def __init__(
self,
proto: ReadOnlyRepositoryProtocol | ReadWriteRepositoryProtocol):
super().__init__(proto.get_id())
self.proto = proto
[docs]
def close(self) -> None:
self.invalidate()
super().close()
[docs]
def invalidate(self) -> None:
self.proto.invalidate()
[docs]
def get_resource(
self, resource_id: str, version_constraint: str | None = None,
repository_id: str | None = None) -> GenomicResource:
if repository_id and self.repo_id != repository_id:
raise ValueError(
f"can't find resource ({resource_id}, {version_constraint}: "
f"repository {repository_id} in repository {self.repo_id}")
return self.proto.get_resource(resource_id, version_constraint)
[docs]
def find_resource(
self, resource_id: str, version_constraint: str | None = None,
repository_id: str | None = None) -> GenomicResource | None:
if repository_id and self.repo_id != repository_id:
return None
return self.proto.find_resource(resource_id, version_constraint)
[docs]
def get_all_resources(self) -> Generator[GenomicResource, None, None]:
return self.proto.get_all_resources()
RepositoryProtocol = ReadOnlyRepositoryProtocol | ReadWriteRepositoryProtocol