"""Provides tools usefult for testing."""
from __future__ import annotations
import contextlib
import gzip
import logging
import os
import pathlib
import shutil
import tempfile
import textwrap
from collections.abc import Generator
from typing import Any, cast
import pyBigWig  # type: ignore
import pysam
from s3fs.core import S3FileSystem
from dae.genomic_resources.fsspec_protocol import (
    FsspecReadOnlyProtocol,
    FsspecReadWriteProtocol,
    build_fsspec_protocol,
    build_inmemory_protocol,
)
from dae.genomic_resources.gene_models.gene_models import GeneModels
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.genomic_resources.repository import (
    GenomicResource,
    GenomicResourceProtocolRepo,
)
logger = logging.getLogger(__name__)
[docs]
def convert_to_tab_separated(content: str) -> str:
    """Convert a string into tab separated file content.
    Useful for testing purposes.
    If you need to have a space in the file content use '||'.
    """
    result = []
    for line in content.split("\n"):
        line = line.strip("\n\r")
        if not line:
            continue
        if line.startswith("##"):
            result.append(line)
        else:
            result.append("\t".join(line.split()))
    text = "\n".join(result)
    text = text.replace("||", " ")
    return text.replace("EMPTY", ".") 
[docs]
def setup_directories(
        root_dir: pathlib.Path,
        content: str | dict[str, Any]) -> None:
    """Set up directory and subdirectory structures using the content."""
    root_dir = pathlib.Path(root_dir)
    root_dir.parent.mkdir(parents=True, exist_ok=True)
    if isinstance(content, str):
        root_dir.write_text(content, encoding="utf8")
    elif isinstance(content, bytes):
        root_dir.write_bytes(content)
    elif isinstance(content, dict):
        for path_name, path_content in content.items():
            setup_directories(root_dir / path_name, path_content)
    else:
        raise TypeError(
            f"unexpected content type: {content} for {root_dir}") 
[docs]
def setup_pedigree(ped_path: pathlib.Path, content: str) -> pathlib.Path:
    ped_data = convert_to_tab_separated(content)
    setup_directories(ped_path, ped_data)
    return ped_path 
[docs]
def setup_denovo(denovo_path: pathlib.Path, content: str) -> pathlib.Path:
    denovo_data = convert_to_tab_separated(content)
    setup_directories(denovo_path, denovo_data)
    return denovo_path 
[docs]
def setup_tabix(
        tabix_path: pathlib.Path, tabix_content: str,
        **kwargs: bool | str | int) -> tuple[str, str]:
    """Set up a tabix file."""
    content = convert_to_tab_separated(tabix_content)
    out_path = tabix_path
    if tabix_path.suffix == ".gz":
        out_path = tabix_path.with_suffix("")
    setup_directories(out_path, content)
    tabix_filename = str(out_path.parent / f"{out_path.name}.gz")
    index_filename = f"{tabix_filename}.tbi"
    force = cast(bool, kwargs.pop("force", False))
    # pylint: disable=no-member
    pysam.tabix_compress(str(out_path), tabix_filename, force=force)
    pysam.tabix_index(tabix_filename, force=force, **kwargs)  # type: ignore
    out_path.unlink()
    return tabix_filename, index_filename 
[docs]
def setup_gzip(gzip_path: pathlib.Path, gzip_content: str) -> pathlib.Path:
    """Set up a gzipped TSV file."""
    content = convert_to_tab_separated(gzip_content)
    out_path = gzip_path
    if gzip_path.suffix != ".gz":
        out_path = gzip_path.with_suffix("gz")
    with gzip.open(out_path, "wt") as outfile:
        outfile.write(content)
    return out_path 
[docs]
def setup_vcf(
        out_path: pathlib.Path, content: str, *,
        csi: bool = False) -> pathlib.Path:
    """Set up a VCF file using the content."""
    vcf_data = convert_to_tab_separated(content)
    vcf_path = out_path
    if out_path.suffix == ".gz":
        vcf_path = out_path.with_suffix("")
    assert vcf_path.suffix == ".vcf"
    header_path = vcf_path.with_suffix("")
    header_path = header_path.parent / f"{header_path.name}.header.vcf"
    setup_directories(vcf_path, vcf_data)
    # pylint: disable=no-member
    if out_path.suffix == ".gz":
        vcf_gz_filename = str(vcf_path.parent / f"{vcf_path.name}.gz")
        pysam.tabix_compress(str(vcf_path), vcf_gz_filename)
        pysam.tabix_index(vcf_gz_filename, preset="vcf", csi=csi)
    with pysam.VariantFile(str(out_path)) as variant_file:
        header = variant_file.header
        with open(header_path, "wt", encoding="utf8") as outfile:
            outfile.write(str(header))
    if out_path.suffix == ".gz":
        header_gz_filename = str(header_path.parent / f"{header_path.name}.gz")
        pysam.tabix_compress(str(header_path), header_gz_filename)
        pysam.tabix_index(header_gz_filename, preset="vcf")
    return out_path 
