import collections
from collections.abc import Generator
from typing import IO, ClassVar
from dae.genomic_resources.repository import GenomicResource
from .line import Line, LineBase
from .table import GenomicPositionTable, get_idx, zero_based_adjust
[docs]
class InmemoryGenomicPositionTable(GenomicPositionTable):
"""In-memory genomic position table."""
FORMAT_DEF: ClassVar[dict] = {
# parameters are <column separator>, <strip_chars>, <space replacement>
"mem": (None, " \t\n\r", True),
"tsv": ("\t", "\n\r", False),
"csv": (",", "\n\r", False),
}
def __init__(
self,
genomic_resource: GenomicResource,
table_definition: dict,
file_format: str,
):
self.format = file_format
self.str_stream: IO | None = None
self.records_by_chr: dict[str, list[Line]] = {}
self.zero_based = table_definition.get("zero_based", False)
super().__init__(genomic_resource, table_definition)
def _make_line(self, data: tuple) -> Line:
return Line(
data,
self.chrom_key,
self.pos_begin_key, self.pos_end_key,
self.ref_key, self.alt_key,
)
[docs]
def open(self) -> "InmemoryGenomicPositionTable":
compression = None
if self.definition.filename.endswith(".gz"):
compression = "gzip"
self.str_stream = self.genomic_resource.open_raw_file(
self.definition.filename, mode="rt", compression=compression)
assert self.str_stream is not None
clmn_sep, strip_chars, space_replacement = \
InmemoryGenomicPositionTable.FORMAT_DEF[self.format]
if self.header_mode == "file":
hcs = None
for row in self.str_stream:
row = row.strip(strip_chars)
if not row:
continue
hcs = row.split(clmn_sep)
break
if not hcs:
raise ValueError("No header found")
self.header = tuple(hcs)
col_number = len(self.header) if self.header else None
self._set_core_column_keys()
records_by_chr = collections.defaultdict(list)
for row in self.str_stream:
row = row.strip(strip_chars)
if not row:
continue
columns = tuple(row.split(clmn_sep))
if col_number and len(columns) != col_number:
raise ValueError("Inconsistent number of columns")
col_number = len(columns)
if space_replacement:
columns = tuple("" if v == "EMPTY" else v for v in columns)
if self.zero_based:
columns = zero_based_adjust(
columns, self.pos_begin_key,
self.pos_end_key, self.header,
)
line = self._make_line(columns)
records_by_chr[line.chrom].append(line)
self.records_by_chr = {
c: sorted(pss, key=lambda line: (line.chrom, line.pos_begin,
line.pos_end, line.ref, line.alt))
for c, pss in records_by_chr.items()
}
self._build_chrom_mapping()
return self
[docs]
def get_file_chromosomes(self) -> list[str]:
return sorted(self.records_by_chr.keys())
def _transform_result(self, line: Line, chrom: str) -> Line:
assert self.chrom_key is not None
assert self.header is not None
new_data = list(line._data) # pylint: disable=protected-access # noqa: SLF001
chrom_idx = get_idx(self.chrom_key, self.header)
new_data[chrom_idx] = chrom
return self._make_line(tuple(new_data))
[docs]
def get_all_records(self) -> Generator[LineBase, None, None]:
for chrom in self.get_chromosomes():
if self.chrom_map:
if chrom not in self.chrom_map:
continue
fchrom = self.chrom_map[chrom]
for line in self.records_by_chr[fchrom]:
yield self._transform_result(line, chrom)
else:
yield from self.records_by_chr[chrom]
[docs]
def get_records_in_region(
self,
chrom: str,
pos_begin: int | None = None,
pos_end: int | None = None,
) -> Generator[LineBase, None, None]:
fch = self.chrom_map[chrom] if self.chrom_map else chrom
if fch not in self.records_by_chr:
raise ValueError(
f"The chromosome {chrom} is not present in the table")
for line in self.records_by_chr[fch]:
if pos_begin and pos_begin > line.pos_end:
continue
if pos_end and pos_end < line.pos_begin:
continue
if self.chrom_map:
yield self._transform_result(line, chrom)
else:
yield line
[docs]
def get_chromosome_length(self, chrom: str, _step: int = 0) -> int:
if chrom not in self.get_chromosomes():
raise ValueError(
f"contig {chrom} not present in the table's contigs: "
f"{self.get_chromosomes()}")
fch = self.chrom_map[chrom] if self.chrom_map else chrom
return max(line.pos_end for line in self.records_by_chr[fch]) + 1
[docs]
def close(self) -> None:
if self.str_stream is not None:
self.str_stream.close()