Source code for dae.variants_loaders.raw.loader

"""Base classes and helpers for variant loaders."""
from __future__ import annotations

import argparse
import copy
import logging
from abc import ABC, abstractmethod
from collections.abc import Callable, Generator, Sequence
from enum import Enum
from typing import (
    Any,
)

import numpy as np

from dae.annotation.annotation_pipeline import AttributeInfo
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Family
from dae.utils.regions import Region
from dae.utils.variant_utils import best2gt, get_locus_ploidy
from dae.variants.attributes import GeneticModel, Sex, TransmissionType
from dae.variants.family_variant import (
    FamilyVariant,
    calculate_simple_best_state,
)
from dae.variants.variant import SummaryVariant

logger = logging.getLogger(__name__)


[docs] class ArgumentType(Enum): ARGUMENT = 1 OPTION = 2
[docs] class CLIArgument: """Defines class for handling CLI arguments in variant loaders. This class handles the logic for CLI argument operations such as parsing arguments, transforming to dict, transforming a parsed argument back to a CLI argument and adding itself to an existing ArgumentParser. Construction closely mirrors the ArgumentParser argument format. """ def __init__( self, argument_name: str, *, has_value: bool = True, default_value: int | str | bool | None = None, destination: str | None = None, help_text: str | None = None, action: str | None = None, value_type: type[str] | None = None, metavar: str | None = None, nargs: str | None = None, raw: bool = False, ) -> None: self.argument_name = argument_name self.has_value = has_value self.default_value = default_value self.destination = destination self.value_type = value_type self.metavar = metavar self.help_text = help_text self.nargs = nargs self.action = action self.raw = raw self.arg_type = ArgumentType.OPTION if destination is None: self.destination = self._default_destination() def __repr__(self) -> str: return f"{self.argument_name} ({self.arg_type})" def _default_destination(self) -> str | None: if self.argument_name.startswith("--"): self.arg_type = ArgumentType.OPTION else: self.arg_type = ArgumentType.ARGUMENT return None return self.argument_name[2:].replace("-", "_")
[docs] def add_to_parser(self, parser: argparse.ArgumentParser) -> None: """Add this argument to argsparser.""" kwargs = { "type": self.value_type, "help": self.help_text, "default": self.default_value, } if self.arg_type == ArgumentType.OPTION: kwargs["dest"] = self.destination if self.action: # For some reason kwargs["type"] = self.value_type gets tuple-ized # should find a different workaround del kwargs["type"] kwargs["action"] = self.action else: kwargs["metavar"] = self.metavar kwargs["nargs"] = self.nargs parser.add_argument(self.argument_name, **kwargs) # type: ignore
[docs] def build_option( self, params: dict[str, str], *, use_defaults: bool = False, ) -> str | None: """Build an option.""" if self.arg_type == ArgumentType.ARGUMENT: return None for key, value in params.items(): if key == self.destination: if self.has_value: if value is not None: if value == self.default_value: continue if self.raw: value = value.encode( "unicode-escape", ).decode().replace("\\\\", "\\") return f'{self.argument_name} "{value}"' if use_defaults and self.default_value is not None: value = str(self.default_value) if self.raw: value = value.encode( "unicode-escape", ).decode().replace("\\\\", "\\") return f'{self.argument_name} "{value}"' else: return f"{self.argument_name}" return None
[docs] def parse_cli_argument( self, argv: argparse.Namespace, *, use_defaults: bool = False, ) -> None: """Parse the command line argument from the `argv` object. Args: argv (argparse.Namespace): The command line arguments. use_defaults (bool, optional): Whether to use default values if the argument is None. Defaults to False. """ if self.destination not in argv: # type: ignore return assert self.destination is not None argument = getattr(argv, self.destination) if argument is None and self.default_value is not None and use_defaults: setattr(argv, self.destination, self.default_value)
FamilyGenotypeIterator = Generator[ tuple[Family, np.ndarray, np.ndarray | None], None, None]
[docs] class FamiliesGenotypes(ABC): """A base class for family genotypes."""
[docs] @abstractmethod def family_genotype_iterator(self) -> FamilyGenotypeIterator: pass
[docs] class CLILoader(ABC): # noqa: B024 """Base class for loader classes that require cli arguments.""" def __init__( self, params: dict[str, Any] | None = None, ) -> None: self.arguments = self._arguments() self.params: dict[str, Any] = params or {} def _add_argument(self, argument: CLIArgument) -> None: self.arguments.append(argument) @classmethod def _arguments(cls) -> list[CLIArgument]: return []
[docs] @classmethod def cli_defaults(cls) -> dict[str, Any]: """Build a dictionary with default values for CLI arguments.""" argument_destinations = [ arg.destination for arg in cls._arguments() if arg.destination is not None ] defaults = [ arg.default_value for arg in cls._arguments() if arg.destination is not None ] return dict(zip(argument_destinations, defaults, strict=True))
[docs] @classmethod def cli_arguments( cls, parser: argparse.ArgumentParser, *, options_only: bool = False, ) -> None: """Add command-line arguments specific for the CLILoader class. Args: parser (argparse.ArgumentParser): The ArgumentParser object to add the arguments to. options_only (bool, optional): If True, only adds options (not arguments) to the parser. Defaults to False. """ for argument in cls._arguments(): if options_only and argument.arg_type == ArgumentType.ARGUMENT: continue argument.add_to_parser(parser)
[docs] @classmethod def build_cli_arguments(cls, params: dict) -> str: """Return a string with cli arguments.""" built_arguments = [] for argument in cls._arguments(): logger.info("adding to CLI arguments: %s", argument) built_arguments.append(argument.build_option(params)) nonnull_arguments = (x for x in built_arguments if x is not None) result = " ".join(nonnull_arguments) logger.info("result CLI arguments: %s", result) return result
[docs] def build_arguments_dict(self) -> dict[str, str | bool]: """Build a dictionary with the argument destinations as keys.""" result = {} for argument in self._arguments(): if argument.arg_type == ArgumentType.ARGUMENT: continue if argument.destination in self.params: result[argument.destination] = \ self.params[argument.destination] logger.debug( "building arguments from %s into dict: %s", self.params, result) return result
[docs] @classmethod def parse_cli_arguments( cls, argv: argparse.Namespace, *, use_defaults: bool = False, ) -> tuple[list[str], dict[str, Any]]: """Parse cli arguments.""" for arg in cls._arguments(): arg.parse_cli_argument(argv, use_defaults=use_defaults) return [], {}
FullVariantsIterator = Generator[ tuple[SummaryVariant, list[FamilyVariant]], None, None] FamilyVariantsIterator = Generator[ FamilyVariant, None, None]
[docs] class VariantsLoader(CLILoader): """Base class for all variant loaders.""" def __init__( self, families: FamiliesData, filenames: str | list[str], genome: ReferenceGenome, transmission_type: TransmissionType = TransmissionType.transmitted, params: dict[str, Any] | None = None, attributes: dict[str, Any] | None = None, ) -> None: params = params or {} super().__init__(params=params) assert isinstance(families, FamiliesData) self.families = families if isinstance(filenames, str): filenames = [filenames] self.filenames = filenames assert isinstance(transmission_type, TransmissionType) self.transmission_type = transmission_type self.genome = genome if attributes is None: self._attributes = {} else: self._attributes = copy.deepcopy(attributes) self.arguments = [] @property def variants_filenames(self) -> list[str]: return self.filenames
[docs] def get_attribute(self, key: str) -> Any: return self._attributes.get(key, None)
[docs] def set_attribute(self, key: str, value: Any) -> None: self._attributes[key] = value
[docs] def reset_regions( self, regions: list[Region] | Sequence[Region | None] | None, ) -> None: pass
@property def annotation_schema(self) -> list[AttributeInfo] | None: return None @classmethod def _arguments(cls) -> list[CLIArgument]: return []
[docs] @abstractmethod def full_variants_iterator(self) -> FullVariantsIterator: pass
[docs] def family_variants_iterator(self) -> FamilyVariantsIterator: for _, fvs in self.full_variants_iterator(): yield from fvs
[docs] @abstractmethod def close(self) -> None: """Close resources used by the loader."""
@property @abstractmethod def chromosomes(self) -> list[str]: """Return list of all chromosomes."""
[docs] class VariantsLoaderDecorator(VariantsLoader): """Base class for wrapping and decoring a variant loader.""" def __init__(self, variants_loader: VariantsLoader) -> None: super().__init__( variants_loader.families, variants_loader.filenames, genome=variants_loader.genome, transmission_type=variants_loader.transmission_type, params=variants_loader.params, attributes=variants_loader._attributes, # noqa: SLF001 ) self.variants_loader = variants_loader
[docs] def get_attribute(self, key: str) -> Any: result = self._attributes.get(key, None) if result is not None: return result return self.variants_loader.get_attribute(key)
def __getattr__(self, attr: str) -> Any: return getattr(self.variants_loader, attr, None) @property def annotation_schema(self) -> list[AttributeInfo] | None: return self.variants_loader.annotation_schema
[docs] @classmethod def build_cli_arguments(cls, params: dict) -> str: raise NotImplementedError
[docs] @classmethod def cli_defaults(cls) -> dict[str, Any]: raise NotImplementedError
[docs] @classmethod def cli_arguments( cls, parser: argparse.ArgumentParser, *, options_only: bool = False, ) -> None: raise NotImplementedError
[docs] def build_arguments_dict(self) -> dict: return self.variants_loader.build_arguments_dict()
[docs] def close(self) -> None: self.variants_loader.close()
@property def chromosomes(self) -> list[str]: return self.variants_loader.chromosomes
[docs] class VariantsGenotypesLoader(VariantsLoader): """Base class for variants loaders. Calculate missing best states and adds a genetic model value to the family variant and its alleles. """ def __init__( self, families: FamiliesData, filenames: str | list[str], genome: ReferenceGenome, transmission_type: TransmissionType = TransmissionType.transmitted, regions: list[Region] | None = None, *, expect_genotype: bool = True, expect_best_state: bool = False, params: dict[str, Any] | None = None) -> None: params = params or {} super().__init__( families=families, filenames=filenames, transmission_type=transmission_type, genome=genome, params=params) self.regions: Sequence[Region | None] = [None] if regions is not None: self.reset_regions(regions) self._adjust_chrom_prefix: Callable[[str], str] = lambda chrom: chrom self._unadjust_chrom_prefix: Callable[[str], str] = lambda chrom: chrom if params.get("add_chrom_prefix", None): self._chrom_prefix = params["add_chrom_prefix"] self._adjust_chrom_prefix = self._add_chrom_prefix self._unadjust_chrom_prefix = self._del_chrom_prefix elif params.get("del_chrom_prefix", None): self._chrom_prefix = params["del_chrom_prefix"] self._adjust_chrom_prefix = self._del_chrom_prefix self._unadjust_chrom_prefix = self._add_chrom_prefix self.expect_genotype = expect_genotype self.expect_best_state = expect_best_state @classmethod def _arguments(cls) -> list[CLIArgument]: arguments = super()._arguments() arguments.append(CLIArgument( "--add-chrom-prefix", value_type=str, help_text="Add specified prefix to each chromosome name in " "variants file", )) arguments.append(CLIArgument( "--del-chrom-prefix", value_type=str, help_text="Remove specified prefix to each chromosome name in " "variants file", )) return arguments @abstractmethod def _full_variants_iterator_impl(self) -> FullVariantsIterator: pass
[docs] def reset_regions( self, regions: list[Region] | Sequence[Region | None] | None, ) -> None: if regions is None: self.regions = [None] return self.regions = list(filter( lambda r: r is None or "HLA" not in r.chrom, regions, ))
@classmethod def _get_diploid_males(cls, family_variant: FamilyVariant) -> list[bool]: res = [] assert family_variant.gt is not None assert family_variant.gt.shape == (2, len(family_variant.family)) for member_idx, member in enumerate( family_variant.family.members_in_order, ): if member.sex in (Sex.F, Sex.U): continue res.append(bool(family_variant.gt[1, member_idx] != -2)) return res @classmethod def _calc_genetic_model( cls, family_variant: FamilyVariant, genome: ReferenceGenome, ) -> GeneticModel: if family_variant.chromosome in ("X", "chrX"): male_ploidy = get_locus_ploidy( family_variant.chromosome, family_variant.position, Sex.M, genome, ) if male_ploidy == 2: if not all(cls._get_diploid_males(family_variant)): return GeneticModel.X_broken return GeneticModel.pseudo_autosomal if any(cls._get_diploid_males(family_variant)): return GeneticModel.X_broken return GeneticModel.X # We currently assume all other chromosomes are autosomal return GeneticModel.autosomal @classmethod def _calc_best_state( cls, family_variant: FamilyVariant, genome: ReferenceGenome, *, force: bool = True, ) -> np.ndarray | None: assert family_variant.gt is not None male_ploidy = get_locus_ploidy( family_variant.chromosome, family_variant.position, Sex.M, genome, ) if family_variant.chromosome in ("X", "chrX") and male_ploidy == 1: best_state = calculate_simple_best_state( family_variant.gt, family_variant.allele_count, ) male_ids = [ person.person_id for person in family_variant.family.members_in_order if person.sex == Sex.M ] male_indices = family_variant.family.members_index(male_ids) for idx in male_indices: # A male with a haploid genotype for X cannot # have two alternative alleles, therefore there # must be one or two reference alleles left over # from the simple best state calculation if best_state[0, idx] in (1, 2): best_state[0, idx] -= 1 elif np.any(best_state[:, idx] == 2): best_state[best_state[:, idx] == 2, idx] -= 1 return best_state if force: return calculate_simple_best_state( family_variant.gt, family_variant.allele_count, ) return None @classmethod def _calc_genotype( cls, family_variant: FamilyVariant, genome: ReferenceGenome) -> tuple[np.ndarray, GeneticModel]: # pylint: disable=protected-access best_state = family_variant._best_state # noqa: SLF001 assert best_state is not None genotype = best2gt(best_state) male_ploidy = get_locus_ploidy( family_variant.chromosome, family_variant.position, Sex.M, genome, ) ploidy = np.sum(best_state, 0) genetic_model = GeneticModel.autosomal if family_variant.chromosome in ("X", "chrX"): genetic_model = GeneticModel.X if male_ploidy == 2: genetic_model = GeneticModel.pseudo_autosomal male_ids = [ person_id for person_id, person in family_variant.family.persons.items() if person.sex == Sex.M ] male_indices = family_variant.family.members_index(male_ids) for idx in male_indices: if ploidy[idx] != male_ploidy: genetic_model = GeneticModel.X_broken break elif np.any(ploidy == 1): genetic_model = GeneticModel.autosomal_broken return genotype, genetic_model def _add_chrom_prefix(self, chrom: str) -> str: # there is an important invariant between this and _del_chrom_prefix # we don't know if this or _del_chrom_prefix will be executed first so # _add_chrom_prefix should undo _del_chrom_prefix # _del_chrom_prefix should undo _add_chrom_prefix assert self._chrom_prefix is not None return f"{self._chrom_prefix}{chrom}" def _del_chrom_prefix(self, chrom: str) -> str: assert self._chrom_prefix is not None assert chrom.startswith(self._chrom_prefix) return chrom[len(self._chrom_prefix):]
[docs] def full_variants_iterator( self, ) -> FullVariantsIterator: full_iterator = self._full_variants_iterator_impl() for summary_variant, family_variants in full_iterator: chrom = self._adjust_chrom_prefix(summary_variant.chromosome) # pylint: disable=protected-access summary_variant._chromosome = chrom # noqa: SLF001 for summary_allele in summary_variant.alleles: summary_allele._chrom = chrom # noqa: SLF001 summary_allele._attributes["chrom"] = chrom # noqa: SLF001 for fv in family_variants: if self.expect_genotype: assert fv._best_state is None # noqa: SLF001 assert fv._genetic_model is None # noqa: SLF001 assert fv.gt is not None fv._genetic_model = self._calc_genetic_model( # noqa: SLF001 fv, self.genome, ) fv._best_state = self._calc_best_state( # noqa: SLF001 fv, self.genome, force=False, ) for fa in fv.family_alleles: fa._best_state = fv.best_state # noqa: SLF001 fa._genetic_model = fv.genetic_model # noqa: SLF001 elif self.expect_best_state and fv.gt is None: assert fv._best_state is not None # noqa: SLF001 assert fv._genetic_model is None # noqa: SLF001 assert fv.gt is None ( fv.gt, fv._genetic_model, # noqa: SLF001 ) = self._calc_genotype(fv, self.genome) for fa in fv.family_alleles: fa.gt = fv.gt fa._genetic_model = fv.genetic_model # noqa: SLF001 yield summary_variant, family_variants