Source code for dae.tools.liftover_tools
import abc
import argparse
import logging
import pathlib
import sys
import tempfile
import textwrap
from contextlib import closing
from typing import cast
import pysam
from dae.annotation.annotatable import (
CNVAllele,
VCFAllele,
)
from dae.annotation.annotation_factory import load_pipeline_from_yaml
from dae.annotation.annotation_pipeline import AnnotationPipeline
from dae.annotation.liftover_annotator import (
basic_liftover_variant,
bcf_liftover_variant,
)
from dae.genomic_resources.genomic_context import (
context_providers_add_argparser_arguments,
context_providers_init,
get_genomic_context,
)
from dae.genomic_resources.liftover_chain import (
LiftoverChain,
build_liftover_chain_from_resource,
)
from dae.genomic_resources.reference_genome import (
ReferenceGenome,
build_reference_genome_from_resource,
)
from dae.genomic_resources.repository_factory import (
GenomicResourceRepo,
)
from dae.pedigrees.loader import FamiliesLoader
from dae.utils.regions import Region
from dae.utils.variant_utils import mat2str
from dae.utils.verbosity_configuration import VerbosityConfiguration
from dae.variants.family_variant import FamilyAllele
from dae.variants.variant import VariantDetails
from dae.variants_loaders.cnv.loader import CNVLoader
from dae.variants_loaders.dae.loader import (
DaeTransmittedLoader,
DenovoLoader,
)
from dae.variants_loaders.raw.loader import VariantsGenotypesLoader
logger = logging.getLogger("liftover_tools")
def _build_variant_loader(
loader_class: type[VariantsGenotypesLoader],
args: argparse.Namespace,
source_genome: ReferenceGenome,
) -> VariantsGenotypesLoader:
families_filenames, families_params = \
FamiliesLoader.parse_cli_arguments(args)
families_filename = families_filenames[0]
families_loader = FamiliesLoader(
families_filename, **families_params,
)
families = families_loader.load()
variants_filenames, variants_params = \
loader_class.parse_cli_arguments(args)
return loader_class(
families,
variants_filenames,
params=variants_params,
genome=source_genome,
)
def _region_output_filename(
output_filename: str,
region: Region | None,
) -> str:
if region is None:
return output_filename
out = pathlib.Path(output_filename)
suffixes = out.suffixes
output_prefix = out
for _ in suffixes:
output_prefix = output_prefix.with_suffix("")
region_str = str(region).replace(":", "_").replace("-", "_")
suffix = "".join(suffixes)
return f"{output_prefix}_{region_str}{suffix}"
[docs]
class LiftoverTool(abc.ABC):
"""Liftover tools base class."""
def __init__(
self, description: str,
default_output: str,
) -> None:
self.description = description
self.default_output = default_output
self.cli_args: argparse.Namespace = argparse.Namespace()
self.grr: GenomicResourceRepo | None = None
self.source_genome: ReferenceGenome | None = None
self.target_genome: ReferenceGenome | None = None
self.chain: LiftoverChain | None = None
[docs]
def build_liftover_pipeline(
self,
grr: GenomicResourceRepo,
) -> AnnotationPipeline:
"""Build liftover annotator based on the selected mode."""
if self.cli_args.mode not in {"bcf_liftover", "basic_liftover"}:
raise ValueError(f"unknown liftover mode: {self.cli_args.mode}")
annotator_type = "liftover_annotator"
if self.cli_args.mode == "basic_liftover":
annotator_type = "basic_liftover_annotator"
pipeline_config = textwrap.dedent(
f"""
- {annotator_type}:
chain: {self.cli_args.chain}
source_genome: {self.cli_args.source_genome}
target_genome: {self.cli_args.target_genome}
attributes:
- source: liftover_annotatable
name: target_annotatable
""",
)
pipeline = load_pipeline_from_yaml(pipeline_config, grr)
pipeline.open()
return pipeline
[docs]
def build_cli_arguments_parser(
self,
) -> argparse.ArgumentParser:
"""Create CLI parser."""
parser = argparse.ArgumentParser(description=self.description)
VerbosityConfiguration.set_arguments(parser)
context_providers_add_argparser_arguments(
parser,
skip_cli_annotation_context=True,
)
parser.add_argument(
"-c", "--chain", help="chain resource id",
default="liftover/hg19ToHg38")
parser.add_argument(
"-t", "--target-genome", help="target genome",
default="hg38/genomes/GRCh38-hg38")
parser.add_argument(
"-s", "--source-genome", help="source genome",
default="hg19/genomes/GATK_ResourceBundle_5777_b37_phiX174")
parser.add_argument(
"-o", "--output", help="output filename",
default=self.default_output)
parser.add_argument(
"--region",
type=str,
dest="region",
metavar="region",
default=None,
help="region to convert [default: None] "
"ex. chr1:1-10000. ",
)
parser.add_argument(
"--mode",
type=str,
dest="mode",
metavar="mode",
default="bcf_liftover",
help="mode to use for liftover: 'bcf_liftover' or 'basic_liftover'",
)
return parser
[docs]
def run(self,
argv: list[str] | None = None,
grr: GenomicResourceRepo | None = None,
) -> None:
"""Liftover tool main function."""
parser = self.build_cli_arguments_parser()
if argv is None:
argv = sys.argv[1:]
assert argv is not None
self.cli_args = parser.parse_args(argv)
VerbosityConfiguration.set(self.cli_args)
context_providers_init(
**vars(self.cli_args), skip_cli_annotation_context=True)
genomic_context = get_genomic_context()
if grr is None:
grr = genomic_context.get_genomic_resources_repository()
if grr is None:
raise ValueError("no valid GRR configured")
self.grr = grr
self.source_genome = build_reference_genome_from_resource(
grr.get_resource(self.cli_args.source_genome))
self.source_genome.open()
self.target_genome = build_reference_genome_from_resource(
grr.get_resource(self.cli_args.target_genome))
self.target_genome.open()
self.chain = build_liftover_chain_from_resource(
grr.get_resource(self.cli_args.chain))
self.chain.open()
region = None
if self.cli_args.region is not None:
region = Region.from_str(self.cli_args.region)
self.liftover_variants(region)
[docs]
@abc.abstractmethod
def liftover_variants(
self,
region: Region | None = None,
) -> None:
"""Liftover variants abstract method."""
[docs]
class CNVLiftoverTool(LiftoverTool):
"""CNV liftover tool class."""
def __init__(self) -> None:
super().__init__("liftover CNV variants", "cnv_liftover.tsv")
[docs]
def build_cli_arguments_parser(
self,
) -> argparse.ArgumentParser:
"""Create CLI parser."""
parser = super().build_cli_arguments_parser()
FamiliesLoader.cli_arguments(parser)
CNVLoader.cli_arguments(parser)
return parser
[docs]
def liftover_variants(
self,
region: Region | None = None,
) -> None:
"""Liftover CNV variants method."""
assert self.source_genome is not None
variants_loader = _build_variant_loader(
CNVLoader, self.cli_args, self.source_genome)
assert isinstance(variants_loader, CNVLoader)
output_filename = _region_output_filename(
self.cli_args.output, region,
)
assert self.grr is not None
pipeline = self.build_liftover_pipeline(self.grr)
if region is not None:
logger.info("resetting regions (region): %s", region)
variants_loader.reset_regions([region])
logger.info("output: %s", output_filename)
with open(output_filename, "wt") as output:
header = [
"location", "cnv_type",
"location_src", "cnv_type_src",
"size_change",
"familyId", "personId",
]
output.write("\t".join(header))
output.write("\n")
for sv, fvs in variants_loader.full_variants_iterator():
assert len(sv.alt_alleles) == 1
aa = sv.alt_alleles[0]
annotatable: CNVAllele = cast(CNVAllele, aa.get_annotatable())
result = pipeline.annotate(annotatable)
liftover_annotatable: CNVAllele = \
cast(CNVAllele, result.get("target_annotatable"))
if liftover_annotatable is None:
logger.error("can't liftover %s", aa)
continue
for fv in fvs:
size_src = len(annotatable)
size = len(liftover_annotatable)
size_diff = (100.0 * abs(size_src - size)) / size_src
if size_diff >= 50:
logger.warning(
"CNV variant size changed more than 50 percent: "
"%s; %s -> %s",
size_diff, annotatable, liftover_annotatable)
for aa in fv.alt_alleles:
fa = cast(FamilyAllele, aa)
line: list[str] = []
person_ids = [
person_id
for person_id in fa.variant_in_members
if person_id is not None
]
assert len(person_ids) >= 1
line = [
f"{liftover_annotatable.chrom}:"
f"{liftover_annotatable.pos}-"
f"{liftover_annotatable.pos_end}",
liftover_annotatable.type.name,
f"{annotatable.chrom}:"
f"{annotatable.pos}-"
f"{annotatable.pos_end}",
annotatable.type.name,
str(size_diff),
fa.family_id,
";".join(person_ids),
]
output.write("\t".join(line))
output.write("\n")
[docs]
def cnv_liftover_main(
argv: list[str] | None = None,
grr: GenomicResourceRepo | None = None,
) -> None:
"""CNV liftover tool main function."""
tool = CNVLiftoverTool()
tool.run(argv=argv, grr=grr)
[docs]
class DaeLiftoverTool(LiftoverTool):
"""DAE liftover tool class."""
_TOOMANY_THRESHOLD = 20
def __init__(self) -> None:
super().__init__(
"liftover DAE transmitted variants",
"transmitted_liftover")
[docs]
def build_cli_arguments_parser(
self,
) -> argparse.ArgumentParser:
"""Create CLI parser."""
parser = super().build_cli_arguments_parser()
FamiliesLoader.cli_arguments(parser)
DaeTransmittedLoader.cli_arguments(parser)
return parser
[docs]
def liftover_variants(
self,
region: Region | None = None,
) -> None:
"""Liftover CNV variants method."""
assert self.source_genome is not None
variants_loader = _build_variant_loader(
DaeTransmittedLoader, self.cli_args, self.source_genome)
assert isinstance(variants_loader, DaeTransmittedLoader)
assert self.grr is not None
pipeline = self.build_liftover_pipeline(self.grr)
assert isinstance(variants_loader, DaeTransmittedLoader)
output_prefix = self.cli_args.output
summary_filename = f"{output_prefix}.txt"
toomany_filename = f"{output_prefix}-TOOMANY.txt"
if region is not None:
logger.info("resetting regions (region): %s", region)
variants_loader.reset_regions([region])
region_str = str(region).replace(":", "_").replace("-", "_")
summary_filename = f"{output_prefix}-{region_str}.txt"
toomany_filename = f"{output_prefix}-TOOMANY-{region_str}.txt"
logger.info("summary output: %s", summary_filename)
logger.info("toomany output: %s", toomany_filename)
with open(summary_filename, "wt") as output_summary, \
open(toomany_filename, "wt") as output_toomany:
summary_header = [
"#chr", "position", "variant",
"familyData",
"all.nParCalled", "all.prcntParCalled",
"all.nAltAlls", "all.altFreq",
]
toomany_header = [
"#chr", "position", "variant",
"familyData",
]
output_summary.write("\t".join(summary_header))
output_summary.write("\n")
output_toomany.write("\t".join(toomany_header))
output_toomany.write("\n")
for sv, fvs in variants_loader.full_variants_iterator():
assert len(sv.alt_alleles) == 1
aa = sv.alt_alleles[0]
annotatable = aa.get_annotatable()
result = pipeline.annotate(annotatable)
liftover_annotatable: VCFAllele = \
cast(VCFAllele, result.get("target_annotatable"))
if liftover_annotatable is None:
logger.error("can't liftover %s", aa)
continue
liftover_cshl_variant = VariantDetails.from_vcf(
liftover_annotatable.chrom, liftover_annotatable.pos,
liftover_annotatable.ref,
liftover_annotatable.alt)
summary_line = [
liftover_cshl_variant.chrom,
str(liftover_cshl_variant.cshl_position),
liftover_cshl_variant.cshl_variant,
]
frequency_data = [
str(aa.attributes.get("af_parents_called_count", "")),
str(aa.attributes.get("af_parents_called_percent", "")),
str(aa.attributes.get("af_allele_count", "")),
str(aa.attributes.get("af_allele_freq", "")),
]
toomany_line = [
liftover_cshl_variant.chrom,
str(liftover_cshl_variant.cshl_position),
liftover_cshl_variant.cshl_variant,
]
families_data = []
for fv in fvs:
fa = cast(FamilyAllele, fv.alt_alleles[0])
fdata = [
fa.family_id,
mat2str(fa.best_state),
fa.family_attributes["read_counts"],
]
families_data.append(":".join(fdata))
if len(families_data) < self._TOOMANY_THRESHOLD:
summary_line.append(";".join(families_data))
summary_line.extend(frequency_data)
output_summary.write("\t".join(summary_line))
output_summary.write("\n")
else:
summary_line.append("TOOMANY")
summary_line.extend(frequency_data)
output_summary.write("\t".join(summary_line))
output_summary.write("\n")
toomany_line.append(";".join(families_data))
output_toomany.write("\t".join(toomany_line))
output_toomany.write("\n")
[docs]
def dae_liftover_main(
argv: list[str] | None = None,
grr: GenomicResourceRepo | None = None,
) -> None:
"""DAE liftover tool main function."""
tool = DaeLiftoverTool()
tool.run(argv=argv, grr=grr)
[docs]
class DenovoLiftoverTool(LiftoverTool):
"""Denovo liftover tool class."""
def __init__(self) -> None:
super().__init__(
"lliftover de Novo variants",
"denovo_liftover.txt")
[docs]
def build_cli_arguments_parser(
self,
) -> argparse.ArgumentParser:
"""Create CLI parser."""
parser = super().build_cli_arguments_parser()
FamiliesLoader.cli_arguments(parser)
DenovoLoader.cli_arguments(parser)
return parser
[docs]
def liftover_variants(
self,
region: Region | None = None,
) -> None:
"""Liftover CNV variants method."""
assert self.source_genome is not None
variants_loader = _build_variant_loader(
DenovoLoader, self.cli_args, self.source_genome)
assert isinstance(variants_loader, DenovoLoader)
output_filename = _region_output_filename(
self.cli_args.output, region,
)
assert self.grr is not None
pipeline = self.build_liftover_pipeline(self.grr)
if region is not None:
logger.info("resetting regions (region): %s", region)
variants_loader.reset_regions([region])
logger.info("output: %s", output_filename)
with open(output_filename, "wt") as output:
header = [
"chrom", "pos", "ref", "alt", # target variant
"chrom_src", "pos_src", "ref_src", "alt_src", # source variant
"familyId", "bestSt",
]
additional_columns = set(variants_loader.denovo_df.columns) - {
"chrom", "position", "reference", "alternative", "family_id",
"genotype", "best_state",
}
header.extend(sorted(additional_columns))
output.write("\t".join(header))
output.write("\n")
for sv, fvs in variants_loader.full_variants_iterator():
assert len(sv.alt_alleles) == 1
aa = sv.alt_alleles[0]
annotatable: VCFAllele = cast(VCFAllele, aa.get_annotatable())
result = pipeline.annotate(annotatable)
liftover_annotatable: VCFAllele = \
cast(VCFAllele, result.get("target_annotatable"))
if liftover_annotatable is None:
logger.error("can't liftover %s", aa)
continue
for fv in fvs:
fa = cast(FamilyAllele, fv.alt_alleles[0])
line = [
liftover_annotatable.chrom,
str(liftover_annotatable.pos),
liftover_annotatable.ref,
liftover_annotatable.alt,
annotatable.chrom, str(annotatable.pos),
annotatable.ref, annotatable.alt,
fa.family_id,
mat2str(fa.best_state, col_sep=" "),
]
line.extend([
str(fa.get_attribute(col) or "")
for col in sorted(additional_columns)])
output.write("\t".join(line))
output.write("\n")
[docs]
def denovo_liftover_main(
argv: list[str] | None = None,
grr: GenomicResourceRepo | None = None,
) -> None:
"""Denovo liftover tool main function."""
tool = DenovoLiftoverTool()
tool.run(argv=argv, grr=grr)
[docs]
class VCFLiftoverTool(LiftoverTool):
"""VCF liftover tool class."""
def __init__(self) -> None:
super().__init__(
"liftover VCF variants",
"vcf_liftover.vcf")
[docs]
def build_cli_arguments_parser(
self,
) -> argparse.ArgumentParser:
"""Create CLI parser."""
parser = super().build_cli_arguments_parser()
parser.add_argument(
"vcffile", help="vcf file to liftover",
)
return parser
[docs]
def liftover_variants(
self,
region: Region | None = None,
) -> None:
"""Liftover CNV variants method."""
assert self.source_genome is not None
output_filename = _region_output_filename(
self.cli_args.output, region,
)
assert self.target_genome is not None
assert self.source_genome is not None
assert self.chain is not None
if self.cli_args.mode == "bcf_liftover":
liftover_variant_func = bcf_liftover_variant
elif self.cli_args.mode == "basic_liftover":
liftover_variant_func = basic_liftover_variant
else:
raise ValueError(f"Invalid mode: {self.cli_args.mode}")
output_header = self._vcf_liftover_header(
self.cli_args.vcffile,
self.source_genome,
self.target_genome)
with closing(pysam.VariantFile(self.cli_args.vcffile)) as infile, \
closing(pysam.VariantFile(
output_filename, "w", header=output_header)) as outfile:
fetch_region = str(region) if region else None
for vcf_variant in infile.fetch(region=fetch_region):
if vcf_variant.alts is None:
logger.warning(
"skipping variant without alts: %s", vcf_variant)
continue
if vcf_variant.ref is None:
logger.warning(
"skipping variant without ref: %s", vcf_variant)
continue
lo_variant = liftover_variant_func(
vcf_variant.chrom, vcf_variant.pos,
vcf_variant.ref, list(vcf_variant.alts),
self.chain,
source_genome=self.source_genome,
target_genome=self.target_genome,
)
if lo_variant is None:
logger.warning(
"skipping variant without liftover: %s",
self.report_vcf_variant(vcf_variant))
continue
vcf_variant.translate(output_header)
chrom, pos, ref, alts = lo_variant
try:
vcf_variant.contig = chrom
vcf_variant.pos = pos
vcf_variant.ref = ref
vcf_variant.alts = tuple(alts)
outfile.write(vcf_variant)
except ValueError:
logger.exception(
"skipping variant with invalid liftover: "
"%s liftover to %s",
self.report_vcf_variant(vcf_variant),
self.report_variant(lo_variant))
raise
[docs]
@staticmethod
def report_vcf_variant(vcf_variant: pysam.VariantRecord) -> str:
"""Report VCF variant."""
return (
f"({vcf_variant.chrom}:{vcf_variant.pos} "
f"{vcf_variant.ref} > "
f"{','.join(vcf_variant.alts) if vcf_variant.alts else '.'})"
)
[docs]
@staticmethod
def report_variant(variant: tuple[str, int, str, list[str]] | None) -> str:
"""Report variant."""
if not variant:
return "(none)"
chrom, pos, ref, alts = variant
s_alts = ",".join(alts)
return f"({chrom}:{pos} {ref} > {s_alts})"
@staticmethod
def _vcf_liftover_header(
vcffile: str,
source_genome: ReferenceGenome,
target_genome: ReferenceGenome,
) -> pysam.VariantHeader:
"""Create liftover VCF header."""
target_contigs = []
target_contigs = target_genome.chromosomes[:]
with tempfile.NamedTemporaryFile(
mode="wt", suffix=".vcf") as temp_vcf:
with closing(pysam.VariantFile(vcffile)) as vcf, \
closing(pysam.VariantFile(
temp_vcf.name, "w", header=vcf.header)) as tmp:
pass
with open(temp_vcf.name, "rt") as tmp:
lines = tmp.readlines()
outheader = []
contig_index = None
for index, line in enumerate(lines):
if line.startswith("##contig"):
if contig_index is None:
contig_index = index
continue
outheader.append(line)
outcontigs = [
f"##contig=<ID={contig}>\n" for contig in target_contigs]
if contig_index is None:
raise ValueError("No contig lines found in header")
out_buf = "".join([
*outheader[:contig_index],
*outcontigs,
*outheader[contig_index:],
f"##source_genome={source_genome.resource_id}\n",
f"##target_genome={target_genome.resource_id}\n",
f"##command=vcf_liftover {' '.join(sys.argv[1:])}\n",
])
pathlib.Path(temp_vcf.name).write_text(out_buf)
with closing(pysam.VariantFile(temp_vcf.name)) as tmp:
return tmp.header
[docs]
def vcf_liftover_main(
argv: list[str] | None = None,
grr: GenomicResourceRepo | None = None,
) -> None:
"""VCF liftover tool main function."""
tool = VCFLiftoverTool()
tool.run(argv=argv, grr=grr)