Source code for gain.genomic_resources.cached_repository

"""Provides caching genomic resources."""
from __future__ import annotations

import logging
import os
import sys
import threading
from collections.abc import Callable, Generator, Iterable
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import IO, Any, cast

import apsw
import pysam
from tqdm import tqdm

from gain.genomic_resources.fsspec_protocol import (
    FileCacheVerdict,
    FsspecReadWriteProtocol,
)
from gain.genomic_resources.repository import (
    GR_CONF_FILE_NAME,
    GenomicResource,
    GenomicResourceRepo,
    Manifest,
    ReadOnlyRepositoryProtocol,
    is_version_constraint_satisfied,
    parse_resource_id_version,
    version_tuple_to_string,
)

from .fsspec_protocol import build_fsspec_protocol

logger = logging.getLogger(__name__)


[docs] class CacheResource(GenomicResource): """Represents resources stored in cache.""" def __init__(self, resource: GenomicResource, protocol: CachingProtocol): super().__init__( resource.resource_id, resource.version, protocol, config=resource.config, manifest=resource.get_manifest())
[docs] class CachingProtocol(ReadOnlyRepositoryProtocol): """Defines caching GRR repository protocol.""" def __init__( self, remote_protocol: ReadOnlyRepositoryProtocol, local_protocol: FsspecReadWriteProtocol, public_url: str | None = None, ): self.remote_protocol = remote_protocol self.local_protocol = local_protocol super().__init__(local_protocol.proto_id, local_protocol.get_url()) self.public_url = public_url or remote_protocol.get_public_url() self._all_resources: dict[str, CacheResource] | None = None
[docs] def get_url(self) -> str: return self.remote_protocol.get_url()
[docs] def get_public_url(self) -> str: return self.public_url
[docs] def invalidate(self) -> None: self.remote_protocol.invalidate() self.local_protocol.invalidate() self._all_resources = None
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: yield from self.get_all_resources_dict().values()
[docs] def get_all_resources_dict(self) -> dict[str, GenomicResource]: if self._all_resources is None: self._all_resources = { resource.get_full_id(): self._create_cache_resource(resource) for resource in self.remote_protocol.get_all_resources() } self.local_protocol.invalidate() return cast(dict[str, GenomicResource], self._all_resources)
def _create_cache_resource( self, remote_resource: GenomicResource) -> CacheResource: return CacheResource( remote_resource, self)
[docs] def refresh_cached_resource_file( self, resource: GenomicResource, filename: str, ) -> tuple[str, str]: """Refresh a resource file in cache if neccessary.""" assert resource.proto == self if filename.endswith(".lockfile"): # Ignore lockfiles return (resource.resource_id, filename) remote_resource = self.remote_protocol.get_resource( resource.resource_id, f"={resource.get_version_str()}") # Lock the resource file to avoid caching it simultaneously with self.local_protocol.obtain_resource_file_lock(resource, filename): self.local_protocol.update_resource_file( remote_resource, resource, filename) return (resource.resource_id, filename)
[docs] def refresh_cached_resource( self, resource: GenomicResource, ) -> tuple[str, None]: """Refresh all resource files in cache if neccessary.""" assert resource.proto == self for entry in resource.get_manifest(): filename = entry.name if filename.endswith(".lockfile"): continue remote_resource = self.remote_protocol.get_resource( resource.resource_id, f"={resource.get_version_str()}") # Lock the resource file to avoid caching it simultaneously with self.local_protocol.obtain_resource_file_lock( resource, filename): self.local_protocol.update_resource_file( remote_resource, resource, filename) return (resource.resource_id, None)
[docs] def classify_cached_resource_file( self, resource: GenomicResource, filename: str, ) -> FileCacheVerdict: """Classify a resource file without taking any lock or downloading. The lock-free decision half of :meth:`refresh_cached_resource_file`: it resolves the remote resource and delegates to the local protocol's :meth:`classify_resource_file`. See gain#78. """ assert resource.proto == self if filename.endswith(".lockfile"): # Ignore lockfiles return FileCacheVerdict(needs_download=False, size=0) remote_resource = self.remote_protocol.get_resource( resource.resource_id, f"={resource.get_version_str()}") return self.local_protocol.classify_resource_file( remote_resource, resource, filename)
[docs] def download_cached_resource_file( self, resource: GenomicResource, filename: str, *, on_bytes: Callable[[int], None] | None = None, ) -> tuple[str, str]: """Download a resource file into cache unconditionally. Takes the per-file lock and copies the file regardless of its local state -- the decision was already made by :meth:`classify_cached_resource_file`. See gain#78. """ assert resource.proto == self remote_resource = self.remote_protocol.get_resource( resource.resource_id, f"={resource.get_version_str()}") # Lock the resource file to avoid caching it simultaneously with self.local_protocol.obtain_resource_file_lock(resource, filename): self.local_protocol.copy_resource_file( remote_resource, resource, filename, on_bytes=on_bytes) return (resource.resource_id, filename)
[docs] def get_resource_url(self, resource: GenomicResource) -> str: """Return url of the specified resources.""" return self.local_protocol.get_resource_url(resource)
[docs] def get_resource_file_url( self, resource: GenomicResource, filename: str) -> str: """Return url of a file in the resource.""" self.refresh_cached_resource_file(resource, filename) return self.local_protocol.get_resource_file_url(resource, filename)
[docs] def open_raw_file( self, resource: GenomicResource, filename: str, mode: str = "rt", **kwargs: str | bool | None) -> IO: if "w" in mode: raise OSError( f"Read-Only caching protocol {self.get_id()} trying to open " f"{filename} for writing") self.refresh_cached_resource_file(resource, filename) return self.local_protocol.open_raw_file( resource, filename, mode, **kwargs)
[docs] def open_tabix_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.TabixFile: self.refresh_cached_resource_file(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" self.refresh_cached_resource_file(resource, index_filename) return self.local_protocol.open_tabix_file( resource, filename, index_filename)
[docs] def open_vcf_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None) -> pysam.VariantFile: self.refresh_cached_resource_file(resource, filename) if index_filename is None: index_filename = f"{filename}.tbi" self.refresh_cached_resource_file(resource, index_filename) return self.local_protocol.open_vcf_file( resource, filename, index_filename)
[docs] def open_fasta_file( self, resource: GenomicResource, filename: str, index_filename: str | None = None, compressed_index_filename: str | None = None) -> pysam.FastaFile: self.refresh_cached_resource_file(resource, filename) if index_filename is None: index_filename = f"{filename}.fai" self.refresh_cached_resource_file(resource, index_filename) if compressed_index_filename is None: compressed_index_filename = f"{filename}.gzi" self.refresh_cached_resource_file(resource, compressed_index_filename) return self.local_protocol.open_fasta_file( resource, filename, index_filename, compressed_index_filename)
[docs] def open_bigwig_file( self, resource: GenomicResource, filename: str) -> Any: self.refresh_cached_resource_file(resource, filename) return self.local_protocol.open_bigwig_file(resource, filename)
[docs] def file_exists(self, resource: GenomicResource, filename: str) -> bool: self.refresh_cached_resource_file(resource, filename) return self.local_protocol.file_exists(resource, filename)
[docs] def load_manifest(self, resource: GenomicResource) -> Manifest: self.refresh_cached_resource_file(resource, GR_CONF_FILE_NAME) return self.remote_protocol.load_manifest(resource)
[docs] def open_repository_sqlite3_metadata_db(self) -> apsw.Connection: return self.remote_protocol.open_repository_sqlite3_metadata_db()
[docs] class GenomicResourceCachedRepo(GenomicResourceRepo): """Defines caching genomic resources repository.""" def __init__( self, child: GenomicResourceRepo, cache_url: str, **kwargs: str | None): repo_id: str = f"{child.repo_id}.caching_repo" super().__init__(repo_id) logger.debug( "creating cached GRR with cache url: %s", cache_url) self._all_resources: list[GenomicResource] | None = None self.child: GenomicResourceRepo = child self.cache_url = cache_url self.cache_protos: dict[str, CachingProtocol] = {} self.additional_kwargs = kwargs
[docs] def invalidate(self) -> None: self.child.invalidate() for proto in self.cache_protos.values(): proto.invalidate() self._all_resources = None
[docs] def get_all_resources(self) -> Generator[GenomicResource, None, None]: if self._all_resources is None: self._all_resources = [] for remote_resource in self.child.get_all_resources(): cache_proto = self._get_or_create_cache_proto( remote_resource.proto) version_constraint = f"={remote_resource.get_version_str()}" self._all_resources.append( cache_proto.get_resource( remote_resource.resource_id, version_constraint)) yield from self._all_resources
[docs] def search_resources( self, search_term: str | None = None, resource_type: str | None = None, ) -> Generator[GenomicResource, None, None]: yield from self.child.search_resources(search_term, resource_type)
def _get_or_create_cache_proto( self, proto: ReadOnlyRepositoryProtocol) -> CachingProtocol: proto_id = proto.proto_id if proto_id not in self.cache_protos: cached_proto_url = os.path.join(self.cache_url, proto_id) logger.debug( "going to create cached protocol with url: %s", cached_proto_url) cache_proto = build_fsspec_protocol( f"{proto_id}.cached", cached_proto_url, **self.additional_kwargs) if not isinstance(cache_proto, FsspecReadWriteProtocol): raise ValueError( f"caching protocol should be RW;" f"{cached_proto_url} is not RW") self.cache_protos[proto_id] = \ CachingProtocol( proto, cache_proto) return self.cache_protos[proto_id]
[docs] def find_resource( self, resource_id: str, version_constraint: str | None = None, repository_id: str | None = None, ) -> GenomicResource | None: """Return requested resource or None if not found.""" if version_constraint is None: resource_id, version = parse_resource_id_version(resource_id) if version is not None: version_constraint = f"={version_tuple_to_string(version)}" matching_resources: list[GenomicResource] = [] for res in self.get_all_resources(): if res.resource_id != resource_id: continue if repository_id is not None and \ res.proto.proto_id != repository_id: continue if is_version_constraint_satisfied( version_constraint, res.version): matching_resources.append(res) if not matching_resources: return None def get_resource_version(res: GenomicResource) -> tuple[int, ...]: return res.version return max( matching_resources, key=get_resource_version)
[docs] def get_resource( self, resource_id: str, version_constraint: str | None = None, repository_id: str | None = None) -> GenomicResource: if version_constraint is None: resource_id, version = parse_resource_id_version(resource_id) if version is not None: version_constraint = f"={version_tuple_to_string(version)}" remote_resource = self.child.get_resource( resource_id, version_constraint, repository_id) cache_proto = self._get_or_create_cache_proto( remote_resource.proto) version_constraint = f"={remote_resource.get_version_str()}" return cache_proto.get_resource(resource_id, version_constraint)
[docs] def get_resource_cached_files(self, resource_id: str) -> set[str]: """Get a set of filenames of cached files for a given resource.""" resource = self.child.get_resource(resource_id) cache_proto = self._get_or_create_cache_proto( resource.proto) cached_files = set() for filename in [entry.name for entry in resource.get_manifest()]: if filename == GR_CONF_FILE_NAME: continue if cache_proto.local_protocol.file_exists(resource, filename): cached_files.add(filename) return cached_files
def _human_bytes(n: int) -> str: """Render a byte count with a 1024 divisor, e.g. ``2.8 GB``/``712.0 MB``. Used for the header line and milestone byte figures so a captured log is readable; tqdm renders its own human units for the live bar. """ size = float(n) for unit in ("B", "KB", "MB", "GB", "TB", "PB"): if size < 1024.0 or unit == "PB": return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} PB" class _CacheProgress: """Report caching progress against two metrics: bytes and files. Bytes are the primary metric (the visible bar / milestone driver); files are secondary context (a ``files=done/total`` tally). Concrete behaviours share this interface so the caching loop stays oblivious to the rendering mode: - off (``progress=False``): nothing is shown; the loop still logs its header, its DEBUG per-file lines, and the final failure summary. - a live :class:`tqdm` byte bar when stderr is a terminal. - throttled milestone log lines on byte-percentage thresholds (a ``0%`` baseline, then each 10% crossing, then ``100%``) when stderr is not a terminal, so a captured CI log stays readable and greppable. When there are bytes to download (``byte_total > 0``) the bar/milestones are byte-driven via :meth:`on_bytes`. When only zero-byte files need downloading (``byte_total == 0`` but ``file_total > 0``) the reporter falls back to a file-unit bar driven by :meth:`update` so there is still motion. :meth:`on_bytes` may be called from multiple download threads; :meth:`update` is called only from the single ``as_completed`` thread. Failures advance the counter and are surfaced as a ``failed=N`` tally. """ def __init__(self, byte_total: int, file_total: int) -> None: self.byte_total = byte_total self.file_total = file_total self.bytes_done = 0 self.done = 0 self.failed = 0 def update(self, *, failed: bool) -> None: self.done += 1 if failed: self.failed += 1 def on_bytes(self, n: int) -> None: """Credit ``n`` downloaded bytes (signed); a no-op in off mode. Subclasses that render a byte-level bar override this to advance it. """ def report_failure(self, message: str) -> None: logger.error(message) def close(self) -> None: pass class _MilestoneProgress(_CacheProgress): """Log a progress line on the ``0% / every 10% / 100%`` schedule. In byte mode (``byte_total > 0``) milestones fire on byte-percentage crossings driven by :meth:`on_bytes`; :meth:`update` only bumps the file figures shown on the next byte-milestone line. In the zero-byte fallback (``byte_total == 0``, ``file_total > 0``) milestones are file-driven via :meth:`update`, preserving the pre-gain#79 behaviour exactly. A genuine ``0%`` baseline line is emitted at construction (gain#67), seeding ``_last_bucket = 0`` so the bucket-dedup suppresses a duplicate ``0%`` line on the first crossing. When the driving total is 0 there is nothing to cache, so the baseline is skipped rather than emitting a misleading ``0/0 (100%)`` line. ``on_bytes`` is called from multiple download threads, so the byte accumulator, the bucket cursor and the logging are guarded by a lock. """ def __init__(self, byte_total: int, file_total: int) -> None: super().__init__(byte_total, file_total) self._byte_mode = byte_total > 0 self._lock = threading.Lock() driving_total = byte_total if self._byte_mode else file_total if driving_total: self._last_bucket = 0 self._log_progress() else: self._last_bucket = -1 def _pct(self) -> int: if self._byte_mode: if not self.byte_total: return 100 return min(100, self.bytes_done * 100 // self.byte_total) return self.done * 100 // self.file_total if self.file_total else 100 def _log_progress(self) -> None: failed_suffix = f", failed={self.failed}" if self.failed else "" if self._byte_mode: logger.info( "caching progress: %s/%s (%s%%), %s/%s files%s", _human_bytes(self.bytes_done), _human_bytes(self.byte_total), self._pct(), self.done, self.file_total, failed_suffix) else: logger.info( "caching progress: %s/%s files (%s%%)%s", self.done, self.file_total, self._pct(), failed_suffix) def on_bytes(self, n: int) -> None: if not self._byte_mode: return with self._lock: self.bytes_done += n pct = self._pct() bucket = pct // 10 # ``_last_bucket`` is a high-water mark: a rollback (negative # delta from a retryable failure, slice 1) lowers ``bytes_done`` # but does NOT lower the cursor, so re-crossing an # already-reported bucket logs no duplicate line. A line is # emitted only on a forward crossing into a new bucket. Bucket 10 # occurs only at 100%, so this still logs the final line exactly # once; once full, further positive deltas (e.g. a concurrent # file's chunks after a terminal-failure top-up overshot the # total) are deduped rather than re-logging 100%. if bucket <= self._last_bucket: return self._last_bucket = bucket self._log_progress() def update(self, *, failed: bool) -> None: super().update(failed=failed) if self._byte_mode: # File figures normally ride the next byte-milestone line. But a # terminal failure tops up the bytes (reaching 100%) and is # marked failed straight after -- there is no later byte line to # carry the tally, so re-log the 100% line once the bar is full # so the failed=N tally is visible. See gain#79 / gain#43. if failed and self._pct() == 100: with self._lock: self._log_progress() return bucket = self._pct() // 10 if bucket == self._last_bucket and self.done != self.file_total: return self._last_bucket = bucket self._log_progress() class _TqdmProgress(_CacheProgress): """Drive a live tqdm bar, writing failures above it via tqdm.write. In byte mode the bar counts bytes (human units + throughput + ETA), advanced by :meth:`on_bytes`; :meth:`update` only refreshes the ``files=done/total`` postfix. In the zero-byte fallback the bar counts files, advanced by :meth:`update`. ``on_bytes`` runs on multiple download threads, so bar mutations are guarded by a lock. """ def __init__(self, byte_total: int, file_total: int) -> None: super().__init__(byte_total, file_total) self._byte_mode = byte_total > 0 self._lock = threading.Lock() if self._byte_mode: self._bar = tqdm( total=byte_total, desc="caching", unit="B", unit_scale=True, unit_divisor=1024, leave=True) else: self._bar = tqdm( total=file_total, desc="caching", unit="file", leave=True) def _postfix(self) -> None: files = f"{self.done}/{self.file_total}" if self.failed: self._bar.set_postfix_str( f"files={files}, failed={self.failed}", refresh=False) else: self._bar.set_postfix_str(f"files={files}", refresh=False) def on_bytes(self, n: int) -> None: if not self._byte_mode: return with self._lock: self._bar.update(n) def update(self, *, failed: bool) -> None: super().update(failed=failed) with self._lock: self._postfix() if not self._byte_mode: self._bar.update(1) def report_failure(self, message: str) -> None: self._bar.write(message) def close(self) -> None: self._bar.close() def _make_cache_progress( byte_total: int, file_total: int, *, progress: bool, ) -> _CacheProgress: if not progress: return _CacheProgress(byte_total, file_total) if sys.stderr.isatty(): return _TqdmProgress(byte_total, file_total) return _MilestoneProgress(byte_total, file_total) def _resolve_resources( repository: GenomicResourceRepo, resource_ids: Iterable[str] | None, ) -> list[GenomicResource]: """Resolve the remote resources to cache, either all or a given list.""" if resource_ids is None: return list(repository.get_all_resources()) resources: list[GenomicResource] = [] for resource_id in resource_ids: remote_res = repository.get_resource(resource_id) assert remote_res is not None, resource_id resources.append(remote_res) return resources def _enumerate_resource_files( resource: GenomicResource, ) -> list[str]: """Return the file set to consider for caching a single resource. Mirrors the pre-refactor selection exactly: a resource of a known implementation type contributes ``genomic_resource.yaml`` plus the implementation's ``files``; a resource of an unknown type contributes every manifest entry except ``.lockfile`` files (the coarse ``refresh_cached_resource`` set). See gain#78. """ # pylint: disable=import-outside-toplevel from gain.genomic_resources import get_resource_implementation_builder impl_builder = get_resource_implementation_builder(resource.get_type()) if impl_builder is None: logger.info( "unexpected resource type <%s> for resource %s; " "updating resource", resource.get_type(), resource.resource_id) return [ entry.name for entry in resource.get_manifest() if not entry.name.endswith(".lockfile") ] impl = impl_builder(resource) return ["genomic_resource.yaml", *impl.files] def _build_cache_worklist( cached_proto: CachingProtocol, resource: GenomicResource, filenames: Iterable[str], workers: int | None = None, ) -> tuple[list[tuple[GenomicResource, str, int]], int, int, list[str]]: """Classify ``filenames`` of ``resource`` (lock-free) into a work-list. Returns ``(worklist, total_bytes, already_cached, failures)`` where ``worklist`` is the list of ``(resource, filename, size)`` entries that need downloading, ``total_bytes`` is the summed manifest size of those entries, ``already_cached`` counts the files that need no download, and ``failures`` collects per-file classify errors (a classify failure must not abort the whole run). Classification is lock-free, so it is fanned out across a thread pool. See gain#43, gain#78. """ filenames = list(filenames) worklist: list[tuple[GenomicResource, str, int]] = [] total_bytes = 0 already_cached = 0 failures: list[str] = [] with ThreadPoolExecutor(max_workers=workers) as classify_executor: future_to_name = { classify_executor.submit( cached_proto.classify_cached_resource_file, resource, filename): filename for filename in filenames } for future, filename in future_to_name.items(): try: verdict = future.result() except Exception as error: # noqa: BLE001 - report, don't abort # A classify failure (e.g. a corrupt .state, or a resource # gone from the remote) must not discard the whole run; it is # collected and surfaced in the end-of-run summary like a # download failure. See gain#43. failures.append( f"{resource.resource_id}: {filename} ({error})") # One concise line per failure; a stack trace per failed file # would swamp a large run (see the gain#43 rationale in the # download loop), so logger.error not logger.exception. logger.error( # noqa: TRY400 "failed to classify (%s: %s): %s", resource.resource_id, filename, error) continue if verdict.needs_download: worklist.append((resource, filename, verdict.size)) total_bytes += verdict.size else: already_cached += 1 return worklist, total_bytes, already_cached, failures def _classify_resources( resources: list[GenomicResource], workers: int | None, ) -> tuple[list[tuple[GenomicResource, str, int]], int, int, list[str]]: """Phase A: classify every resource's files (lock-free) into a work-list. Returns ``(worklist, total_bytes, already_cached, failures)``. A classify failure for one file is collected (not raised) so the run continues and surfaces it in the end-of-run summary, preserving the gain#43 contract that one file failing must not discard the whole run. See gain#78. """ worklist: list[tuple[GenomicResource, str, int]] = [] total_bytes = 0 already_cached = 0 failures: list[str] = [] for resource in resources: if not isinstance(resource.proto, CachingProtocol): continue filenames = _enumerate_resource_files(resource) res_worklist, res_bytes, res_cached, res_failures = \ _build_cache_worklist( resource.proto, resource, filenames, workers) worklist.extend(res_worklist) total_bytes += res_bytes already_cached += res_cached failures.extend(res_failures) return worklist, total_bytes, already_cached, failures
[docs] def cache_resources( repository: GenomicResourceRepo, resource_ids: Iterable[str] | None, workers: int | None = None, *, progress: bool = True, ) -> None: """Cache resources from a list of remote resource IDs.""" resources = _resolve_resources(repository, resource_ids) # Phase A: classify (lock-free) the same file set as before into an # authoritative work-list of files that actually need downloading. A # classify failure is collected, not raised, so the run still proceeds # and surfaces it in the end-of-run summary (gain#43). worklist, total_bytes, already_cached, classify_failures = \ _classify_resources(resources, workers) logger.info( "caching %s file(s), %s to download; %s already cached", len(worklist), _human_bytes(total_bytes), already_cached) # Nothing to download: the header already reported it; skip the executor, # the reporter and the bar entirely (gain#67 nothing-to-do behaviour). if not worklist: if classify_failures: summary = "\n".join(f" - {f}" for f in classify_failures) raise RuntimeError( f"failed to cache {len(classify_failures)}/" f"{len(classify_failures)} resource file(s):\n{summary}") return # Phase B: download (with per-file locks) only the work-list entries. executor = ThreadPoolExecutor(max_workers=workers) # Each future maps to its label and manifest size; the size feeds the # terminal-failure byte top-up below. futures: dict[Future, tuple[str, int]] = {} total_files = len(worklist) reporter = _make_cache_progress( byte_total=total_bytes, file_total=total_files, progress=progress) for resource, filename, size in worklist: cached_proto = cast(CachingProtocol, resource.proto) logger.debug( "request to cache resource file: (%s, %s) from %s", resource.resource_id, filename, cached_proto.remote_protocol.proto_id) futures[executor.submit( cached_proto.download_cached_resource_file, resource, filename, on_bytes=reporter.on_bytes, )] = (f"{resource.resource_id}: {filename}", size) failures: list[str] = list(classify_failures) try: for count, future in enumerate(as_completed(futures)): label, size = futures[future] try: resource_id, filename = future.result() except Exception as error: # noqa: BLE001 - report, don't abort # A single file failing (e.g. a download that stalled past # its retries) must not discard the progress of every other # file in the run. Collect the failure and keep caching; we # raise a summary at the end so the run still fails loudly. # See gain#43. failures.append(f"{label} ({error})") # One concise line per failure; the full summary is raised at # the end. A stack trace per failed file would swamp a large # run. reporter.report_failure( f"failed {count}/{total_files} ({label}): {error}") # Slice 1 rolls a retryable terminal failure's bytes back to # net ~0, so credit the file's full size to land the byte bar # at 100%, then mark the failure so the tally shows. See # gain#79 / gain#43. Must precede update(failed=True). reporter.on_bytes(size) reporter.update(failed=True) continue logger.debug( "finished %s/%s (%s: %s)", count, total_files, resource_id, filename) reporter.update(failed=False) finally: # Cleanup must run on every exit path -- normal completion, an # unexpected exception, or a KeyboardInterrupt escaping the loop -- # so a live tqdm bar is always finalized rather than left dangling. # See gain#68. reporter.close() executor.shutdown() if failures: summary = "\n".join(f" - {failure}" for failure in failures) # Files acted on that could fail: downloads attempted (total_files) # plus files that failed classification before reaching the # work-list. (Already-cached files cannot fail, so are excluded.) attempted = total_files + len(classify_failures) raise RuntimeError( f"failed to cache {len(failures)}/{attempted} resource " f"file(s):\n{summary}")