[docs]classCacheResource(GenomicResource):"""Represents resources stored in cache."""def__init__(self,resource:GenomicResource,protocol:CachingProtocol):super().__init__(resource.resource_id,resource.version,protocol,config=resource.config,manifest=resource.get_manifest())
[docs]defrefresh_cached_resource_file(self,resource:GenomicResource,filename:str)->tuple[str,str]:"""Refresh a resource file in cache if neccessary."""assertresource.proto==selfiffilename.endswith(".lockfile"):# Ignore lockfilesreturn(resource.resource_id,filename)remote_resource=self.remote_protocol.get_resource(resource.resource_id,f"={resource.get_version_str()}")# Lock the resource file to avoid caching it simultaneouslywithself.local_protocol.obtain_resource_file_lock(resource,filename):self.local_protocol.update_resource_file(remote_resource,resource,filename)return(resource.resource_id,filename)
[docs]defrefresh_cached_resource(self,resource:GenomicResource)->None:"""Refresh all resource files in cache if neccessary."""assertresource.proto==selfforentryinresource.get_manifest():filename=entry.nameiffilename.endswith(".lockfile"):continueremote_resource=self.remote_protocol.get_resource(resource.resource_id,f"={resource.get_version_str()}")# Lock the resource file to avoid caching it simultaneouslywithself.local_protocol.obtain_resource_file_lock(resource,filename):self.local_protocol.update_resource_file(remote_resource,resource,filename)
[docs]defget_resource_url(self,resource:GenomicResource)->str:"""Return url of the specified resources."""returnself.local_protocol.get_resource_url(resource)
[docs]defget_resource_file_url(self,resource:GenomicResource,filename:str)->str:"""Return url of a file in the resource."""self.refresh_cached_resource_file(resource,filename)returnself.local_protocol.get_resource_file_url(resource,filename)
[docs]defopen_raw_file(self,resource:GenomicResource,filename:str,mode:str="rt",**kwargs:str|bool|None)->IO:if"w"inmode:raiseOSError(f"Read-Only caching protocol {self.get_id()} trying to open "f"{filename} for writing")self.refresh_cached_resource_file(resource,filename)returnself.local_protocol.open_raw_file(resource,filename,mode,**kwargs)
def_get_or_create_cache_proto(self,proto:ReadOnlyRepositoryProtocol)->CachingProtocol:proto_id=proto.proto_idifproto_idnotinself.cache_protos:cached_proto_url=os.path.join(self.cache_url,proto_id)logger.debug("going to create cached protocol with url: %s",cached_proto_url)cache_proto=build_fsspec_protocol(f"{proto_id}.cached",cached_proto_url,**self.additional_kwargs)ifnotisinstance(cache_proto,FsspecReadWriteProtocol):raiseValueError(f"caching protocol should be RW;"f"{cached_proto_url} is not RW")self.cache_protos[proto_id]= \
CachingProtocol(proto,cache_proto)returnself.cache_protos[proto_id]
[docs]deffind_resource(self,resource_id:str,version_constraint:str|None=None,repository_id:str|None=None,)->GenomicResource|None:"""Return requested resource or None if not found."""matching_resources:list[GenomicResource]=[]forresinself.get_all_resources():ifres.resource_id!=resource_id:continueifrepository_idisnotNoneand \
res.proto.proto_id!=repository_id:continueifis_version_constraint_satisfied(version_constraint,res.version):matching_resources.append(res)ifnotmatching_resources:returnNonedefget_resource_version(res:GenomicResource)->tuple[int,...]:returnres.versionreturnmax(matching_resources,key=get_resource_version)
[docs]defget_resource_cached_files(self,resource_id:str)->set[str]:"""Get a set of filenames of cached files for a given resource."""resource=self.child.get_resource(resource_id)cache_proto=self._get_or_create_cache_proto(resource.proto)cached_files=set()forfilenamein[entry.nameforentryinresource.get_manifest()]:iffilename==GR_CONF_FILE_NAME:continueifcache_proto.local_protocol.file_exists(resource,filename):cached_files.add(filename)returncached_files
[docs]defcache_resources(repository:GenomicResourceRepo,resource_ids:Iterable[str]|None,workers:int|None=None)->None:"""Cache resources from a list of remote resource IDs."""# pylint: disable=import-outside-toplevelfromdae.genomic_resourcesimportget_resource_implementation_builderexecutor=ThreadPoolExecutor(max_workers=workers)futures=[]ifresource_idsisNone:resources:list[GenomicResource]= \
list(repository.get_all_resources())else:resources=[]forresource_idinresource_ids:remote_res=repository.get_resource(resource_id)assertremote_resisnotNone,resource_idresources.append(remote_res)forresourceinresources:ifnotisinstance(resource.proto,CachingProtocol):continuecached_proto=resource.protoimpl_builder=get_resource_implementation_builder(resource.get_type())ifimpl_builderisNone:logger.info("unexpected resource type <%s> for resource %s; ""updating resource",resource.get_type(),resource.resource_id)futures.append(executor.submit(cached_proto.refresh_cached_resource,resource,),)continuefutures.append(executor.submit(cached_proto.refresh_cached_resource_file,# type: ignoreresource,"genomic_resource.yaml",),)impl=impl_builder(resource)forres_fileinimpl.files:logger.info("request to cache resource file: (%s, %s) from %s",resource.resource_id,res_file,cached_proto.remote_protocol.proto_id)futures.append(executor.submit(cached_proto.refresh_cached_resource_file,# type: ignoreresource,res_file,),)total_files=len(futures)logger.info("caching %s files",total_files)forcount,futureinenumerate(as_completed(futures)):filename:strresource_id,filename=future.result()# type: ignorelogger.info("finished %s/%s (%s: %s)",count,total_files,resource_id,filename)executor.shutdown()