[docs]classRawVariantsQueryRunner(QueryRunner):"""Run a variant iterator as a query."""def__init__(self,variants_iterator:Iterator[Any],deserializer:Callable|None=None,)->None:super().__init__(deserializer=deserializer)self.variants_iterator=variants_iteratorassertself.variants_iteratorisnotNone
[docs]classRawFamilyVariants(abc.ABC):"""Base class that stores a reference to the families data."""def__init__(self,families:FamiliesData)->None:self.families=families
[docs]defsummary_variants_iterator(self,**kwargs:Any)->Iterator[SummaryVariant]:"""Create a generator to iterate over summary variants."""forsv,fvsinself.full_variants_iterator(**kwargs):seen_in_status=sv.allele_count*[0]seen_as_denovo=sv.allele_count*[False]family_variants_count=sv.allele_count*[0]forfvinfvs:foraainfv.alt_alleles:fa=cast(FamilyAllele,aa)seen_in_status[fa.allele_index]=reduce(lambdat,s:t|s.value,filter(None,fa.allele_in_statuses),seen_in_status[fa.allele_index])seen_as_denovo[fa.allele_index]=reduce(lambdat,s:tor(s==Inheritance.denovo),filter(None,fa.inheritance_in_members),seen_as_denovo[fa.allele_index])family_variants_count[fa.allele_index]+=1sv.update_attributes({"seen_in_status":seen_in_status[1:],"seen_as_denovo":seen_as_denovo[1:],"family_variants_count":family_variants_count[1:],"family_alleles_count":family_variants_count[1:],})yieldsv
[docs]@staticmethoddeffilter_regions(v:SummaryVariant,regions:list[Region],)->bool:"""Return True if v is in regions."""pos=v.positionassertposisnotNoneend_pos=v.end_positionifv.end_positionisnotNoneelsev.positionforreginregions:ifreg.chrom!=v.chromosome:continueifreg.startisNoneandreg.stopisNone:returnTrueifreg.startisNone:assertreg.stopisnotNoneifpos<=reg.stop:returnTrueifreg.stopisNone:assertreg.startisnotNoneifend_pos>=reg.start:returnTrueifreg.startisnotNoneandreg.stopisnotNone:assertreg.start<=reg.stopif(reg.start<=pos<=reg.stoporreg.start<=end_pos<=reg.stopor(reg.start>=posandreg.stop<=end_pos)):returnTruereturnFalse
[docs]@staticmethoddeffilter_real_attr(allele:SummaryAllele,real_attr_filter:RealAttrFilterType,*,is_frequency:bool=False,)->bool:# pylint: disable=unused-argument"""Return True if allele's attrs are within bounds. The bounds are specified in real_attr_filter. """result=[]forkey,rangesinreal_attr_filter:ifnotallele.has_attribute(key):returnFalseval=allele.get_attribute(key)rmin,rmax=rangesifrminisNoneandrmaxisNone:ifis_frequencyorvalisnotNone:result.append(True)elifrminisNone:ifis_frequencyandvalisNone:result.append(True)elifvalisnotNone:result.append(val<=rmax)elifrmaxisNone:result.append(valisnotNoneandval>=rmin)else:result.append(valisnotNoneand(rmin<=val<=rmax))returnall(result)
[docs]@staticmethoddeffilter_gene_effects(v:SummaryAllele,effect_types:Sequence[str]|None,genes:Sequence[str]|None,)->bool:"""Return True if variant's effects are in effect types and genes."""asserteffect_typesisnotNoneorgenesisnotNoneifv.effectsisNone:returnFalsegene_effects=v.effects.genesifeffect_typesisNone:assertgenesisnotNoneresult=[geforgeingene_effectsifge.symbolingenes]ifresult:v.matched_gene_effects=resultreturnTrueelifgenesisNone:result=[geforgeingene_effectsifge.effectineffect_types]ifresult:v.matched_gene_effects=resultreturnTrueelse:result=[geforgeingene_effectsifge.effectineffect_typesandge.symbolingenes]ifresult:v.matched_gene_effects=resultreturnTruereturnFalse
[docs]@classmethoddeffilter_allele(# noqa: C901cls,allele:FamilyAllele,*,genes:list[str]|None=None,effect_types:list[str]|None=None,person_ids:Sequence[str]|None=None,inheritance:list[Matcher]|None=None,roles:Matcher|None=None,sexes:Matcher|None=None,affected_statuses:Matcher|None=None,variant_type:Matcher|None=None,real_attr_filter:RealAttrFilterType|None=None,ultra_rare:bool|None=None,frequency_filter:RealAttrFilterType|None=None,return_reference:bool|None=None,return_unknown:bool|None=None,**kwargs:Any,# noqa: ARG003)->bool:# pylint: disable=too-many-arguments,too-many-return-statements# pylint: disable=too-many-branches,unused-argument"""Return True if a family allele meets the required conditions."""assertisinstance(allele,FamilyAllele)ifinheritanceisnotNone:formatcherininheritance:allele_inheritance=0forinhinallele.inheritance_in_members:allele_inheritance|=inh.valueifnotmatcher(allele_inheritance):returnFalseifreal_attr_filterisnotNoneand \
notcls.filter_real_attr(allele,real_attr_filter):returnFalseiffrequency_filterisnotNoneand \
notcls.filter_real_attr(allele,frequency_filter,is_frequency=True):returnFalseifultra_rareandnotcls.filter_real_attr(allele,[("af_allele_count",(None,1))],is_frequency=True):returnFalseifgenesisNoneandeffect_typesisNone:allele.matched_gene_effects=[]elifnotcls.filter_gene_effects(allele,effect_types,genes):returnFalseifvariant_typeisnotNone:returnvariant_type(allele.allele_type.value)if(notreturn_referenceandnotreturn_unknownandallele.is_reference_allele):returnFalseifperson_idsisnotNoneand \
notset(allele.variant_in_members)&set(person_ids):returnFalseifrolesisnotNone:allele_roles=0forroleinallele.allele_in_roles:ifroleisNone:continueallele_roles|=role.valueifnotroles(allele_roles):returnFalseifsexesisnotNone:allele_sexes=0forsexinallele.allele_in_sexes:ifsexisNone:continueallele_sexes|=sex.valueifnotsexes(allele_sexes):returnFalseifaffected_statusesisnotNone:allele_statuses=0forstatusinallele.allele_in_statuses:ifstatusisNone:continueallele_statuses|=status.valueifnotaffected_statuses(allele_statuses):returnFalsereturnTrue
[docs]@classmethoddeffilter_summary_allele(cls,allele:SummaryAllele,*,genes:list[str]|None=None,effect_types:list[str]|None=None,variant_type:Matcher|None=None,real_attr_filter:RealAttrFilterType|None=None,ultra_rare:bool|None=None,frequency_filter:RealAttrFilterType|None=None,**kwargs:Any,# noqa: ARG003)->bool:# pylint: disable=too-many-return-statements,too-many-branches# pylint: disable=unused-argument"""Return True if a summary allele meets the required conditions."""assertisinstance(allele,SummaryAllele)ifreal_attr_filterisnotNoneand \
notcls.filter_real_attr(allele,real_attr_filter):returnFalseiffrequency_filterisnotNoneand \
notcls.filter_real_attr(allele,frequency_filter,is_frequency=True):returnFalseifultra_rareandnotcls.filter_real_attr(allele,[("af_allele_count",(0,1))]):returnFalseif(genesisnotNoneoreffect_typesisnotNone)and \
notcls.filter_gene_effects(allele,effect_types,genes):returnFalseifvariant_typeisnotNone:returnvariant_type(allele.allele_type.value)returnTrue
[docs]@classmethoddeffilter_family_variant(cls,v:FamilyVariant,**kwargs:Any,)->bool:"""Return true if variant meets conditions in kwargs."""ifkwargs.get("regions")isnotNoneand \
notcls.filter_regions(v,kwargs["regions"]):returnFalsefamily_ids=kwargs.get("family_ids")iffamily_idsisnotNoneandv.family_idnotinfamily_ids:returnFalseif"filter"inkwargs:func=kwargs["filter"]ifnotfunc(v):returnFalsereturnTrue
[docs]@classmethoddeffilter_summary_variant(cls,v:SummaryVariant,**kwargs:Any,)->bool:"""Return true if variant meets conditions in kwargs."""ifkwargs.get("regions")isnotNoneand \
notcls.filter_regions(v,kwargs["regions"]):returnFalseif"filter"inkwargs:func=kwargs["filter"]ifnotfunc(v):returnFalsereturnTrue
[docs]@classmethoddefsummary_variant_filter_function(cls,**kwargs:Any,)->Callable[[SummaryVariant],SummaryVariant|None]:"""Return a filter function that checks the conditions in kwargs."""return_reference=kwargs.get("return_reference",False)seen=kwargs.get("seen",set())deffilter_func(sv:SummaryVariant)->SummaryVariant|None:ifsvisNone:returnNoneifsv.svuidinseen:returnNoneseen.add(sv.svuid)ifnotcls.filter_summary_variant(sv,**kwargs):returnNoneoriginal_variant_type=Nonevariant_type_matcher=Noneifkwargs.get("variant_type")isnotNone:original_variant_type=kwargs["variant_type"]variant_type_matcher=transform_attribute_query_to_function(Allele.Type,original_variant_type,Allele.TYPE_DISPLAY_NAME,)kwargs["variant_type"]=variant_type_matcheralleles=sv.allelesalleles_matched=[]foralleleinalleles:ifcls.filter_summary_allele(allele,**kwargs):ifallele.allele_index==0andnotreturn_reference:continuealleles_matched.append(allele.allele_index)kwargs["variant_type"]=original_variant_typeifnotalleles_matched:returnNonesv.set_matched_alleles(alleles_matched)returnsvreturnfilter_func
[docs]defbuild_summary_variants_query_runner(self,**kwargs:Any,)->RawVariantsQueryRunner:"""Return a query runner for the summary variants."""filter_func=RawFamilyVariants\
.summary_variant_filter_function(**kwargs)returnRawVariantsQueryRunner(variants_iterator=self.summary_variants_iterator(),deserializer=filter_func)
[docs]defquery_summary_variants(self,**kwargs:Any,)->Iterator[SummaryVariant]:"""Run a sammary variant query and yields the results."""runner=self.build_summary_variants_query_runner(**kwargs)result=QueryResult(runners=[runner],limit=kwargs.get("limit",-1),)try:logger.debug("starting result")result.start()seen=set()withclosing(result)asresult:forsvinresult:ifsvisNone:continueifsv.svuidinseen:continueseen.add(sv.svuid)yieldsvfinally:pass
[docs]@classmethoddeffamily_variant_filter_function(cls,*,regions:list[Region]|None=None,genes:list[str]|None=None,effect_types:list[str]|None=None,family_ids:Sequence[str]|None=None,person_ids:Sequence[str]|None=None,inheritance:list[str]|str|None=None,roles:str|None=None,sexes:str|None=None,affected_statuses:str|None=None,variant_type:str|None=None,real_attr_filter:RealAttrFilterType|None=None,ultra_rare:bool|None=None,frequency_filter:RealAttrFilterType|None=None,return_reference:bool|None=None,return_unknown:bool|None=None,)->Callable[[FamilyVariant],FamilyVariant|None]:"""Return a function that filters variants."""ifreturn_referenceisNone:return_reference=Falseifreturn_unknownisNone:return_unknown=Falseseen=set()inheritance_matchers=NoneifinheritanceisnotNone:ifnotisinstance(inheritance,list):inheritance=[inheritance]inheritance_matchers=[transform_attribute_query_to_function(Inheritance,query)forqueryininheritance]variant_type_matcher=Noneifvariant_typeisnotNone:variant_type_matcher=transform_attribute_query_to_function(Allele.Type,variant_type,Allele.TYPE_DISPLAY_NAME,)roles_matcher=NoneifrolesisnotNone:roles_matcher=transform_attribute_query_to_function(Role,roles)sexes_matcher=NoneifsexesisnotNone:sexes_matcher=transform_attribute_query_to_function(Sex,sexes,Sex.aliases())statuses_matcher=Noneifaffected_statusesisnotNone:statuses_matcher=transform_attribute_query_to_function(Status,affected_statuses)deffilter_func(v:FamilyVariant)->FamilyVariant|None:try:ifvisNoneorv.fvuidinseen:returnNoneseen.add(v.fvuid)ifv.is_unknown()andnotreturn_unknown:logger.error("unknown variants selected but not requested %s",v)returnNoneifnotcls.filter_family_variant(v,regions=regions,family_ids=family_ids,):logger.info("family variants selected but not requested %s",v)returnNonealleles=v.allelesalleles_matched=[]foralleleinalleles:ifallele.allele_index==0andnotreturn_reference:continuefa=cast(FamilyAllele,allele)ifcls.filter_allele(fa,genes=genes,effect_types=effect_types,person_ids=person_ids,inheritance=inheritance_matchers,roles=roles_matcher,sexes=sexes_matcher,affected_statuses=statuses_matcher,variant_type=variant_type_matcher,real_attr_filter=real_attr_filter,ultra_rare=ultra_rare,frequency_filter=frequency_filter,return_reference=return_reference,return_unknown=return_unknown,):alleles_matched.append(allele.allele_index)ifalleles_matched:v.set_matched_alleles(alleles_matched)returnvlogger.info("no matched alleles for family variant: %s",v)exceptExceptionasex:# pylint: disable=broad-except # noqa: BLE001logger.warning("unexpected error: %s; %s",ex,v,exc_info=True)returnNonereturnNonereturnfilter_func
[docs]defbuild_family_variants_query_runner(self,*,regions:list[Region]|None=None,genes:list[str]|None=None,effect_types:list[str]|None=None,family_ids:Sequence[str]|None=None,person_ids:Sequence[str]|None=None,inheritance:list[str]|None=None,roles_in_parent:str|None=None,roles_in_child:str|None=None,roles:str|None=None,sexes:str|None=None,affected_statuses:str|None=None,variant_type:str|None=None,real_attr_filter:RealAttrFilterType|None=None,ultra_rare:bool|None=None,frequency_filter:RealAttrFilterType|None=None,return_reference:bool|None=None,return_unknown:bool|None=None,tags_query:TagsQuery|None=None,**kwargs:Any,# noqa: ARG002)->RawVariantsQueryRunner:# pylint: disable=too-many-arguments,unused-argument"""Return a query runner for the family variants."""# In memory variants does not inherit QueryVariantsBase,# but is suitable for the tags to family IDs utility.roles=QueryVariantsBase.transform_roles_to_single_role_string(roles_in_parent,roles_in_child,roles,)tag_family_ids=QueryVariantsBase.tags_to_family_ids(self,tags_query,# type: ignore)iftag_family_idsisnotNone:iffamily_idsisnotNone:family_ids=list(set(family_ids).intersection(tag_family_ids))else:family_ids=list(tag_family_ids)filter_func=RawFamilyVariants.family_variant_filter_function(regions=regions,genes=genes,effect_types=effect_types,family_ids=family_ids,person_ids=person_ids,inheritance=inheritance,roles=roles,sexes=sexes,affected_statuses=affected_statuses,variant_type=variant_type,real_attr_filter=real_attr_filter,ultra_rare=ultra_rare,frequency_filter=frequency_filter,return_reference=return_reference,return_unknown=return_unknown)returnRawVariantsQueryRunner(variants_iterator=self.family_variants_iterator(),deserializer=filter_func)
[docs]defquery_variants(self,**kwargs:Any)->Iterator[FamilyVariant]:"""Query family variants and yield the results."""runner=self.build_family_variants_query_runner(**kwargs)result=QueryResult(runners=[runner],limit=kwargs.get("limit",-1),)try:logger.debug("starting result")result.start()seen=set()withclosing(result)asresult:forvinresult:ifvisNone:continueifv.fvuidinseen:continueseen.add(v.fvuid)yieldvfinally:pass
[docs]classRawMemoryVariants(RawFamilyVariants):"""Store variants in memory."""def__init__(self,full_variants:list[tuple[SummaryVariant,list[FamilyVariant]]],families:FamiliesData,)->None:super().__init__(families)self._full_variants=full_variants