Source code for dae.variants_loaders.vcf.serializer

from __future__ import annotations

import logging
import pathlib
from collections.abc import Iterable
from types import TracebackType

import pysam

from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.pedigrees.families_data import FamiliesData
from dae.variants.family_variant import FamilyVariant
from dae.variants.variant import SummaryVariant

logger = logging.getLogger(__name__)


[docs] class VcfSerializer: """Stores a sequence of alleles in a VCF file.""" def __init__( self, families: FamiliesData, genome: ReferenceGenome, output_path: pathlib.Path | str | None, header: list[str] | None = None, ): self.families = families self.genome = genome self.output_path = output_path self._vcf_header = self._build_vcf_header(header) self.vcf_file: pysam.VariantFile | None = None
[docs] def serialize( self, full_variants: Iterable[tuple[SummaryVariant, list[FamilyVariant]]], ) -> None: """Serialize a sequence of alleles to a VCF file.""" assert self.vcf_file is not None for sv, fvs in full_variants: record = self._build_vcf_record(sv, fvs) self.vcf_file.write(record)
def _build_vcf_header( self, header: list[str] | None, ) -> pysam.VariantHeader: vcf_header = pysam.VariantHeader() if header is not None: for line in header: vcf_header.add_line(line) for contig in self.genome.chromosomes: vcf_header.contigs.add(contig) for fam in self.families.values(): for p in fam.members_in_order: vcf_header.add_sample(p.sample_id) vcf_header.formats.add("GT", 1, "String", "Genotype") return vcf_header def _build_vcf_file(self) -> pysam.VariantFile: output = "-" if self.output_path is None else str(self.output_path) return pysam.VariantFile( output, "w", header=self._vcf_header, ) def _build_vcf_record( self, sv: SummaryVariant, fvs: list[FamilyVariant], ) -> pysam.VariantRecord: assert self.vcf_file is not None record = self.vcf_file.new_record() record.contig = sv.chrom record.pos = sv.position record.ref = sv.reference alts = [aa.alternative for aa in sv.alt_alleles] record.alts = alts svid = [ f"{aa.chrom}_{aa.position}_{aa.reference}_{aa.alternative}" for aa in sv.alt_alleles] record.id = ";".join(svid) for fv in fvs: fam = self.families[fv.family_id] for mem_index, mem in enumerate(fam.members_in_order): gt = [g for g in fv.genotype[mem_index] if g > -2] record.samples[mem.sample_id]["GT"] = gt return record def __enter__(self) -> VcfSerializer: return self.open() def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None, ) -> None: if exc_type is not None: logger.error( "exception while working with genomic score: %s, %s, %s", exc_type, exc_value, exc_tb) self.close()
[docs] def close(self) -> None: if self.vcf_file is not None: self.vcf_file.close() self.vcf_file = None
[docs] def open(self) -> VcfSerializer: self.vcf_file = self._build_vcf_file() return self