Source code for gain.genomic_resources.testing

"""Provides tools usefult for testing."""
from __future__ import annotations

import contextlib
import gzip
import logging
import os
import pathlib
import shutil
import tempfile
import textwrap
from collections.abc import Generator
from typing import Any, cast

import pyBigWig
import pysam
from s3fs.core import S3FileSystem

from gain.genomic_resources.fsspec_protocol import (
    FsspecReadOnlyProtocol,
    FsspecReadWriteProtocol,
    build_fsspec_protocol,
    build_inmemory_protocol,
)
from gain.genomic_resources.gene_models import GeneModels
from gain.genomic_resources.reference_genome import ReferenceGenome
from gain.genomic_resources.repository import (
    GenomicResource,
    GenomicResourceProtocolRepo,
)

logger = logging.getLogger(__name__)


[docs] def convert_to_tab_separated(content: str) -> str: """Convert a string into tab separated file content. Useful for testing purposes. If you need to have a space in the file content use '||'. """ result = [] for line in content.split("\n"): line = line.strip("\n\r") if not line: continue if line.startswith("##"): result.append(line) else: result.append("\t".join(line.split())) text = "\n".join(result) text = text.replace("||", " ") return text.replace("EMPTY", ".")
[docs] def setup_directories( root_dir: pathlib.Path, content: str | dict[str, Any]) -> None: """Set up directory and subdirectory structures using the content.""" root_dir = pathlib.Path(root_dir) root_dir.parent.mkdir(parents=True, exist_ok=True) if isinstance(content, str): root_dir.write_text(content, encoding="utf8") elif isinstance(content, bytes): root_dir.write_bytes(content) elif isinstance(content, dict): for path_name, path_content in content.items(): setup_directories(root_dir / path_name, path_content) else: raise TypeError( f"unexpected content type: {content} for {root_dir}")
[docs] def setup_pedigree(ped_path: pathlib.Path, content: str) -> pathlib.Path: ped_data = convert_to_tab_separated(content) setup_directories(ped_path, ped_data) return ped_path
[docs] def setup_denovo(denovo_path: pathlib.Path, content: str) -> pathlib.Path: denovo_data = convert_to_tab_separated(content) setup_directories(denovo_path, denovo_data) return denovo_path
[docs] def setup_tabix( tabix_path: pathlib.Path, tabix_content: str, **kwargs: bool | str | int) -> tuple[str, str]: """Set up a tabix file.""" content = convert_to_tab_separated(tabix_content) out_path = tabix_path if tabix_path.suffix == ".gz": out_path = tabix_path.with_suffix("") setup_directories(out_path, content) tabix_filename = str(out_path.parent / f"{out_path.name}.gz") index_filename = f"{tabix_filename}.tbi" force = cast(bool, kwargs.pop("force", False)) # pylint: disable=no-member pysam.tabix_compress(str(out_path), tabix_filename, force=force) pysam.tabix_index(tabix_filename, force=force, **kwargs) # type: ignore out_path.unlink() return tabix_filename, index_filename
[docs] def setup_gzip(gzip_path: pathlib.Path, gzip_content: str) -> pathlib.Path: """Set up a gzipped TSV file.""" content = convert_to_tab_separated(gzip_content) out_path = gzip_path out_path.parent.mkdir(parents=True, exist_ok=True) if gzip_path.suffix != ".gz": out_path = gzip_path.with_suffix("gz") with gzip.open(out_path, "wt") as outfile: outfile.write(content) return out_path
[docs] def setup_vcf( out_path: pathlib.Path, content: str, *, csi: bool = False) -> pathlib.Path: """Set up a VCF file using the content.""" vcf_data = convert_to_tab_separated(content) vcf_path = out_path if out_path.suffix == ".gz": vcf_path = out_path.with_suffix("") assert vcf_path.suffix == ".vcf" header_path = vcf_path.with_suffix("") header_path = header_path.parent / f"{header_path.name}.header.vcf" setup_directories(vcf_path, vcf_data) # pylint: disable=no-member if out_path.suffix == ".gz": vcf_gz_filename = str(vcf_path.parent / f"{vcf_path.name}.gz") pysam.tabix_compress(str(vcf_path), vcf_gz_filename) pysam.tabix_index(vcf_gz_filename, preset="vcf", csi=csi) with pysam.VariantFile(str(out_path)) as variant_file: header = variant_file.header with open(header_path, "wt", encoding="utf8") as outfile: outfile.write(str(header)) if out_path.suffix == ".gz": header_gz_filename = str(header_path.parent / f"{header_path.name}.gz") pysam.tabix_compress(str(header_path), header_gz_filename) pysam.tabix_index(header_gz_filename, preset="vcf") return out_path
[docs] def setup_dae_transmitted( root_path: pathlib.Path, summary_content: str, toomany_content: str, ) -> tuple[pathlib.Path, pathlib.Path]: """Set up a DAE transmitted variants file using passed content.""" summary = convert_to_tab_separated(summary_content) toomany = convert_to_tab_separated(toomany_content) setup_directories(root_path, { "dae_transmitted_data": { "tr.txt": summary, "tr-TOOMANY.txt": toomany, }, }) # pylint: disable=no-member pysam.tabix_compress( str(root_path / "dae_transmitted_data" / "tr.txt"), str(root_path / "dae_transmitted_data" / "tr.txt.gz")) pysam.tabix_compress( str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt"), str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz")) pysam.tabix_index( str(root_path / "dae_transmitted_data" / "tr.txt.gz"), seq_col=0, start_col=1, end_col=1, line_skip=1) pysam.tabix_index( str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz"), seq_col=0, start_col=1, end_col=1, line_skip=1) return (root_path / "dae_transmitted_data" / "tr.txt.gz", root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz")
[docs] def setup_bigwig( out_path: pathlib.Path, content: str, chrom_lens: dict[str, int], ) -> pathlib.Path: """ Setup a bigwig format variants file using bedGraph-style content. Example: chr1 0 100 0.0 chr1 100 120 1.0 chr1 125 126 200.0 """ assert out_path.parent.exists() bw_file = pyBigWig.open(str(out_path), "w") # pylint: disable=I1101 bw_file.addHeader(list(chrom_lens.items()), maxZooms=0) chrom_col: list[str] = [] start_col: list[int] = [] end_col: list[int] = [] val_col: list[float] = [] prev_end: int = -1 prev_chrom: str = "" for line in convert_to_tab_separated(content).split("\n"): tokens = line.strip().split("\t") assert len(tokens) == 4 chrom = tokens[0] start = int(tokens[1]) end = int(tokens[2]) val = float(tokens[3]) assert chrom in chrom_lens assert start < end if chrom == prev_chrom: assert start >= prev_end prev_chrom = chrom prev_end = end chrom_col.append(chrom) start_col.append(start) end_col.append(end) val_col.append(val) bw_file.addEntries(chrom_col, start_col, ends=end_col, values=val_col) bw_file.close() return out_path
[docs] def setup_genome(out_path: pathlib.Path, content: str) -> ReferenceGenome: """Set up reference genome using the content.""" if out_path.suffix != ".fa": raise ValueError("genome output file is expected to have '.fa' suffix") setup_directories(out_path, convert_to_tab_separated(content)) # pylint: disable=no-member pysam.faidx(str(out_path)) setup_directories(out_path.parent, { "genomic_resource.yaml": textwrap.dedent(f""" type: genome filename: {out_path.name} """), }) # pylint: disable=import-outside-toplevel from gain.genomic_resources.reference_genome import ( build_reference_genome_from_file, ) return build_reference_genome_from_file(str(out_path)).open()
[docs] def setup_gene_models( out_path: pathlib.Path, content: str, fileformat: str | None = None, config: str | None = None) -> GeneModels: """Set up gene models in refflat format using the passed content.""" setup_directories(out_path, convert_to_tab_separated(content)) if config is None: config = textwrap.dedent(f""" type: gene_models filename: {out_path.name} format: "{fileformat}" """) setup_directories(out_path.parent, {"genomic_resource.yaml": config}) # pylint: disable=import-outside-toplevel from gain.genomic_resources.gene_models.gene_models_factory import ( build_gene_models_from_file, ) gene_models = build_gene_models_from_file( str(out_path), file_format=fileformat) gene_models.load() return gene_models
[docs] def setup_empty_gene_models(out_path: pathlib.Path) -> GeneModels: """Set up empty gene models.""" content = """ #geneName name chrom strand txStart txEnd cdsStart cdsEnd exonCount exonStarts exonEnds """ # noqa return setup_gene_models(out_path, content, fileformat="refflat")
[docs] def build_inmemory_test_protocol( content: dict[str, Any]) -> FsspecReadWriteProtocol: """Build and return an embedded fsspec protocol for testing.""" with tempfile.TemporaryDirectory("embedded_test_protocol") as root_path: return build_inmemory_protocol(root_path, root_path, content)
[docs] def build_inmemory_test_repository( content: dict[str, Any]) -> GenomicResourceProtocolRepo: """Create an embedded GRR repository using passed content.""" proto = build_inmemory_test_protocol(content) return GenomicResourceProtocolRepo(proto)
[docs] def build_inmemory_test_resource( content: dict[str, Any]) -> GenomicResource: """Create a test resource based on content passed. The passed content should appropriate for a single resource. Example content: { "genomic_resource.yaml": textwrap.dedent(''' type: position_score table: filename: data.txt scores: - id: aaaa type: float desc: "" name: sc '''), "data.txt": convert_to_tab_separated(''' #chrom start end sc 1 10 12 1.1 2 13 14 1.2 ''') } """ proto = build_inmemory_test_protocol(content) return proto.get_resource("")
[docs] def build_filesystem_test_protocol( root_path: pathlib.Path, *, repair: bool = True, ) -> FsspecReadWriteProtocol: """Build and return an filesystem fsspec protocol for testing. The root_path is expected to point to a directory structure with all the resources. """ proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol(str(root_path), str(root_path))) if repair: for res in proto.get_all_resources(): proto.save_manifest(res, proto.build_manifest(res)) proto.build_content_file() return proto
[docs] def build_filesystem_test_repository( root_path: pathlib.Path) -> GenomicResourceProtocolRepo: """Build and return an filesystem fsspec repository for testing. The root_path is expected to point to a directory structure with all the resources. """ proto = build_filesystem_test_protocol(root_path) return GenomicResourceProtocolRepo(proto)
[docs] def build_filesystem_test_resource( root_path: pathlib.Path) -> GenomicResource: proto = build_filesystem_test_protocol(root_path) return proto.get_resource("")
[docs] @contextlib.contextmanager def build_http_test_protocol( root_path: pathlib.Path, *, repair: bool = True, ) -> Generator[FsspecReadOnlyProtocol, None, None]: """Populate Apache2 directory and construct HTTP genomic resource protocol. The Apache2 is used to serve the GRR. This root_path directory should be a valid filesystem genomic resource repository. """ source_proto = build_filesystem_test_protocol(root_path, repair=repair) http_path = pathlib.Path(__file__).parent.parent.parent http_path = http_path / "tests" / ".test_grr" assert http_path.parts[-2:] == ("tests", ".test_grr"), http_path http_path = http_path / root_path.name http_path.mkdir(parents=True, exist_ok=True) dest_proto = build_filesystem_test_protocol(http_path) copy_proto_genomic_resources( dest_proto, source_proto) host = os.environ.get("HTTP_HOST", "localhost:28080") server_address = f"http://{host}/{http_path.name}" try: yield build_fsspec_protocol(str(root_path), server_address) except GeneratorExit: print("Generator exit") finally: shutil.rmtree(http_path)
[docs] def s3_test_server_endpoint() -> str: host = os.environ.get("MINIO_HOST", "localhost") return f"http://{host}:9000"
[docs] def s3_test_protocol() -> FsspecReadWriteProtocol: """Build an S3 fsspec testing protocol on top of existing S3 server.""" endpoint_url = s3_test_server_endpoint() s3filesystem = build_s3_test_filesystem() bucket_url = build_s3_test_bucket(s3filesystem) return cast( FsspecReadWriteProtocol, build_fsspec_protocol( str(bucket_url), bucket_url, endpoint_url=endpoint_url))
[docs] def build_s3_test_filesystem( endpoint_url: str | None = None) -> S3FileSystem: """Create an S3 fsspec filesystem connected to the S3 server.""" if "AWS_SECRET_ACCESS_KEY" not in os.environ: os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin" # noqa: S105 if "AWS_ACCESS_KEY_ID" not in os.environ: os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin" if endpoint_url is None: endpoint_url = s3_test_server_endpoint() assert endpoint_url is not None s3filesystem = S3FileSystem( anon=False, client_kwargs={"endpoint_url": endpoint_url}) s3filesystem.invalidate_cache() return s3filesystem
[docs] def build_s3_test_bucket(s3filesystem: S3FileSystem | None = None) -> str: """Create an s3 test buckent.""" with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path: if s3filesystem is None: s3filesystem = build_s3_test_filesystem() bucket_url = f"s3://test-bucket{tmp_path}" s3filesystem.mkdir(bucket_url, acl="public-read") return bucket_url
[docs] @contextlib.contextmanager def build_s3_test_protocol( root_path: pathlib.Path, ) -> Generator[FsspecReadWriteProtocol, None, None]: """Construct fsspec genomic resource protocol. The S3 bucket is populated with resource from filesystem GRR pointed by the root_path. """ endpoint_url = s3_test_server_endpoint() s3filesystem = build_s3_test_filesystem(endpoint_url) bucket_url = build_s3_test_bucket(s3filesystem) proto = cast( FsspecReadWriteProtocol, build_fsspec_protocol( str(bucket_url), bucket_url, endpoint_url=endpoint_url)) copy_proto_genomic_resources( proto, build_filesystem_test_protocol(root_path)) yield proto
[docs] def copy_proto_genomic_resources( dest_proto: FsspecReadWriteProtocol, src_proto: FsspecReadOnlyProtocol) -> None: for res in src_proto.get_all_resources(): dest_proto.copy_resource(res) dest_proto.build_content_file() dest_proto.filesystem.invalidate_cache()
[docs] @contextlib.contextmanager def proto_builder( scheme: str, content: dict, ) -> Generator[ FsspecReadOnlyProtocol | FsspecReadWriteProtocol, None, None]: """Build a test genomic resource protocol with specified content.""" with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path: root_path = pathlib.Path(tmp_path) setup_directories(root_path, content) if scheme == "file": try: yield build_filesystem_test_protocol(root_path) except GeneratorExit: print("Generator exit") return if scheme == "s3": with build_s3_test_protocol(root_path) as proto: try: yield proto except GeneratorExit: print("Generator exit") return if scheme == "http": with build_http_test_protocol(root_path) as proto: try: yield proto except GeneratorExit: print("Generator exit") return raise ValueError(f"unexpected protocol scheme: <{scheme}>")
[docs] @contextlib.contextmanager def resource_builder( scheme: str, content: dict) -> Generator[GenomicResource, None, None]: with proto_builder(scheme, content) as proto: res = proto.get_resource("") yield res