from collections import deque
from collections.abc import Generator
from typing import Any, Protocol
import pysam
Key = str | int
[docs]
class LineBase(Protocol):
"""Protocol for genomic position table lines."""
chrom: str
fchrom: str
pos_begin: int
pos_end: int
ref: str | None
alt: str | None
[docs]
def get(self, key: Key) -> Any:
...
[docs]
def row(self) -> tuple:
...
[docs]
class Line:
"""Represents a line read from a genomic position table.
Provides attribute access to a number of important columns - chromosome,
start position, end position, reference allele and alternative allele.
"""
__slots__ = ( # noqa: RUF023
"_data",
"chrom",
"fchrom",
"pos_begin",
"pos_end",
"ref",
"alt",
)
def __init__(
self,
raw_line: tuple,
chrom_key: int = 0,
pos_begin_key: int = 1,
pos_end_key: int = 2,
ref_key: int | None = None,
alt_key: int | None = None,
):
self._data: tuple[str, ...] = raw_line
self.chrom: str = self._data[chrom_key]
self.fchrom: str = self._data[chrom_key]
self.pos_begin: int = int(self._data[pos_begin_key])
self.pos_end: int = int(self._data[pos_end_key])
self.ref: str | None = \
self._data[ref_key] if ref_key is not None else None
self.alt: str | None = \
self._data[alt_key] if alt_key is not None else None
[docs]
def get(self, key: Key) -> str:
return self._data[key] # type: ignore
[docs]
def row(self) -> tuple:
return tuple(self._data)
[docs]
class VCFLine:
"""Line adapter for lines derived from a VCF file.
Implements functionality for handling multi-allelic variants
and INFO fields.
"""
def __init__(
self, raw_line: pysam.VariantRecord, allele_index: int | None):
self.chrom: str = raw_line.contig
self.fchrom: str = raw_line.contig
self.pos_begin: int = raw_line.pos
self.pos_end: int = raw_line.pos
assert raw_line.ref is not None
self.ref: str | None = raw_line.ref
self.alt: str | None = None
# Used to handle multiallelic variants in VCF files.
# The allele index is None if the variant for this line
# is missing its ALT, i.e. its value is '.'
self.allele_index: int | None = allele_index
if self.allele_index is not None:
assert raw_line.alts is not None
self.alt = raw_line.alts[self.allele_index]
self.info: pysam.VariantRecordInfo = raw_line.info
self.info_meta: pysam.VariantHeaderMetadata = raw_line.header.info
[docs]
def get(self, key: Key) -> Any:
"""Get a value from the INFO field of the VCF line."""
assert isinstance(key, str)
value, meta = self.info.get(key), self.info_meta.get(key)
if isinstance(value, tuple):
if meta.number == "A" and self.allele_index is not None:
value = value[self.allele_index]
elif meta.number == "R":
return value[
self.allele_index + 1
if self.allele_index is not None
else 0 # Get reference allele value if ALT is '.'
]
elif meta.number == "." and meta.type == "String":
return "|".join(value)
return value
[docs]
def row(self) -> tuple:
return ()
[docs]
class BigWigLine:
"""Represents a line read from a bigWig file."""
def __init__(self, raw_line: tuple):
self._data: tuple[str, int, int, float] = raw_line
self.chrom: str = self._data[0]
self.fchrom: str = self._data[0]
self.pos_begin: int = self._data[1]
self.pos_end: int = self._data[2]
self.ref: str | None = None
self.alt: str | None = None
[docs]
def get(self, key: Key) -> str | int | int | float:
return self._data[key] # type: ignore
[docs]
def row(self) -> tuple:
return tuple(self._data)
[docs]
class LineBuffer:
"""Represent a line buffer for Tabix genome position table."""
def __init__(self) -> None:
self.deque: deque[LineBase] = deque()
def __len__(self) -> int:
return len(self.deque)
[docs]
def clear(self) -> None:
self.deque.clear()
[docs]
def append(self, line: LineBase) -> None:
if len(self.deque) > 0 and self.peek_first().chrom != line.chrom:
self.clear()
self.deque.append(line)
[docs]
def peek_first(self) -> LineBase:
return self.deque[0]
[docs]
def pop_first(self) -> LineBase:
return self.deque.popleft()
[docs]
def peek_last(self) -> LineBase:
return self.deque[-1]
[docs]
def region(self) -> tuple[str | None, int | None, int | None]:
"""Return region stored in the buffer."""
if len(self.deque) == 0:
return None, None, None
first = self.peek_first()
last = self.peek_last()
if first.chrom != last.chrom or first.pos_end > last.pos_end:
self.clear()
return None, None, None
return first.chrom, first.pos_begin, last.pos_end
[docs]
def prune(self, chrom: str, pos: int) -> None:
"""Prune the buffer if needed."""
if len(self.deque) == 0:
return
first = self.peek_first()
if chrom != first.chrom:
self.clear()
return
while len(self.deque) > 0:
first = self.peek_first()
if pos <= first.pos_end:
break
self.deque.popleft()
[docs]
def contains(self, chrom: str, pos: int) -> bool:
bchrom, bbeg, bend = self.region()
if bchrom is None or bbeg is None or bend is None:
return False
return chrom == bchrom and bend >= pos >= bbeg
[docs]
def find_index(self, chrom: str, pos: int) -> int:
"""Find index in line buffer that contains the passed position."""
if len(self.deque) == 0 or not self.contains(chrom, pos):
return -1
if len(self.deque) == 1:
return 0
first_index = 0
last_index = len(self.deque) - 1
while True:
mid_index = (last_index - first_index) // 2 + first_index
if last_index <= first_index:
break
mid = self.deque[mid_index]
if mid.pos_end >= pos >= mid.pos_begin:
break
if pos < mid.pos_begin:
last_index = mid_index - 1
else:
first_index = mid_index + 1
while mid_index > 0:
prev = self.deque[mid_index - 1]
if pos > prev.pos_begin:
break
mid_index -= 1
for index in range(mid_index, len(self.deque)):
line = self.deque[index]
if line.pos_end >= pos >= line.pos_begin:
mid_index = index
break
if line.pos_begin >= pos:
mid_index = index
break
return mid_index
[docs]
def fetch(
self, chrom: str, pos_begin: int, pos_end: int,
) -> Generator[LineBase, None, None]:
"""Return a generator of rows matching the region."""
beg_index = self.find_index(chrom, pos_begin)
if beg_index == -1:
return
for index in range(beg_index, len(self.deque)):
row = self.deque[index]
if row.pos_end < pos_begin:
continue
if pos_end is not None and row.pos_begin > pos_end:
break
yield row