[docs]
def setup_dae_transmitted(
    root_path: pathlib.Path,
    summary_content: str,
    toomany_content: str,
) -> tuple[pathlib.Path, pathlib.Path]:
    """Set up a DAE transmitted variants file using passed content."""
    summary = convert_to_tab_separated(summary_content)
    toomany = convert_to_tab_separated(toomany_content)
    setup_directories(root_path, {
        "dae_transmitted_data": {
            "tr.txt": summary,
            "tr-TOOMANY.txt": toomany,
        },
    })
    # pylint: disable=no-member
    pysam.tabix_compress(
        str(root_path / "dae_transmitted_data" / "tr.txt"),
        str(root_path / "dae_transmitted_data" / "tr.txt.gz"))
    pysam.tabix_compress(
        str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt"),
        str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz"))
    pysam.tabix_index(
        str(root_path / "dae_transmitted_data" / "tr.txt.gz"),
        seq_col=0, start_col=1, end_col=1, line_skip=1)
    pysam.tabix_index(
        str(root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz"),
        seq_col=0, start_col=1, end_col=1, line_skip=1)
    return (root_path / "dae_transmitted_data" / "tr.txt.gz",
            root_path / "dae_transmitted_data" / "tr-TOOMANY.txt.gz") 
[docs]
def setup_bigwig(
    out_path: pathlib.Path,
    content: str,
    chrom_lens: dict[str, int],
) -> pathlib.Path:
    """
    Setup a bigwig format variants file using bedGraph-style content.
    Example:
    chr1	0	100	0.0
    chr1	100	120	1.0
    chr1	125	126	200.0
    """
    assert out_path.parent.exists()
    bw_file = pyBigWig.open(str(out_path), "w")  # pylint: disable=I1101
    bw_file.addHeader(list(chrom_lens.items()), maxZooms=0)
    chrom_col: list[str] = []
    start_col: list[int] = []
    end_col: list[int] = []
    val_col: list[float] = []
    prev_end: int = -1
    prev_chrom: str = ""
    for line in convert_to_tab_separated(content).split("\n"):
        tokens = line.strip().split("\t")
        assert len(tokens) == 4
        chrom = tokens[0]
        start = int(tokens[1])
        end = int(tokens[2])
        val = float(tokens[3])
        assert chrom in chrom_lens
        assert start < end
        if chrom == prev_chrom:
            assert start >= prev_end
        prev_chrom = chrom
        prev_end = end
        chrom_col.append(chrom)
        start_col.append(start)
        end_col.append(end)
        val_col.append(val)
    bw_file.addEntries(chrom_col, start_col, ends=end_col, values=val_col)
    bw_file.close()
    return out_path 
[docs]
def setup_genome(out_path: pathlib.Path, content: str) -> ReferenceGenome:
    """Set up reference genome using the content."""
    if out_path.suffix != ".fa":
        raise ValueError("genome output file is expected to have '.fa' suffix")
    setup_directories(out_path, convert_to_tab_separated(content))
    # pylint: disable=no-member
    pysam.faidx(str(out_path))
    setup_directories(out_path.parent, {
        "genomic_resource.yaml": textwrap.dedent(f"""
            type: genome
            filename: {out_path.name}
        """),
    })
    # pylint: disable=import-outside-toplevel
    from dae.genomic_resources.reference_genome import (
        build_reference_genome_from_file,
    )
    return build_reference_genome_from_file(str(out_path)).open() 
[docs]
def setup_gene_models(
        out_path: pathlib.Path,
        content: str,
        fileformat: str | None = None,
        config: str | None = None) -> GeneModels:
    """Set up gene models in refflat format using the passed content."""
    setup_directories(out_path, convert_to_tab_separated(content))
    if config is None:
        config = textwrap.dedent(f"""
            type: gene_models
            filename: {out_path.name}
            format: "{fileformat}"
        """)
    setup_directories(out_path.parent, {"genomic_resource.yaml": config})
    # pylint: disable=import-outside-toplevel
    from dae.genomic_resources.gene_models.gene_models_factory import (
        build_gene_models_from_file,
    )
    gene_models = build_gene_models_from_file(
        str(out_path), file_format=fileformat)
    gene_models.load()
    return gene_models 
[docs]
def setup_empty_gene_models(out_path: pathlib.Path) -> GeneModels:
    """Set up empty gene models."""
    content = """
#geneName name chrom strand txStart txEnd cdsStart cdsEnd exonCount exonStarts exonEnds
    """  # noqa
    return setup_gene_models(out_path, content, fileformat="refflat") 
[docs]
def build_inmemory_test_protocol(
        content: dict[str, Any]) -> FsspecReadWriteProtocol:
    """Build and return an embedded fsspec protocol for testing."""
    with tempfile.TemporaryDirectory("embedded_test_protocol") as root_path:
        return build_inmemory_protocol(root_path, root_path, content) 
[docs]
def build_inmemory_test_repository(
        content: dict[str, Any]) -> GenomicResourceProtocolRepo:
    """Create an embedded GRR repository using passed content."""
    proto = build_inmemory_test_protocol(content)
    return GenomicResourceProtocolRepo(proto) 
[docs]
def build_inmemory_test_resource(
        content: dict[str, Any]) -> GenomicResource:
    """Create a test resource based on content passed.
    The passed content should appropriate for a single resource.
    Example content:
    {
        "genomic_resource.yaml": textwrap.dedent('''
            type: position_score
            table:
                filename: data.txt
            scores:
                - id: aaaa
                    type: float
                    desc: ""
                    name: sc
        '''),
        "data.txt": convert_to_tab_separated('''
            #chrom start end sc
            1      10    12  1.1
            2      13    14  1.2
        ''')
    }
    """
    proto = build_inmemory_test_protocol(content)
    return proto.get_resource("") 
[docs]
def build_filesystem_test_protocol(
    root_path: pathlib.Path, *,
    repair: bool = True,
) -> FsspecReadWriteProtocol:
    """Build and return an filesystem fsspec protocol for testing.
    The root_path is expected to point to a directory structure with all the
    resources.
    """
    proto = cast(
        FsspecReadWriteProtocol,
        build_fsspec_protocol(str(root_path), str(root_path)))
    if repair:
        for res in proto.get_all_resources():
            proto.save_manifest(res, proto.build_manifest(res))
        proto.build_content_file()
    return proto 
[docs]
def build_filesystem_test_repository(
        root_path: pathlib.Path) -> GenomicResourceProtocolRepo:
    """Build and return an filesystem fsspec repository for testing.
    The root_path is expected to point to a directory structure with all the
    resources.
    """
    proto = build_filesystem_test_protocol(root_path)
    return GenomicResourceProtocolRepo(proto) 
[docs]
def build_filesystem_test_resource(
        root_path: pathlib.Path) -> GenomicResource:
    proto = build_filesystem_test_protocol(root_path)
    return proto.get_resource("") 
[docs]
@contextlib.contextmanager
def build_http_test_protocol(
    root_path: pathlib.Path, *,
    repair: bool = True,
) -> Generator[FsspecReadOnlyProtocol, None, None]:
    """Populate Apache2 directory and construct HTTP genomic resource protocol.
    The Apache2 is used to serve the GRR.
    This root_path directory should be a valid filesystem genomic resource
    repository.
    """
    source_proto = build_filesystem_test_protocol(root_path, repair=repair)
    http_path = pathlib.Path(__file__).parent.parent.parent
    http_path = http_path / "tests" / ".test_grr"
    assert http_path.parts[-3:] == ("dae", "tests", ".test_grr"), http_path
    http_path = http_path / root_path.name
    http_path.mkdir(parents=True, exist_ok=True)
    dest_proto = build_filesystem_test_protocol(http_path)
    copy_proto_genomic_resources(
        dest_proto, source_proto)
    host = os.environ.get("HTTP_HOST", "localhost:28080")
    server_address = f"http://{host}/{http_path.name}"
    try:
        yield build_fsspec_protocol(str(root_path), server_address)
    except GeneratorExit:
        print("Generator exit")
    finally:
        shutil.rmtree(http_path) 
[docs]
def s3_test_server_endpoint() -> str:
    host = os.environ.get("LOCALSTACK_HOST", "localhost")
    return f"http://{host}:4566" 
[docs]
def s3_test_protocol() -> FsspecReadWriteProtocol:
    """Build an S3 fsspec testing protocol on top of existing S3 server."""
    endpoint_url = s3_test_server_endpoint()
    s3filesystem = build_s3_test_filesystem()
    bucket_url = build_s3_test_bucket(s3filesystem)
    return cast(
        FsspecReadWriteProtocol,
        build_fsspec_protocol(
            str(bucket_url), bucket_url, endpoint_url=endpoint_url)) 
[docs]
def build_s3_test_filesystem(
        endpoint_url: str | None = None) -> S3FileSystem:
    """Create an S3 fsspec filesystem connected to the S3 server."""
    if "AWS_SECRET_ACCESS_KEY" not in os.environ:
        os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"  # noqa: S105
    if "AWS_ACCESS_KEY_ID" not in os.environ:
        os.environ["AWS_ACCESS_KEY_ID"] = "foo"
    if endpoint_url is None:
        endpoint_url = s3_test_server_endpoint()
    assert endpoint_url is not None
    s3filesystem = S3FileSystem(
        anon=False, client_kwargs={"endpoint_url": endpoint_url})
    s3filesystem.invalidate_cache()
    return s3filesystem 
[docs]
def build_s3_test_bucket(s3filesystem: S3FileSystem | None = None) -> str:
    """Create an s3 test buckent."""
    with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path:
        if s3filesystem is None:
            s3filesystem = build_s3_test_filesystem()
        bucket_url = f"s3:/{tmp_path}"
        s3filesystem.mkdir(bucket_url, acl="public-read")
        return bucket_url 
[docs]
@contextlib.contextmanager
def build_s3_test_protocol(
    root_path: pathlib.Path,
) -> Generator[FsspecReadWriteProtocol, None, None]:
    """Construct fsspec genomic resource protocol.
    The S3 bucket is populated with resource from filesystem GRR pointed
    by the root_path.
    """
    endpoint_url = s3_test_server_endpoint()
    s3filesystem = build_s3_test_filesystem(endpoint_url)
    bucket_url = build_s3_test_bucket(s3filesystem)
    proto = cast(
        FsspecReadWriteProtocol,
        build_fsspec_protocol(
            str(bucket_url), bucket_url, endpoint_url=endpoint_url))
    copy_proto_genomic_resources(
        proto,
        build_filesystem_test_protocol(root_path))
    yield proto 
[docs]
def copy_proto_genomic_resources(
        dest_proto: FsspecReadWriteProtocol,
        src_proto: FsspecReadOnlyProtocol) -> None:
    for res in src_proto.get_all_resources():
        dest_proto.copy_resource(res)
    dest_proto.build_content_file()
    dest_proto.filesystem.invalidate_cache() 
[docs]
@contextlib.contextmanager
def proto_builder(
    scheme: str, content: dict,
) -> Generator[
        FsspecReadOnlyProtocol | FsspecReadWriteProtocol,
        None, None]:
    """Build a test genomic resource protocol with specified content."""
    with tempfile.TemporaryDirectory("s3_test_bucket") as tmp_path:
        root_path = pathlib.Path(tmp_path)
        setup_directories(root_path, content)
        if scheme == "file":
            try:
                yield build_filesystem_test_protocol(root_path)
            except GeneratorExit:
                print("Generator exit")
            return
        if scheme == "s3":
            with build_s3_test_protocol(root_path) as proto:
                try:
                    yield proto
                except GeneratorExit:
                    print("Generator exit")
            return
        if scheme == "http":
            with build_http_test_protocol(root_path) as proto:
                try:
                    yield proto
                except GeneratorExit:
                    print("Generator exit")
            return
    raise ValueError(f"unexpected protocol scheme: <{scheme}>") 
[docs]
@contextlib.contextmanager
def resource_builder(
        scheme: str, content: dict) -> Generator[GenomicResource, None, None]:
    with proto_builder(scheme, content) as proto:
        res = proto.get_resource("")
        yield res