[docs]classPartitionDescriptor:"""Class to represent partition of a genotype dataset."""# pylint: disable=too-many-public-methodsdef__init__(self,*,chromosomes:list[str]|None=None,region_length:int=0,integer_region_bins:bool=False,family_bin_size:int=0,coding_effect_types:list[str]|None=None,rare_boundary:float=0):ifchromosomesisNone:self.chromosomes:list[str]=[]else:self.chromosomes=chromosomesself.region_length=region_lengthself.integer_region_bins=integer_region_binsself.family_bin_size=family_bin_sizeself.coding_effect_types:set[str]= \
set(coding_effect_types)ifcoding_effect_typeselseset()self.rare_boundary=rare_boundary
[docs]@staticmethoddefparse(path_name:pathlib.Path|str)->PartitionDescriptor:"""Parse partition description from a file. When the file name has a `.conf` suffix or is without suffix the format of the file is assumed to be python config file and it is parsed using the Python ConfigParser class. When the file name has `.yaml` suffix the file is parsed using the YAML parser. """ifisinstance(path_name,str):path_name=pathlib.Path(path_name)ifpath_name.suffixin{"",".conf"}:# parse configparser contentreturnPartitionDescriptor.parse_string(pathlib.Path(path_name).read_text(encoding="utf8"),"conf")ifpath_name.suffix==".yaml":# parse YAML contentreturnPartitionDescriptor.parse_string(pathlib.Path(path_name).read_text(encoding="utf8"),"yaml")raiseValueError(f"unsupported partition description format "f"<{path_name.suffix}>")
[docs]@staticmethoddefparse_string(content:str,content_format:str="conf")->PartitionDescriptor:"""Parse partition description from a string. The supported formats are the Python config format and YAML. Example string content should be as follows. Example Python config format: ``` [region_bin] chromosomes = chr1, chr2 region_length = 10 integer_region_bins = False [frequency_bin] rare_boundary = 5.0 [coding_bin] coding_effect_types = frame-shift,splice-site,nonsense,missense [family_bin] family_bin_size=10 ``` Example YAML format: ``` region_bin: chromosomes: chr1, chr2 region_length: 10 integer_region_bins: False frequency_bin: rare_boundary: 5.0 coding_bin: coding_effect_types: frame-shift,splice-site,nonsense,missense family_bin: family_bin_size: 10 ``` """content=content.strip()ifnotcontent:returnPartitionDescriptor()ifcontent_format=="conf":try:parsed_data=toml.loads(content)excepttoml.TomlDecodeError:parser=configparser.ConfigParser()parser.read_string(content)parsed_data=cast(dict[str,Any],parser)elifcontent_format=="yaml":parsed_data=yaml.safe_load(content)else:raiseValueError(f"unsuported partition description format <{content_format}>")returnPartitionDescriptor.parse_dict(parsed_data)
[docs]@staticmethoddefparse_dict(config_dict:dict[str,Any])->PartitionDescriptor:"""Parse configuration dictionary and create a partion descriptor."""config:dict[str,Any]={}if"region_bin"inconfig_dict:config["region_length"]=int(config_dict["region_bin"].get("region_length",sys.maxsize))config["integer_region_bins"]=bool(config_dict["region_bin"].get("integer_region_bins",False))chromosomes=config_dict["region_bin"]["chromosomes"]ifisinstance(chromosomes,int):config["chromosomes"]=[str(chromosomes)]elifisinstance(chromosomes,str):config["chromosomes"]=[c.strip()forcinchromosomes.split(",")]elifisinstance(chromosomes,list):config["chromosomes"]=chromosomeselse:raiseValueError(f"unexpected chromosomes types: {type(chromosomes)} "f"{chromosomes}")if"family_bin"inconfig_dict:config["family_bin_size"]=int(config_dict["family_bin"]["family_bin_size"])if"frequency_bin"inconfig_dict:config["rare_boundary"]=float(config_dict["frequency_bin"]["rare_boundary"])if"coding_bin"inconfig_dict:coding_effect_types= \
config_dict["coding_bin"]["coding_effect_types"]ifisinstance(coding_effect_types,str):result={s.strip()forsincoding_effect_types.split(",")}else:assertisinstance(coding_effect_types,list)result=set(coding_effect_types)config["coding_effect_types"]=set(expand_effect_types(result))returnPartitionDescriptor(chromosomes=config.get("chromosomes"),region_length=config.get("region_length",0),integer_region_bins=config.get("integer_region_bins",False),family_bin_size=config.get("family_bin_size",0),rare_boundary=config.get("rare_boundary",0.0),coding_effect_types=config.get("coding_effect_types"),)
[docs]defhas_summary_partitions(self)->bool:"""Check if partition applicable to summary allele are defined."""returnself.has_region_bins()orself.has_frequency_bins()or \
self.has_coding_bins()
[docs]defhas_family_partitions(self)->bool:"""Check if partition applicable to family allele are defined."""returnself.has_region_bins()orself.has_frequency_bins()or \
self.has_coding_bins()orself.has_family_bins()
[docs]defhas_partitions(self)->bool:"""Equivalent to `has_family_partitions` method."""returnself.has_family_partitions()
[docs]defmake_region_bin(self,chrom:str,pos:int)->str:"""Produce region bin for given chromosome and position."""ifnotself.has_region_bins():raiseValueError(f"Partition <{self.serialize()}> does not define region bins.")assertself.chromosomesisnotNoneassertself.region_length>0assertpos>0pos_bin=calc_bin_index(self.region_length,pos)ifchrominself.chromosomes:ifself.integer_region_bins:chrom_index=self.chromosomes.index(chrom)returnf"{chrom_index*10_000+pos_bin}"returnf"{chrom}_{pos_bin}"ifself.integer_region_bins:returnf"{10_000_000+pos_bin}"returnf"other_{pos_bin}"
[docs]defmake_region_bins_regions(self,chromosomes:list[str],chromosome_lengths:dict[str,int],)->dict[str,list[Region]]:"""Generate region_bin to regions based on a partition descriptor."""assertself.has_region_bins()result=defaultdict(list)forchrominchromosomes:region_bins_count=self.region_bins_count(chrom,chromosome_lengths)ifregion_bins_count==1:region_bin=self.make_region_bin(chrom,1)result[region_bin].append(Region(chrom))continueforregion_indexinrange(region_bins_count):start=calc_bin_begin(self.region_length,region_index)end=calc_bin_end(self.region_length,region_index)end=min(end,chromosome_lengths[chrom])region_bin=self.make_region_bin(chrom,start)result[region_bin].append(Region(chrom,start,end))returnresult
[docs]defregion_to_region_bins(self,region:Region,chrom_lens:dict[str,int],)->list[str]:"""Provide a list of bins the given region intersects."""start=region.startor1stop=region.stoporchrom_lens[region.chrom]stop=min(stoporstart,chrom_lens[region.chrom])assertstart>0assertstop>0ifstart==stop:return[self.make_region_bin(region.chrom,start)]return[self.make_region_bin(region.chrom,calc_bin_begin(self.region_length,i))foriinrange(calc_bin_index(self.region_length,start),calc_bin_index(self.region_length,stop)+1)]
[docs]defmake_all_region_bins(self,chromosome_lengths:dict[str,int],)->list[str]:"""Produce all region bins for all chromosomes."""bins=[]genome_chroms=set(chromosome_lengths.keys())partition_chroms=set(self.chromosomes)&genome_chromsforchrominpartition_chroms:ifchromnotinchromosome_lengths:raiseValueError(f"Partition descriptor chromosome <{chrom}> "f"not found in reference genome chromosome lengths. "f"Chromosomes: {chromosome_lengths.keys()}")chrom_len=chromosome_lengths[chrom]bins.extend(self.region_to_region_bins(Region(chrom,1,chrom_len),chromosome_lengths,))other_chroms=genome_chroms-partition_chromsifother_chroms:max_other_len=0max_chrom=""forchrominother_chroms:ifchromosome_lengths[chrom]>max_other_len:max_other_len=chromosome_lengths[chrom]max_chrom=chrombins.extend(self.region_to_region_bins(Region(max_chrom,1,max_other_len),chromosome_lengths,))returnbins
[docs]defmake_family_bin(self,family_id:str)->int:"""Produce family bin for given family ID."""ifnotself.has_family_bins():raiseValueError(f"Partition <{self.serialize()}> does not define family bins.")sha256=hashlib.sha256()sha256.update(family_id.encode())digest=int(sha256.hexdigest(),16)returnint(digest%self.family_bin_size)
[docs]defmake_coding_bin(self,effect_types:Iterable[str])->int:"""Produce coding bin for given list of effect types."""ifnotself.has_coding_bins():raiseValueError(f"Partition <{self.serialize()}> does not define coding bins.")variant_effects=set(effect_types)result=variant_effects.intersection(self.coding_effect_types)iflen(result)==0:return0return1
[docs]defmake_frequency_bin(self,allele_count:int,allele_freq:float,*,is_denovo:bool=False)->str:"""Produce frequency bin from allele count, frequency and de Novo flag. Params are allele count, allele frequence and de Novo flag. """ifis_denovo:return"0"ifint(allele_count)<=1:# Ultra rarereturn"1"ifallele_freq<=self.rare_boundary:# Rarereturn"2"return"3"
[docs]defdataset_summary_partition(self)->list[tuple[str,str]]:"""Build summary parquet dataset partition for table creation. When creating an Impala or BigQuery table it is helpful to have the list of partitions and types used in the parquet dataset. """result=[]ifself.has_region_bins():result.append(("region_bin","string"))ifself.has_frequency_bins():result.append(("frequency_bin","int8"))ifself.has_coding_bins():result.append(("coding_bin","int8"))returnresult
[docs]defdataset_family_partition(self)->list[tuple[str,str]]:"""Build family dataset partition for table creation. When creating an Impala or BigQuery table it is helpful to have the list of partitions and types used in the parquet dataset. """result=self.dataset_summary_partition()ifself.has_family_bins():result.append(("family_bin","int8"))returnresult
[docs]defsummary_partition(self,allele:SummaryAllele,*,seen_as_denovo:bool,)->list[tuple[str,str]]:"""Produce summary partition for an allele. The partition is returned as a list of tuples consiting of the name of the partition and the value. Example: [ ("region_bin", "chr1_0"), ("frequency_bin", "0"), ("coding_bin", "1"), ] """result=[]ifself.has_region_bins():result.append(("region_bin",self.make_region_bin(allele.chrom,allele.position)))ifself.has_frequency_bins():allele_count=allele.get_attribute("af_allele_count",0)allele_freq=allele.get_attribute("af_allele_freq",0)result.append(("frequency_bin",str(self.make_frequency_bin(allele_count=allele_count,allele_freq=allele_freq,is_denovo=seen_as_denovo)),))ifself.has_coding_bins():coding_bin=0ifallele.is_reference_allele:coding_bin=0else:coding_bin=self.make_coding_bin(allele.effect_types)result.append(("coding_bin",str(coding_bin)))returnresult
[docs]deffamily_partition(self,allele:FamilyAllele,*,seen_as_denovo:bool,)->list[tuple[str,str]]:"""Produce family partition for an allele. The partition is returned as a list of tuples consiting of the name of the partition and the value. Example: [ ("region_bin", "chr1_0"), ("frequency_bin", "0"), ("coding_bin", "1"), ("family_bin", "1) ] """partition=self.summary_partition(allele,seen_as_denovo=seen_as_denovo)ifself.has_family_bins():partition.append(("family_bin",str(self.make_family_bin(allele.family_id)),))returnpartition
[docs]defschema1_partition(self,allele:FamilyAllele,)->list[tuple[str,str]]:"""Produce Schema1 family partition for an allele. The partition is returned as a list of tuples consiting of the name of the partition and the value. Example: [ ("region_bin", "chr1_0"), ("frequency_bin", "0"), ("coding_bin", "1"), ("family_bin", "1) ] """is_denovo=allele.transmission_type==TransmissionType.denovopartition=self.summary_partition(allele,seen_as_denovo=is_denovo)ifself.has_family_bins():partition.append(("family_bin",str(self.make_family_bin(allele.family_id)),))returnpartition
[docs]defget_variant_partitions(self,chromosome_lengths:dict[str,int],)->tuple[list[list[tuple[str,str]]],list[list[tuple[str,str]]]]:"""Return the output summary and family variant partition names."""summary_parts:list[list[tuple[str,str]]]=[]ifself.has_region_bins():summary_parts.extend([("region_bin",r)]forrinself.make_all_region_bins(chromosome_lengths))ifself.has_frequency_bins():summary_parts=self._add_product(summary_parts,[("frequency_bin",str(i))foriinrange(4)],)ifself.has_coding_bins():summary_parts=self._add_product(summary_parts,[("coding_bin",str(i))foriinrange(2)],)ifself.has_family_bins():family_parts=self._add_product(summary_parts,[("family_bin",str(i))foriinrange(self.family_bin_size)],)else:family_parts=summary_partsreturnsummary_parts,family_parts
[docs]@staticmethoddefpartition_directory(output_dir:str,partition:list[tuple[str,str]])->str:"""Construct a partition dataset directory. Given a partition in the format returned by `summary_parition` or `family_partition` methods, this function constructs the directory name corresponding to the partition. """returnfs_utils.join(output_dir,*[f"{bname}={bvalue}"for(bname,bvalue)inpartition])
[docs]@staticmethoddefpartition_filename(prefix:str,partition:list[tuple[str,str]],bucket_index:int|None)->str:"""Construct a partition dataset base filename. Given a partition in the format returned by `summary_parition` or `family_partition` methods, this function constructs the file name corresponding to the partition. """partition_parts=[f"{bname}_{bvalue}"for(bname,bvalue)inpartition]parts=[prefix,*partition_parts]ifbucket_indexisnotNone:parts.append(f"bucket_index_{bucket_index:0>6}")result="_".join(parts)result+=".parquet"returnresult
[docs]@staticmethoddefpath_to_partitions(raw_path:str)->list[tuple[str,str]]:"""Convert a path into the partitions it is composed of."""path=pathlib.Path(raw_path)parts=list(path.parts)ifparts[-1].endswith(".parquet"):parts.pop(-1)ifnotall("="inpartforpartinparts):raiseValueError("Path contains non-partition directories!")result=[]forpartinparts:partition=part.split("=",maxsplit=2)result.append((partition[0],partition[1]))returnresult
[docs]defto_dict(self)->dict[str,Any]:"""Convert the partition descriptor to a dict."""result:dict[str,Any]={}result["chromosomes"]=self.chromosomesresult["region_length"]=self.region_lengthresult["integer_region_bins"]=self.integer_region_binsresult["rare_boundary"]=self.rare_boundaryresult["coding_effect_types"]=self.coding_effect_typesresult["family_bin_size"]=self.family_bin_sizereturnresult
[docs]defserialize(self,output_format:str="conf")->str:"""Serialize a partition descriptor into a string."""ifoutput_format=="conf":returnjinja2.Template(textwrap.dedent(""" {%- if chromosomes %} [region_bin] chromosomes={{ chromosomes|join(',') }} region_length={{ region_length }} {%- if integer_region_bins %} integer_region_bins=true {%- endif %} {%- endif %} {%- if rare_boundary %} [frequency_bin] rare_boundary={{ rare_boundary }} {%- endif %} {%- if coding_effect_types %} [coding_bin] coding_effect_types={{ coding_effect_types|join(',') }} {%- endif %} {%- if family_bin_size %} [family_bin] family_bin_size={{ family_bin_size }} {%- endif %} """)).render(self.to_dict())ifoutput_format=="yaml":returnjinja2.Template(textwrap.dedent(""" {%- if chromosomes %} region_bin: chromosomes: {{ chromosomes|join(',') }} region_length: {{ region_length }} {%- if integer_region_bins %} integer_region_bins: true {%- endif %} {%- endif %} {%- if rare_boundary %} frequency_bin: rare_boundary: {{ rare_boundary }} {%- endif %} {%- if coding_effect_types %} coding_bin: coding_effect_types: {{ coding_effect_types|join(',') }} {%- endif %} {%- if family_bin_size %} family_bin: family_bin_size: {{ family_bin_size }} {%- endif %} """)).render(self.to_dict())raiseValueError(f"usupported output format for partition descriptor: "f"<{output_format}>")