[docs]defpedigree_parquet_schema()->pa.Schema:"""Return the schema for pedigree parquet file."""fields:list[pa.Field]=[pa.field("family_id",pa.string()),pa.field("person_id",pa.string()),pa.field("dad_id",pa.string()),pa.field("mom_id",pa.string()),pa.field("sex",pa.int8()),pa.field("status",pa.int8()),pa.field("role",pa.int32()),pa.field("sample_id",pa.string()),pa.field("generated",pa.bool_()),pa.field("layout",pa.string()),pa.field("not_sequenced",pa.bool_()),pa.field("member_index",pa.int32()),]returnpa.schema(fields)
[docs]defcollect_pedigree_parquet_schema(ped_df:pd.DataFrame)->pa.Schema:"""Build the pedigree parquet schema."""pps=pedigree_parquet_schema()_ped_df,pps=add_missing_parquet_fields(pps,ped_df)returnpps
[docs]deffill_family_bins(families:FamiliesData,partition_descriptor:PartitionDescriptor|None=None,)->None:"""Save families data into a parquet file."""ifpartition_descriptorisnotNone \
andpartition_descriptor.has_family_bins():forfamilyinfamilies.values():family_bin=partition_descriptor.make_family_bin(family.family_id)forpersoninfamily.persons.values():person.set_attr("family_bin",family_bin)families._ped_df=None# noqa: SLF001 pylint: disable=W0212
[docs]defsave_ped_df_to_parquet(ped_df:pd.DataFrame,filename:str,parquet_version:str|None=None,)->None:"""Save ped_df as a parquet file named filename."""ped_df=ped_df.copy()ped_df.role=ped_df.role.apply(Role.to_value)ped_df.sex=ped_df.sex.apply(Sex.to_value)ped_df.status=ped_df.status.apply(Status.to_value)if"generated"notinped_df:ped_df["generated"]=Falseif"layout"notinped_df:ped_df["layout"]=Noneif"not_sequenced"notinped_df:ped_df["not_sequenced"]=Falsepps=pedigree_parquet_schema()ped_df,pps=add_missing_parquet_fields(pps,ped_df)table=pa.Table.from_pandas(ped_df,schema=pps)filesystem,filename=url_to_pyarrow_fs(filename)pq.write_table(table,filename,filesystem=filesystem,version=parquet_version,# type: ignore)
[docs]defmerge_variants_parquets(partition_descriptor:PartitionDescriptor,variants_dir:str,partition:list[tuple[str,str]],row_group_size:int=50_000,parquet_version:str|None=None,)->None:"""Merge parquet files in variants_dir."""output_parquet_file=fs_utils.join(variants_dir,partition_descriptor.partition_filename("merged",partition,bucket_index=None,),)parquet_files=sorted(fs_utils.glob(fs_utils.join(variants_dir,"*.parquet"),))is_output_in_input= \
any(fn.endswith(output_parquet_file)forfninparquet_files)ifis_output_in_input:# a leftover file from a previous run. Remove from list of files.# we use endswith instead of == because of path normalizationfori,filenameinenumerate(parquet_files):iffilename.endswith(output_parquet_file):parquet_files.pop(i)breakiflen(parquet_files)>0:logger.info("Merging %d files in %s",len(parquet_files),variants_dir,)parquet_helpers.merge_parquets(parquet_files,output_parquet_file,row_group_size=row_group_size,parquet_version=parquet_version)
[docs]defappend_meta_to_parquet(meta_filename:str,key:list[str],value:list[str],)->None:"""Append key-value pair to meta data parquet file."""dirname=os.path.dirname(meta_filename)ifdirnameandnotos.path.exists(dirname):os.makedirs(dirname,exist_ok=True)append_table=pa.table({"key":key,"value":value,},schema=pa.schema({"key":pa.string(),"value":pa.string()}),)ifnotos.path.isfile(meta_filename):pq.write_table(append_table,meta_filename)returnmeta_table=pq.read_table(meta_filename)meta_table=pa.concat_tables([meta_table,append_table])pq.write_table(meta_table,meta_filename)
[docs]defserialize_summary_schema(annotation_attributes:list[AttributeInfo],partition_descriptor:PartitionDescriptor,)->str:"""Serialize the summary schema."""summary_schema=build_summary_schema(annotation_attributes,)schema=[(f.name,f.type)forfinsummary_schema]schema.extend(list(partition_descriptor.summary_partition_schema()))return"\n".join([f"{n}|{t}"forn,tinschema])