Source code for dae.annotation.format_handlers
import gzip
import logging
from abc import abstractmethod
from collections.abc import Generator
from itertools import islice, starmap
from pathlib import Path
from typing import Any
from pysam import TabixFile, VariantFile, VariantRecord
from dae.annotation.annotatable import Annotatable, VCFAllele
from dae.annotation.annotation_config import (
AttributeInfo,
RawPipelineConfig,
)
from dae.annotation.annotation_factory import (
build_annotation_pipeline,
)
from dae.annotation.annotation_pipeline import (
AnnotationPipeline,
ReannotationPipeline,
)
from dae.annotation.record_to_annotatable import build_record_to_annotatable
from dae.genomic_resources.reference_genome import (
build_reference_genome_from_resource,
)
from dae.genomic_resources.repository_factory import (
build_genomic_resource_repository,
)
from dae.parquet.schema2.parquet_io import VariantsParquetWriter
from dae.schema2_storage.schema2_layout import Schema2DatasetLayout
from dae.utils.regions import (
Region,
)
from dae.variants.variant import SummaryVariant
from dae.variants_loaders.parquet.loader import ParquetLoader
logger = logging.getLogger("format_handlers")
[docs]
def stringify(value: Any, *, vcf: bool = False) -> str:
"""Format the value to a string for human-readable output."""
if value is None:
return "." if vcf else ""
if isinstance(value, float):
return f"{value:.3g}"
if isinstance(value, bool):
return "yes" if value else ""
return str(value)
[docs]
class AbstractFormat:
"""
Abstract class of input/output handlers for various formats.
This class and its children are responsible for correctly reading from
and writing to formats that can be annotated by our system.
They convert the raw input data to types that can be passed to the
annotation pipeline and then convert it back to its native format, as well
as handling the reading, updating and writing of metadata the format may
possess.
Each child class handles the specific differences of a single format.
"""
def __init__(
self,
pipeline_config: RawPipelineConfig,
pipeline_config_old: str | None,
cli_args: dict,
grr_definition: dict | None,
region: Region | None,
):
self.pipeline = None
self.grr = None
self.grr_definition = grr_definition
self.pipeline_config = pipeline_config
self.pipeline_config_old = pipeline_config_old
self.cli_args = cli_args
self.region = region
if self.cli_args.get("reannotate"):
pipeline_config_old = Path(self.cli_args["reannotate"]).read_text()
[docs]
def open(self) -> None:
"""
Initialize all necessary member variables and process relevant metadata.
"""
self.grr = \
build_genomic_resource_repository(definition=self.grr_definition)
self.pipeline = build_annotation_pipeline(
self.pipeline_config, self.grr,
allow_repeated_attributes=self.cli_args["allow_repeated_attributes"],
work_dir=Path(self.cli_args["work_dir"]),
config_old_raw=self.pipeline_config_old,
full_reannotation=self.cli_args["full_reannotation"],
)
self.pipeline.open()
[docs]
def close(self) -> None:
"""
Close any open files, clean up anything unnecessary.
"""
self.pipeline.close() # type: ignore
@abstractmethod
def _read(self) -> Generator[Any, None, None]:
"""
Read raw data from the input.
"""
@abstractmethod
def _convert(self, variant: Any) -> list[tuple[Annotatable, dict]]:
"""
Convert a single piece of raw data from the input into a usable format.
This method returns a list of tuples - one tuple per allele for the raw
variant data that has been read.
Each tuple contains an Annotatable instance, which will be
annotated by the relevant annotation pipeline, and a dictionary
containing any attributes already present in the raw data, such as a
previous annotation - this is called a "context". This "context" is
primarily used when reannotating data.
"""
@abstractmethod
def _apply(self, variant: Any, annotations: list[dict]) -> None:
"""
Apply produced annotations to the raw variant data.
This method updates the native variant data in-place with a list of
annotations - this list should contain a dictionary of annotation
results for each allele in the variant.
"""
@abstractmethod
def _write(self, variant: Any) -> None:
"""
Write a single piece of raw data to the output.
"""
[docs]
@staticmethod
def get_task_dir(region: Region | None) -> str:
"""Get dir for batch annotation."""
if region is None:
return "batch_work_dir"
chrom = region.chrom
pos_beg = region.start if region.start is not None else "_"
pos_end = region.stop if region.stop is not None else "_"
return f"{chrom}_{pos_beg}_{pos_end}"
[docs]
def process(self):
"""
Iteratively carry out the annotation of the input.
This method will read, annotate, apply and then write each variant
from the input data in an iterative fashion - one by one.
"""
assert self.pipeline is not None
annotations = []
for variant in self._read():
try:
annotations = list(starmap(self.pipeline.annotate,
self._convert(variant)))
self._apply(variant, annotations)
self._write(variant)
except Exception: # pylint: disable=broad-except
logger.exception("Error during iterative annotation")
[docs]
def process_batched(self):
"""
Carry out the annotation of the input in batches.
This method performs each step of the read-annotate-apply-write
loop in batches.
"""
assert self.pipeline is not None
batch_size = self.cli_args["batch_size"]
work_dir = AbstractFormat.get_task_dir(self.region)
errors = []
try:
while batch := tuple(islice(self._read(), batch_size)):
all_contexts = []
all_annotatables = []
allele_counts = [] # per variant
for variant in batch:
annotatables, contexts = zip(*self._convert(variant),
strict=True)
all_contexts.extend(contexts)
all_annotatables.extend(annotatables)
allele_counts.append(len(annotatables))
all_annotations = iter(self.pipeline.batch_annotate(
all_annotatables,
all_contexts,
batch_work_dir=work_dir,
))
for variant, allele_count in zip(batch, allele_counts,
strict=True):
annotations = [next(all_annotations)
for _ in range(allele_count)]
self._apply(variant, annotations)
self._write(variant)
except Exception as ex: # pylint: disable=broad-except
logger.exception("Error during batch annotation")
errors.append(str(ex))
if len(errors) > 0:
logger.error("there were errors during annotation")
for error in errors:
logger.error("\t%s", error)
[docs]
class ColumnsFormat(AbstractFormat):
"""
Handler for delimiter-separated values text files.
"""
def __init__(
self,
pipeline_config: RawPipelineConfig,
pipeline_config_old: str | None,
cli_args: dict,
grr_definition: dict | None,
region: Region | None,
input_path: str,
output_path: str,
ref_genome_id: str | None,
):
super().__init__(pipeline_config, pipeline_config_old,
cli_args, grr_definition, region)
self.input_path = input_path
self.output_path = output_path
self.ref_genome_id = ref_genome_id
self.input_separator = cli_args["input_separator"]
self.separator = cli_args["output_separator"]
self.ref_genome = None
self.line_iterator = None
self.header_columns = None
self.record_to_annotatable = None
self.annotation_columns = None
self.input_file = None
self.output_file = None
[docs]
def open(self) -> None:
# pylint: disable=consider-using-with
super().open()
assert self.grr is not None
if self.ref_genome_id:
res = self.grr.find_resource(self.ref_genome_id)
if res is not None:
self.ref_genome = \
build_reference_genome_from_resource(res).open()
# Open input file
if self.input_path.endswith(".gz"):
self.input_file = TabixFile(self.input_path)
with gzip.open(self.input_path, "rt") as in_file_raw:
raw_header = in_file_raw.readline()
if self.region is not None:
self.line_iterator = self.input_file.fetch(
self.region.chrom, self.region.start, self.region.stop)
else:
self.line_iterator = self.input_file.fetch()
else:
self.input_file = open(self.input_path, "rt") # noqa: SIM115
self.line_iterator = self.input_file
raw_header = self.input_file.readline()
# Set header columns
self.header_columns = [
c.strip("#")
for c in raw_header.strip("\r\n").split(self.input_separator)
]
self.record_to_annotatable = build_record_to_annotatable(
self.cli_args, set(self.header_columns), ref_genome=self.ref_genome)
self.annotation_columns = [
attr.name for attr in self.pipeline.get_attributes() # type: ignore
if not attr.internal
]
self.output_file = open(self.output_path, "w") # noqa: SIM115
# Write header to output file
if isinstance(self.pipeline, ReannotationPipeline):
old_annotation_columns = {
attr.name
for attr in self.pipeline.pipeline_old.get_attributes()
if not attr.internal
}
new_header = [
col for col in self.header_columns
if col not in old_annotation_columns
]
else:
new_header = list(self.header_columns)
new_header = new_header + self.annotation_columns
self.output_file.write(self.separator.join(new_header) + "\n")
[docs]
def close(self):
super().close()
if self.input_file is not None:
self.input_file.close()
if self.output_file is not None:
self.output_file.close()
def _read(self) -> Generator[dict, None, None]:
assert self.cli_args is not None
assert self.header_columns is not None
assert self.line_iterator is not None
errors = []
for lnum, line in enumerate(self.line_iterator):
try:
columns = line.strip("\n\r").split(self.input_separator)
record = dict(zip(self.header_columns, columns, strict=True))
yield record
except Exception as ex: # pylint: disable=broad-except
logger.exception(
"unexpected input data format at line %s: %s",
lnum, line)
errors.append((lnum, line, str(ex)))
if len(errors) > 0:
logger.error("there were errors during the import")
for lnum, line, error in errors:
logger.error("line %s: %s", lnum, line)
logger.error("\t%s", error)
def _apply(self, variant: dict, annotations: list[dict]):
if isinstance(self.pipeline, ReannotationPipeline):
for col in self.pipeline.attributes_deleted: # type: ignore
del variant[col]
# No support for multi-allelic variants in columns format
annotation = annotations[0]
for col in self.annotation_columns: # type: ignore
variant[col] = annotation[col]
def _convert(self, variant: dict) -> list[tuple[Annotatable, dict]]:
return [(self.record_to_annotatable.build(variant), dict(variant))] # type: ignore
def _write(self, variant: dict) -> None:
result = self.separator.join(
stringify(val) for val in variant.values()
) + "\n"
self.output_file.write(result) # type: ignore
[docs]
class VCFFormat(AbstractFormat):
"""
Handler for VCF format files.
"""
def __init__(
self,
pipeline_config: RawPipelineConfig,
pipeline_config_old: str | None,
cli_args: dict,
grr_definition: dict | None,
region: Region | None,
input_path: str,
output_path: str,
):
super().__init__(pipeline_config, pipeline_config_old,
cli_args, grr_definition, region)
self.input_path = input_path
self.output_path = output_path
self.input_file = None
self.output_file = None
self.annotation_attributes = None
@staticmethod
def _update_header(
variant_file: VariantFile,
pipeline: AnnotationPipeline | ReannotationPipeline,
args: dict,
) -> None:
"""Update a variant file's header with annotation pipeline scores."""
header = variant_file.header
header.add_meta("pipeline_annotation_tool", "GPF variant annotation.")
header.add_meta("pipeline_annotation_tool", f"{args}")
if isinstance(pipeline, ReannotationPipeline):
header_info_keys = variant_file.header.info.keys()
old_annotation_columns = {
attr.name for attr in pipeline.pipeline_old.get_attributes()
if not attr.internal
}
new_annotation_columns = {
attr.name for attr in pipeline.get_attributes()
if not attr.internal
}
for info_key in header_info_keys:
if info_key in old_annotation_columns \
and info_key not in new_annotation_columns:
variant_file.header.info.remove_header(info_key)
attributes = []
for attr in pipeline.get_attributes():
if attr.internal:
continue
if attr.name not in variant_file.header.info:
attributes.append(attr)
else:
attributes = pipeline.get_attributes()
for attribute in attributes:
description = attribute.description
description = description.replace("\n", " ")
description = description.replace('"', '\\"')
header.info.add(attribute.name, "A", "String", description)
[docs]
def open(self) -> None:
super().open()
assert self.pipeline is not None
self.annotation_attributes = self.pipeline.get_attributes()
self.input_file = VariantFile(self.input_path, "r")
VCFFormat._update_header(self.input_file, self.pipeline, self.cli_args)
self.output_file = VariantFile(self.output_path, "w",
header=self.input_file.header)
[docs]
def close(self):
super().close()
self.input_file.close() # type: ignore
self.output_file.close() # type: ignore
def _read(self) -> Generator[Any, None, None]:
assert self.input_file is not None
if self.region is None:
in_file_iter = self.input_file.fetch()
else:
in_file_iter = self.input_file.fetch(
self.region.chrom, self.region.start, self.region.stop)
for vcf_var in in_file_iter:
if vcf_var.ref is None:
logger.warning(
"vcf variant without reference: %s %s",
vcf_var.chrom, vcf_var.pos,
)
continue
if vcf_var.alts is None:
logger.info(
"vcf variant without alternatives: %s %s",
vcf_var.chrom, vcf_var.pos,
)
continue
yield vcf_var
def _convert(
self, variant: VariantRecord,
) -> list[tuple[Annotatable, dict]]:
return [
(VCFAllele(variant.chrom, variant.pos, variant.ref, alt), # type: ignore
dict(variant.info))
for alt in variant.alts # type: ignore
]
def _apply(self, variant: VariantRecord, annotations: list[dict]):
VCFFormat._update_vcf_variant(
variant, annotations,
self.annotation_attributes, # type: ignore
self.pipeline, # type: ignore
)
def _write(self, variant: VariantRecord) -> None:
self.output_file.write(variant) # type: ignore
@staticmethod
def _update_vcf_variant(
vcf_var: VariantRecord,
allele_annotations: list,
attributes: list[AttributeInfo],
pipeline: AnnotationPipeline,
) -> None:
buffers: list[list] = [[] for _ in attributes]
for annotation in allele_annotations:
if isinstance(pipeline, ReannotationPipeline):
for col in pipeline.attributes_deleted:
del vcf_var.info[col]
for buff, attribute in zip(buffers, attributes, strict=True):
attr = annotation.get(attribute.name)
if isinstance(attr, list):
attr = ";".join(stringify(a, vcf=True) for a in attr)
elif isinstance(attr, dict):
attr = ";".join(
f"{k}:{v}"
for k, v in attr.items()
)
else:
attr = stringify(attr, vcf=True)
attr = stringify(attr, vcf=True) \
.replace(";", "|")\
.replace(",", "|")\
.replace(" ", "_")
buff.append(attr)
# If the all values for a given attribute are
# empty (i.e. - "."), then that attribute has no
# values to be written and will be skipped in the output
has_value = {
attr.name: any(filter(lambda x: x != ".", buffers[idx]))
for idx, attr in enumerate(attributes)
}
for buff, attribute in zip(buffers, attributes, strict=True):
if attribute.internal or not has_value[attribute.name]:
continue
vcf_var.info[attribute.name] = buff
[docs]
class ParquetFormat(AbstractFormat):
"""
Handler for Schema2 Parquet datasets.
"""
def __init__(
self,
pipeline_config: RawPipelineConfig,
pipeline_config_old: str | None,
cli_args: dict,
grr_definition: dict | None,
region: Region | None,
input_layout: Schema2DatasetLayout,
output_dir: str,
bucket_idx: int,
):
super().__init__(pipeline_config, pipeline_config_old,
cli_args, grr_definition, region)
self.input_layout = input_layout
self.output_dir = output_dir
self.bucket_idx = bucket_idx
self.input_loader = None
self.writer = None
self.internal_attributes = None
[docs]
def open(self) -> None:
super().open()
assert self.pipeline is not None
self.input_loader = ParquetLoader(self.input_layout)
self.writer = VariantsParquetWriter(
self.output_dir, self.pipeline,
self.input_loader.partition_descriptor,
bucket_index=self.bucket_idx,
)
if isinstance(self.pipeline, ReannotationPipeline):
self.internal_attributes = [
attribute.name
for annotator in (self.pipeline.annotators_new
| self.pipeline.annotators_rerun)
for attribute in annotator.attributes
if attribute.internal
]
else:
self.internal_attributes = [
attribute.name
for attribute in self.pipeline.get_attributes()
if attribute.internal
]
def _read(self) -> Generator[SummaryVariant, None, None]:
assert self.input_loader is not None
yield from self.input_loader.fetch_summary_variants(region=self.region)
def _convert(
self, variant: SummaryVariant,
) -> list[tuple[Annotatable, dict]]:
return [(allele.get_annotatable(), allele.attributes)
for allele in variant.alt_alleles]
def _apply(self, variant: SummaryVariant, annotations: list[dict]):
for allele, annotation in zip(variant.alt_alleles, annotations,
strict=True):
if isinstance(self.pipeline, ReannotationPipeline):
for attr in self.pipeline.attributes_deleted:
del allele.attributes[attr]
for attr in self.internal_attributes: # type: ignore
del annotation[attr]
allele.update_attributes(annotation)
def _write(self, variant: SummaryVariant) -> None:
assert self.internal_attributes is not None
assert self.writer is not None
self.writer.write_summary_variant(variant)