Source code for gain.genomic_resources.fsspec_protocol

"""Provides GRR protocols based on fsspec library."""
from __future__ import annotations

import asyncio
import copy
import datetime
import gzip
import hashlib
import json
import logging
import operator
import os
import pathlib
import tempfile
import time
from collections.abc import Callable, Generator
from contextlib import AbstractContextManager
from dataclasses import asdict
from threading import Lock
from types import TracebackType
from typing import (
    IO,
    Any,
    NamedTuple,
    cast,
)
from urllib.parse import urlparse

import apsw
import fsspec
import fsspec.exceptions
import pyBigWig
import pysam
import yaml
from filelock import FileLock
from markdown2 import markdown

from gain.genomic_resources.repository import (
    GR_CONF_FILE_NAME,
    GR_CONTENTS_FILE_NAME,
    GR_INDEX_FILE_NAME,
    GR_MANIFEST_FILE_NAME,
    GR_SQLITE_META_FILE_NAME,
    GenomicResource,
    Manifest,
    ManifestEntry,
    Mode,
    ReadOnlyRepositoryProtocol,
    ReadWriteRepositoryProtocol,
    ResourceFileState,
    is_gr_id_token,
    parse_gr_id_version_token,
)
from gain.templates import get_template
from gain.utils.helpers import convert_size

# Silence the spurious "[W::hts_idx_load3] The index file is older than the
# data file" warning that htslib emits when a tabix/VCF index has an older
# mtime than its data file. In our GRR workflow this is benign: both the
# caching protocol and DVC download index and data files in parallel, and
# the smaller index typically lands first. Level 1 (errors only) keeps real
# htslib errors visible while suppressing notices (3) and warnings (2).
pysam.set_verbosity(1)

logger = logging.getLogger(__name__)


