[docs]@dataclass(frozen=True)classBucket:"""A region of the input used for processing."""type:strregion_bin:strregions:list[str]index:intdef__str__(self)->str:regions=";".join(ror"all"forrinself.regions)ifnotregions:regions="all"returnf"Bucket({self.type},{self.region_bin},{regions},{self.index})"
[docs]classImportProject:"""Encapsulate the import configuration. This class creates the necessary objects needed to import a study (e.g. loaders, family data and so one). """# pylint: disable=too-many-public-methodsdef__init__(self,import_config:dict[str,Any],base_input_dir:str|None,base_config_dir:str|None=None,gpf_instance:GPFInstance|None=None,config_filenames:list[str]|None=None)->None:"""Create a new project from the provided config. It is best not to call this ctor directly but to use one of the provided build_* methods. :param import_config: The parsed, validated and normalized config. :param gpf_instance: Allow overwiting the gpf instance as described in the configuration and instead injecting our own instance. Ideal for testing. """self.import_config:dict[str,Any]=import_configif"denovo"inimport_config.get("input",{}):len_files=len(import_config["input"]["denovo"]["files"])assertlen_files==1,"Support for multiple denovo files is NYI"self._base_input_dir=base_input_dirself._base_config_dir=base_config_dirorbase_input_dirself._gpf_instance=gpf_instanceself.config_filenames=config_filenamesor[]self.stats:StatsCollection=StatsCollection()self._input_filenames_cache:dict[str,list[str]]={}
[docs]@staticmethoddefbuild_from_config(import_config:dict[str,Any],base_input_dir:str="",gpf_instance:GPFInstance|None=None,)->ImportProject:"""Create a new project from the provided config. The config is first validated and normalized. :param import_config: The config to use for the import. :base_input_dir: Default input dir. Use cwd by default. """import_config=GPFConfigParser.validate_config(import_config,import_config_schema)normalizer=ImportConfigNormalizer()base_config_dir=base_input_dirimport_config,base_input_dir,external_files= \
normalizer.normalize(import_config,base_input_dir)returnImportProject(import_config,base_input_dir,base_config_dir,gpf_instance=gpf_instance,config_filenames=external_files,)
[docs]@staticmethoddefbuild_from_file(import_filename:str|os.PathLike,gpf_instance:GPFInstance|None=None)->ImportProject:"""Create a new project from the provided config filename. The file is first parsed, validated and normalized. The path to the file is used as the default input path for the project. :param import_filename: Path to the config file :param gpf_instance: Gpf Instance to use. """base_input_dir=fs_utils.containing_path(import_filename)import_config=GPFConfigParser.parse_and_interpolate_file(import_filename,conf_dir=base_input_dir)project=ImportProject.build_from_config(import_config,base_input_dir,gpf_instance=gpf_instance)# the path to the import filename should be the first config fileproject.config_filenames.insert(0,str(import_filename))returnproject
[docs]defget_pedigree_params(self)->tuple[str,dict[str,Any]]:"""Get params for loading the pedigree."""families_filename=self.get_pedigree_filename()families_params={}families_params.update(FamiliesLoader.cli_defaults())config_params=self.import_config["input"]["pedigree"]config_params=self._add_loader_prefix(config_params,"ped_")families_params.update(config_params)returnfamilies_filename,families_params
[docs]defget_pedigree_filename(self)->str:"""Return the path to the pedigree file."""families_filename=self.import_config["input"]["pedigree"]["file"]families_filename=fs_utils.join(self.input_dir,families_filename)returncast(str,families_filename)
[docs]defget_pedigree(self)->FamiliesData:"""Load, parse and return the pedigree data."""families_loader=self.get_pedigree_loader()returnfamilies_loader.load()
[docs]defget_variant_loader_types(self)->set[str]:"""Collect all variant import types used in the project."""result=set()forloader_typein["denovo","vcf","cnv","dae"]:config=self.import_config["input"].get(loader_type)ifconfigisnotNone:result.add(loader_type)returnresult
[docs]defhas_denovo_variants(self)->bool:"""Check if the resulting imported study has denovo variants."""if"denovo"inself.get_variant_loader_types():returnTrueif"vcf"inself.get_variant_loader_types():_,variants_params= \
self.get_variant_params("vcf")ifvariants_params.get("vcf_denovo_mode")=="denovo":returnTruereturnFalse
[docs]defhas_variants(self)->bool:"""Check if the resulting imported study has any variants."""parquet_dataset_dir=self.get_processing_parquet_dataset_dir()ifparquet_dataset_dirisnotNone:returnfs_utils.exists(fs_utils.join(parquet_dataset_dir,"summary"))returnany(self.get_variant_loader_types())
[docs]defget_variant_loader_chromosomes(self,loader_type:str|None=None)->list[str]:"""Collect all chromosomes available in input files."""loader_types=self.get_variant_loader_types()ifloader_typeisnotNone:ifloader_typenotinloader_types:return[]loader_types={loader_type}chromosomes=set()forltypeinloader_types:loader=self.get_variant_loader(loader_type=ltype)chromosomes.update(loader.chromosomes)return[chromforchrominself.get_gpf_instance().reference_genome.chromosomesifchrominchromosomes]
[docs]defget_variant_loader_chrom_lens(self,loader_type:str|None=None)->dict[str,int]:"""Collect all chromosomes and their length available in input files."""all_chrom_lens=dict(self.get_gpf_instance().reference_genome.get_all_chrom_lengths())return{chrom:all_chrom_lens[chrom]forchrominself.get_variant_loader_chromosomes(loader_type)}
[docs]defget_import_variants_buckets(self)->list[Bucket]:"""Split variant files into buckets enabling parallel processing."""buckets:list[Bucket]=[]forloader_typein["denovo","vcf","cnv","dae"]:config=self.import_config["input"].get(loader_type,None)ifconfigisnotNone:buckets.extend(self._loader_region_bins(loader_type))returnbuckets
[docs]defget_variant_loader(self,bucket:Bucket|None=None,loader_type:str|None=None,reference_genome:ReferenceGenome|None=None,)->VariantsLoader:"""Get the appropriate variant loader for the specified bucket."""ifbucketisNoneandloader_typeisNone:raiseValueError("loader_type or bucket is required")ifbucketisnotNone:loader_type=bucket.typeassertloader_typeisnotNoneloader=self._get_variant_loader(loader_type,reference_genome)ifbucketisnotNoneandbucket.region_bin!="all":loader.reset_regions([Region.from_str(reg)forreginbucket.regions])returnloader
[docs]defget_input_filenames(self,bucket:Bucket)->list[str]:"""Get a list of input files for a specific bucket."""# creating a loader is expensive so cache the resultsifbucket.typenotinself._input_filenames_cache:loader=self.get_variant_loader(bucket)self._input_filenames_cache[bucket.type]=loader.filenamesreturnself._input_filenames_cache[bucket.type]
[docs]defget_variant_params(self,loader_type:str,)->tuple[str|list[str],dict[str,Any]]:"""Return variant loader filenames and params."""assertloader_typeinself.import_config["input"], \
f"No input config for loader {loader_type}"loader_config=self.import_config["input"][loader_type]ifloader_type=="vcf"and"chromosomes"inloader_config:# vcf loader expects chromosomes to be in a string separated by ;loader_config=deepcopy(loader_config)loader_config["chromosomes"]=";".join(loader_config["chromosomes"])variants_params=self._add_loader_prefix(loader_config,loader_type+"_")variants_filenames=loader_config["files"]variants_filenames=[fs_utils.join(self.input_dir,f)forfinvariants_filenames]ifloader_typein{"denovo","dae"}:assertlen(variants_filenames)==1, \
f"Support for multiple {loader_type} files is NYI"variants_filenames=variants_filenames[0]returnvariants_filenames,variants_params
def_get_variant_loader(self,loader_type:str,reference_genome:ReferenceGenome|None=None,)->VariantsLoader:assertloader_typeinself.import_config["input"], \
f"No input config for loader {loader_type}"ifreference_genomeisNone:reference_genome=self.get_gpf_instance().reference_genomevariants_filenames,variants_params= \
self.get_variant_params(loader_type)loader_cls={"denovo":DenovoLoader,"vcf":VcfLoader,"cnv":CNVLoader,"dae":DaeTransmittedLoader,}[loader_type]loader:VariantsLoader=loader_cls(self.get_pedigree(),variants_filenames,params=variants_params,genome=reference_genome,)self._check_chrom_prefix(loader,variants_params)returnloader
[docs]defget_partition_descriptor(self)->PartitionDescriptor:"""Return the partition descriptor as described in the config."""if"partition_description"notinself.import_config:returnPartitionDescriptor()config_dict:dict=self.import_config["partition_description"]partition_descriptor=PartitionDescriptor.parse_dict(config_dict)ifpartition_descriptor.has_region_bins():reference_genome=self.get_gpf_instance().reference_genomepartition_descriptor.chromosomes=[chromforchrominpartition_descriptor.chromosomesifchrominreference_genome.chromosomes]returnpartition_descriptor
[docs]defget_gpf_instance(self)->GPFInstance:"""Create and return a gpf instance as desribed in the config."""ifself._gpf_instanceisnotNone:returnself._gpf_instanceinstance_config=self.import_config.get("gpf_instance",{})instance_dir=instance_config.get("path")ifinstance_dirisNone:config_filename=Noneelse:config_filename=fs_utils.join(instance_dir,"gpf_instance.yaml")self._gpf_instance=GPFInstance.build(config_filename)returnself._gpf_instance
[docs]defget_import_storage(self)->ImportStorage:"""Create an import storage as described in the import config."""storage_type=self._storage_type()returnself._get_import_storage(storage_type)
@staticmethod@cachedef_get_import_storage(storage_type:str)->ImportStorage:factory=get_import_storage_factory(storage_type)returnfactory()@cached_propertydefwork_dir(self)->str:"""Where to store generated import files (e.g. parquet files)."""work_dir=cast(str,self.import_config.get("processing_config",{}).get("work_dir",""),)ifwork_dir=="":work_dir=os.path.join(self._base_config_diror"",self.study_id,)returnwork_dir@propertydefinclude_reference(self)->bool:"""Check if the import should include ref allele in the output data."""returncast(bool,self.import_config.get("processing_config",{}).get("include_reference",False))@propertydefinput_dir(self)->str:"""Return the path relative to which input files are specified."""assertself._base_input_dirisnotNonereturnfs_utils.join(self._base_input_dir,self.import_config["input"].get("input_dir",""),)@propertydefstudy_id(self)->str:returncast(str,self.import_config["id"])
[docs]defget_processing_annotation_batch_size(self)->int:"""Return processing parquet dataset dir if configured and exists."""processing_config=self.import_config.get("processing_config",{})returnprocessing_config.get("annotation_batch_size",0)
[docs]defget_processing_parquet_dataset_dir(self)->str|None:"""Return processing parquet dataset dir if configured and exists."""processing_config=self.import_config.get("processing_config",{})parquet_dataset_dir=processing_config.get("parquet_dataset_dir")ifparquet_dataset_dirisNone:returnNoneifnotfs_utils.exists(parquet_dataset_dir):returnNonereturncast(str,parquet_dataset_dir)
[docs]defget_parquet_dataset_dir(self)->str:"""Return parquet dataset direcotry. If processing parquet dataset dir is configured this method will return it. Otherwise it will construct work dir parquet dataset directory. """parquet_dataset_dir=self.get_processing_parquet_dataset_dir()ifparquet_dataset_dirisnotNone:returnparquet_dataset_dirreturnfs_utils.join(self.work_dir,self.study_id)
[docs]defhas_genotype_storage(self)->bool:"""Return if a genotype storage can be created."""ifnotself._has_destination():returnTrue# Use default genotype storageif"storage_type"notinself.import_config["destination"]:returnTrue# External genotype storage# Embedded configuration# storage_type is the only property in destination# this is a special case and we assume there is no genotype storagereturnlen(self.import_config["destination"])>1
[docs]defget_genotype_storage(self)->GenotypeStorage:"""Find, create and return the correct genotype storage."""explicit_config=(self._has_destination()and"storage_id"notinself.import_config["destination"])ifnotexplicit_config:gpf_instance=self.get_gpf_instance()genotype_storages:GenotypeStorageRegistry= \
gpf_instance.genotype_storagesstorage_id=self.import_config.get("destination",{})\
.get("storage_id",None)ifstorage_idisnotNone:returngenotype_storages.get_genotype_storage(storage_id)returngenotype_storages.get_default_genotype_storage()# explicit storage configregistry=GenotypeStorageRegistry()returnregistry.register_storage_config(self.import_config["destination"])
def_has_destination(self)->bool:"""Return if there is a *destination* section in the import config."""return"destination"inself.import_config
def_storage_type(self)->str:ifnotself._has_destination():# get default storage schema from GPF instancegpf_instance=self.get_gpf_instance()storage:GenotypeStorage=gpf_instance\
.genotype_storages.get_default_genotype_storage()returnstorage.storage_typedestination=self.import_config["destination"]if"storage_id"indestination:storage_id=destination["storage_id"]gpf_instance=self.get_gpf_instance()storage=gpf_instance\
.genotype_storages\
.get_genotype_storage(storage_id)returnstorage.storage_typereturncast(str,destination["storage_type"])@staticmethoddef_get_default_bucket_index(loader_type:str)->int:return{"denovo":0,"vcf":100_000,"dae":200_000,"cnv":300_000,}[loader_type]@staticmethoddef_add_loader_prefix(params:dict[str,Any],prefix:str)->dict[str,Any]:res={}exclude={"add_chrom_prefix","del_chrom_prefix","files"}fork,valinparams.items():ifknotinexclude:res[prefix+k]=valelse:res[k]=valreturnres
[docs]@staticmethoddefdel_loader_prefix(params:dict[str,Any],prefix:str)->dict[str,Any]:"""Remove prefix from parameter keys."""res={}fork,valinparams.items():ifvalisNone:continuekey=kifk.startswith(prefix):key=k[len(prefix):]res[key]=valreturnres
def_loader_region_bins(self,loader_type:str)->Generator[Bucket,None,None]:# pylint: disable=too-many-localsreference_genome=self.get_gpf_instance().reference_genomeloader=self._get_variant_loader(loader_type,reference_genome)loader_chromosomes=loader.chromosomestarget_chromosomes=self._get_loader_target_chromosomes(loader_type)iftarget_chromosomesisNone:target_chromosomes=loader_chromosomes# cannot use self.get_partition_description() here as the# processing region length might be different than the region length# specified in the parition description section of the import configprocessing_region_length= \
self._get_processing_region_length(loader_type)processing_descriptor=PartitionDescriptor(chromosomes=target_chromosomes,region_length=processing_region_length,# type: ignore)processing_config=self._get_loader_processing_config(loader_type)mode=Noneifisinstance(processing_config,str):mode=processing_configeliflen(processing_config)==0:mode="single_bucket"# default mode when missing configifmode=="single_bucket":processing_regions:dict[str,list[str]]={"all":[]}elifmode=="chromosome":processing_regions={chrom:[chrom]forchrominloader_chromosomes}else:assertmodeisNoneprocessing_regions={chrom:[str(r)forrinregions]forchrom,regionsinprocessing_descriptor.make_region_bins_regions(chromosomes=loader_chromosomes,chromosome_lengths=reference_genome.get_all_chrom_lengths(),).items()}default_bucket_index=self._get_default_bucket_index(loader_type)forindex,(region_bin,regions)inenumerate(processing_regions.items()):assertindex<=100_000,f"Too many buckets {loader_type}"bucket_index=default_bucket_index+indexyieldBucket(loader_type,region_bin,regions,bucket_index,)def_get_processing_region_length(self,loader_type:str)->int|None:processing_config=self._get_loader_processing_config(loader_type)ifisinstance(processing_config,str):returnNonereturncast(int,processing_config.get("region_length",sys.maxsize))def_get_loader_target_chromosomes(self,loader_type:str)->list[str]|None:processing_config=self._get_loader_processing_config(loader_type)ifisinstance(processing_config,str):returnNoneprocessing_chromsomes=processing_config.get("chromosomes",None)ifprocessing_chromsomesisNone:returnNonereference_genome=self.get_gpf_instance().reference_genomeif"all"inprocessing_chromsomes:returnlist(reference_genome.chromosomes)return[chromforchrominprocessing_chromsomesifchrominreference_genome.chromosomes]def_get_loader_processing_config(self,loader_type:str)->dict[str,Any]:returncast(dict[str,Any],self.import_config.get("processing_config",{}).get(loader_type,{}))@staticmethoddef_check_chrom_prefix(loader:VariantsLoader,variants_params:dict[str,Any])->None:prefix=variants_params.get("add_chrom_prefix")ifprefix:all_already_have_prefix=Trueforchrominloader.chromosomes:# the loader should have already added the prefixassertchrom.startswith(prefix)ifnotchrom[len(prefix):].startswith(prefix):all_already_have_prefix=Falsebreakifall_already_have_prefixandlen(loader.chromosomes):raiseValueError(f"All chromosomes already have the prefix {prefix}. ""Consider removing add_chrom_prefix.",)prefix=variants_params.get("del_chrom_prefix")ifprefix:try:# the chromosomes getter will assert for us if the prefix# can be removed or not. If there is no prefix to begin with# we will get an assertion errorloader.chromosomes# noqa: B018exceptAssertionErrorasexp:raiseValueError(f"Chromosomes already missing the prefix {prefix}. ""Consider removing del_chrom_prefix.",)fromexp
[docs]defget_annotation_pipeline_config(self,)->RawPipelineConfig:"""Return the annotation pipeline configuration."""gpf_instance=self.get_gpf_instance()if"annotation"notinself.import_config:# build default annotation pipeline as described in the gpfreturnconstruct_import_annotation_pipeline_config(gpf_instance)annotation_config=self.import_config["annotation"]if"file"inannotation_config:# pipeline in external fileassertself._base_config_dirisnotNoneannotation_config_file=fs_utils.join(self._base_config_dir,annotation_config["file"],)returnconstruct_import_annotation_pipeline_config(gpf_instance,annotation_configfile=annotation_config_file,)returncast(list[dict],annotation_config)
def__str__(self)->str:returnf"Project({self.study_id})"def__getstate__(self)->dict[str,Any]:"""Return state of object used for pickling. The state is the default state but doesn't include the _gpf_instance as this property is transient. """gpf_instance=self.get_gpf_instance()state=self.__dict__.copy()delstate["_gpf_instance"]state["_gpf_dae_config"]=gpf_instance.dae_configstate["_gpf_dae_config_path"]=gpf_instance.dae_config_pathstate["_gpf_dae_dir"]=gpf_instance.dae_dirreturnstatedef__setstate__(self,state:dict[str,Any])->None:"""Set state of object after unpickling."""self.__dict__.update(state)self._gpf_instance=GPFInstance(state["_gpf_dae_config"],state["_gpf_dae_dir"],state["_gpf_dae_config_path"],)
[docs]classImportConfigNormalizer:"""Class to normalize import configs. Most of the normalization is done by Cerberus but it fails short in a few cases. This class picks up the slack. It also reads external files and embeds them in the final configuration dict. """
[docs]defnormalize(self,import_config:dict,base_input_dir:str)->tuple[dict[str,Any],str,list[str]]:"""Normalize the import config."""config=deepcopy(import_config)base_input_dir,external_files=self._load_external_files(config,base_input_dir,)self._map_for_key(config,"region_length",self._int_shorthand)self._map_for_key(config,"chromosomes",self._normalize_chrom_list)if"parquet_row_group_size"inconfig.get("processing_config",{}):group_size_config= \
config["processing_config"]["parquet_row_group_size"]ifgroup_size_configisNone:delconfig["processing_config"]["parquet_row_group_size"]else:config["processing_config"]["parquet_row_group_size"]= \
self._int_shorthand(group_size_config)returnconfig,base_input_dir,external_files
@classmethoddef_load_external_files(cls,config:dict,base_input_dir:str)->tuple[str,list[str]]:external_files:list[str]=[]base_input_dir=cls._load_external_file(config,"input",base_input_dir,embedded_input_schema,external_files,)if"file"inconfig.get("annotation",{}):# don't load the config just add it to the list of external filesexternal_files.append(config["annotation"]["file"])returnbase_input_dir,external_files@staticmethoddef_load_external_file(config:dict,section_key:str,base_input_dir:str,schema:dict,external_files:list[str])->str:ifsection_keynotinconfig:returnbase_input_dirsub_config=config[section_key]while"file"insub_config:external_fn=fs_utils.join(base_input_dir,sub_config["file"])external_files.append(external_fn)sub_config=GPFConfigParser.parse_and_interpolate_file(external_fn,)sub_config=GPFConfigParser.validate_config(sub_config,schema,)base_input_dir=fs_utils.containing_path(external_fn)config[section_key]=sub_configreturnbase_input_dir@classmethoddef_map_for_key(cls,config:dict[str,Any],key:str,func:Callable[[Any],Any])->None:fork,valinconfig.items():ifk==key:config[k]=func(val)elifisinstance(val,dict):cls._map_for_key(val,key,func)@staticmethoddef_int_shorthand(obj:str|int)->int:ifisinstance(obj,int):returnobjassertisinstance(obj,str)val=obj.strip()unit_suffixes={"K":1_000,"M":1_000_000,"G":1_000_000_000,}ifval[-1].upper()notinunit_suffixes:returnint(val)returnint(val[:-1])*unit_suffixes[val[-1].upper()]@classmethoddef_normalize_chrom_list(cls,obj:str|list[str])->list[str]:ifisinstance(obj,list):returncls._expand_chromosomes(obj)assertisinstance(obj,str)chrom_list=list(map(str.strip,obj.split(",")),)returncls._expand_chromosomes(chrom_list)@staticmethoddef_expand_chromosomes(chromosomes:list[str])->list[str]:ifchromosomesisNone:returnNoneres:list[str]=[]forchrominchromosomes:ifchromin{"autosomes","autosomesXY"}:foriinrange(1,23):res.extend((f"{i}",f"chr{i}"))ifchrom=="autosomesXY":forjin["X","Y"]:res.extend((f"{j}",f"chr{j}"))else:res.append(chrom)returnres
[docs]classImportStorage(ABC):"""Defines abstract base class for import storages."""
[docs]@abstractmethoddefgenerate_import_task_graph(self,project:ImportProject)->TaskGraph:"""Generate task grap for import of the project into this storage."""
[docs]defget_import_storage_factory(storage_type:str)->Callable[[],ImportStorage]:"""Find and return a factory function for creation of a storage type."""_load_import_storage_factory_plugins()ifstorage_typenotin_REGISTERED_IMPORT_STORAGE_FACTORIES:raiseValueError(f"unsupported import storage type: {storage_type}")return_REGISTERED_IMPORT_STORAGE_FACTORIES[storage_type]
[docs]defsave_study_config(dae_config:Box,study_id:str,study_config:str,*,force:bool=False)->None:"""Save the study config to a file."""dirname=os.path.join(dae_config.studies.dir,study_id)filename=os.path.join(dirname,f"{study_id}.yaml")ifos.path.exists(filename):logger.info("configuration file already exists: %s",filename)bak_name=os.path.basename(filename)+"."+str(time.time_ns())bak_path=os.path.join(os.path.dirname(filename),bak_name)ifnotforce:logger.info("skipping overwring the old config file... ""storing new config in %s",bak_path)pathlib.Path(bak_path).write_text(study_config)returnlogger.info("Backing up configuration for %s in %s",study_id,bak_path)os.rename(filename,bak_path)os.makedirs(dirname,exist_ok=True)pathlib.Path(filename).write_text(study_config)
[docs]defconstruct_import_annotation_pipeline_config(gpf_instance:GPFInstance,annotation_configfile:str|None=None,)->RawPipelineConfig:"""Construct annotation pipeline config for importing data."""ifannotation_configfileisnotNone:assertos.path.exists(annotation_configfile),annotation_configfilewithopen(annotation_configfile,"rt",encoding="utf8")asinfile:returncast(list[dict],yaml.safe_load(infile.read()))returngpf_instance.get_annotation_pipeline_config()
[docs]defconstruct_import_annotation_pipeline(gpf_instance:GPFInstance,annotation_configfile:str|None=None,work_dir:pathlib.Path|None=None,)->AnnotationPipeline:"""Construct annotation pipeline for importing data."""pipeline_config=construct_import_annotation_pipeline_config(gpf_instance,annotation_configfile)grr=gpf_instance.grrreturnbuild_annotation_pipeline(pipeline_config,grr,work_dir=work_dir)