Source code for dae.genomic_resources.fsspec_protocol

"""Provides GRR protocols based on fsspec library."""
from __future__ import annotations

import datetime
import fcntl
import hashlib
import json
import logging
import operator
import os
from collections.abc import Generator
from contextlib import AbstractContextManager
from dataclasses import asdict
from types import TracebackType
from typing import (
    IO,
    Any,
    cast,
)
from urllib.parse import urlparse

import fsspec
import jinja2
import pyBigWig  # type: ignore
import pysam
import yaml

from dae.genomic_resources.repository import (
    GR_CONF_FILE_NAME,
    GR_CONTENTS_FILE_NAME,
    GR_INDEX_FILE_NAME,
    GR_MANIFEST_FILE_NAME,
    GenomicResource,
    Manifest,
    ManifestEntry,
    Mode,
    ReadOnlyRepositoryProtocol,
    ReadWriteRepositoryProtocol,
    ResourceFileState,
    is_gr_id_token,
    parse_gr_id_version_token,
    parse_resource_id_version,
)
from dae.utils.helpers import convert_size

logger = logging.getLogger(__name__)


def _scan_for_resources(
    content_dict: dict, parent_id: list[str],
) -> Generator[tuple[str, tuple[int, ...], dict], None, None]:
    name = "/".join(parent_id)
    id_ver = parse_gr_id_version_token(name)
    if isinstance(content_dict, dict) and id_ver and \
            GR_CONF_FILE_NAME in content_dict and \
            not isinstance(content_dict[GR_CONF_FILE_NAME], dict):
        # resource found
        resource_id, version = id_ver
        yield "/".join([*parent_id, resource_id]), version, content_dict
        return

    for name, content in content_dict.items():
        id_ver = parse_gr_id_version_token(name)
        if isinstance(content, dict) and id_ver and \
                GR_CONF_FILE_NAME in content and \
                not isinstance(content[GR_CONF_FILE_NAME], dict):
            # resource found
            resource_id, version = id_ver
            yield "/".join([*parent_id, resource_id]), version, content
        else:
            curr_id = [*parent_id, name]
            curr_id_path = "/".join(curr_id)
            if not isinstance(content, dict):
                logger.warning("file <%s> is not used.", curr_id_path)
                continue
            if not is_gr_id_token(name):
                logger.warning(
                    "directory <%s> has a name <%s> that is not a "
                    "valid Genomic Resource Id Token.", curr_id_path, name)
                continue

            # scan children
            yield from _scan_for_resources(content, curr_id)


def _scan_for_resource_files(
    content_dict: dict[str, Any], parent_dirs: list[str],
) -> Generator[tuple[str, str | bytes], None, None]:

    for path, content in content_dict.items():
        if isinstance(content, dict):
            # handle subdirectory
            for fname, fcontent in _scan_for_resource_files(
                    content, [*parent_dirs, path]):
                yield fname, fcontent
        else:
            fname = "/".join([*parent_dirs, path])
            if isinstance(content, (str, bytes)):
                # handle file content
                yield fname, content
            else:
                logger.error(
                    "unexpected content at %s: %s", fname, content)
                raise TypeError(f"unexpected content at {fname}: {content}")


