"""Provides tools usefult for testing."""
from __future__ import annotations
import contextlib
import gzip
import logging
import os
import pathlib
import shutil
import tempfile
import textwrap
from collections.abc import Generator
from typing import Any, cast
import pyBigWig # type: ignore
import pysam
from s3fs.core import S3FileSystem
from dae.genomic_resources.fsspec_protocol import (
FsspecReadOnlyProtocol,
FsspecReadWriteProtocol,
build_fsspec_protocol,
build_inmemory_protocol,
)
from dae.genomic_resources.gene_models import GeneModels
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.genomic_resources.repository import (
GenomicResource,
GenomicResourceProtocolRepo,
)
logger = logging.getLogger(__name__)
[docs]
def convert_to_tab_separated(content: str) -> str:
"""Convert a string into tab separated file content.
Useful for testing purposes.
If you need to have a space in the file content use '||'.
"""
result = []
for line in content.split("\n"):
line = line.strip("\n\r")
if not line:
continue
if line.startswith("##"):
result.append(line)
else:
result.append("\t".join(line.split()))
text = "\n".join(result)
text = text.replace("||", " ")
return text.replace("EMPTY", ".")
[docs]
def setup_directories(
root_dir: pathlib.Path,
content: str | dict[str, Any]) -> None:
"""Set up directory and subdirectory structures using the content."""
root_dir = pathlib.Path(root_dir)
root_dir.parent.mkdir(parents=True, exist_ok=True)
if isinstance(content, str):
root_dir.write_text(content, encoding="utf8")
elif isinstance(content, bytes):
root_dir.write_bytes(content)
elif isinstance(content, dict):
for path_name, path_content in content.items():
setup_directories(root_dir / path_name, path_content)
else:
raise TypeError(
f"unexpected content type: {content} for {root_dir}")
[docs]
def setup_pedigree(ped_path: pathlib.Path, content: str) -> pathlib.Path:
ped_data = convert_to_tab_separated(content)
setup_directories(ped_path, ped_data)
return ped_path
[docs]
def setup_denovo(denovo_path: pathlib.Path, content: str) -> pathlib.Path:
denovo_data = convert_to_tab_separated(content)
setup_directories(denovo_path, denovo_data)
return denovo_path
[docs]
def setup_tabix(
tabix_path: pathlib.Path, tabix_content: str,
**kwargs: bool | str | int) -> tuple[str, str]:
"""Set up a tabix file."""
content = convert_to_tab_separated(tabix_content)
out_path = tabix_path
if tabix_path.suffix == ".gz":
out_path = tabix_path.with_suffix("")
setup_directories(out_path, content)
tabix_filename = str(out_path.parent / f"{out_path.name}.gz")
index_filename = f"{tabix_filename}.tbi"
force = cast(bool, kwargs.pop("force", False))
# pylint: disable=no-member
pysam.tabix_compress(str(out_path), tabix_filename, force=force)
pysam.tabix_index(tabix_filename, force=force, **kwargs) # type: ignore
out_path.unlink()
return tabix_filename, index_filename
[docs]
def setup_gzip(gzip_path: pathlib.Path, gzip_content: str) -> pathlib.Path:
"""Set up a gzipped TSV file."""
content = convert_to_tab_separated(gzip_content)
out_path = gzip_path
if gzip_path.suffix != ".gz":
out_path = gzip_path.with_suffix("gz")
with gzip.open(out_path, "wt") as outfile:
outfile.write(content)
return out_path
[docs]
def setup_vcf(
out_path: pathlib.Path, content: str, *,
csi: bool = False) -> pathlib.Path:
"""Set up a VCF file using the content."""
vcf_data = convert_to_tab_separated(content)
vcf_path = out_path
if out_path.suffix == ".gz":
vcf_path = out_path.with_suffix("")
assert vcf_path.suffix == ".vcf"
header_path = vcf_path.with_suffix("")
header_path = header_path.parent / f"{header_path.name}.header.vcf"
setup_directories(vcf_path, vcf_data)
# pylint: disable=no-member
if out_path.suffix == ".gz":
vcf_gz_filename = str(vcf_path.parent / f"{vcf_path.name}.gz")
pysam.tabix_compress(str(vcf_path), vcf_gz_filename)
pysam.tabix_index(vcf_gz_filename, preset="vcf", csi=csi)
with pysam.VariantFile(str(out_path)) as variant_file:
header = variant_file.header
with open(header_path, "wt", encoding="utf8") as outfile:
outfile.write(str(header))
if out_path.suffix == ".gz":
header_gz_filename = str(header_path.parent / f"{header_path.name}.gz")
pysam.tabix_compress(str(header_path), header_gz_filename)
pysam.tabix_index(header_gz_filename, preset="vcf")
return out_path
[docs]
def setup_dae_transmitted(
root_path: pathlib.Path,
summary_content: str,
toomany_content: str,
) -> tuple[pathlib.Path, pathlib.Path]:
"""Set up a DAE transmitted variants file using passed content."""
summary = convert_to_tab_separated(summary_content)
toomany = convert_to_tab_separated(toomany_content)
setup_directories(root_path, {
"dae_transmitted_data": {
"tr.txt": summary,
"tr-TOOMANY.txt": toomany,
},
})
# pylint: disable=no-member
pysam.tabix_compress(
str(root_path / "dae_transmitted_data" / "tr.txt"),
str(root_path / "dae_transmitted_data" / "tr.txt.gz"))
pysam.tabix_compress(
str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt"),
str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz"))
pysam.tabix_index(
str(root_path / "dae_transmitted_data" / "tr.txt.gz"),
seq_col=0, start_col=1, end_col=1, line_skip=1)
pysam.tabix_index(
str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz"),
seq_col=0, start_col=1, end_col=1, line_skip=1)
return (root_path / "dae_transmitted_data" / "tr.txt.gz",
root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz")
[docs]
def setup_bigwig(
out_path: pathlib.Path,
content: str,
chrom_lens: dict[str, int],
) -> pathlib.Path:
"""
Setup a bigwig format variants file using bedGraph-style content.
Example:
chr1 0 100 0.0
chr1 100 120 1.0
chr1 125 126 200.0
"""
assert out_path.parent.exists()
bw_file = pyBigWig.open(str(out_path), "w") # pylint: disable=I1101
bw_file.addHeader(list(chrom_lens.items()), maxZooms=0)
chrom_col: list[str] = []
start_col: list[int] = []
end_col: list[int] = []
val_col: list[float] = []
prev_end: int = -1
prev_chrom: str = ""
for line in convert_to_tab_separated(content).split("\n"):
tokens = line.strip().split("\t")
assert len(tokens) == 4
chrom = tokens[0]
start = int(tokens[1])
end = int(tokens[2])
val = float(tokens[3])
assert chrom in chrom_lens
assert start < end
if chrom == prev_chrom:
assert start >= prev_end
prev_chrom = chrom
prev_end = end
chrom_col.append(chrom)
start_col.append(start)
end_col.append(end)
val_col.append(val)
bw_file.addEntries(chrom_col, start_col, ends=end_col, values=val_col)
bw_file.close()
return out_path
[docs]
def setup_genome(out_path: pathlib.Path, content: str) -> ReferenceGenome:
"""Set up reference genome using the content."""
if out_path.suffix != ".fa":
raise ValueError("genome output file is expected to have '.fa' suffix")
setup_directories(out_path, convert_to_tab_separated(content))
# pylint: disable=no-member
pysam.faidx(str(out_path)) # type: ignore
setup_directories(out_path.parent, {
"genomic_resource.yaml": textwrap.dedent(f"""
type: genome
filename: {out_path.name}
"""),
})
# pylint: disable=import-outside-toplevel
from dae.genomic_resources.reference_genome import (
build_reference_genome_from_file,
)
return build_reference_genome_from_file(str(out_path)).open()
[docs]
def setup_gene_models(
out_path: pathlib.Path,
content: str,
fileformat: str | None = None,
config: str | None = None) -> GeneModels:
"""Set up gene models in refflat format using the passed content."""
setup_directories(out_path, convert_to_tab_separated(content))
if config is None:
config = textwrap.dedent(f"""
type: gene_models
filename: { out_path.name }
format: "{ fileformat }"
""")
setup_directories(out_path.parent, {"genomic_resource.yaml": config})
# pylint: disable=import-outside-toplevel
from dae.genomic_resources.gene_models import build_gene_models_from_file
gene_models = build_gene_models_from_file(
str(out_path), file_format=fileformat)
gene_models.load()
return gene_models
[docs]
def setup_empty_gene_models(out_path: pathlib.Path) -> GeneModels:
"""Set up empty gene models."""
content = """
#geneName name chrom strand txStart txEnd cdsStart cdsEnd exonCount exonStarts exonEnds
""" # noqa
return setup_gene_models(out_path, content, fileformat="refflat")
[docs]
def build_inmemory_test_protocol(
content: dict[str, Any]) -> FsspecReadWriteProtocol:
"""Build and return an embedded fsspec protocol for testing."""
with tempfile.TemporaryDirectory("embedded_test_protocol") as root_path:
return build_inmemory_protocol(root_path, root_path, content)
[docs]
def build_inmemory_test_repository(
content: dict[str, Any]) -> GenomicResourceProtocolRepo:
"""Create an embedded GRR repository using passed content."""
proto = build_inmemory_test_protocol(content)
return GenomicResourceProtocolRepo(proto)
[docs]
def build_inmemory_test_resource(
content: dict[str, Any]) -> GenomicResource:
"""Create a test resource based on content passed.
The passed content should appropriate for a single resource.
Example content:
{
"genomic_resource.yaml": textwrap.dedent('''
type: position_score
table:
filename: data.txt
scores:
- id: aaaa
type: float
desc: ""
name: sc
'''),
"data.txt": convert_to_tab_separated('''
#chrom start end sc
1 10 12 1.1
2 13 14 1.2
''')
}
"""
proto = build_inmemory_test_protocol(content)
return proto.get_resource("")
[docs]
def build_filesystem_test_protocol(
root_path: pathlib.Path, *,
repair: bool = True,
) -> FsspecReadWriteProtocol:
"""Build and return an filesystem fsspec protocol for testing.
The root_path is expected to point to a directory structure with all the
resources.
"""
proto = cast(
FsspecReadWriteProtocol,
build_fsspec_protocol(str(root_path), str(root_path)))
if repair:
for res in proto.get_all_resources():
proto.save_manifest(res, proto.build_manifest(res))
proto.build_content_file()
return proto
[docs]
def build_filesystem_test_repository(
root_path: pathlib.Path) -> GenomicResourceProtocolRepo:
"""Build and return an filesystem fsspec repository for testing.
The root_path is expected to point to a directory structure with all the
resources.
"""
proto = build_filesystem_test_protocol(root_path)
return GenomicResourceProtocolRepo(proto)
[docs]
def build_filesystem_test_resource(
root_path: pathlib.Path) -> GenomicResource:
proto = build_filesystem_test_protocol(root_path)
return proto.get_resource("")
[docs]
@contextlib.contextmanager
def build_http_test_protocol(
root_path: pathlib.Path, *,
repair: bool = True,
) -> Generator[FsspecReadOnlyProtocol, None, None]:
"""Populate Apache2 directory and construct HTTP genomic resource protocol.
The Apache2 is used to serve the GRR.
This root_path directory should be a valid filesystem genomic resource
repository.
"""
source_proto = build_filesystem_test_protocol(root_path, repair=repair)
http_path = pathlib.Path(__file__).parent
http_path = http_path / "tests" / ".test_grr" / root_path.name
http_path.mkdir(parents=True, exist_ok=True)
dest_proto = build_filesystem_test_protocol(http_path)
copy_proto_genomic_resources(
dest_proto, source_proto)
host = os.environ.get("HTTP_HOST", "localhost:8080")
server_address = f"http://{host}/{http_path.name}"
try:
yield build_fsspec_protocol(str(root_path), server_address)
except GeneratorExit:
print("Generator exit")
finally:
shutil.rmtree(http_path)
[docs]
def s3_test_server_endpoint() -> str:
host = os.environ.get("LOCALSTACK_HOST", "localhost")
return f"http://{host}:4566"
[docs]
def s3_test_protocol() -> FsspecReadWriteProtocol:
"""Build an S3 fsspec testing protocol on top of existing S3 server."""
endpoint_url = s3_test_server_endpoint()
s3filesystem = build_s3_test_filesystem()
bucket_url = build_s3_test_bucket(s3filesystem)
return cast(
FsspecReadWriteProtocol,
build_fsspec_protocol(
str(bucket_url), bucket_url, endpoint_url=endpoint_url))
[docs]
def build_s3_test_filesystem(
endpoint_url: str | None = None) -> S3FileSystem:
"""Create an S3 fsspec filesystem connected to the S3 server."""
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo" # noqa: S105
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
if endpoint_url is None:
endpoint_url = s3_test_server_endpoint()
assert endpoint_url is not None
s3filesystem = S3FileSystem(
anon=False, client_kwargs={"endpoint_url": endpoint_url})
s3filesystem.invalidate_cache()
return s3filesystem
[docs]
def build_s3_test_bucket(s3filesystem: S3FileSystem | None = None) -> str:
"""Create an s3 test buckent."""
with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path:
if s3filesystem is None:
s3filesystem = build_s3_test_filesystem()
bucket_url = f"s3:/{tmp_path}"
s3filesystem.mkdir(bucket_url, acl="public-read")
return bucket_url
[docs]
@contextlib.contextmanager
def build_s3_test_protocol(
root_path: pathlib.Path,
) -> Generator[FsspecReadWriteProtocol, None, None]:
"""Construct fsspec genomic resource protocol.
The S3 bucket is populated with resource from filesystem GRR pointed
by the root_path.
"""
endpoint_url = s3_test_server_endpoint()
s3filesystem = build_s3_test_filesystem(endpoint_url)
bucket_url = build_s3_test_bucket(s3filesystem)
proto = cast(
FsspecReadWriteProtocol,
build_fsspec_protocol(
str(bucket_url), bucket_url, endpoint_url=endpoint_url))
copy_proto_genomic_resources(
proto,
build_filesystem_test_protocol(root_path))
yield proto
[docs]
def copy_proto_genomic_resources(
dest_proto: FsspecReadWriteProtocol,
src_proto: FsspecReadOnlyProtocol) -> None:
for res in src_proto.get_all_resources():
dest_proto.copy_resource(res)
dest_proto.build_content_file()
dest_proto.filesystem.invalidate_cache()
[docs]
@contextlib.contextmanager
def proto_builder(
scheme: str, content: dict,
) -> Generator[
FsspecReadOnlyProtocol | FsspecReadWriteProtocol,
None, None]:
"""Build a test genomic resource protocol with specified content."""
with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path:
root_path = pathlib.Path(tmp_path)
setup_directories(root_path, content)
if scheme == "file":
try:
yield build_filesystem_test_protocol(root_path)
except GeneratorExit:
print("Generator exit")
return
if scheme == "s3":
with build_s3_test_protocol(root_path) as proto:
try:
yield proto
except GeneratorExit:
print("Generator exit")
return
if scheme == "http":
with build_http_test_protocol(root_path) as proto:
try:
yield proto
except GeneratorExit:
print("Generator exit")
return
raise ValueError(f"unexpected protocol scheme: <{scheme}>")
[docs]
@contextlib.contextmanager
def resource_builder(
scheme: str, content: dict) -> Generator[GenomicResource, None, None]:
with proto_builder(scheme, content) as proto:
res = proto.get_resource("")
yield res