[docs]classGenomicScoreImplementation(GenomicResourceImplementation,InfoImplementationMixin,):# pylint: disable=too-many-public-methods"""Genomic scores base class."""def__init__(self,resource:GenomicResource):super().__init__(resource)self.score:GenomicScore=build_score_from_resource(resource)
[docs]defget_config_histograms(self)->dict[str,Any]:"""Collect all configurations of histograms for the genomic score."""result:dict[str,Any]={}forscore_id,score_definself.score.score_definitions.items():result[score_id]=score_def.hist_confreturnresult
_REF_GENOME_CACHE:ClassVar[dict[str,Any]]={}@propertydeffiles(self)->set[str]:files=set()files.add(self.score.table.definition.filename)ifisinstance(self.score.table,TabixGenomicPositionTable):files.add(f"{self.score.table.definition.filename}.tbi")returnfiles@staticmethoddef_unpack_score_defs(resource:GenomicResource,)->tuple[list[str],dict[str,HistogramConfig]]:"""Extracts scores with min/max and histogram configs for a score."""impl=build_score_implementation_from_resource(resource)all_min_max_scores=[]all_hist_confs:dict[str,HistogramConfig]={}withimpl.score.open():forscore_id,score_definimpl.score.score_definitions.items():ifscore_def.hist_confisnotNone:hist_conf=score_def.hist_confelse:hist_conf=build_default_histogram_conf(score_def.value_type)ifisinstance(hist_conf,NullHistogramConfig):all_hist_confs[score_id]=hist_confcontinueifisinstance(hist_conf,CategoricalHistogramConfig):all_hist_confs[score_id]=hist_confcontinueassertisinstance(hist_conf,NumberHistogramConfig)ifnothist_conf.has_view_range():all_min_max_scores.append(score_id)all_hist_confs[score_id]=hist_confreturnall_min_max_scores,all_hist_confs@staticmethoddef_get_reference_genome_cached(grr:GenomicResourceRepo|None,genome_id:str|None,)->ReferenceGenome|None:ifgenome_idisNoneorgrrisNone:returnNoneifgenome_idinGenomicScoreImplementation._REF_GENOME_CACHE:returncast(ReferenceGenome,GenomicScoreImplementation._REF_GENOME_CACHE[genome_id],)try:ref_genome=build_reference_genome_from_resource(grr.get_resource(genome_id),)logger.info("Using reference genome label <%s> ",genome_id,)exceptFileNotFoundError:logger.warning("Couldn't find reference genome %s",genome_id,)returnNoneGenomicScoreImplementation._REF_GENOME_CACHE[genome_id]=ref_genomereturnref_genomedef_get_chrom_regions(self,region_size:int,grr:GenomicResourceRepo|None=None,)->list[Region]:ifregion_size==0:# Forcefully setting the chromosome to None is a bit hacky,# but is more elegant than properly supporting it in Region.return[Region(None,None,None)]# type: ignoreregions=[]ref_genome_id=cast(str,self.resource.get_labels().get("reference_genome"),)ref_genome=self._get_reference_genome_cached(grr,ref_genome_id)chrom_length:int|None=Noneforchrominself.score.get_all_chromosomes():ifref_genomeisnotNoneandchrominref_genome.chromosomes:chrom_length=ref_genome.get_chrom_length(chrom)else:ifisinstance(self.score.table,InmemoryGenomicPositionTable):chrom_length= \
max(line.pos_endforlineinself.score.table.get_records_in_region(chrom))elifisinstance(self.score.table,BigWigTable):chrom_length=self.score.table.get_chromosome_length(chrom)else:assertisinstance(self.score.table,TabixGenomicPositionTable)assertself.score.table.pysam_fileisnotNonefchrom=self.score.table.unmap_chromosome(chrom)iffchromisnotNone:chrom_length=get_chromosome_length_tabix(self.score.table.pysam_file,fchrom)ifchrom_lengthisNone:logger.warning("unable to find chromosome length for %s",chrom)continueregions.extend(split_into_regions(chrom,chrom_length,region_size,),)returnregions@propertydefresource_id(self)->str:returnself.score.resource_iddef_add_min_max_tasks(self,graph:TaskGraph,score_ids:Iterable[str],region_size:int,grr:GenomicResourceRepo|None=None,)->tuple[list[Task],Task]:""" Add and return calculation, merging and saving tasks for min max. The tasks are returned in a triple containing a list of calculation tasks, the merge task and the save task. """min_max_tasks=[]regions=self._get_chrom_regions(region_size,grr)forregioninregions:chrom=region.chromstart=region.startend=region.stopmin_max_tasks.append(graph.create_task(f"{self.resource.get_full_id()}_calculate_min_max_{chrom}_{start}_{end}",GenomicScoreImplementation._do_min_max,[self.resource,score_ids,chrom,start,end],[],))merge_task=graph.create_task(f"{self.resource.get_full_id()}_merge_min_max",GenomicScoreImplementation._merge_min_max,[score_ids,*min_max_tasks],[],)returnmin_max_tasks,merge_taskdef_min_max_add_value(self,statistic:MinMaxValue,value:float,)->None:statistic.add_value(value)@staticmethoddef_do_min_max(resource:GenomicResource,score_ids:list[str],chrom:str|None,start:int|None,end:int|None,)->dict[str,MinMaxValue]:impl=build_score_implementation_from_resource(resource)result={scr_id:MinMaxValue(scr_id)forscr_idinscore_ids}withimpl.score.open()asscore:for_left,_right,recinscore._fetch_region_values(# noqachrom,start,end,score_ids):forscore_index,score_idinenumerate(score_ids):impl._min_max_add_value(# noqa: SLF001result[score_id],rec[score_index],# type: ignore)returnresult@staticmethoddef_merge_min_max(score_ids:list[str],*calculate_tasks:dict[str,MinMaxValue],)->dict[str,Any]:res:dict[str,MinMaxValue|None]=dict.fromkeys(score_ids)forscore_idinscore_ids:formin_max_regionincalculate_tasks:ifres[score_id]isNone:res[score_id]=min_max_region[score_id]else:assertres[score_id]isnotNoneres[score_id].merge(# type: ignoremin_max_region[score_id])returnres@staticmethoddef_update_hist_confs(all_hist_confs:dict[str,HistogramConfig],minmax_task:dict[str,MinMaxValue]|None,)->dict[str,HistogramConfig]:ifminmax_taskisNone:returnall_hist_confsforscore_id,min_maxinminmax_task.items():hist_conf=all_hist_confs[score_id]assertisinstance(hist_conf,NumberHistogramConfig)assertnothist_conf.has_view_range()ifnp.isnan(min_max.min)ornp.isnan(min_max.max):logger.warning("min/max value for %s not found; ""nullify the histogram",score_id)all_hist_confs[score_id]=NullHistogramConfig(f"min/max for {score_id} not found")else:hist_conf.view_range=(min_max.min,min_max.max)logger.info("histogram configs updated: %s",all_hist_confs)returnall_hist_confsdef_add_histogram_tasks(self,graph:TaskGraph,all_hist_confs:dict[str,HistogramConfig],minmax_task:Task|None,region_size:int,grr:GenomicResourceRepo|None=None,)->tuple[list[Task],Task,Task]:""" Add histogram tasks for specific score id. The histogram tasks are dependant on the provided minmax task. """regions=self._get_chrom_regions(region_size,grr)update_hist_confs=graph.create_task(f"{self.resource.get_full_id()}_update_hist_confs",GenomicScoreImplementation._update_hist_confs,[all_hist_confs,minmax_task],[],)histogram_tasks=[]forregioninregions:chrom=region.chromstart=region.startend=region.stophistogram_tasks.append(graph.create_task(f"{self.resource.get_full_id()}_calculate_histogram_"f"{chrom}_{start}_{end}",GenomicScoreImplementation._do_histogram,[self.resource,update_hist_confs,chrom,start,end],[],))merge_task=graph.create_task(f"{self.resource.get_full_id()}_merge_histograms",GenomicScoreImplementation._merge_histograms,[self.resource,update_hist_confs,*histogram_tasks],[],)save_task=graph.create_task(f"{self.resource.get_full_id()}_save_histograms",GenomicScoreImplementation._save_histograms,[self.resource,merge_task],[],)returnhistogram_tasks,merge_task,save_taskdef_histogram_add_value(self,histogram:Histogram,value:Any,count:int,)->None:histogram.add_value(value,count,)@staticmethoddef_do_histogram(resource:GenomicResource,all_hist_confs:dict[str,HistogramConfig],chrom:str|None,start:int|None,end:int|None,)->dict[str,Histogram]:impl=build_score_implementation_from_resource(resource)result:dict[str,Histogram]={}logger.info("updated hist confs: %s",all_hist_confs)forscore_id,hist_confinall_hist_confs.items():ifisinstance(hist_conf,NullHistogramConfig):continueresult[score_id]=build_empty_histogram(hist_conf)score_ids=list(result.keys())withimpl.score.open()asscore:forleft,right,recinscore._fetch_region_values(# noqachrom,start,end,score_ids):forscr_index,scr_idinenumerate(score_ids):try:impl._histogram_add_value(# noqa: SLF001result[scr_id],rec[scr_index],# type: ignoreright-left+1,)exceptTypeErroraserr:logger.exception("Failed adding value %s to histogram of %s; ""%s:%s-%s",rec[scr_index]ifrecelseNone,resource.resource_id,chrom,start,end)result[scr_id]=NullHistogram(NullHistogramConfig(str(err)),)exceptHistogramErroraserr:logger.warning("Histogram for %s nullified",scr_id,)result[scr_id]=NullHistogram(NullHistogramConfig(str(err)),)returnresult@staticmethoddef_merge_histograms(resource:GenomicResource,all_hist_confs:dict[str,HistogramConfig],*calculated_histograms:dict[str,Any],)->dict[str,Histogram]:result:dict[str,Histogram]={}forscore_id,hist_confinall_hist_confs.items():result[score_id]=build_empty_histogram(hist_conf)forscore_id,hist_confinall_hist_confs.items():ifisinstance(hist_conf,NullHistogramConfig):continuetry:forhistogram_regionincalculated_histograms:ifscore_idnotinhistogram_region:logger.warning("region has no histogram for score %s in %s",score_id,resource.resource_id)continuehist=histogram_region[score_id]ifisinstance(result[score_id],NullHistogram):continueifisinstance(hist,NullHistogram):result[score_id]=NullHistogram(NullHistogramConfig(f"Empty histogram for {score_id} in a region: "f"{hist.reason}"))else:result[score_id].merge(hist)exceptHistogramErroraserr:logger.exception("Histogram for %s nullified",score_id,)result[score_id]=NullHistogram(NullHistogramConfig(str(err)))returnresult@staticmethoddef_save_histograms(resource:GenomicResource,merged_histograms:dict[str,Histogram],)->dict[str,Histogram]:impl=build_score_implementation_from_resource(resource)proto=resource.protoforscore_id,score_histograminmerged_histograms.items():withproto.open_raw_file(resource,impl.score.get_histogram_filename(score_id),mode="wt",)asoutfile:outfile.write(score_histogram.serialize())ifnotisinstance(score_histogram,NullHistogram):plot_histogram(resource,impl.score.get_histogram_image_filename(score_id),score_histogram,score_id,impl.score.score_definitions[score_id].small_values_desc,impl.score.score_definitions[score_id].large_values_desc,)returnmerged_histograms
[docs]defcalc_info_hash(self)->bytes:"""Compute and return the info hash."""returnb"infohash"
[docs]defcalc_statistics_hash(self)->bytes:""" Compute the statistics hash. This hash is used to decide whether the resource statistics should be recomputed. """manifest=self.resource.get_manifest()returnjson.dumps({"config":{"histograms":[hist_conf.to_dict()forhist_confinself.get_config_histograms().values()ifhist_confisnotNone],"table":{"config":self.score.table.definition,"files_md5":{file_name:manifest[file_name].md5forfile_nameinsorted(self.files)},},},"score_config":[{"id":score_def.score_id,"type":score_def.value_type,"name":score_def.col_name,"index":score_def.col_index,"na_values":str(sorted(score_def.na_values)),}forscore_definself.score.score_definitions.values()],},indent=2).encode()
[docs]classCnvCollectionImplementation(GenomicScoreImplementation):"""Assists in the management of resource of type cnv_collection."""# pylint: disable=useless-parent-delegation
[docs]defbuild_score_implementation_from_resource(resource:GenomicResource,)->GenomicScoreImplementation|CnvCollectionImplementation:"""Builds score implementation based on resource type"""ifresource.get_type()=="cnv_collection":impl=CnvCollectionImplementation(resource)else:impl=GenomicScoreImplementation(resource)returnimpl