[docs] def build_inmemory_protocol( proto_id: str, root_path: str, content: dict[str, Any]) -> FsspecReadWriteProtocol: """Build and return an embedded fsspec protocol for testing.""" if not os.path.isabs(root_path): logger.error( "for embedded resources repository we expects an " "absolute path: %s", root_path) raise ValueError(f"not an absolute root path: {root_path}") proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol(proto_id, f"memory://{root_path}")) for rid, rver, rcontent in _scan_for_resources(content, []): resource = GenomicResource(rid, rver, proto) for fname, fcontent in _scan_for_resource_files(rcontent, []): mode = "wt" if isinstance(fcontent, bytes): mode = "wb" with proto.open_raw_file(resource, fname, mode) as outfile: outfile.write(fcontent) proto.save_resource_file_state( resource, proto.build_resource_file_state(resource, fname)) proto.save_manifest(resource, proto.build_manifest(resource)) return proto
[docs] class FsspecReadOnlyProtocol(ReadOnlyRepositoryProtocol): """Provides fsspec genomic resources repository protocol.""" def __init__( self, proto_id: str, url: str, filesystem: fsspec.AbstractFileSystem): super().__init__(proto_id, url) parsed = urlparse(url) self.scheme = parsed.scheme if self.scheme == "": self.scheme = "file" self.netloc = parsed.netloc self.root_path = parsed.path self.url = f"{self.scheme}://{self.netloc}{self.root_path}" self.filesystem = filesystem self._all_resources: list[GenomicResource] | None = None
[docs] def get_url(self) -> str: return self.url
[docs] def invalidate(self) -> None: self._all_resources = None
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources in the repository.""" if self._all_resources is None: self._all_resources = [] content_filename = os.path.join( self.url, GR_CONTENTS_FILE_NAME) if not self.filesystem.exists(content_filename): content_filename = content_filename[:-5] with self.filesystem.open(content_filename, "rt") as infile: data = infile.read() if content_filename.endswith(".json"): contents = json.loads(data) else: contents = yaml.safe_load(data) for entry in contents: version = tuple(map(int, entry["version"].split("."))) manifest = Manifest.from_manifest_entries(entry["manifest"]) resource = self.build_genomic_resource( entry["id"], version, config=entry["config"], manifest=manifest) logger.debug( "repo %s loaded resource %s", self.proto_id, resource.resource_id) self._all_resources.append(resource) self._all_resources = sorted( self._all_resources, key=lambda r: r.get_genomic_resource_id_version()) yield from self._all_resources
[docs] def file_exists( self, resource: GenomicResource, filename: str) -> bool: filepath = self.get_resource_file_url(resource, filename) return cast(bool, self.filesystem.exists(filepath))
[docs] def load_manifest(self, resource: GenomicResource) -> Manifest: """Load resource manifest.""" content = self.get_file_content(resource, GR_MANIFEST_FILE_NAME) return Manifest.from_file_content(content)
[docs] def open_raw_file( self, resource: GenomicResource, filename: str, mode: str = "rt", **kwargs: str | bool | None) -> IO: filepath = self.get_resource_file_url(resource, filename) if "w" in mode: if self.mode() == Mode.READONLY: raise OSError( f"Read-Only protocol {self.get_id()} trying to open " f"{filepath} for writing") # Create the containing directory if it doesn't exists. parent = os.path.dirname(filepath) if not self.filesystem.exists(parent): self.filesystem.mkdir( parent, create_parents=True, exist_ok=True) compression = None if kwargs.get("compression"): compression = "gzip" return cast( IO, self.filesystem.open( filepath, mode=mode, compression=compression))
def _get_file_url(self, resource: GenomicResource, filename: str) -> str: def process_file_url(url: str) -> str: if self.scheme == "file": return urlparse(url).path if self.scheme == "s3": return cast(str, self.filesystem.sign(url)) return url return process_file_url(self.get_resource_file_url(resource, filename))
[docs] def open_tabix_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.TabixFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"tabix files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" index_url = self._get_file_url(resource, index_filename) return pysam.TabixFile( # pylint: disable=no-member file_url, index=index_url, encoding="utf-8", parser=pysam.asTuple())
[docs] def open_vcf_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.VariantFile: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"vcf files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" index_url = self._get_file_url(resource, index_filename) return pysam.VariantFile( # pylint: disable=no-member file_url, index_filename=index_url)
[docs] def open_bigwig_file( self, resource: GenomicResource, filename: str) -> Any: if self.scheme not in {"file", "s3", "http", "https"}: raise OSError( f"bigwig files are not supported on schema {self.scheme}") file_url = self._get_file_url(resource, filename) return pyBigWig.open(file_url) # pylint: disable=I1101
[docs] class FsspecReadWriteProtocol( FsspecReadOnlyProtocol, ReadWriteRepositoryProtocol): """Provides fsspec genomic resources repository protocol.""" def __init__( self, proto_id: str, url: str, filesystem: fsspec.AbstractFileSystem): super().__init__(proto_id, url, filesystem) self.filesystem.makedirs(self.url, exist_ok=True) def _get_resource_file_lockfile_path( self, resource: GenomicResource, filename: str, ) -> str: """Return path of the resource file's lockfile.""" if self.scheme != "file": raise NotImplementedError resource_url = self.get_resource_url(resource) path = os.path.join(resource_url, ".grr", f"{filename}.lockfile") return path.removeprefix(f"{self.scheme}://")
[docs] def obtain_resource_file_lock( self, resource: GenomicResource, filename: str, ) -> AbstractContextManager: """Lock a resource's file.""" class Lock: """Lock representation.""" def __enter__(self) -> None: pass def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None) -> None: pass lock = Lock() if self.scheme == "file": path = self._get_resource_file_lockfile_path(resource, filename) if not self.filesystem.exists(os.path.dirname(path)): self.filesystem.makedirs( os.path.dirname(path), exist_ok=True) # pylint: disable=consider-using-with lockfile = open(path, "wt", encoding="utf8") # noqa lockfile.write(str(datetime.datetime.now()) + "\n") fcntl.flock(lockfile, fcntl.LOCK_EX) lock.__enter__ = lockfile.__enter__ # type: ignore lock.__exit__ = lockfile.__exit__ # type: ignore return lock
def _scan_path_for_resources( self, path_array: list[str], ) -> Generator[Any, None, None]: url = os.path.join(self.url, *path_array) path = os.path.join(self.root_path, *path_array) assert isinstance(url, str) if not self.filesystem.isdir(url): return content = [] for direntry in self.filesystem.ls(url, detail=False): if self.netloc and direntry.startswith(self.netloc): direntry = direntry[len(self.netloc):] name = os.path.relpath(direntry, path) if name.startswith("."): continue content.append(name) if GR_CONF_FILE_NAME in content: res_path = "/".join(path_array) resource_id, version = parse_resource_id_version(res_path) if resource_id is None: logger.error("bad resource id/version: %s", res_path) return yield resource_id, version, res_path else: for name in content: yield from self._scan_path_for_resources([*path_array, name]) def _scan_resource_for_files( self, resource_path: str, path_array: list[str], ) -> Generator[Any, None, None]: url = os.path.join(self.url, resource_path, *path_array) if not self.filesystem.isdir(url): if path_array: yield os.path.join(*path_array), url return path = os.path.join(self.root_path, resource_path, *path_array) content = [] for direntry in self.filesystem.ls(url, detail=False): if self.netloc and direntry.startswith(self.netloc): direntry = direntry[len(self.netloc):] name = os.path.relpath(direntry, path) if name.startswith("."): continue content.append(name) for name in content: yield from self._scan_resource_for_files( resource_path, [*path_array, name]) def _get_filepath_timestamp(self, filepath: str) -> float: try: modification = self.filesystem.modified(filepath) modification = modification.replace(tzinfo=datetime.timezone.utc) return cast(float, round(modification.timestamp(), 2)) except NotImplementedError: info = self.filesystem.info(filepath) modification = info.get("created") return cast(float, round(modification, 2))
[docs] def collect_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources managed by this protocol.""" for res_id, res_ver, res_path in self._scan_path_for_resources([]): res_fullpath = os.path.join(self.root_path, res_path) assert res_fullpath.startswith("/") res_fullpath = f"{self.scheme}://{self.netloc}{res_fullpath}" with self.filesystem.open( os.path.join( res_fullpath, GR_CONF_FILE_NAME), "rt") as infile: config = yaml.safe_load(infile) manifest: Manifest | None = None manifest_filename = os.path.join( res_fullpath, GR_MANIFEST_FILE_NAME) if self.filesystem.exists(manifest_filename): with self.filesystem.open(manifest_filename, "rt") as infile: logger.debug("loading manifest from %s", manifest_filename) manifest = Manifest.from_file_content(infile.read()) yield self.build_genomic_resource( res_id, res_ver, config, manifest)
[docs] def collect_resource_entries(self, resource: GenomicResource) -> Manifest: """Scan the resource and resturn a manifest.""" resource_path = resource.get_genomic_resource_id_version() result = Manifest() for name, path in self._scan_resource_for_files(resource_path, []): if name.endswith("html"): # Ignore generated info files continue size = self._get_filepath_size(path) result.add(ManifestEntry(name, size, None)) return result
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: """Return generator over all resources in the repository.""" if self._all_resources is None: self._all_resources = sorted( self.collect_all_resources(), key=lambda r: r.get_genomic_resource_id_version()) yield from self._all_resources
def _get_resource_file_state_path( self, resource: GenomicResource, filename: str) -> str: """Return filename of the resource file state path.""" resource_url = self.get_resource_url(resource) return os.path.join(resource_url, ".grr", f"{filename}.state")
[docs] def get_resource_file_timestamp( self, resource: GenomicResource, filename: str) -> float: url = self.get_resource_file_url(resource, filename) return self._get_filepath_timestamp(url)
def _get_filepath_size( self, filepath: str) -> int: fileinfo = self.filesystem.info(filepath) return int(fileinfo["size"])
[docs] def get_resource_file_size( self, resource: GenomicResource, filename: str) -> int: path = self.get_resource_file_url(resource, filename) return self._get_filepath_size(path)
[docs] def save_resource_file_state( self, resource: GenomicResource, state: ResourceFileState) -> None: """Save resource file state into internal GRR state.""" path = self._get_resource_file_state_path(resource, state.filename) if not self.filesystem.exists(os.path.dirname(path)): self.filesystem.makedirs( os.path.dirname(path), exist_ok=True) content = asdict(state) with self.filesystem.open(path, "wt", encoding="utf8") as outfile: outfile.write(yaml.safe_dump(content))
[docs] def load_resource_file_state( self, resource: GenomicResource, filename: str) -> ResourceFileState | None: """Load resource file state from internal GRR state. If the specified resource file has no internal state returns None. """ path = self._get_resource_file_state_path(resource, filename) if not self.filesystem.exists(path): return None with self.filesystem.open(path, "rt", encodings="utf8") as infile: content = yaml.safe_load(infile.read()) return ResourceFileState( content["filename"], content["size"], content["timestamp"], content["md5"], )
[docs] def delete_resource_file( self, resource: GenomicResource, filename: str) -> None: """Delete a resource file and it's internal state.""" filepath = self.get_resource_file_url(resource, filename) if self.filesystem.exists(filepath): self.filesystem.delete(filepath) statepath = self._get_resource_file_state_path(resource, filename) if self.filesystem.exists(statepath): self.filesystem.delete(statepath)
[docs] def copy_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> ResourceFileState | None: """Copy a resource file into repository.""" assert dest_resource.resource_id == remote_resource.resource_id logger.debug( "copying resource file (%s: %s) from %s", remote_resource.resource_id, filename, remote_resource.proto.proto_id) remote_manifest = remote_resource.get_manifest() if filename not in remote_manifest: self.delete_resource_file(dest_resource, filename) return None manifest_entry = remote_manifest[filename] dest_filepath = self.get_resource_file_url(dest_resource, filename) dest_parent = os.path.dirname(dest_filepath) if not self.filesystem.exists(dest_parent): self.filesystem.mkdir( dest_parent, create_parents=True, exist_ok=True) with remote_resource.open_raw_file( filename, "rb", uncompress=False) as infile, \ self.open_raw_file( dest_resource, filename, "wb", uncompress=False) as outfile: md5_hash = hashlib.md5() # noqa while chunk := infile.read(self.CHUNK_SIZE): outfile.write(chunk) md5_hash.update(chunk) md5 = md5_hash.hexdigest() if not self.filesystem.exists(dest_filepath): raise OSError(f"destination file not created {dest_filepath}") if md5 != manifest_entry.md5: raise OSError( f"file copy is broken " f"{dest_resource.resource_id} ({filename}); " f"md5sum are different: " f"{md5}!={manifest_entry.md5}") state = self.build_resource_file_state( dest_resource, filename, md5sum=md5) self.save_resource_file_state(dest_resource, state) return state
[docs] def update_resource_file( self, remote_resource: GenomicResource, dest_resource: GenomicResource, filename: str) -> ResourceFileState | None: """Update a resource file into repository if needed.""" assert dest_resource.resource_id == remote_resource.resource_id if not self.file_exists(dest_resource, filename): return self.copy_resource_file( remote_resource, dest_resource, filename) local_state = self.load_resource_file_state(dest_resource, filename) if local_state is None: local_state = self.build_resource_file_state( dest_resource, filename) self.save_resource_file_state(dest_resource, local_state) else: timestamp = self.get_resource_file_timestamp( dest_resource, filename) size = self.get_resource_file_size(dest_resource, filename) if timestamp != local_state.timestamp or \ size != local_state.size: local_state = self.build_resource_file_state( dest_resource, filename) self.save_resource_file_state(dest_resource, local_state) remote_manifest = remote_resource.get_manifest() if filename not in remote_manifest: self.delete_resource_file(dest_resource, filename) return None manifest_entry = remote_manifest[filename] if local_state.md5 != manifest_entry.md5: return self.copy_resource_file( remote_resource, dest_resource, filename) return local_state
[docs] def build_content_file(self) -> list[dict[str, Any]]: """Build the content of the repository (i.e '.CONTENTS.json' file).""" content = [ { "id": res.resource_id, "version": res.get_version_str(), "config": res.get_config(), "manifest": res.get_manifest().to_manifest_entries(), } for res in self.get_all_resources()] content = sorted(content, key=operator.itemgetter("id")) content_filepath = os.path.join( self.url, GR_CONTENTS_FILE_NAME) with self.filesystem.open( content_filepath, "wt", encoding="utf8") as outfile: json.dump(content, outfile) return content
[docs] def build_index_info(self, repository_template: jinja2.Template) -> dict: """Build info dict for the repository.""" result = {} for res in self.get_all_resources(): res_size = convert_size( sum(f for _, f in res.get_manifest().get_files()), ) assert res.config is not None result[res.get_full_id()] = { **res.config, "res_version": res.get_version_str(), "res_files": len(list(res.get_manifest().get_files())), "res_size": res_size, "res_summary": res.get_summary(), } content_filepath = os.path.join(self.url, GR_INDEX_FILE_NAME) with self.filesystem.open( content_filepath, "wt", encoding="utf8") as outfile: outfile.write(repository_template.render(data=result)) return result
[docs] def build_local_resource( dirname: str, config: dict[str, Any]) -> GenomicResource: """Build a resource from a local filesystem directory.""" proto = build_fsspec_protocol("d", dirname) return GenomicResource(".", (0, ), proto, config)
FsspecRepositoryProtocol = FsspecReadOnlyProtocol | FsspecReadWriteProtocol
[docs] def build_fsspec_protocol( proto_id: str, root_url: str, **kwargs: str | None, ) -> FsspecRepositoryProtocol: """Create fsspec GRR protocol based on the root url.""" url = urlparse(root_url) # pylint: disable=import-outside-toplevel if url.scheme in {"file", ""}: from fsspec.implementations.local import LocalFileSystem filesystem = LocalFileSystem() read_only = kwargs.get("read_only", False) if read_only: return FsspecReadOnlyProtocol( proto_id, root_url, filesystem) return FsspecReadWriteProtocol( proto_id, root_url, filesystem) if url.scheme in {"http", "https"}: from fsspec.implementations.http import HTTPFileSystem base_url = kwargs.get("base_url") filesystem = HTTPFileSystem(client_kwargs={"base_url": base_url}) return FsspecReadOnlyProtocol(proto_id, root_url, filesystem) if url.scheme == "s3": filesystem = kwargs.get("filesystem") if filesystem is None: from s3fs.core import S3FileSystem endpoint_url = kwargs.get("endpoint_url") filesystem = S3FileSystem( anon=False, client_kwargs={"endpoint_url": endpoint_url}) return FsspecReadWriteProtocol( proto_id, root_url, filesystem) if url.scheme == "memory": from fsspec.implementations.memory import MemoryFileSystem filesystem = MemoryFileSystem() return FsspecReadWriteProtocol(proto_id, root_url, filesystem) raise NotImplementedError(f"unsupported schema {url.scheme}")