[docs]deffilter_genes(self,filter_fun:Callable[[list[str]],list[str]],)->None:"""Filter the genes."""keep_gs=filter_fun(list(self.g2t.keys()))self.g2t={g:tsforg,tsinlist(self.g2t.items())ifginkeep_gs}self.t2g=defaultdict(dd)forg,tsinlist(self.g2t.items()):fort,ninlist(ts.items()):self.t2g[t][g]=nfortinset(self.t_desc)-set(self.t2g):delself.t_desc[t]
[docs]defsave(self,fname:str)->None:"""Save to `fname`."""iffname.endswith("-map.txt"):map_fname=fnamedsc_fname=fname[:-4]+"names.txt"else:map_fname=fname+"-map.txt"dsc_fname=fname+"-mapnames.txt"withopen(map_fname,"wt")asoutfile:outfile.write("#geneNS\t"+str(self.gene_ns)+"\n")forginsorted(self.g2t):ts=[]fort,tninsorted(self.g2t[g].items()):ts+=[t]*tnoutfile.write(g+"\t"+" ".join(ts)+"\n")pathlib.Path(dsc_fname).write_text("\n".join([t+"\t"+dscfort,dscinsorted(self.t_desc.items())],)+"\n",)
[docs]defread_ewa_set_file(set_files:list[IO])->GeneTerms:"""Read a set of ewa files."""r=GeneTerms()r.gene_ns="sym"forfinset_files:setname=""whilesetname=="":setname=f.readline().strip()line=f.readline()r.t_desc[setname]=line.strip()forlineinf:gene_sym=line.strip()r.t2g[setname][gene_sym]+=1r.g2t[gene_sym][setname]+=1f.close()returnr
[docs]defread_gmt_file(input_file:IO)->GeneTerms:"""Read a gmt file."""r=GeneTerms()r.gene_ns="sym"forlnininput_file:line=ln.strip().split()t=line[0]r.t_desc[t]=line[1]forgsinline[2:]:r.t2g[t][gs]+=1r.g2t[gs][t]+=1input_file.close()returnr
[docs]defread_mapping_file(input_file:IO,names_file:IO|None)->GeneTerms:"""Read a mapping file."""r=GeneTerms()r.gene_ns="id"forlnininput_file:line=ln.strip().split()ifline[0]=="#geneNS":r.gene_ns=line[1]continuegene_id=line[0]delline[0]fortinline:r.t2g[t][gene_id]+=1r.g2t[gene_id][t]+=1input_file.close()ifnames_fileisnotNone:try:forlineinnames_file:(t,desc)=line.strip().split("\t",1)iftinr.t2g:r.t_desc[t]=descexceptOSError:passnames_file.close()fortinset(r.t2g)-set(r.t_desc):r.t_desc[t]=""returnr
def_add_gene_ns_token(ns_tokens:dict[str,dict[str,list[GeneInfo]]],ns:str,token:str,gi:GeneInfo,)->None:ifnsnotinns_tokens:ns_tokens[ns]={}tokens=ns_tokens[ns]iftokennotintokens:tokens[token]=[]tokens[token].append(gi)def_parse_ncbi_gene_info(gene_info_file:str,)->tuple[dict[str,GeneInfo],dict[str,dict[str,list[GeneInfo]]]]:genes={}ns_tokens:dict[str,dict[str,list[GeneInfo]]]={}withopen(gene_info_file)asf:forlineinf:ifline[0]=="#":# skipping commentscontinuecs=line.strip().split("\t")iflen(cs)!=15:raiseValueError(f"Unexpected line in the {gene_info_file}",)# Format: tax_id GeneID Symbol LocusTag Synonyms dbXrefs# chromosome map_location description# type_of_gene Symbol_from_nomenclature_authority# Full_name_from_nomenclature_authority Nomenclature_status# Other_designations Modification_date# (tab is used as a separator, pound sign - start of a comment)(_tax_id,gene_id,gene_sym,_locus_tag,synonyms,_db_xrefs,_chromosome,_map_location,description,_type_of_gene,_symbol_from_nomenclature_authority,_full_name_from_nomenclature_authority,_nomenclature_status,_other_designations,_modification_date,)=csgi=GeneInfo(gene_id=gene_id,gene_sym=gene_sym,synonyms=set(synonyms.split("|"))-{"-"},description=description,)ifgi.gene_idingenes:raiseValueError(f"The gene {gi.gene_id} is repeated in {gene_info_file}")genes[gi.gene_id]=gi_add_gene_ns_token(ns_tokens,"id",gi.gene_id,gi)_add_gene_ns_token(ns_tokens,"sym",gi.gene_sym,gi)forsymingi.synonyms:_add_gene_ns_token(ns_tokens,"syns",sym,gi)returngenes,ns_tokens
[docs]defget_clean_gene_id(ncbi_gene_info:NCBIGeneInfo,ns:str,term:str,)->str|None:"""Gene gene ID from NCBI gene info data."""ns_tokens=ncbi_gene_info.ns_tokensifnsnotinns_tokens:returnNoneall_tokens=ns_tokens[ns]iftermnotinall_tokens:returnNoneiflen(all_tokens[term])!=1:logger.info("multiple tokens for term %s in name space %s",term,ns)returnNonereturnall_tokens[term][0].gene_id
[docs]defrename_gene_terms(gene_terms:GeneTerms,gene_ns:str,ncbi_gene_info:NCBIGeneInfo,)->GeneTerms:"""Rename gene terms using NCBI gene info data."""assert{gene_terms.gene_ns,gene_ns}<={"id","sym"},(f"The provided namespaces {gene_terms.gene_ns}, "f"{gene_ns} must be either 'id' or 'sym'")result=copy.deepcopy(gene_terms)ifresult.gene_ns==gene_ns:returnresultifresult.gene_ns=="id"andgene_ns=="sym":defrename_fn(x:str)->str|None:genes=ncbi_gene_info.genesifxingenes:returngenes[x].gene_symreturnNoneresult.rename_genes("sym",rename_fn)returnresultifresult.gene_ns=="sym"andgene_ns=="id":result.rename_genes("id",lambdax:get_clean_gene_id(ncbi_gene_info,"sym",x),)returnresult
[docs]defload_gene_terms(path:str)->GeneTerms|None:"""Load gene terms from a file."""ifpath.endswith("-map.txt"):names_file=path[:-4]+"names.txt"ifnotpathlib.Path(names_file).exists():withopen(path)asmapfile:returnread_mapping_file(mapfile,None)else:withopen(path)asmapfile,open(names_file)asnamesfile:returnread_mapping_file(mapfile,namesfile)ifpath.endswith(".gmt"):withopen(path)asgmtfile:returnread_gmt_file(gmtfile)# pylint: disable=consider-using-withinfiles=[cast(IO,open(f,"rt"))# noqa: SIM115forfinglob.glob(os.path.join(path,"*.txt"))]returnread_ewa_set_file(infiles)ifinfileselseNone