Source code for dae.variants_loaders.vcf.loader

"""Defines variant loader classed for VCF variants."""
from __future__ import annotations

import argparse
import itertools
import logging
from collections import Counter
from collections.abc import Callable, Generator, Iterator, Sequence
from typing import Any, cast
from urllib.parse import urlparse

import numpy as np
import pysam
from fsspec.core import url_to_fs

from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Family
from dae.utils import fs_utils
from dae.utils.helpers import str2bool
from dae.utils.regions import Region
from dae.variants.attributes import Inheritance
from dae.variants.family_variant import FamilyAllele, FamilyVariant
from dae.variants.variant import SummaryVariant, SummaryVariantFactory
from dae.variants_loaders.raw.loader import (
    CLIArgument,
    FamiliesGenotypes,
    TransmissionType,
    VariantsGenotypesLoader,
)

logger = logging.getLogger(__name__)


[docs] def vcffile_chromosomes(filename: str) -> list[str]: """Return list of all chromosomes from VCF file.""" seqnames = [] with pysam.VariantFile(filename) as vcf: seqnames = [str(c) for c in vcf.header.contigs] tabix_index_filename = fs_utils.tabix_index_filename(filename) if tabix_index_filename is None: return seqnames try: # pylint: disable=no-member assert tabix_index_filename is not None tabix_index_filename = fs_utils.sign(tabix_index_filename) with pysam.Tabixfile( fs_utils.sign(filename), index=tabix_index_filename, ) as tbx: return list(tbx.contigs) except Exception: # noqa: BLE001 return seqnames
[docs] class VcfFamiliesGenotypes(FamiliesGenotypes): """Class for family genotypes build vrom VCF variant.""" def __init__( self, loader: SingleVcfLoader, all_genotypes: dict[str, tuple[int, ...]], ): super().__init__() self.loader = loader self.all_genotypes = all_genotypes self.known_independent_genotypes: list[tuple[int, int]] = [] def _collect_family_genotype( self, family: Family, fill_value: int, ) -> tuple[ list[tuple[int, int]], list[tuple[int, int]], tuple[bool, bool, bool], ]: genotypes: list[tuple[int, int]] = [] independent_genotypes: list[tuple[int, int]] = [] all_reference = True all_unknown = True has_unknown = False for person in family.members_in_order: sg: tuple[int, int] = cast( tuple[int, int], self.all_genotypes.get(person.sample_id) or (fill_value, fill_value)) if len(sg) == 1: sg = (sg[0], -2) assert len(sg) == 2, ( family, person, sg) sg = ( sg[0] if sg[0] is not None else -1, sg[1] if sg[1] is not None else -1, ) if sg != (0, 0): all_reference = False if sg != (-1, -1): all_unknown = False if sg[0] == -1 or sg[1] == -1: has_unknown = True genotypes.append(sg) if person.person_id in self.loader.independent_persons: independent_genotypes.append(sg) return ( genotypes, independent_genotypes, (all_reference, all_unknown, has_unknown))
[docs] def family_genotype_iterator( self, ) -> Generator[ tuple[Family, np.ndarray, np.ndarray | None], None, None]: self.known_independent_genotypes = [] # pylint: disable=protected-access fill_value = self.loader._fill_missing_value # noqa: SLF001 for family in self.loader.families.values(): family_genotype, independent_genotypes, gt_type = \ self._collect_family_genotype(family, fill_value) if len(family_genotype) == 0: continue genotype = np.array(family_genotype, np.int64) genotype = genotype.T assert len(genotype.shape) == 2, (genotype, family) assert genotype.shape[0] == 2 all_reference, all_unknown, has_unknown = gt_type if has_unknown: if not self.loader.include_unknown_person_genotypes: continue else: self.known_independent_genotypes.extend(independent_genotypes) if all_unknown and \ not self.loader.include_unknown_family_genotypes: continue if all_reference and \ not self.loader.include_reference_genotypes: continue yield family, genotype, None
[docs] class SingleVcfLoader(VariantsGenotypesLoader): """Defines a variant loader from single VCF file.""" # pylint: disable=too-many-instance-attributes def __init__( self, families: FamiliesData, vcf_files: list[str], genome: ReferenceGenome, regions: list[Region] | None = None, params: dict[str, Any] | None = None, **_kwargs: Any): params = params or {} super().__init__( families=families, filenames=vcf_files, transmission_type=TransmissionType.transmitted, genome=genome, regions=regions, expect_genotype=True, expect_best_state=False, params=params) assert len(vcf_files) self.set_attribute("source_type", "vcf") fill_in_mode = params.get("vcf_multi_loader_fill_in_mode", "unknown") if fill_in_mode == "reference": self._fill_missing_value: int = 0 elif fill_in_mode == "unknown": self._fill_missing_value = -1 else: logger.warning( "unexpected `vcf_multi_loader_fill_in_mode` value%s; " "expected values are `reference` or `unknown`", fill_in_mode) self._fill_missing_value = -1 self.fixed_pedigree = params.get("vcf_pedigree_mode", "fixed") == \ "fixed" self._init_vcf_readers() self._match_pedigree_to_samples() self.independent_persons = { p.person_id for p in self.families.persons_without_parents() } self._init_chromosome_order() self._init_denovo_mode() self._init_omission_mode() self.include_reference_genotypes = str2bool( params.get("vcf_include_reference_genotypes", False)) self.include_unknown_family_genotypes = str2bool( params.get("vcf_include_unknown_family_genotypes", False)) self.include_unknown_person_genotypes = str2bool( params.get("vcf_include_unknown_person_genotypes", False)) def _init_denovo_mode(self) -> None: denovo_mode = self.params.get("vcf_denovo_mode", "possible_denovo") self._denovo_handler: Callable[[FamilyVariant], bool] if denovo_mode == "possible_denovo": self._denovo_handler = self._possible_denovo_mode_handler elif denovo_mode == "denovo": self._denovo_handler = self._denovo_mode_handler elif denovo_mode == "ignore": self._denovo_handler = self._ignore_denovo_mode_handler else: logger.warning( "unexpected denovo mode: %s; " "using possible_denovo", denovo_mode) self._denovo_handler = self._possible_denovo_mode_handler @staticmethod def _possible_denovo_mode_handler(family_variant: FamilyVariant) -> bool: for fa in family_variant.alleles: assert isinstance(fa, FamilyAllele) inheritance_in_members = fa.inheritance_in_members inheritance_in_members = [ inh if inh != Inheritance.denovo else Inheritance.possible_denovo for inh in inheritance_in_members ] # pylint: disable=protected-access fa._inheritance_in_members = inheritance_in_members # noqa: SLF001 return False @staticmethod def _ignore_denovo_mode_handler(family_variant: FamilyVariant) -> bool: for fallele in family_variant.alleles: assert isinstance(fallele, FamilyAllele) if Inheritance.denovo in fallele.inheritance_in_members: return True return False @staticmethod def _denovo_mode_handler(_family_vairant: FamilyVariant) -> bool: return False def _init_omission_mode(self) -> None: omission_mode = self.params.get( "vcf_omission_mode", "possible_omission", ) self._omission_handler: Callable[[FamilyVariant], bool] if omission_mode == "possible_omission": self._omission_handler = self._possible_omission_mode_handler elif omission_mode == "omission": self._omission_handler = self._omission_mode_handler elif omission_mode == "ignore": self._omission_handler = self._ignore_omission_mode_handler else: logger.warning( "unexpected omission mode: %s; " "using possible_omission", omission_mode) self._omission_handler = self._possible_omission_mode_handler @staticmethod def _possible_omission_mode_handler(family_variant: FamilyVariant) -> bool: for fa in family_variant.alleles: assert isinstance(fa, FamilyAllele) inheritance_in_members = fa.inheritance_in_members inheritance_in_members = [ inh if inh != Inheritance.omission else Inheritance.possible_omission for inh in inheritance_in_members ] # pylint: disable=protected-access fa._inheritance_in_members = inheritance_in_members # noqa: SLF001 return False @staticmethod def _ignore_omission_mode_handler(family_variant: FamilyVariant) -> bool: for fallele in family_variant.alleles: assert isinstance(fallele, FamilyAllele) if Inheritance.omission in fallele.inheritance_in_members: return True return False @staticmethod def _omission_mode_handler(_family_vairant: FamilyVariant) -> bool: return False
[docs] def close(self) -> None: for vcf in self.vcfs: vcf.close()
def _init_vcf_readers(self) -> None: self.vcfs: list[pysam.VariantFile] = [] logger.debug("SingleVcfLoader input files: %s", self.filenames) for file in self.filenames: # pylint: disable=no-member index_filename = fs_utils.tabix_index_filename(file) if index_filename is not None: index_filename = fs_utils.sign(index_filename, ) self.vcfs.append( pysam.VariantFile( fs_utils.sign(file), index_filename=index_filename), ) def _build_vcf_iterators( self, region: Region | None, ) -> list[Iterator[pysam.VariantRecord]]: if region is None: return [ vcf.fetch() for vcf in self.vcfs ] return [ vcf.fetch(region=self._unadjust_chrom_prefix(str(region))) for vcf in self.vcfs] def _init_chromosome_order(self) -> None: seqnames = list(self.vcfs[0].header.contigs) if not all( list(vcf.header.contigs) == seqnames for vcf in self.vcfs): logger.warning( "VCF files %s do not have the same list " "of contigs", self.filenames) chrom_order = { seq: ids for ids, seq in enumerate(seqnames) } self.chrom_order = chrom_order @property def chromosomes(self) -> list[str]: """Return list of all chromosomes from VCF file(s).""" assert len(self.vcfs) > 0 seqnames = [str(c) for c in self.vcfs[0].header.contigs] filename = self.filenames[0] tabix_index_filename = fs_utils.tabix_index_filename(filename) if tabix_index_filename is None: res = seqnames try: # pylint: disable=no-member index_filename = fs_utils.tabix_index_filename(filename) if index_filename is not None: index_filename = fs_utils.sign(index_filename) with pysam.Tabixfile( fs_utils.sign(filename), index=index_filename, ) as tbx: res = [str(c) for c in tbx.contigs] except Exception: # noqa: BLE001 res = seqnames return [self._adjust_chrom_prefix(chrom) for chrom in res] def _match_pedigree_to_samples(self) -> None: # pylint: disable=too-many-branches vcf_samples: set[str] = set() for vcf in self.vcfs: intersection = set(vcf_samples) & set(vcf.header.samples) if intersection: logger.warning( "vcf samples present in multiple batches: %s", intersection) vcf_samples.update(list(vcf.header.samples)) logger.info("vcf samples (all): %s", len(vcf_samples)) vcf_samples = set(vcf_samples) logger.info("vcf samples (set): %s", len(vcf_samples)) pedigree_samples = set(self.families.pedigree_samples()) logger.info("pedigree samples (all): %s", len(pedigree_samples)) missing_samples = vcf_samples.difference(pedigree_samples) if missing_samples: logger.info( "vcf samples not found in pedigree: %s; %s", len(missing_samples), missing_samples) vcf_samples = vcf_samples.difference(missing_samples) assert vcf_samples.issubset(pedigree_samples) logger.info("vcf samples (matched): %s", len(vcf_samples)) not_sequenced = set() counters: Counter = Counter() for person in self.families.persons.values(): if person.generated: counters["generated"] += 1 continue if person.sample_id in vcf_samples: continue if self.fixed_pedigree and not person.not_sequenced: counters["missing"] += 1 else: if not person.generated and not person.not_sequenced: not_sequenced.add(person.person_id) person.set_attr("not_sequenced", value=True) counters["not_sequenced"] += 1 logger.info( "person %s marked as " "'not_sequenced';", person.person_id) logger.info("people stats: %s", counters) if not_sequenced: logger.info( "persons changed to not_sequenced %s in %s", len(not_sequenced), self.filenames) self.families.redefine() def _compare_vcf_variants_gt( self, lhs: pysam.VariantRecord | None, rhs: pysam.VariantRecord | None, ) -> bool: """Compare two VCF variant positions. Returns true if left vcf variant position in file is larger than right vcf variant position in file. """ if lhs is None: return True if rhs is None: return False l_chrom_idx = self.chrom_order.get(lhs.chrom) r_chrom_idx = self.chrom_order.get(rhs.chrom) assert l_chrom_idx is not None assert r_chrom_idx is not None if l_chrom_idx > r_chrom_idx: return True return lhs.pos > rhs.pos @staticmethod def _compare_vcf_variants_eq( lhs: pysam.VariantRecord, rhs: pysam.VariantRecord | None, ) -> bool: """Compare two VCF variant positions. Returns true if left vcf variant position in file is equal to right vcf variant position in file """ assert lhs is not None if rhs is None: return False return lhs.chrom == rhs.chrom and lhs.pos == rhs.pos def _find_current_vcf_variant( self, vcf_variants: list[pysam.VariantRecord | None], ) -> pysam.VariantRecord | None: assert len(vcf_variants) min_index = 0 for index in range(1, len(vcf_variants)): if vcf_variants[index] is None: continue if self._compare_vcf_variants_gt( vcf_variants[min_index], vcf_variants[index]): min_index = index return vcf_variants[min_index] def _calc_allele_frequencies( self, summary_variant: SummaryVariant, known_independent_genotypes: np.ndarray, ) -> None: n_independent_parents = len(self.independent_persons) n_parents_called = 0 if len(known_independent_genotypes) > 0: n_parents_called = known_independent_genotypes.shape[1] ref_n_alleles = 0 ref_allele_freq = 0.0 for allele in summary_variant.alleles: allele_index = allele["allele_index"] n_alleles = np.sum(known_independent_genotypes == allele_index) allele_freq = 0.0 percent_parents_called = 0.0 if n_independent_parents > 0: percent_parents_called = ( 100.0 * n_parents_called ) / n_independent_parents if n_parents_called > 0: allele_freq = (100.0 * n_alleles) / (2.0 * n_parents_called) if allele_index == 0: ref_n_alleles = n_alleles ref_allele_freq = allele_freq freq = { "af_parents_called_count": int(n_parents_called), "af_parents_called_percent": float(percent_parents_called), "af_allele_count": int(n_alleles), "af_allele_freq": float(allele_freq), "af_ref_allele_count": int(ref_n_alleles), "af_ref_allele_freq": float(ref_allele_freq), } allele.update_attributes(freq) def _full_variants_iterator_impl( self, initial_summary_index: int = 0, ) -> Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None]: summary_index = initial_summary_index for region in self.regions: if region is not None and "HLA" in region.chrom: logger.warning("skipping odd chromosomal region: %s", region) continue vcf_iterators = self._build_vcf_iterators(region) vcf_variants = [next(it, None) for it in vcf_iterators] while True: if all(vcf_variant is None for vcf_variant in vcf_variants): break current_vcf_variant = self._find_current_vcf_variant( vcf_variants, ) assert current_vcf_variant is not None current_summary_variant = \ SummaryVariantFactory.summary_variant_from_vcf( current_vcf_variant, summary_index, transmission_type=self.transmission_type) all_genotypes, vcf_iterator_idexes_to_advance = \ self._collect_all_genotypes( current_vcf_variant, vcf_variants) if len(current_summary_variant.alt_alleles) > 127: logger.warning( "more than 127 alternative alleles; " "some alleles will be skipped: %s", current_summary_variant) family_genotypes = VcfFamiliesGenotypes( self, all_genotypes) family_variants = [] for fam, genotype, best_state in family_genotypes \ .family_genotype_iterator(): fvariant = FamilyVariant( current_summary_variant, fam, genotype, best_state) if self._denovo_handler(fvariant): continue if self._omission_handler(fvariant): continue family_variants.append(fvariant) known_independent_genotypes = \ family_genotypes.known_independent_genotypes assert known_independent_genotypes is not None independent_genotypes = np.array( known_independent_genotypes, np.int64).T self._calc_allele_frequencies( current_summary_variant, independent_genotypes) yield current_summary_variant, family_variants for idx in vcf_iterator_idexes_to_advance: vcf_variants[idx] = next(vcf_iterators[idx], None) summary_index += 1 def _collect_all_genotypes( self, current_vcf_variant: pysam.VariantRecord, vcf_variants: list[pysam.VariantRecord | None], ) -> tuple[dict[str, tuple[int, ...]], list[int]]: all_genotypes: dict[str, tuple[int, ...]] = {} vcf_iterator_idexes_to_advance: list[int] = [] for idx, vcf_variant in enumerate(vcf_variants): if self._compare_vcf_variants_eq( current_vcf_variant, vcf_variant, ): vcf_iterator_idexes_to_advance.append(idx) assert vcf_variant is not None for sample_id in vcf_variant.header.samples: if sample_id in all_genotypes: logger.debug( "sample %s already in genotypes; skipping", sample_id) continue gt = vcf_variant.samples.get(sample_id)["GT"] all_genotypes[sample_id] = gt return all_genotypes, vcf_iterator_idexes_to_advance
[docs] class VcfLoader(VariantsGenotypesLoader): """Defines variant loader for VCF variants.""" def __init__( self, families: FamiliesData, vcf_files: list[str], genome: ReferenceGenome, regions: list[Region] | None = None, params: dict[str, Any] | None = None, **kwargs: Any, # noqa: ARG002 ): # pylint: disable=unused-argument params = params or {} all_filenames, filenames = self._collect_filenames(params, vcf_files) super().__init__( families=families, filenames=all_filenames, transmission_type=TransmissionType.transmitted, genome=genome, regions=regions, expect_genotype=True, expect_best_state=False, params=params) self.set_attribute("source_type", "vcf") logger.debug("loader passed VCF files %s", vcf_files) logger.debug("collected VCF files: %s, %s", all_filenames, filenames) assert vcf_files self.vcf_files = vcf_files self.files_batches = filenames self.fixed_pedigree = params.get("vcf_pedigree_mode", "fixed") == \ "fixed" self.vcf_loaders: list[SingleVcfLoader] | None = None def _filter_files_batches( self, files_batches: list[list[str]], regions: Sequence[Region | None], ) -> list[list[str]]: if regions == [None]: return files_batches result: list[list[str]] = [] region_chromosomes = {r.chrom for r in regions if r is not None} for batch in files_batches: for filename in batch: chromosomes = { self._adjust_chrom_prefix(c) for c in vcffile_chromosomes(filename) } if set(chromosomes).intersection(region_chromosomes): result.append(batch) break return result
[docs] def init_vcf_loaders( self, ) -> None: """Initialize VCF loaders.""" self.vcf_loaders = [] files_batches = self._filter_files_batches( self.files_batches, self.regions, ) if len(files_batches) == 0: logger.warning( "no VCF files found for regions %s", self.regions) return if len(files_batches) == 1 and len(files_batches[0]) == 1: vcf_loader = SingleVcfLoader( self.families, files_batches[0], self.genome, params=self.params) vcf_loader.reset_regions(self.regions) self.vcf_loaders.append(vcf_loader) return for vcf_files_batch in files_batches: if vcf_files_batch: if self.fixed_pedigree: vcf_families = self.families else: vcf_families = self.families.copy() vcf_loader = SingleVcfLoader( vcf_families, vcf_files_batch, self.genome, params=self.params) vcf_loader.reset_regions(self.regions) self.vcf_loaders.append(vcf_loader) pedigree_mode = self.params.get("vcf_pedigree_mode", "fixed") if pedigree_mode == "intersection": self.families = self._families_intersection() elif pedigree_mode == "union": self.families = self._families_union() logger.info( "real persons/sample: %s", len(self.families.real_persons)) for vcf_loader in self.vcf_loaders: vcf_families = vcf_loader.families logger.info( "real persons/sample: %s in %s", len(vcf_families.real_persons), vcf_loader.filenames)
def _families_intersection(self) -> FamiliesData: assert self.vcf_loaders is not None if len(self.vcf_loaders) == 1: return self.vcf_loaders[0].families logger.warning("families intersection run...") families = self.vcf_loaders[0].families for vcf_loader in self.vcf_loaders: other_families = vcf_loader.families assert len(families.persons) == len(other_families.persons) for other_person in other_families.persons.values(): if other_person.not_sequenced: person = families.persons[other_person.fpid] logger.warning( "families intersection: person %s " "is marked as 'not_sequenced'", person.person_id) person.set_attr("not_sequenced", value=True) families.redefine() for vcf_loader in self.vcf_loaders: vcf_loader.families = families return families def _families_union(self) -> FamiliesData: assert self.vcf_loaders is not None if len(self.vcf_loaders) == 1: return self.vcf_loaders[0].families logger.warning("families union run...") families = self.vcf_loaders[0].families for fpid, person in families.persons.items(): if not person.not_sequenced: continue for vcf_loader in self.vcf_loaders: other_person = vcf_loader.families.persons[fpid] if not other_person.not_sequenced: logger.warning( "families union: person %s " "'not_sequenced' flag changed to 'sequenced'", person.person_id) person.set_attr("not_sequenced", value=False) break families.redefine() for vcf_loader in self.vcf_loaders: vcf_loader.families = families return families
[docs] def close(self) -> None: if self.vcf_loaders is None: return for vcf_loader in self.vcf_loaders: vcf_loader.close()
@classmethod def _arguments(cls) -> list[CLIArgument]: arguments = super()._arguments() arguments.append(CLIArgument( "vcf_files", value_type=str, nargs="+", metavar="<VCF filenames>", help_text="VCF files to import", )) arguments.append(CLIArgument( "--vcf-include-reference-genotypes", default_value=False, help_text="include reference only variants " "[default_value: %(default)s]", action="store_true", )) arguments.append(CLIArgument( "--vcf-include-unknown-family-genotypes", default_value=False, help_text="include family variants with fully unknown genotype " "[default: %(default)s]", action="store_true", )) arguments.append(CLIArgument( "--vcf-include-unknown-person-genotypes", default_value=False, help_text="include family variants with " "partially unknown genotype [default: %(default)s]", action="store_true", )) arguments.append(CLIArgument( "--vcf-multi-loader-fill-in-mode", default_value="unknown", help_text="used for multi VCF files loader " "to fill missing genotypes; " "supported values are `reference` or `unknown`" "[default: %(default)s]", )) arguments.append(CLIArgument( "--vcf-denovo-mode", default_value="ignore", help_text="used for handling family variants " "with denovo inheritance; " "supported values are: `denovo`, `possible_denovo`, `ignore`; " "[default: %(default)s]", )) arguments.append(CLIArgument( "--vcf-omission-mode", default_value="ignore", help_text="used for handling family variants with omission " "inheritance; " "supported values are: `omission`, `possible_omission`, `ignore`; " "[default: %(default)s]", )) arguments.append(CLIArgument( "--vcf-pedigree-mode", default_value="fixed", help_text="used for handling missmathes between samples in VCF" "and sample in pedigree file;" "supported values are: 'intersection', 'union', 'fixed';" "'fixed' mode means that pedigree should be accept 'as is' " "without any modifications; samples found in pedigree but not " "in the VCF should be patched with unknown genotype;" "[default: 'fixed']", )) arguments.append(CLIArgument( "--vcf-chromosomes", value_type=str, help_text="specifies a list of filename template " "substitutions; then specified variant filename(s) are treated " "as templates and each occurent of `[vc]` is replaced " "consecutively by elements of VCF wildcards list; " "by default the list is empty and no substitution " "takes place. " "[default: None]", )) return arguments @staticmethod def _glob(globname: str) -> list[str]: filesystem, _ = url_to_fs(globname) filenames = filesystem.glob(globname) # fs.glob strips the protocol at the beginning. We need to add it back # otherwise there is no way to know the correct fs down the pipeline scheme = urlparse(globname).scheme if scheme: filenames = [f"{scheme}://{fn}" for fn in filenames] return cast(list[str], filenames) @staticmethod def _collect_filenames( params: dict[str, Any], vcf_files: list[str], ) -> tuple[list[str], list[list[str]]]: if params.get("vcf_chromosomes"): vcf_chromosomes = [ wc.strip() for wc in params["vcf_chromosomes"].split(";") ] if all("[vc]" in vcf_file for vcf_file in vcf_files): glob_filenames = [ [vcf_file.replace("[vc]", vc) for vcf_file in vcf_files] for vc in vcf_chromosomes ] elif all("[vc]" not in vcf_file for vcf_file in vcf_files): logger.warning( "VCF files %s does not contain '[vc]' pattern, " "but '--vcf-chromosomes' argument is passed; skipping...", vcf_files) glob_filenames = [vcf_files] else: logger.error( "some VCF files contain '[vc]' pattern, some not: " "%s; can't continue...", vcf_files) raise ValueError( f"some VCF files does not have '[vc]': {vcf_files}") else: glob_filenames = [vcf_files] logger.debug("collecting VCF filenames glob: %s", glob_filenames) result: list[list[str]] = [] for batches_globnames in glob_filenames: batches_result = [] for globname in batches_globnames: filenames = VcfLoader._glob(globname) if len(filenames) == 0: continue assert len(filenames) == 1, (globname, filenames) batches_result.append(filenames[0]) result.append(batches_result) all_filenames = list(itertools.chain.from_iterable(result)) return all_filenames, result @property def variants_filenames(self) -> list[str]: return self.vcf_files @property def chromosomes(self) -> list[str]: """Return list of all chromosomes from VCF files.""" result: set[str] = set() for filname in self.filenames: result.update(vcffile_chromosomes(filname)) return [self._adjust_chrom_prefix(chrom) for chrom in result]
[docs] def reset_regions( self, regions: list[Region] | Sequence[Region | None] | None, ) -> None: super().reset_regions(regions) self.vcf_loaders = None
def _full_variants_iterator_impl( self, ) -> Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None]: if self.vcf_loaders is None: self.init_vcf_loaders() assert self.vcf_loaders is not None summary_index = 0 for vcf_loader in self.vcf_loaders: # pylint: disable=protected-access iterator = vcf_loader._full_variants_iterator_impl( # noqa: SLF001 summary_index) try: for summary_variant, family_variants in iterator: yield summary_variant, family_variants summary_index += 1 except StopIteration: pass
[docs] @classmethod def parse_cli_arguments( cls, argv: argparse.Namespace, *, use_defaults: bool = False, ) -> tuple[list[str], dict[str, Any]]: super().parse_cli_arguments(argv, use_defaults=use_defaults) filenames = argv.vcf_files assert argv.vcf_multi_loader_fill_in_mode in { "reference", "unknown", } assert argv.vcf_denovo_mode in { "denovo", "possible_denovo", "ignore", }, argv.vcf_denovo_mode assert argv.vcf_omission_mode in { "omission", "possible_omission", "ignore", }, argv.vcf_omission_mode assert argv.vcf_pedigree_mode in { "intersection", "union", "fixed", }, argv.vcf_pedigree_mode params = { "vcf_include_reference_genotypes": str2bool( argv.vcf_include_reference_genotypes, ), "vcf_include_unknown_family_genotypes": str2bool( argv.vcf_include_unknown_family_genotypes, ), "vcf_include_unknown_person_genotypes": str2bool( argv.vcf_include_unknown_person_genotypes, ), "vcf_multi_loader_fill_in_mode": argv.vcf_multi_loader_fill_in_mode, "vcf_denovo_mode": argv.vcf_denovo_mode, "vcf_omission_mode": argv.vcf_omission_mode, "vcf_pedigree_mode": argv.vcf_pedigree_mode, "vcf_chromosomes": argv.vcf_chromosomes, "add_chrom_prefix": argv.add_chrom_prefix, "del_chrom_prefix": argv.del_chrom_prefix, } return filenames, params