Source code for dae.genomic_resources.testing

"""Provides tools usefult for testing."""
from __future__ import annotations

import contextlib
import gzip
import logging
import os
import pathlib
import shutil
import tempfile
import textwrap
from collections.abc import Generator
from typing import Any, cast

import pyBigWig  # type: ignore
import pysam
from s3fs.core import S3FileSystem

from dae.genomic_resources.fsspec_protocol import (
    FsspecReadOnlyProtocol,
    FsspecReadWriteProtocol,
    build_fsspec_protocol,
    build_inmemory_protocol,
)
from dae.genomic_resources.gene_models.gene_models import GeneModels
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.genomic_resources.repository import (
    GenomicResource,
    GenomicResourceProtocolRepo,
)

logger = logging.getLogger(__name__)


[docs] def convert_to_tab_separated(content: str) -> str: """Convert a string into tab separated file content. Useful for testing purposes. If you need to have a space in the file content use '||'. """ result = [] for line in content.split("\n"): line = line.strip("\n\r") if not line: continue if line.startswith("##"): result.append(line) else: result.append("\t".join(line.split())) text = "\n".join(result) text = text.replace("||", " ") return text.replace("EMPTY", ".")
[docs] def setup_directories( root_dir: pathlib.Path, content: str | dict[str, Any]) -> None: """Set up directory and subdirectory structures using the content.""" root_dir = pathlib.Path(root_dir) root_dir.parent.mkdir(parents=True, exist_ok=True) if isinstance(content, str): root_dir.write_text(content, encoding="utf8") elif isinstance(content, bytes): root_dir.write_bytes(content) elif isinstance(content, dict): for path_name, path_content in content.items(): setup_directories(root_dir / path_name, path_content) else: raise TypeError( f"unexpected content type: {content} for {root_dir}")
[docs] def setup_pedigree(ped_path: pathlib.Path, content: str) -> pathlib.Path: ped_data = convert_to_tab_separated(content) setup_directories(ped_path, ped_data) return ped_path
[docs] def setup_denovo(denovo_path: pathlib.Path, content: str) -> pathlib.Path: denovo_data = convert_to_tab_separated(content) setup_directories(denovo_path, denovo_data) return denovo_path
[docs] def setup_tabix( tabix_path: pathlib.Path, tabix_content: str, **kwargs: bool | str | int) -> tuple[str, str]: """Set up a tabix file.""" content = convert_to_tab_separated(tabix_content) out_path = tabix_path if tabix_path.suffix == ".gz": out_path = tabix_path.with_suffix("") setup_directories(out_path, content) tabix_filename = str(out_path.parent / f"{out_path.name}.gz") index_filename = f"{tabix_filename}.tbi" force = cast(bool, kwargs.pop("force", False)) # pylint: disable=no-member pysam.tabix_compress(str(out_path), tabix_filename, force=force) pysam.tabix_index(tabix_filename, force=force, **kwargs) # type: ignore out_path.unlink() return tabix_filename, index_filename
[docs] def setup_gzip(gzip_path: pathlib.Path, gzip_content: str) -> pathlib.Path: """Set up a gzipped TSV file.""" content = convert_to_tab_separated(gzip_content) out_path = gzip_path if gzip_path.suffix != ".gz": out_path = gzip_path.with_suffix("gz") with gzip.open(out_path, "wt") as outfile: outfile.write(content) return out_path
[docs] def setup_vcf( out_path: pathlib.Path, content: str, *, csi: bool = False) -> pathlib.Path: """Set up a VCF file using the content.""" vcf_data = convert_to_tab_separated(content) vcf_path = out_path if out_path.suffix == ".gz": vcf_path = out_path.with_suffix("") assert vcf_path.suffix == ".vcf" header_path = vcf_path.with_suffix("") header_path = header_path.parent / f"{header_path.name}.header.vcf" setup_directories(vcf_path, vcf_data) # pylint: disable=no-member if out_path.suffix == ".gz": vcf_gz_filename = str(vcf_path.parent / f"{vcf_path.name}.gz") pysam.tabix_compress(str(vcf_path), vcf_gz_filename) pysam.tabix_index(vcf_gz_filename, preset="vcf", csi=csi) with pysam.VariantFile(str(out_path)) as variant_file: header = variant_file.header with open(header_path, "wt", encoding="utf8") as outfile: outfile.write(str(header)) if out_path.suffix == ".gz": header_gz_filename = str(header_path.parent / f"{header_path.name}.gz") pysam.tabix_compress(str(header_path), header_gz_filename) pysam.tabix_index(header_gz_filename, preset="vcf") return out_path
[docs] def setup_dae_transmitted( root_path: pathlib.Path, summary_content: str, toomany_content: str, ) -> tuple[pathlib.Path, pathlib.Path]: """Set up a DAE transmitted variants file using passed content.""" summary = convert_to_tab_separated(summary_content) toomany = convert_to_tab_separated(toomany_content) setup_directories(root_path, { "dae_transmitted_data": { "tr.txt": summary, "tr-TOOMANY.txt": toomany, }, }) # pylint: disable=no-member pysam.tabix_compress( str(root_path / "dae_transmitted_data" / "tr.txt"), str(root_path / "dae_transmitted_data" / "tr.txt.gz")) pysam.tabix_compress( str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt"), str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz")) pysam.tabix_index( str(root_path / "dae_transmitted_data" / "tr.txt.gz"), seq_col=0, start_col=1, end_col=1, line_skip=1) pysam.tabix_index( str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz"), seq_col=0, start_col=1, end_col=1, line_skip=1) return (root_path / "dae_transmitted_data" / "tr.txt.gz", root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz")
[docs] def setup_bigwig( out_path: pathlib.Path, content: str, chrom_lens: dict[str, int], ) -> pathlib.Path: """ Setup a bigwig format variants file using bedGraph-style content. Example: chr1 0 100 0.0 chr1 100 120 1.0 chr1 125 126 200.0 """ assert out_path.parent.exists() bw_file = pyBigWig.open(str(out_path), "w") # pylint: disable=I1101 bw_file.addHeader(list(chrom_lens.items()), maxZooms=0) chrom_col: list[str] = [] start_col: list[int] = [] end_col: list[int] = [] val_col: list[float] = [] prev_end: int = -1 prev_chrom: str = "" for line in convert_to_tab_separated(content).split("\n"): tokens = line.strip().split("\t") assert len(tokens) == 4 chrom = tokens[0] start = int(tokens[1]) end = int(tokens[2]) val = float(tokens[3]) assert chrom in chrom_lens assert start < end if chrom == prev_chrom: assert start >= prev_end prev_chrom = chrom prev_end = end chrom_col.append(chrom) start_col.append(start) end_col.append(end) val_col.append(val) bw_file.addEntries(chrom_col, start_col, ends=end_col, values=val_col) bw_file.close() return out_path
[docs] def setup_genome(out_path: pathlib.Path, content: str) -> ReferenceGenome: """Set up reference genome using the content.""" if out_path.suffix != ".fa": raise ValueError("genome output file is expected to have '.fa' suffix") setup_directories(out_path, convert_to_tab_separated(content)) # pylint: disable=no-member pysam.faidx(str(out_path)) setup_directories(out_path.parent, { "genomic_resource.yaml": textwrap.dedent(f""" type: genome filename: {out_path.name} """), }) # pylint: disable=import-outside-toplevel from dae.genomic_resources.reference_genome import ( build_reference_genome_from_file, ) return build_reference_genome_from_file(str(out_path)).open()
[docs] def setup_gene_models( out_path: pathlib.Path, content: str, fileformat: str | None = None, config: str | None = None) -> GeneModels: """Set up gene models in refflat format using the passed content.""" setup_directories(out_path, convert_to_tab_separated(content)) if config is None: config = textwrap.dedent(f""" type: gene_models filename: {out_path.name} format: "{fileformat}" """) setup_directories(out_path.parent, {"genomic_resource.yaml": config}) # pylint: disable=import-outside-toplevel from dae.genomic_resources.gene_models.gene_models_factory import ( build_gene_models_from_file, ) gene_models = build_gene_models_from_file( str(out_path), file_format=fileformat) gene_models.load() return gene_models
[docs] def setup_empty_gene_models(out_path: pathlib.Path) -> GeneModels: """Set up empty gene models.""" content = """ #geneName name chrom strand txStart txEnd cdsStart cdsEnd exonCount exonStarts exonEnds """ # noqa return setup_gene_models(out_path, content, fileformat="refflat")
[docs] def build_inmemory_test_protocol( content: dict[str, Any]) -> FsspecReadWriteProtocol: """Build and return an embedded fsspec protocol for testing.""" with tempfile.TemporaryDirectory("embedded_test_protocol") as root_path: return build_inmemory_protocol(root_path, root_path, content)
[docs] def build_inmemory_test_repository( content: dict[str, Any]) -> GenomicResourceProtocolRepo: """Create an embedded GRR repository using passed content.""" proto = build_inmemory_test_protocol(content) return GenomicResourceProtocolRepo(proto)
[docs] def build_inmemory_test_resource( content: dict[str, Any]) -> GenomicResource: """Create a test resource based on content passed. The passed content should appropriate for a single resource. Example content: { "genomic_resource.yaml": textwrap.dedent(''' type: position_score table: filename: data.txt scores: - id: aaaa type: float desc: "" name: sc '''), "data.txt": convert_to_tab_separated(''' #chrom start end sc 1 10 12 1.1 2 13 14 1.2 ''') } """ proto = build_inmemory_test_protocol(content) return proto.get_resource("")
[docs] def build_filesystem_test_protocol( root_path: pathlib.Path, *, repair: bool = True, ) -> FsspecReadWriteProtocol: """Build and return an filesystem fsspec protocol for testing. The root_path is expected to point to a directory structure with all the resources. """ proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol(str(root_path), str(root_path))) if repair: for res in proto.get_all_resources(): proto.save_manifest(res, proto.build_manifest(res)) proto.build_content_file() return proto
[docs] def build_filesystem_test_repository( root_path: pathlib.Path) -> GenomicResourceProtocolRepo: """Build and return an filesystem fsspec repository for testing. The root_path is expected to point to a directory structure with all the resources. """ proto = build_filesystem_test_protocol(root_path) return GenomicResourceProtocolRepo(proto)
[docs] def build_filesystem_test_resource( root_path: pathlib.Path) -> GenomicResource: proto = build_filesystem_test_protocol(root_path) return proto.get_resource("")
[docs] @contextlib.contextmanager def build_http_test_protocol( root_path: pathlib.Path, *, repair: bool = True, ) -> Generator[FsspecReadOnlyProtocol, None, None]: """Populate Apache2 directory and construct HTTP genomic resource protocol. The Apache2 is used to serve the GRR. This root_path directory should be a valid filesystem genomic resource repository. """ source_proto = build_filesystem_test_protocol(root_path, repair=repair) http_path = pathlib.Path(__file__).parent.parent.parent http_path = http_path / "tests" / ".test_grr" assert http_path.parts[-3:] == ("dae", "tests", ".test_grr"), http_path http_path = http_path / root_path.name http_path.mkdir(parents=True, exist_ok=True) dest_proto = build_filesystem_test_protocol(http_path) copy_proto_genomic_resources( dest_proto, source_proto) host = os.environ.get("HTTP_HOST", "localhost:28080") server_address = f"http://{host}/{http_path.name}" try: yield build_fsspec_protocol(str(root_path), server_address) except GeneratorExit: print("Generator exit") finally: shutil.rmtree(http_path)
[docs] def s3_test_server_endpoint() -> str: host = os.environ.get("LOCALSTACK_HOST", "localhost") return f"http://{host}:4566"
[docs] def s3_test_protocol() -> FsspecReadWriteProtocol: """Build an S3 fsspec testing protocol on top of existing S3 server.""" endpoint_url = s3_test_server_endpoint() s3filesystem = build_s3_test_filesystem() bucket_url = build_s3_test_bucket(s3filesystem) return cast( FsspecReadWriteProtocol, build_fsspec_protocol( str(bucket_url), bucket_url, endpoint_url=endpoint_url))
[docs] def build_s3_test_filesystem( endpoint_url: str | None = None) -> S3FileSystem: """Create an S3 fsspec filesystem connected to the S3 server.""" if "AWS_SECRET_ACCESS_KEY" not in os.environ: os.environ["AWS_SECRET_ACCESS_KEY"] = "foo" # noqa: S105 if "AWS_ACCESS_KEY_ID" not in os.environ: os.environ["AWS_ACCESS_KEY_ID"] = "foo" if endpoint_url is None: endpoint_url = s3_test_server_endpoint() assert endpoint_url is not None s3filesystem = S3FileSystem( anon=False, client_kwargs={"endpoint_url": endpoint_url}) s3filesystem.invalidate_cache() return s3filesystem
[docs] def build_s3_test_bucket(s3filesystem: S3FileSystem | None = None) -> str: """Create an s3 test buckent.""" with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path: if s3filesystem is None: s3filesystem = build_s3_test_filesystem() bucket_url = f"s3:/{tmp_path}" s3filesystem.mkdir(bucket_url, acl="public-read") return bucket_url
[docs] @contextlib.contextmanager def build_s3_test_protocol( root_path: pathlib.Path, ) -> Generator[FsspecReadWriteProtocol, None, None]: """Construct fsspec genomic resource protocol. The S3 bucket is populated with resource from filesystem GRR pointed by the root_path. """ endpoint_url = s3_test_server_endpoint() s3filesystem = build_s3_test_filesystem(endpoint_url) bucket_url = build_s3_test_bucket(s3filesystem) proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol( str(bucket_url), bucket_url, endpoint_url=endpoint_url)) copy_proto_genomic_resources( proto, build_filesystem_test_protocol(root_path)) yield proto
[docs] def copy_proto_genomic_resources( dest_proto: FsspecReadWriteProtocol, src_proto: FsspecReadOnlyProtocol) -> None: for res in src_proto.get_all_resources(): dest_proto.copy_resource(res) dest_proto.build_content_file() dest_proto.filesystem.invalidate_cache()
[docs] @contextlib.contextmanager def proto_builder( scheme: str, content: dict, ) -> Generator[ FsspecReadOnlyProtocol | FsspecReadWriteProtocol, None, None]: """Build a test genomic resource protocol with specified content.""" with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path: root_path = pathlib.Path(tmp_path) setup_directories(root_path, content) if scheme == "file": try: yield build_filesystem_test_protocol(root_path) except GeneratorExit: print("Generator exit") return if scheme == "s3": with build_s3_test_protocol(root_path) as proto: try: yield proto except GeneratorExit: print("Generator exit") return if scheme == "http": with build_http_test_protocol(root_path) as proto: try: yield proto except GeneratorExit: print("Generator exit") return raise ValueError(f"unexpected protocol scheme: <{scheme}>")
[docs] @contextlib.contextmanager def resource_builder( scheme: str, content: dict) -> Generator[GenomicResource, None, None]: with proto_builder(scheme, content) as proto: res = proto.get_resource("") yield res