[docs] class FileCacheVerdict(NamedTuple): """The lock-free classification of a single resource file. ``needs_download`` is True when the local copy is missing or has drifted from the remote manifest and must be (re)downloaded; ``size`` is the manifest-recorded byte size of that pending download (0 when nothing needs downloading). See gain#78. """ needs_download: bool size: int
# Per-file download retry policy for copy_resource_file. A single stalled or # dropped read over a slow HTTP GRR link used to abort the whole cache run # (gain#43); instead we retry the file from scratch with exponential backoff. _COPY_MAX_ATTEMPTS = 4 _COPY_BACKOFF_BASE = 5 # seconds; delays are 5s, 15s, 45s
[docs] class ChecksumMismatchError(OSError): """A completed download whose md5 disagrees with the manifest. Almost always a truncated or corrupted transfer, so it is treated as a retryable error by copy_resource_file rather than a hard failure. """
# aiohttp.ClientError is folded into the retryable set when aiohttp is # importable (it always is when an HTTP GRR is used). try: import aiohttp as _aiohttp _aiohttp_errors: tuple[type[BaseException], ...] = (_aiohttp.ClientError,) except ImportError: _aiohttp_errors = () # Transient errors that warrant retrying a file download from scratch. A # stalled aiohttp read surfaces as fsspec's FSTimeoutError; ConnectionError # covers resets/refused connects; ChecksumMismatchError covers truncated # transfers. _RETRYABLE_COPY_ERRORS: tuple[type[BaseException], ...] = ( fsspec.exceptions.FSTimeoutError, asyncio.TimeoutError, ConnectionError, ChecksumMismatchError, *_aiohttp_errors, ) def _scan_for_resources( content_dict: dict, parent_id: list[str], ) -> Generator[tuple[str, tuple[int, ...], dict], None, None]: name = "/".join(parent_id) id_ver = parse_gr_id_version_token(name) if isinstance(content_dict, dict) and id_ver and \ GR_CONF_FILE_NAME in content_dict and \ not isinstance(content_dict[GR_CONF_FILE_NAME], dict): # resource found resource_id, version = id_ver yield "/".join([*parent_id, resource_id]), version, content_dict return for name, content in content_dict.items(): id_ver = parse_gr_id_version_token(name) if isinstance(content, dict) and id_ver and \ GR_CONF_FILE_NAME in content and \ not isinstance(content[GR_CONF_FILE_NAME], dict): # resource found resource_id, version = id_ver yield "/".join([*parent_id, resource_id]), version, content else: curr_id = [*parent_id, name] curr_id_path = "/".join(curr_id) if not isinstance(content, dict): logger.warning("file <%s> is not used.", curr_id_path) continue if not is_gr_id_token(name): logger.warning( "directory <%s> has a name <%s> that is not a " "valid Genomic Resource Id Token.", curr_id_path, name) continue # scan children yield from _scan_for_resources(content, curr_id) def _scan_for_resource_files( content_dict: dict[str, Any], parent_dirs: list[str], ) -> Generator[tuple[str, str | bytes], None, None]: for path, content in content_dict.items(): if isinstance(content, dict): # handle subdirectory for fname, fcontent in _scan_for_resource_files( content, [*parent_dirs, path]): yield fname, fcontent else: fname = "/".join([*parent_dirs, path]) if isinstance(content, (str, bytes)): # handle file content yield fname, content else: logger.error( "unexpected content at %s: %s", fname, content) raise TypeError(f"unexpected content at {fname}: {content}")
[docs] def build_inmemory_protocol( proto_id: str, root_path: str, content: dict[str, Any]) -> FsspecReadWriteProtocol: """Build and return an embedded fsspec protocol for testing.""" if not os.path.isabs(root_path): logger.error( "for embedded resources repository we expects an " "absolute path: %s", root_path) raise ValueError(f"not an absolute root path: {root_path}") proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol(proto_id, f"memory://{root_path}")) for rid, rver, rcontent in _scan_for_resources(content, []): resource = GenomicResource(rid, rver, proto) for fname, fcontent in _scan_for_resource_files(rcontent, []): mode = "wt" if isinstance(fcontent, bytes): mode = "wb" with proto.open_raw_file(resource, fname, mode) as outfile: outfile.write(fcontent) proto.save_resource_file_state( resource, proto.build_resource_file_state(resource, fname)) proto.save_manifest(resource, proto.build_manifest(resource)) return proto
[docs] class FsspecReadOnlyProtocol(ReadOnlyRepositoryProtocol): """Provides fsspec genomic resources repository protocol.""" def __getnewargs_ex__(self) -> tuple[tuple, dict]: # pylint: disable=invalid-getnewargs-ex-returned args = (self.proto_id, self.url) kwargs: dict[str, Any] = copy.copy(self.kwargs) kwargs["public_url"] = self.public_url return (args, kwargs) def __new__(cls, *args: Any, **kwargs: Any) -> FsspecReadOnlyProtocol: proto_id = args[0] if len(args) > 0 else kwargs["proto_id"] url = args[1] if len(args) > 1 else kwargs["url"] key = (proto_id, url) if key in _FSSPEC_PROTOCOLS: logger.debug( "protocol with id %s and url %s already exists, " "returning the existing instance", proto_id, url) return _FSSPEC_PROTOCOLS[key] instance = super().__new__(cls) _FSSPEC_PROTOCOLS[key] = instance return instance def __init__( self, proto_id: str, url: str, *, filesystem: fsspec.AbstractFileSystem, public_url: str | None = None, **kwargs: Any, ): super().__init__(proto_id, url) parsed = urlparse(url) self.scheme = parsed.scheme if self.scheme == "": self.scheme = "file" self.netloc = parsed.netloc self.root_path = parsed.path self.url = f"{self.scheme}://{self.netloc}{self.root_path}" if public_url is None: self.public_url = self.url else: self.public_url = public_url self.filesystem = filesystem self.kwargs: dict[str, Any] = kwargs self._all_resources_lock = Lock() self._all_resources: dict[str, GenomicResource] | None = None def __getstate__(self) -> dict[str, Any]: state = self.__dict__.copy() # Remove the unpicklable entries. del state["filesystem"] del state["_all_resources"] del state["_all_resources_lock"] return state def __setstate__(self, state: dict[str, Any]) -> None: self.__dict__.update(state) self.filesystem = _build_filesystem( self.url, **self.kwargs) self._all_resources = None self._all_resources_lock = Lock()
[docs] def get_url(self) -> str: return self.url
[docs] def get_public_url(self) -> str: return self.public_url
[docs] def invalidate(self) -> None: if self._all_resources is not None: for resource in self._all_resources.values(): resource.proto = None # type: ignore self._all_resources = None
[docs] def close(self) -> None: """Close the genomic resource.""" self.invalidate()
[docs] def load_contents(self) -> list[dict[str, Any]]: """Load the content JSON of the repository.""" content_filename = os.path.join( self.url, GR_CONTENTS_FILE_NAME) compression: str | None = "gzip" if not self.filesystem.exists(content_filename): content_filename = content_filename[:-3] compression = None with self.filesystem.open( content_filename, "rt", compression=compression) as infile: data = infile.read() return cast(list[dict[str, Any]], json.loads(data))
[docs] def md5_contents(self) -> str: """Calculate md5 hash of the repository content.""" content_filename = os.path.join( self.url, GR_CONTENTS_FILE_NAME) if not self.filesystem.exists(content_filename): content_filename = content_filename[:-3] with self.filesystem.open(content_filename, "rb") as infile: data = infile.read() assert isinstance(data, bytes) return hashlib.md5(data).hexdigest() # noqa: S324
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources in the repository.""" yield from self.get_all_resources_dict().values()
[docs] def get_all_resources_dict(self) -> dict[str, GenomicResource]: with self._all_resources_lock: if self._all_resources is None: all_resources = [] contents = self.load_contents() for entry in contents: version = tuple(map(int, entry["version"].split("."))) manifest = Manifest.from_manifest_entries( entry["manifest"]) resource = self.build_genomic_resource( entry["id"], version, config=entry["config"], manifest=manifest) logger.debug( "repo %s loaded resource %s", self.proto_id, resource.resource_id) all_resources.append(resource) self._all_resources = { res.get_full_id(): res for res in sorted( all_resources, key=lambda r: r.get_full_id(), ) } return self._all_resources
[docs] def file_exists( self, resource: GenomicResource, filename: str) -> bool: filepath = self.get_resource_file_url(resource, filename) return cast(bool, self.filesystem.exists(filepath))
[docs] def load_manifest(self, resource: GenomicResource) -> Manifest: """Load resource manifest.""" content = self.get_file_content(resource, GR_MANIFEST_FILE_NAME) return Manifest.from_file_content(content)
[docs] def open_raw_file( self, resource: GenomicResource, filename: str, mode: str = "rt", **kwargs: str | bool | None) -> IO: filepath = self.get_resource_file_url(resource, filename) if "w" in mode: if self.mode() == Mode.READONLY: raise OSError( f"Read-Only protocol {self.get_id()} trying to open " f"{filepath} for writing") # Create the containing directory if it doesn't exists. parent = os.path.dirname(filepath) if not self.filesystem.exists(parent): self.filesystem.mkdir( parent, create_parents=True, exist_ok=True) compression = None if kwargs.get("compression"): compression = "gzip" return cast( IO, self.filesystem.open( filepath, mode=mode, compression=compression))
[docs] def open_repository_sqlite3_metadata_db(self) -> apsw.Connection: sqlite_filepath = os.path.join( self.url, GR_SQLITE_META_FILE_NAME) if not self.filesystem.exists(sqlite_filepath): raise ValueError( "Repository contents SQLite metadata DB not found!") connection = apsw.Connection(":memory:") with self.filesystem.open( sqlite_filepath, "rb", compression="gzip") as decompressed_db: raw_db = decompressed_db.read() assert isinstance(raw_db, bytes) connection.deserialize("main", raw_db) return connection
def _get_file_url(self, resource: GenomicResource, filename: str) -> str: def process_file_url(url: str) -> str: if self.scheme == "file": return urlparse(url).path if self.scheme == "s3": return cast(str, self.filesystem.sign(url)) return url return process_file_url(self.get_resource_file_url(resource, filename))
[docs] def open_tabix_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.TabixFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"tabix files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" index_url = self._get_file_url(resource, index_filename) return pysam.TabixFile( # pylint: disable=no-member file_url, index=index_url, encoding="utf-8", parser=pysam.asTuple())
[docs] def open_vcf_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.VariantFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"vcf files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" index_url = self._get_file_url(resource, index_filename) return pysam.VariantFile( # pylint: disable=no-member file_url, index_filename=index_url)
[docs] def open_fasta_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None, compressed_index_filename: str | None = None) -> pysam.FastaFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"fasta files are not supported on schema {self.scheme}") if index_filename is None: index_filename = f"{filename}.fai" if compressed_index_filename is None: compressed_index_filename = f"{filename}.gzi" if not self.file_exists(resource, compressed_index_filename): raise ValueError( f"bgzip index '{compressed_index_filename}' is required to " f"read bgzipped genome '{filename}' in resource " f"'{resource.resource_id}'; generate the .fai and .gzi " f"indexes with 'samtools faidx {filename}'") file_url = self._get_file_url(resource, filename) if self.scheme == "file": return pysam.FastaFile( # pylint: disable=no-member file_url, filepath_index=self._get_file_url(resource, index_filename), filepath_index_compressed=self._get_file_url( resource, compressed_index_filename)) # Remote scheme: pysam.FastaFile requires the index arguments to be # local paths (it os.path.exists-checks them), but htslib can range- # read the data file remotely. Copy the small .fai/.gzi indexes to a # temporary local directory and open against those; htslib loads both # indexes into memory at open, so the temp files can be removed # immediately afterwards. The multi-GB data file stays remote. with tempfile.TemporaryDirectory(prefix="gain-fasta-idx-") as tmpdir: local_index = self._copy_resource_file_to_local( resource, index_filename, tmpdir) local_compressed_index = self._copy_resource_file_to_local( resource, compressed_index_filename, tmpdir) return pysam.FastaFile( # pylint: disable=no-member file_url, filepath_index=local_index, filepath_index_compressed=local_compressed_index)
def _copy_resource_file_to_local( self, resource: GenomicResource, filename: str, dest_dir: str) -> str: """Copy a (small) resource file into dest_dir; return the local path.""" dest = os.path.join(dest_dir, os.path.basename(filename)) with self.open_raw_file( resource, filename, "rb", uncompress=False) as src: data = src.read() pathlib.Path(dest).write_bytes(data) return dest
[docs] def open_bigwig_file( self, resource: GenomicResource, filename: str, ) -> Any: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"bigwig files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) return pyBigWig.open(file_url) # pylint: disable=I1101
[docs] class FsspecReadWriteProtocol( FsspecReadOnlyProtocol, ReadWriteRepositoryProtocol): """Provides fsspec genomic resources repository protocol.""" def __init__( self, proto_id: str, url: str, *, filesystem: fsspec.AbstractFileSystem, public_url: str | None = None, **kwargs: Any, ): super().__init__( proto_id, url, filesystem=filesystem, public_url=public_url, **kwargs, ) self.filesystem.makedirs(self.url, exist_ok=True) def _get_resource_file_lockfile_path( self, resource: GenomicResource, filename: str, ) -> str: """Return path of the resource file's lockfile.""" if self.scheme != "file": raise NotImplementedError resource_url = self.get_resource_url(resource) path = os.path.join(resource_url, ".grr", f"{filename}.lockfile") return path.removeprefix(f"{self.scheme}://")
[docs] def obtain_resource_file_lock( self, resource: GenomicResource, filename: str, timeout: float = -1, ) -> AbstractContextManager: """Lock a resource's file.""" class NoLock: """Lock representation.""" def __enter__(self) -> None: pass def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None) -> bool: return exc_type is None if self.scheme != "file": return NoLock() lockfile = self._get_resource_file_lockfile_path(resource, filename) return FileLock(lockfile, timeout=timeout)
def _scan_path_for_resources( self, path_array: list[str], ) -> Generator[Any, None, None]: url = os.path.join(self.url, *path_array) path = os.path.join(self.root_path, *path_array) assert isinstance(url, str) if not self.filesystem.isdir(url): return content = [] for direntry in self.filesystem.ls(url, detail=False): if self.netloc and direntry.startswith(self.netloc): direntry = direntry[len(self.netloc):] name = os.path.relpath(direntry, path) if name.startswith("."): continue content.append(name) if GR_CONF_FILE_NAME in content: res_path = "/".join(path_array) resource_id, version = parse_gr_id_version_token(res_path) if resource_id is None: logger.error("bad resource id/version: %s", res_path) return yield resource_id, version, res_path else: for name in content: yield from self._scan_path_for_resources([*path_array, name]) def _scan_resource_for_files( self, resource_path: str, path_array: list[str], ) -> Generator[Any, None, None]: url = os.path.join(self.url, resource_path, *path_array) if not self.filesystem.isdir(url): if path_array: yield os.path.join(*path_array), url return path = os.path.join(self.root_path, resource_path, *path_array) content = [] for direntry in self.filesystem.ls(url, detail=False): if self.netloc and direntry.startswith(self.netloc): direntry = direntry[len(self.netloc):] name = os.path.relpath(direntry, path) if name.startswith("."): continue content.append(name) for name in content: yield from self._scan_resource_for_files( resource_path, [*path_array, name]) def _get_filepath_timestamp(self, filepath: str) -> float: try: modification = self.filesystem.modified(filepath) modification = modification.replace(tzinfo=datetime.timezone.utc) return cast(float, round(modification.timestamp(), 2)) except NotImplementedError: info = self.filesystem.info(filepath) modification = cast(float, info.get("created")) return cast(float, round(modification, 2))
[docs] def collect_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources managed by this protocol.""" for res_id, res_ver, res_path in self._scan_path_for_resources([]): res_fullpath = os.path.join(self.root_path, res_path) assert res_fullpath.startswith("/") res_fullpath = f"{self.scheme}://{self.netloc}{res_fullpath}" with self.filesystem.open( os.path.join( res_fullpath, GR_CONF_FILE_NAME), "rt") as infile: config = yaml.safe_load(infile) manifest: Manifest | None = None manifest_filename = os.path.join( res_fullpath, GR_MANIFEST_FILE_NAME) if self.filesystem.exists(manifest_filename): with self.filesystem.open(manifest_filename, "rt") as infile: logger.debug("loading manifest from %s", manifest_filename) manifest = Manifest.from_file_content( cast(str, infile.read())) yield self.build_genomic_resource( res_id, res_ver, config, manifest)
[docs] def collect_resource_entries(self, resource: GenomicResource) -> Manifest: """Scan the resource and resturn a manifest.""" resource_path = resource.get_genomic_resource_id_version() result = Manifest() for name, path in self._scan_resource_for_files(resource_path, []): if name.endswith("html"): # Ignore generated info files continue size = self._get_filepath_size(path) result.add(ManifestEntry(name, size, None)) return result
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources in the repository.""" yield from self.get_all_resources_dict().values()
[docs] def get_all_resources_dict(self) -> dict[str, GenomicResource]: with self._all_resources_lock: if self._all_resources is None: self._all_resources = { res.get_full_id(): res for res in sorted( self.collect_all_resources(), key=lambda r: r.get_full_id(), ) } return self._all_resources
def _get_resource_file_state_path( self, resource: GenomicResource, filename: str) -> str: """Return filename of the resource file state path.""" resource_url = self.get_resource_url(resource) return os.path.join(resource_url, ".grr", f"{filename}.state")
[docs] def get_resource_file_timestamp( self, resource: GenomicResource, filename: str) -> float: url = self.get_resource_file_url(resource, filename) return self._get_filepath_timestamp(url)
def _get_filepath_size( self, filepath: str) -> int: fileinfo = self.filesystem.info(filepath) return int(fileinfo["size"])
[docs] def get_resource_file_size( self, resource: GenomicResource, filename: str) -> int: path = self.get_resource_file_url(resource, filename) return self._get_filepath_size(path)
[docs] def save_resource_file_state( self, resource: GenomicResource, state: ResourceFileState) -> None: """Save resource file state into internal GRR state.""" path = self._get_resource_file_state_path(resource, state.filename) if not self.filesystem.exists(os.path.dirname(path)): self.filesystem.makedirs( os.path.dirname(path), exist_ok=True) content = asdict(state) with self.filesystem.open(path, "wt", encoding="utf8") as outfile: outfile.write(yaml.safe_dump(content))
[docs] def load_resource_file_state( self, resource: GenomicResource, filename: str) -> ResourceFileState | None: """Load resource file state from internal GRR state. If the specified resource file has no internal state returns None. """ path = self._get_resource_file_state_path(resource, filename) if not self.filesystem.exists(path): return None with self.filesystem.open(path, "rt", encodings="utf8") as infile: content = yaml.safe_load(infile.read()) if content is None or not content: return None return ResourceFileState( content["filename"], content["size"], content["timestamp"], content["md5"], )
[docs] def delete_resource_file( self, resource: GenomicResource, filename: str) -> None: """Delete a resource file and it's internal state.""" filepath = self.get_resource_file_url(resource, filename) if self.filesystem.exists(filepath): self.filesystem.delete(filepath) statepath = self._get_resource_file_state_path(resource, filename) if self.filesystem.exists(statepath): self.filesystem.delete(statepath)
[docs] def copy_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str, on_bytes: Callable[[int], None] | None = None, ) -> ResourceFileState | None: """Copy a resource file into repository. A transient stall or drop mid-download (common when fetching a large resource over a slow HTTP GRR link) is retried from scratch with exponential backoff rather than aborting the file. See gain#43. ``on_bytes``, when given, is called with the number of bytes written for each chunk during the download (see gain#77). Because a retried attempt re-downloads the whole file from scratch, the bytes credited by a failed attempt are rolled back with a single compensating negative call before the retry, so a caller-side byte counter never double-counts. """ assert dest_resource.resource_id == remote_resource.resource_id logger.debug( "copying resource file (%s: %s) from %s", remote_resource.resource_id, filename, remote_resource.proto.proto_id) remote_manifest = remote_resource.get_manifest() if filename not in remote_manifest: self.delete_resource_file(dest_resource, filename) return None manifest_entry = remote_manifest[filename] dest_filepath = self.get_resource_file_url(dest_resource, filename) dest_parent = os.path.dirname(dest_filepath) if not self.filesystem.exists(dest_parent): self.filesystem.mkdir( dest_parent, create_parents=True, exist_ok=True) # Bytes credited to on_bytes during the current attempt, so a # retryable failure can roll them back before the next attempt. attempt_bytes = 0 def tracking_on_bytes(n: int) -> None: nonlocal attempt_bytes attempt_bytes += n assert on_bytes is not None on_bytes(n) wrapped_on_bytes = ( tracking_on_bytes if on_bytes is not None else None) last_error: BaseException | None = None for attempt in range(1, _COPY_MAX_ATTEMPTS + 1): attempt_bytes = 0 try: return self._download_resource_file( remote_resource, dest_resource, filename, dest_filepath, manifest_entry.md5, on_bytes=wrapped_on_bytes) except _RETRYABLE_COPY_ERRORS as error: last_error = error if on_bytes is not None and attempt_bytes: # roll back the partially-credited bytes of this attempt on_bytes(-attempt_bytes) if attempt >= _COPY_MAX_ATTEMPTS: break delay = _COPY_BACKOFF_BASE * (3 ** (attempt - 1)) logger.warning( "transient failure downloading (%s: %s): %s; " "retrying in %ss (attempt %s/%s)", dest_resource.resource_id, filename, error, delay, attempt + 1, _COPY_MAX_ATTEMPTS) time.sleep(delay) assert last_error is not None raise last_error
def _download_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str, dest_filepath: str, expected_md5: str | None, *, on_bytes: Callable[[int], None] | None = None, ) -> ResourceFileState: """Download a single resource file once and verify its checksum. Opens a fresh remote handle and truncates the destination, so a retried call recovers cleanly from a partially-written file. ``on_bytes``, when given, is called with the length of each chunk right after it is written, to drive a byte-level progress bar (see gain#77). """ with remote_resource.open_raw_file( filename, "rb", uncompress=False) as infile, \ self.open_raw_file( dest_resource, filename, "wb", uncompress=False) as outfile: md5_hash = hashlib.md5() # noqa while chunk := infile.read(self.CHUNK_SIZE): outfile.write(chunk) if on_bytes is not None: on_bytes(len(chunk)) md5_hash.update(chunk) md5 = md5_hash.hexdigest() if not self.filesystem.exists(dest_filepath): raise OSError(f"destination file not created {dest_filepath}") if md5 != expected_md5: raise ChecksumMismatchError( f"file copy is broken " f"{dest_resource.resource_id} ({filename}); " f"md5sum are different: " f"{md5}!={expected_md5}") state = self.build_resource_file_state( dest_resource, filename, md5sum=md5) self.save_resource_file_state(dest_resource, state) return state
[docs] def classify_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> FileCacheVerdict: """Decide whether a resource file needs (re)downloading. This is the lock-free decision half of :meth:`update_resource_file`: it performs the same checks and the same state-refresh side effect (rebuild + save the ``.state`` on a missing state or a timestamp/size drift, and delete a file no longer in the remote manifest), but it never copies/downloads. The verdict's ``size`` is the manifest byte size for files that will download (0 otherwise). See gain#78. """ assert dest_resource.resource_id == remote_resource.resource_id remote_manifest = remote_resource.get_manifest() if not self.file_exists(dest_resource, filename): size = ( remote_manifest[filename].size if filename in remote_manifest else 0) return FileCacheVerdict(needs_download=True, size=size) local_state = self.load_resource_file_state(dest_resource, filename) if local_state is None: local_state = self.build_resource_file_state( dest_resource, filename) self.save_resource_file_state(dest_resource, local_state) else: timestamp = self.get_resource_file_timestamp( dest_resource, filename) size = self.get_resource_file_size(dest_resource, filename) if timestamp != local_state.timestamp or \ size != local_state.size: local_state = self.build_resource_file_state( dest_resource, filename) self.save_resource_file_state(dest_resource, local_state) if filename not in remote_manifest: self.delete_resource_file(dest_resource, filename) return FileCacheVerdict(needs_download=False, size=0) manifest_entry = remote_manifest[filename] if local_state.md5 != manifest_entry.md5: return FileCacheVerdict( needs_download=True, size=manifest_entry.size) return FileCacheVerdict(needs_download=False, size=0)
[docs] def update_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> ResourceFileState | None: """Update a resource file into repository if needed.""" verdict = self.classify_resource_file( remote_resource, dest_resource, filename) if verdict.needs_download: return self.copy_resource_file( remote_resource, dest_resource, filename) # No download needed: a file deleted because it left the remote # manifest has no state to return (load returns None); an up-to-date # file returns its current persisted state. return self.load_resource_file_state(dest_resource, filename)
[docs] def build_content_file(self) -> list[dict[str, Any]]: """Build the content of the repository (i.e '.CONTENTS.json' file).""" content = [ { "full_id": res.get_full_id(), "id": res.resource_id, "version": res.get_version_str(), "config": res.get_config(), "manifest": res.get_manifest().to_manifest_entries(), } for res in self.get_all_resources()] content = sorted(content, key=operator.itemgetter("id")) content_filepath = os.path.join( self.url, GR_CONTENTS_FILE_NAME) # gzip header OS byte (offset 9) is normalised to 0xff # ("unknown") so the file is byte-deterministic across # Python distributions. Upstream CPython hardcodes 0xff, # but Debian's Python patches gzip.compress to emit 0x03 # ("Unix"), which means the same input produces different # bytes between a conda Python and a python:3.x-slim # container — enough to flag .CONTENTS.json.gz as modified # under `git status --porcelain` in CI even when the JSON # payload is identical. gz = gzip.compress( json.dumps( content, indent=2, sort_keys=True).encode("utf8"), mtime=0) gz = gz[:9] + b"\xff" + gz[10:] with self.filesystem.open(content_filepath, "wb") as outfile: outfile.write(gz) with self.filesystem.open( content_filepath[:-3], "wt", encoding="utf8") as outfile: json.dump(content, outfile, indent=2, sort_keys=True) return content
[docs] def get_content_file_path(self) -> str: return os.path.join(self.url, GR_CONTENTS_FILE_NAME[:-3])
[docs] def build_index_info( self, repository_template: str = "grr_index.jinja", about_template: str | None = "grr_about.jinja", ) -> dict: """Build info dict for the repository.""" result = {} for res in self.get_all_resources(): res_size = convert_size( sum(f for _, f in res.get_manifest().get_files()), ) assert res.config is not None result[res.get_full_id()] = { "res_full_id": res.get_full_id(), "res_id": res.resource_id, **res.config, "res_version": res.get_version_str(), "res_files": len(list(res.get_manifest().get_files())), "res_size": res_size, "res_summary": res.get_summary(), } about_md_path = os.path.join(self.url, "about.md") has_about = self.filesystem.exists(about_md_path) about_html_content = "" if has_about: with self.filesystem.open( about_md_path, "rt", encoding="utf8") as infile: about_md_raw = infile.read() try: about_html_content = markdown(about_md_raw) except Exception as e: # pylint: disable=broad-exception-caught logger.exception( "Error occurred while converting about.md to HTML for %s", self.get_url(), ) raise ValueError from e with self.filesystem.open( os.path.join(self.url, "about.html"), "wt", encoding="utf8", ) as outfile: if about_template is not None: outfile.write(get_template(about_template).render( about_contents=about_html_content)) else: outfile.write(about_html_content) sqlite3_hash = "" gz_path = os.path.join(self.url, GR_SQLITE_META_FILE_NAME) if self.filesystem.exists(gz_path): with self.filesystem.open(gz_path, "rb") as gz_file: gz_bytes: bytes = cast(bytes, gz_file.read()) sqlite3_hash = hashlib.md5(gz_bytes).hexdigest() # noqa: S324 content_filepath = os.path.join(self.url, GR_INDEX_FILE_NAME) with self.filesystem.open( content_filepath, "wt", encoding="utf8") as outfile: outfile.write(get_template(repository_template).render( data=result, has_about=has_about, sqlite3_hash=sqlite3_hash, )) return result
[docs] def build_local_resource( dirname: str, config: dict[str, Any]) -> GenomicResource: """Build a resource from a local filesystem directory.""" proto = build_fsspec_protocol("d", dirname) return GenomicResource(".", (0, ), proto, config)
def _build_filesystem( url: str, **kwargs: Any, ) -> fsspec.AbstractFileSystem: # pylint: disable=import-outside-toplevel parsed_url = urlparse(url) if parsed_url.scheme in {"file", ""}: from fsspec.implementations.local import LocalFileSystem return LocalFileSystem() if parsed_url.scheme in {"http", "https"}: import aiohttp from fsspec.implementations.http import HTTPFileSystem base_url = kwargs.get("base_url") # Relax aiohttp's default 300s total read timeout: a large GRR # resource (e.g. the ~15GB genome-wide gnomAD file) legitimately # downloads for far longer. total=None lifts the overall cap while # sock_read/sock_connect still turn a genuinely stalled read or hung # connect into a (retryable) error rather than killing the run. See # gain#43. return HTTPFileSystem(client_kwargs={ "base_url": base_url, "timeout": aiohttp.ClientTimeout( total=None, sock_read=120, sock_connect=60), }) if parsed_url.scheme == "s3": from s3fs.core import S3FileSystem endpoint_url = kwargs.get("endpoint_url") return S3FileSystem( anon=False, client_kwargs={"endpoint_url": endpoint_url}) if parsed_url.scheme == "memory": from fsspec.implementations.memory import MemoryFileSystem return MemoryFileSystem() raise NotImplementedError(f"unsupported schema {parsed_url.scheme}") FsspecRepositoryProtocol = FsspecReadOnlyProtocol | FsspecReadWriteProtocol _FSSPEC_PROTOCOLS: dict[tuple[str, str], FsspecRepositoryProtocol] = {}
[docs] def build_fsspec_protocol( proto_id: str, root_url: str, **kwargs: str | None, ) -> FsspecRepositoryProtocol: """Create fsspec GRR protocol based on the root url.""" # pylint: disable=import-outside-toplevel public_url = kwargs.pop("public_url", None) read_only = kwargs.pop("read_only", False) filesystem = _build_filesystem(root_url, **kwargs) url = urlparse(root_url) if url.scheme in {"file", "", "s3", "memory"}: if read_only: return FsspecReadOnlyProtocol( proto_id, root_url, filesystem=filesystem, public_url=public_url, **kwargs) return FsspecReadWriteProtocol( proto_id, root_url, filesystem=filesystem, public_url=public_url, **kwargs) if url.scheme in {"http", "https"}: return FsspecReadOnlyProtocol( proto_id, root_url, filesystem=filesystem, public_url=public_url, **kwargs) raise NotImplementedError(f"unsupported schema {url.scheme}")