"""Defines variant loader classed for VCF variants."""
from __future__ import annotations
import argparse
import itertools
import logging
from collections import Counter
from collections.abc import Callable, Generator, Iterator
from typing import Any, cast
from urllib.parse import urlparse
import numpy as np
import pysam
from fsspec.core import url_to_fs
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Family
from dae.utils import fs_utils
from dae.utils.helpers import str2bool
from dae.utils.variant_utils import (
is_all_reference_genotype,
is_all_unknown_genotype,
is_unknown_genotype,
)
from dae.variants.attributes import Inheritance
from dae.variants.family_variant import FamilyAllele, FamilyVariant
from dae.variants.variant import SummaryVariant, SummaryVariantFactory
from dae.variants_loaders.raw.loader import (
CLIArgument,
FamiliesGenotypes,
TransmissionType,
VariantsGenotypesLoader,
)
logger = logging.getLogger(__name__)
[docs]
class VcfFamiliesGenotypes(FamiliesGenotypes):
"""Class for family genotypes build vrom VCF variant."""
def __init__(
self, loader: SingleVcfLoader,
vcf_variants: list[pysam.VariantRecord | None],
):
super().__init__()
self.loader = loader
self.vcf_variants = vcf_variants
self.known_independent_genotypes: list[np.ndarray] = []
def _collect_family_genotype(
self, family: Family,
samples_index: dict[str, int],
fill_value: int,
) -> list[tuple[int, ...]]:
genotypes: list[tuple[int, ...]] = []
for person in family.members_in_order:
vcf_index = samples_index.get(person.sample_id)
assert vcf_index is not None, (person, self.vcf_variants)
vcf_variant = self.vcf_variants[vcf_index]
if vcf_variant is None:
sample_genotype: tuple[int, ...] = (fill_value, fill_value)
else:
vcf_sample = vcf_variant.samples.get(person.sample_id)
assert vcf_sample is not None, (person, self.vcf_variants)
sample_genotype = vcf_sample["GT"]
if len(sample_genotype) == 1:
sample_genotype = (sample_genotype[0], -2)
assert len(sample_genotype) == 2, (
family, person, sample_genotype)
sample_genotype = tuple(map( # noqa: C417
lambda g: g if g is not None else -1,
sample_genotype))
genotypes.append(sample_genotype)
return genotypes
def _collect_known_independent_genotypes(
self, family: Family, genotype: np.ndarray,
) -> None:
for index, person in enumerate(family.members_in_order):
if person.person_id not in self.loader.independent_persons:
continue
self.known_independent_genotypes.append(
genotype[:, index],
)
[docs]
def family_genotype_iterator(
self,
) -> Generator[
tuple[Family, np.ndarray, np.ndarray | None], None, None]:
self.known_independent_genotypes = []
# pylint: disable=protected-access
fill_value = self.loader._fill_missing_value # noqa: SLF001
samples_index = self.loader.samples_vcf_index
for family in self.loader.families.values():
family_genotype = self._collect_family_genotype(
family, samples_index, fill_value)
if len(family_genotype) == 0:
continue
genotype = np.array(family_genotype, np.int8)
genotype = genotype.T
assert len(genotype.shape) == 2, (genotype, family)
assert genotype.shape[0] == 2
if is_unknown_genotype(genotype):
if not self.loader.include_unknown_person_genotypes:
continue
else:
self._collect_known_independent_genotypes(family, genotype)
if is_all_unknown_genotype(genotype) and \
not self.loader.include_unknown_family_genotypes:
continue
if is_all_reference_genotype(genotype) and \
not self.loader.include_reference_genotypes:
continue
yield family, genotype, None
[docs]
class SingleVcfLoader(VariantsGenotypesLoader):
"""Defines a variant loader from single VCF file."""
# pylint: disable=too-many-instance-attributes
def __init__(
self,
families: FamiliesData,
vcf_files: list[str],
genome: ReferenceGenome,
regions: list[str] | None = None,
params: dict[str, Any] | None = None,
**_kwargs: Any):
params = params or {}
super().__init__(
families=families,
filenames=vcf_files,
transmission_type=TransmissionType.transmitted,
genome=genome,
regions=regions,
expect_genotype=True,
expect_best_state=False,
params=params)
assert len(vcf_files)
self.set_attribute("source_type", "vcf")
fill_in_mode = params.get("vcf_multi_loader_fill_in_mode", "reference")
if fill_in_mode == "reference":
self._fill_missing_value: int = 0
elif fill_in_mode == "unknown":
self._fill_missing_value = -1
else:
logger.warning(
"unexpected `vcf_multi_loader_fill_in_mode` value%s; "
"expected values are `reference` or `unknown`", fill_in_mode)
self._fill_missing_value = 0
self.fixed_pedigree = params.get("vcf_pedigree_mode", "fixed") == \
"fixed"
self._init_vcf_readers()
self._match_pedigree_to_samples()
self._build_samples_vcf_index()
self.independent_persons = {
p.person_id for p in self.families.persons_without_parents()
}
self._init_chromosome_order()
self._init_denovo_mode()
self._init_omission_mode()
self.include_reference_genotypes = str2bool(
params.get("vcf_include_reference_genotypes", False))
self.include_unknown_family_genotypes = str2bool(
params.get("vcf_include_unknown_family_genotypes", False))
self.include_unknown_person_genotypes = str2bool(
params.get("vcf_include_unknown_person_genotypes", False))
self.multi_loader_fill_in_mode = params.get(
"vcf_multi_loader_fill_in_mode", "reference")
def _init_denovo_mode(self) -> None:
denovo_mode = self.params.get("vcf_denovo_mode", "possible_denovo")
self._denovo_handler: Callable[[FamilyVariant], bool]
if denovo_mode == "possible_denovo":
self._denovo_handler = self._possible_denovo_mode_handler
elif denovo_mode == "denovo":
self._denovo_handler = self._denovo_mode_handler
elif denovo_mode == "ignore":
self._denovo_handler = self._ignore_denovo_mode_handler
else:
logger.warning(
"unexpected denovo mode: %s; "
"using possible_denovo", denovo_mode)
self._denovo_handler = self._possible_denovo_mode_handler
@staticmethod
def _possible_denovo_mode_handler(family_variant: FamilyVariant) -> bool:
for fa in family_variant.alleles:
assert isinstance(fa, FamilyAllele)
inheritance_in_members = fa.inheritance_in_members
inheritance_in_members = [
inh
if inh != Inheritance.denovo
else Inheritance.possible_denovo
for inh in inheritance_in_members
]
# pylint: disable=protected-access
fa._inheritance_in_members = inheritance_in_members # noqa: SLF001
return False
@staticmethod
def _ignore_denovo_mode_handler(family_variant: FamilyVariant) -> bool:
for fallele in family_variant.alleles:
assert isinstance(fallele, FamilyAllele)
if Inheritance.denovo in fallele.inheritance_in_members:
return True
return False
@staticmethod
def _denovo_mode_handler(_family_vairant: FamilyVariant) -> bool:
return False
def _init_omission_mode(self) -> None:
omission_mode = self.params.get(
"vcf_omission_mode", "possible_omission",
)
self._omission_handler: Callable[[FamilyVariant], bool]
if omission_mode == "possible_omission":
self._omission_handler = self._possible_omission_mode_handler
elif omission_mode == "omission":
self._omission_handler = self._omission_mode_handler
elif omission_mode == "ignore":
self._omission_handler = self._ignore_omission_mode_handler
else:
logger.warning(
"unexpected omission mode: %s; "
"using possible_omission", omission_mode)
self._omission_handler = self._possible_omission_mode_handler
@staticmethod
def _possible_omission_mode_handler(family_variant: FamilyVariant) -> bool:
for fa in family_variant.alleles:
assert isinstance(fa, FamilyAllele)
inheritance_in_members = fa.inheritance_in_members
inheritance_in_members = [
inh
if inh != Inheritance.omission
else Inheritance.possible_omission
for inh in inheritance_in_members
]
# pylint: disable=protected-access
fa._inheritance_in_members = inheritance_in_members # noqa: SLF001
return False
@staticmethod
def _ignore_omission_mode_handler(family_variant: FamilyVariant) -> bool:
for fallele in family_variant.alleles:
assert isinstance(fallele, FamilyAllele)
if Inheritance.omission in fallele.inheritance_in_members:
return True
return False
@staticmethod
def _omission_mode_handler(_family_vairant: FamilyVariant) -> bool:
return False
[docs]
def close(self) -> None:
for vcf in self.vcfs:
vcf.close()
def _init_vcf_readers(self) -> None:
self.vcfs = []
logger.debug("SingleVcfLoader input files: %s", self.filenames)
for file in self.filenames:
# pylint: disable=no-member
index_filename = fs_utils.tabix_index_filename(file)
if index_filename is not None:
index_filename = fs_utils.sign(index_filename,
)
self.vcfs.append(
pysam.VariantFile(
fs_utils.sign(file),
index_filename=index_filename),
)
def _build_vcf_iterators(
self, region: str | None,
) -> list[Iterator[pysam.VariantRecord]]:
if region is None:
return [
vcf.fetch()
for vcf in self.vcfs
]
return [
vcf.fetch(region=self._unadjust_chrom_prefix(region))
for vcf in self.vcfs]
def _init_chromosome_order(self) -> None:
seqnames = list(self.vcfs[0].header.contigs)
if not all(
list(vcf.header.contigs) == seqnames
for vcf in self.vcfs):
logger.warning(
"VCF files %s do not have the same list "
"of contigs", self.filenames)
chrom_order = {
seq: ids
for ids, seq in enumerate(seqnames)
}
self.chrom_order = chrom_order
@property
def chromosomes(self) -> list[str]:
"""Return list of all chromosomes from VCF file(s)."""
assert len(self.vcfs) > 0
seqnames = list(self.vcfs[0].header.contigs)
filename = self.filenames[0]
tabix_index_filename = fs_utils.tabix_index_filename(filename)
if tabix_index_filename is None:
res = seqnames
try:
# pylint: disable=no-member
index_filename = fs_utils.tabix_index_filename(filename)
if index_filename is not None:
index_filename = fs_utils.sign(index_filename)
with pysam.Tabixfile(
fs_utils.sign(filename),
index=index_filename,
) as tbx:
res = list(tbx.contigs)
except Exception: # noqa: BLE001
res = seqnames
return [self._adjust_chrom_prefix(chrom) for chrom in res]
def _match_pedigree_to_samples(self) -> None:
# pylint: disable=too-many-branches
vcf_samples: set[str] = set()
for vcf in self.vcfs:
intersection = set(vcf_samples) & set(vcf.header.samples)
if intersection:
logger.warning(
"vcf samples present in multiple batches: %s",
intersection)
vcf_samples.update(list(vcf.header.samples))
logger.info("vcf samples (all): %s", len(vcf_samples))
vcf_samples_order = [list(vcf.header.samples) for vcf in self.vcfs]
vcf_samples = set(vcf_samples)
logger.info("vcf samples (set): %s", len(vcf_samples))
pedigree_samples = set(self.families.pedigree_samples())
logger.info("pedigree samples (all): %s", len(pedigree_samples))
missing_samples = vcf_samples.difference(pedigree_samples)
if missing_samples:
logger.info(
"vcf samples not found in pedigree: %s; %s",
len(missing_samples), missing_samples)
vcf_samples = vcf_samples.difference(missing_samples)
assert vcf_samples.issubset(pedigree_samples)
logger.info("vcf samples (matched): %s", len(vcf_samples))
seen = set()
not_sequenced = set()
counters: Counter = Counter()
for person in self.families.persons.values():
if person.generated:
counters["generated"] += 1
continue
if person.sample_id in vcf_samples:
if person.sample_id in seen:
continue
for vcf_index, samples_order in enumerate(vcf_samples_order):
if person.sample_id in samples_order:
person.set_attr(
"sample_index",
(
vcf_index,
samples_order.index(person.sample_id),
),
)
seen.add(person.sample_id)
counters["found"] += 1
break
elif not self.fixed_pedigree:
if not person.generated and not person.not_sequenced:
not_sequenced.add(person.person_id)
person.set_attr("not_sequenced", value=True)
counters["not_sequenced"] += 1
logger.info(
"person %s marked as "
"'not_sequenced';", person.person_id)
else:
if not person.missing:
logger.info(
"person %s marked as missing", person)
person.set_attr(
"sample_index",
(
None,
None,
),
)
person.set_attr("missing", value=True)
counters["missing"] += 1
counters["missing"] += 1
logger.info("people stats: %s", counters)
self.families.redefine()
logger.info(
"persons changed to not_sequenced %s in %s",
len(not_sequenced), self.filenames)
self.families_samples_indexes = [
(family, family.samples_index)
for family in self.families.values()
]
def _build_samples_vcf_index(self) -> None:
samples_index = {}
vcf_samples = [
set(vcf.header.samples)
for vcf in self.vcfs]
for person in self.families.real_persons.values():
for index, samples in enumerate(vcf_samples):
if person.sample_id in samples:
samples_index[person.sample_id] = index
break
self.samples_vcf_index = samples_index
def _compare_vcf_variants_gt(
self, lhs: pysam.VariantRecord | None,
rhs: pysam.VariantRecord | None,
) -> bool:
"""Compare two VCF variant positions.
Returns true if left vcf variant position in file is
larger than right vcf variant position in file.
"""
if lhs is None:
return True
if rhs is None:
return False
l_chrom_idx = self.chrom_order.get(lhs.chrom)
r_chrom_idx = self.chrom_order.get(rhs.chrom)
assert l_chrom_idx is not None
assert r_chrom_idx is not None
if l_chrom_idx > r_chrom_idx:
return True
return lhs.pos > rhs.pos
@staticmethod
def _compare_vcf_variants_eq(
lhs: pysam.VariantRecord,
rhs: pysam.VariantRecord | None,
) -> bool:
"""Compare two VCF variant positions.
Returns true if left vcf variant position in file is
equal to right vcf variant position in file
"""
assert lhs is not None
if rhs is None:
return False
return lhs.chrom == rhs.chrom and lhs.pos == rhs.pos
def _find_current_vcf_variant(
self, vcf_variants: list[pysam.VariantRecord | None],
) -> pysam.VariantRecord | None:
assert len(vcf_variants)
min_index = 0
for index in range(1, len(vcf_variants)):
if vcf_variants[index] is None:
continue
if self._compare_vcf_variants_gt(
vcf_variants[min_index], vcf_variants[index]):
min_index = index
return vcf_variants[min_index]
def _calc_allele_frequencies(
self, summary_variant: SummaryVariant,
known_independent_genotypes: np.ndarray,
) -> None:
n_independent_parents = len(self.independent_persons)
n_parents_called = 0
if len(known_independent_genotypes) > 0:
n_parents_called = known_independent_genotypes.shape[1]
ref_n_alleles = 0
ref_allele_freq = 0.0
for allele in summary_variant.alleles:
allele_index = allele["allele_index"]
n_alleles = np.sum(known_independent_genotypes == allele_index)
allele_freq = 0.0
percent_parents_called = 0.0
if n_independent_parents > 0:
percent_parents_called = (
100.0 * n_parents_called
) / n_independent_parents
if n_parents_called > 0:
allele_freq = (100.0 * n_alleles) / (2.0 * n_parents_called)
if allele_index == 0:
ref_n_alleles = n_alleles
ref_allele_freq = allele_freq
freq = {
"af_parents_called_count": int(n_parents_called),
"af_parents_called_percent": float(percent_parents_called),
"af_allele_count": int(n_alleles),
"af_allele_freq": float(allele_freq),
"af_ref_allele_count": int(ref_n_alleles),
"af_ref_allele_freq": float(ref_allele_freq),
}
allele.update_attributes(freq)
def _full_variants_iterator_impl(
self, initial_summary_index: int = 0,
) -> Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None]:
summary_index = initial_summary_index
for region in self.regions:
if region is not None and "HLA" in region:
logger.warning("skipping odd chromosomal region: %s", region)
continue
vcf_iterators = self._build_vcf_iterators(region)
vcf_variants = [next(it, None) for it in vcf_iterators]
while True:
if all(vcf_variant is None for vcf_variant in vcf_variants):
break
current_vcf_variant = self._find_current_vcf_variant(
vcf_variants,
)
assert current_vcf_variant is not None
current_summary_variant = \
SummaryVariantFactory.summary_variant_from_vcf(
current_vcf_variant, summary_index,
transmission_type=self.transmission_type)
vcf_iterator_idexes_to_advance = []
vcf_gt_variants = []
for idx, vcf_variant in enumerate(vcf_variants):
if self._compare_vcf_variants_eq(
current_vcf_variant, vcf_variant,
):
vcf_gt_variants.append(vcf_variant)
vcf_iterator_idexes_to_advance.append(idx)
else:
vcf_gt_variants.append(None)
if len(current_summary_variant.alt_alleles) > 127:
logger.warning(
"more than 127 alternative alleles; "
"some alleles will be skipped: %s",
current_summary_variant)
family_genotypes = VcfFamiliesGenotypes(
self, vcf_gt_variants)
family_variants = []
for fam, genotype, best_state in family_genotypes \
.family_genotype_iterator():
fvariant = FamilyVariant(
current_summary_variant, fam, genotype, best_state)
if self._denovo_handler(fvariant):
continue
if self._omission_handler(fvariant):
continue
family_variants.append(fvariant)
known_independent_genotypes = \
family_genotypes.known_independent_genotypes
assert known_independent_genotypes is not None
independent_genotypes = np.array(
known_independent_genotypes, np.int8).T
self._calc_allele_frequencies(
current_summary_variant,
independent_genotypes)
yield current_summary_variant, family_variants
for idx in vcf_iterator_idexes_to_advance:
vcf_variants[idx] = next(vcf_iterators[idx], None)
summary_index += 1
[docs]
class VcfLoader(VariantsGenotypesLoader):
"""Defines variant loader for VCF variants."""
def __init__(
self,
families: FamiliesData,
vcf_files: list[str],
genome: ReferenceGenome,
regions: list[str] | None = None,
params: dict[str, Any] | None = None,
**kwargs: Any, # noqa: ARG002
):
# pylint: disable=unused-argument
params = params or {}
all_filenames, filenames = self._collect_filenames(params, vcf_files)
super().__init__(
families=families,
filenames=all_filenames,
transmission_type=TransmissionType.transmitted,
genome=genome,
expect_genotype=True,
expect_best_state=False,
params=params)
self.set_attribute("source_type", "vcf")
logger.debug("loader passed VCF files %s", vcf_files)
logger.debug("collected VCF files: %s, %s", all_filenames, filenames)
self.vcf_files = vcf_files
self.vcf_loaders = []
if vcf_files:
for vcf_files_batch in filenames:
if vcf_files_batch:
vcf_families = families.copy()
vcf_loader = SingleVcfLoader(
vcf_families, vcf_files_batch,
genome, regions=regions, params=params)
self.vcf_loaders.append(vcf_loader)
pedigree_mode = params.get("vcf_pedigree_mode", "fixed")
if pedigree_mode == "intersection":
self.families = self._families_intersection()
elif pedigree_mode == "union":
self.families = self._families_union()
logger.info(
"real persons/sample: %s", len(self.families.real_persons))
for vcf_loader in self.vcf_loaders:
vcf_families = vcf_loader.families
logger.info(
"real persons/sample: %s in %s",
len(vcf_families.real_persons), vcf_loader.filenames)
def _families_intersection(self) -> FamiliesData:
logger.warning("families intersection run...")
families = self.vcf_loaders[0].families
for vcf_loader in self.vcf_loaders:
other_families = vcf_loader.families
assert len(families.persons) == len(other_families.persons)
for other_person in other_families.persons.values():
if other_person.not_sequenced:
person = families.persons[other_person.fpid]
logger.warning(
"families intersection: person %s "
"is marked as 'not_sequenced'", person.person_id)
person.set_attr("not_sequenced", value=True)
families.redefine()
for vcf_loader in self.vcf_loaders:
vcf_loader.families = families
return families
def _families_union(self) -> FamiliesData:
logger.warning("families union run...")
families = self.vcf_loaders[0].families
for fpid, person in families.persons.items():
if not person.not_sequenced:
continue
for vcf_loader in self.vcf_loaders:
other_person = vcf_loader.families.persons[fpid]
if not other_person.not_sequenced:
logger.warning(
"families union: person %s "
"'not_sequenced' flag changed to 'sequenced'",
person.person_id)
person.set_attr("not_sequenced", value=False)
break
families.redefine()
for vcf_loader in self.vcf_loaders:
vcf_loader.families = families
return families
[docs]
def close(self) -> None:
for vcf_loader in self.vcf_loaders:
vcf_loader.close()
@classmethod
def _arguments(cls) -> list[CLIArgument]:
arguments = super()._arguments()
arguments.append(CLIArgument(
"vcf_files",
value_type=str,
nargs="+",
metavar="<VCF filenames>",
help_text="VCF files to import",
))
arguments.append(CLIArgument(
"--vcf-include-reference-genotypes",
default_value=False,
help_text="include reference only variants "
"[default_value: %(default)s]",
action="store_true",
))
arguments.append(CLIArgument(
"--vcf-include-unknown-family-genotypes",
default_value=False,
help_text="include family variants with fully unknown genotype "
"[default: %(default)s]",
action="store_true",
))
arguments.append(CLIArgument(
"--vcf-include-unknown-person-genotypes",
default_value=False,
help_text="include family variants with "
"partially unknown genotype [default: %(default)s]",
action="store_true",
))
arguments.append(CLIArgument(
"--vcf-multi-loader-fill-in-mode",
default_value="reference",
help_text="used for multi VCF files loader "
"to fill missing genotypes; "
"supported values are `reference` or `unknown`"
"[default: %(default)s]",
))
arguments.append(CLIArgument(
"--vcf-denovo-mode",
default_value="ignore",
help_text="used for handling family variants "
"with denovo inheritance; "
"supported values are: `denovo`, `possible_denovo`, `ignore`; "
"[default: %(default)s]",
))
arguments.append(CLIArgument(
"--vcf-omission-mode",
default_value="ignore",
help_text="used for handling family variants with omission "
"inheritance; "
"supported values are: `omission`, `possible_omission`, `ignore`; "
"[default: %(default)s]",
))
arguments.append(CLIArgument(
"--vcf-pedigree-mode",
default_value="fixed",
help_text="used for handling missmathes between samples in VCF"
"and sample in pedigree file;"
"supported values are: 'intersection', 'union', 'fixed';"
"'fixed' mode means that pedigree should be accept 'as is' "
"without any modifications; samples found in pedigree but not "
"in the VCF should be patched with unknown genotype;"
"[default: 'fixed']",
))
arguments.append(CLIArgument(
"--vcf-chromosomes",
value_type=str,
help_text="specifies a list of filename template "
"substitutions; then specified variant filename(s) are treated "
"as templates and each occurent of `[vc]` is replaced "
"consecutively by elements of VCF wildcards list; "
"by default the list is empty and no substitution "
"takes place. "
"[default: None]",
))
return arguments
@staticmethod
def _glob(globname: str) -> list[str]:
filesystem, _ = url_to_fs(globname)
filenames = filesystem.glob(globname)
# fs.glob strips the protocol at the beginning. We need to add it back
# otherwise there is no way to know the correct fs down the pipeline
scheme = urlparse(globname).scheme
if scheme:
filenames = [f"{scheme}://{fn}" for fn in filenames]
return cast(list[str], filenames)
@staticmethod
def _collect_filenames(
params: dict[str, Any], vcf_files: list[str],
) -> tuple[list[str], list[list[str]]]:
if params.get("vcf_chromosomes"):
vcf_chromosomes = [
wc.strip() for wc in params["vcf_chromosomes"].split(";")
]
if all("[vc]" in vcf_file for vcf_file in vcf_files):
glob_filenames = [
[vcf_file.replace("[vc]", vc) for vcf_file in vcf_files]
for vc in vcf_chromosomes
]
elif all("[vc]" not in vcf_file for vcf_file in vcf_files):
logger.warning(
"VCF files %s does not contain '[vc]' pattern, "
"but '--vcf-chromosomes' argument is passed; skipping...",
vcf_files)
glob_filenames = [vcf_files]
else:
logger.error(
"some VCF files contain '[vc]' pattern, some not: "
"%s; can't continue...", vcf_files)
raise ValueError(
f"some VCF files does not have '[vc]': {vcf_files}")
else:
glob_filenames = [vcf_files]
logger.debug("collecting VCF filenames glob: %s", glob_filenames)
result: list[list[str]] = []
for batches_globnames in glob_filenames:
batches_result = []
for globname in batches_globnames:
filenames = VcfLoader._glob(globname)
if len(filenames) == 0:
continue
assert len(filenames) == 1, (globname, filenames)
batches_result.append(filenames[0])
result.append(batches_result)
all_filenames = list(itertools.chain.from_iterable(result))
return all_filenames, result
@property
def variants_filenames(self) -> list[str]:
return self.vcf_files
@property
def chromosomes(self) -> list[str]:
"""Return list of all chromosomes from VCF files."""
assert len(self.vcf_loaders) > 0
all_chromosomes = []
for loader in self.vcf_loaders:
for chrom in loader.chromosomes:
if chrom not in all_chromosomes:
all_chromosomes.append(chrom)
return all_chromosomes
[docs]
def reset_regions(self, regions: str | list[str] | None) -> None:
for single_loader in self.vcf_loaders:
single_loader.reset_regions(regions)
def _full_variants_iterator_impl(
self,
) -> Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None]:
summary_index = 0
for vcf_loader in self.vcf_loaders:
# pylint: disable=protected-access
iterator = vcf_loader._full_variants_iterator_impl( # noqa: SLF001
summary_index)
try:
for summary_variant, family_variants in iterator:
yield summary_variant, family_variants
summary_index += 1
except StopIteration:
pass
[docs]
@classmethod
def parse_cli_arguments(
cls, argv: argparse.Namespace, *,
use_defaults: bool = False,
) -> tuple[list[str], dict[str, Any]]:
super().parse_cli_arguments(argv, use_defaults=use_defaults)
filenames = argv.vcf_files
assert argv.vcf_multi_loader_fill_in_mode in {
"reference", "unknown",
}
assert argv.vcf_denovo_mode in {
"denovo", "possible_denovo", "ignore",
}, argv.vcf_denovo_mode
assert argv.vcf_omission_mode in {
"omission", "possible_omission", "ignore",
}, argv.vcf_omission_mode
assert argv.vcf_pedigree_mode in {
"intersection", "union", "fixed",
}, argv.vcf_pedigree_mode
params = {
"vcf_include_reference_genotypes": str2bool(
argv.vcf_include_reference_genotypes,
),
"vcf_include_unknown_family_genotypes": str2bool(
argv.vcf_include_unknown_family_genotypes,
),
"vcf_include_unknown_person_genotypes": str2bool(
argv.vcf_include_unknown_person_genotypes,
),
"vcf_multi_loader_fill_in_mode":
argv.vcf_multi_loader_fill_in_mode,
"vcf_denovo_mode": argv.vcf_denovo_mode,
"vcf_omission_mode": argv.vcf_omission_mode,
"vcf_pedigree_mode": argv.vcf_pedigree_mode,
"vcf_chromosomes": argv.vcf_chromosomes,
"add_chrom_prefix": argv.add_chrom_prefix,
"del_chrom_prefix": argv.del_chrom_prefix,
}
return filenames, params