Source code for dae.genomic_resources.gene_models.parsing
from __future__ import annotations
import copy
import logging
from collections import defaultdict
from collections.abc import Callable
from typing import IO, cast
import pandas as pd
from .gene_models import (
Exon,
GeneModels,
TranscriptModel,
)
logger = logging.getLogger(__name__)
GeneModelsParser = Callable[
[GeneModels, IO, dict[str, str] | None, int | None],
bool,
]
[docs]
def parse_default_gene_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse default gene models file format."""
# pylint: disable=too-many-locals
infile.seek(0)
df = pd.read_csv(
infile,
sep="\t",
nrows=nrows,
dtype={
"chr": str,
"trID": str,
"trOrigId": str,
"gene": str,
"strand": str,
"atts": str,
},
)
expected_columns = [
"chr",
"trID",
"gene",
"strand",
"tsBeg",
"txEnd",
"cdsStart",
"cdsEnd",
"exonStarts",
"exonEnds",
"exonFrames",
"atts",
]
assert set(expected_columns) <= set(df.columns)
if not set(expected_columns) <= set(df.columns):
return False
if "trOrigId" not in df.columns:
tr_names = pd.Series(data=df["trID"].values)
df["trOrigId"] = tr_names
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
records = df.to_dict(orient="records")
for line in records:
line = cast(dict, line)
exon_starts = list(map(int, line["exonStarts"].split(",")))
exon_ends = list(map(int, line["exonEnds"].split(",")))
exon_frames = list(map(int, line["exonFrames"].split(",")))
assert len(exon_starts) == len(exon_ends) == len(exon_frames)
exons = []
for start, end, frame in zip(exon_starts, exon_ends, exon_frames,
strict=True):
exons.append(Exon(start=start, stop=end, frame=frame))
attributes: dict = {}
atts = line.get("atts")
if atts and isinstance(atts, str):
astep = [a.split(":") for a in atts.split(";")]
attributes = {
a[0]: a[1] for a in astep
}
gene = line["gene"]
gene = gene_models.alternative_names.get(gene, gene)
transcript_model = TranscriptModel(
gene=gene,
tr_id=line["trID"],
tr_name=line["trOrigId"],
chrom=line["chr"],
strand=line["strand"],
tx=(line["tsBeg"], line["txEnd"]),
cds=(line["cdsStart"], line["cdsEnd"]),
exons=exons,
attributes=attributes,
)
gene_models.transcript_models[transcript_model.tr_id] = transcript_model
gene_models.update_indexes()
if nrows is not None:
return True
return True
[docs]
def parse_ref_flat_gene_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse refFlat gene models file format."""
# pylint: disable=too-many-locals
expected_columns = [
"#geneName",
"name",
"chrom",
"strand",
"txStart",
"txEnd",
"cdsStart",
"cdsEnd",
"exonCount",
"exonStarts",
"exonEnds",
]
infile.seek(0)
df = parse_raw(infile, expected_columns, nrows=nrows)
if df is None:
return False
records = df.to_dict(orient="records")
transcript_ids_counter: dict[str, int] = defaultdict(int)
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
for rec in records:
gene = rec["#geneName"]
gene = gene_models.alternative_names.get(gene, gene)
tr_name = rec["name"]
chrom = rec["chrom"]
strand = rec["strand"]
tx = ( # pylint: disable=invalid-name
int(rec["txStart"]) + 1, int(rec["txEnd"]))
cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"]))
exon_starts = list(map(
int, str(rec["exonStarts"]).strip(",").split(",")))
exon_ends = list(map(
int, str(rec["exonEnds"]).strip(",").split(",")))
assert len(exon_starts) == len(exon_ends)
exons = [
Exon(start + 1, end)
for start, end in zip(exon_starts, exon_ends, strict=True)
]
transcript_ids_counter[tr_name] += 1
tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}"
transcript_model = TranscriptModel(
gene=gene,
tr_id=tr_id,
tr_name=tr_name,
chrom=chrom,
strand=strand,
tx=tx,
cds=cds,
exons=exons,
)
transcript_model.update_frames()
gene_models.add_transcript_model(transcript_model)
return True
[docs]
def parse_ref_seq_gene_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse refSeq gene models file format."""
# pylint: disable=too-many-locals
expected_columns = [
"#bin",
"name",
"chrom",
"strand",
"txStart",
"txEnd",
"cdsStart",
"cdsEnd",
"exonCount",
"exonStarts",
"exonEnds",
"score",
"name2",
"cdsStartStat",
"cdsEndStat",
"exonFrames",
]
infile.seek(0)
df = parse_raw(infile, expected_columns, nrows=nrows)
if df is None:
return False
records = df.to_dict(orient="records")
transcript_ids_counter: dict[str, int] = defaultdict(int)
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
for rec in records:
gene = rec["name2"]
gene = gene_models.alternative_names.get(gene, gene)
tr_name = rec["name"]
chrom = rec["chrom"]
strand = rec["strand"]
tx = ( # pylint: disable=invalid-name
int(rec["txStart"]) + 1, int(rec["txEnd"]))
cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"]))
exon_starts = list(map(
int, rec["exonStarts"].strip(",").split(",")))
exon_ends = list(map(
int, rec["exonEnds"].strip(",").split(",")))
assert len(exon_starts) == len(exon_ends)
exons = [
Exon(start + 1, end)
for start, end in zip(exon_starts, exon_ends, strict=True)
]
transcript_ids_counter[tr_name] += 1
tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}"
attributes = {
k: rec[k]
for k in [
"#bin",
"score",
"exonCount",
"cdsStartStat",
"cdsEndStat",
"exonFrames",
]
}
transcript_model = TranscriptModel(
gene=gene,
tr_id=tr_id,
tr_name=tr_name,
chrom=chrom,
strand=strand,
tx=tx,
cds=cds,
exons=exons,
attributes=attributes,
)
transcript_model.update_frames()
gene_models.add_transcript_model(transcript_model)
return True
[docs]
def probe_header(
infile: IO, expected_columns: list[str],
comment: str | None = None,
) -> bool:
"""Probe gene models file header based on expected columns."""
infile.seek(0)
df = pd.read_csv(
infile, sep="\t", nrows=1, header=None, comment=comment)
return list(df.iloc[0, :]) == expected_columns
[docs]
def probe_columns(
infile: IO, expected_columns: list[str],
comment: str | None = None,
) -> bool:
"""Probe gene models file based on expected columns."""
infile.seek(0)
df = pd.read_csv(
infile, sep="\t", nrows=1, header=None, comment=comment)
return cast(list[int], list(df.columns)) == \
list(range(len(expected_columns)))
[docs]
def parse_raw(
infile: IO, expected_columns: list[str],
nrows: int | None = None, comment: str | None = None,
) -> pd.DataFrame | None:
"""Parse raw gene models data based on expected columns."""
if probe_header(infile, expected_columns, comment=comment):
infile.seek(0)
df = pd.read_csv(infile, sep="\t", nrows=nrows, comment=comment)
assert list(df.columns) == expected_columns
return df
if probe_columns(infile, expected_columns, comment=comment):
infile.seek(0)
df = pd.read_csv(
infile,
sep="\t",
nrows=nrows,
header=None,
names=expected_columns,
comment=comment,
)
assert list(df.columns) == expected_columns
return df
return None
[docs]
def parse_ccds_gene_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse CCDS gene models file format."""
# pylint: disable=too-many-locals
expected_columns = [
# CCDS is identical with RefSeq
"#bin",
"name",
"chrom",
"strand",
"txStart",
"txEnd",
"cdsStart",
"cdsEnd",
"exonCount",
"exonStarts",
"exonEnds",
"score",
"name2",
"cdsStartStat",
"cdsEndStat",
"exonFrames",
]
infile.seek(0)
df = parse_raw(infile, expected_columns, nrows=nrows)
if df is None:
return False
records = df.to_dict(orient="records")
transcript_ids_counter: dict[str, int] = defaultdict(int)
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
for rec in records:
gene = rec["name"]
gene = gene_models.alternative_names.get(gene, gene)
tr_name = rec["name"]
chrom = rec["chrom"]
strand = rec["strand"]
tx = ( # pylint: disable=invalid-name
int(rec["txStart"]) + 1, int(rec["txEnd"]))
cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"]))
exon_starts = list(map(
int, rec["exonStarts"].strip(",").split(",")))
exon_ends = list(map(
int, rec["exonEnds"].strip(",").split(",")))
assert len(exon_starts) == len(exon_ends)
exons = [
Exon(start + 1, end)
for start, end in zip(exon_starts, exon_ends, strict=True)
]
transcript_ids_counter[tr_name] += 1
tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}"
attributes = {
k: rec[k]
for k in [
"#bin",
"score",
"exonCount",
"cdsStartStat",
"cdsEndStat",
"exonFrames",
]
}
transcript_model = TranscriptModel(
gene=gene,
tr_id=tr_id,
tr_name=tr_name,
chrom=chrom,
strand=strand,
tx=tx,
cds=cds,
exons=exons,
attributes=attributes,
)
transcript_model.update_frames()
gene_models.add_transcript_model(transcript_model)
return True
[docs]
def parse_known_gene_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse known gene models file format."""
# pylint: disable=too-many-locals
expected_columns = [
"name",
"chrom",
"strand",
"txStart",
"txEnd",
"cdsStart",
"cdsEnd",
"exonCount",
"exonStarts",
"exonEnds",
"proteinID",
"alignID",
]
infile.seek(0)
df = parse_raw(infile, expected_columns, nrows=nrows)
if df is None:
return False
records = df.to_dict(orient="records")
transcript_ids_counter: dict[str, int] = defaultdict(int)
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
for rec in records:
gene = rec["name"]
gene = gene_models.alternative_names.get(gene, gene)
tr_name = rec["name"]
chrom = rec["chrom"]
strand = rec["strand"]
tx = ( # pylint: disable=invalid-name
int(rec["txStart"]) + 1, int(rec["txEnd"]))
cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"]))
exon_starts = list(map(
int, rec["exonStarts"].strip(",").split(",")))
exon_ends = list(map(
int, rec["exonEnds"].strip(",").split(",")))
assert len(exon_starts) == len(exon_ends)
exons = [
Exon(start + 1, end)
for start, end in zip(exon_starts, exon_ends, strict=True)
]
transcript_ids_counter[tr_name] += 1
tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}"
attributes = {k: rec[k] for k in ["proteinID", "alignID"]}
transcript_model = TranscriptModel(
gene=gene,
tr_id=tr_id,
tr_name=tr_name,
chrom=chrom,
strand=strand,
tx=tx,
cds=cds,
exons=exons,
attributes=attributes,
)
transcript_model.update_frames()
gene_models.add_transcript_model(transcript_model)
return True
[docs]
def parse_ucscgenepred_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse UCSC gene prediction models file fomrat.
table genePred
"A gene prediction."
(
string name; "Name of gene"
string chrom; "Chromosome name"
char[1] strand; "+ or - for strand"
uint txStart; "Transcription start position"
uint txEnd; "Transcription end position"
uint cdsStart; "Coding region start"
uint cdsEnd; "Coding region end"
uint exonCount; "Number of exons"
uint[exonCount] exonStarts; "Exon start positions"
uint[exonCount] exonEnds; "Exon end positions"
)
table genePredExt
"A gene prediction with some additional info."
(
string name; "Name of gene (usually transcript_id from
GTF)"
string chrom; "Chromosome name"
char[1] strand; "+ or - for strand"
uint txStart; "Transcription start position"
uint txEnd; "Transcription end position"
uint cdsStart; "Coding region start"
uint cdsEnd; "Coding region end"
uint exonCount; "Number of exons"
uint[exonCount] exonStarts; "Exon start positions"
uint[exonCount] exonEnds; "Exon end positions"
int score; "Score"
string name2; "Alternate name (e.g. gene_id from GTF)"
string cdsStartStat; "Status of CDS start annotation (none,
unknown, incomplete, or complete)"
string cdsEndStat; "Status of CDS end annotation
(none, unknown,
incomplete, or complete)"
lstring exonFrames; "Exon frame offsets {0,1,2}"
)
"""
# pylint: disable=too-many-locals
expected_columns = [
"name",
"chrom",
"strand",
"txStart",
"txEnd",
"cdsStart",
"cdsEnd",
"exonCount",
"exonStarts",
"exonEnds",
"score",
"name2",
"cdsStartStat",
"cdsEndStat",
"exonFrames",
]
infile.seek(0)
df = parse_raw(infile, expected_columns[:10], nrows=nrows)
if df is None:
infile.seek(0)
df = parse_raw(infile, expected_columns, nrows=nrows)
if df is None:
return False
records = df.to_dict(orient="records")
transcript_ids_counter: dict[str, int] = defaultdict(int)
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
for rec in records:
gene = rec.get("name2")
if not gene:
gene = rec["name"]
gene = gene_models.alternative_names.get(gene, gene)
tr_name = rec["name"]
chrom = rec["chrom"]
strand = rec["strand"]
tx = ( # pylint: disable=invalid-name
int(rec["txStart"]) + 1, int(rec["txEnd"]))
cds = (int(rec["cdsStart"]) + 1, int(rec["cdsEnd"]))
exon_starts = list(map(
int, rec["exonStarts"].strip(",").split(",")))
exon_ends = list(map(
int, rec["exonEnds"].strip(",").split(",")))
assert len(exon_starts) == len(exon_ends)
exons = [
Exon(start + 1, end)
for start, end in zip(exon_starts, exon_ends, strict=True)
]
transcript_ids_counter[tr_name] += 1
tr_id = f"{tr_name}_{transcript_ids_counter[tr_name]}"
attributes = {}
for attr in expected_columns[10:]:
if attr in rec:
attributes[attr] = rec.get(attr)
transcript_model = TranscriptModel(
gene=gene,
tr_id=tr_id,
tr_name=tr_name,
chrom=chrom,
strand=strand,
tx=tx,
cds=cds,
exons=exons,
attributes=attributes,
)
transcript_model.update_frames()
gene_models.add_transcript_model(transcript_model)
return True
def _parse_gtf_attributes(data: str) -> dict[str, str]:
attributes = list(
filter(lambda x: x, [a.strip() for a in data.split(";")]),
)
result = {}
for attr in attributes:
key, value = attr.split(" ", maxsplit=1)
result[key.strip()] = value.strip('"').strip()
return result
[docs]
def parse_gtf_gene_models_format(
gene_models: GeneModels,
infile: IO,
gene_mapping: dict[str, str] | None = None,
nrows: int | None = None,
) -> bool:
"""Parse GTF gene models file format."""
assert not gene_models.transcript_models
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
expected_columns = [
"seqname",
"source",
"feature",
"start",
"end",
"score",
"strand",
"phase",
"attributes",
# "comments",
]
infile.seek(0)
df = parse_raw(
infile, expected_columns, nrows=nrows, comment="#")
if df is None:
expected_columns.append("comment")
infile.seek(0)
df = parse_raw(
infile, expected_columns, nrows=nrows, comment="#")
if df is None:
return False
if gene_mapping:
gene_models.alternative_names = copy.deepcopy(gene_mapping)
records = df.to_dict(orient="records")
for rec in records:
feature = rec["feature"]
if feature == "gene":
continue
attributes = _parse_gtf_attributes(rec["attributes"])
tr_id = attributes["transcript_id"]
if feature in {"transcript", "Selenocysteine"}:
if feature == "Selenocysteine" and \
tr_id in gene_models.transcript_models:
continue
if tr_id in gene_models.transcript_models:
raise ValueError(
f"{tr_id} of {feature} already in transcript models",
)
gene = attributes["gene_name"]
gene = gene_models.alternative_names.get(gene, gene)
transcript_model = TranscriptModel(
gene=gene,
tr_id=tr_id,
tr_name=tr_id,
chrom=rec["seqname"],
strand=rec["strand"],
tx=(rec["start"], rec["end"]),
cds=(rec["end"], rec["start"]),
attributes=attributes,
)
gene_models.add_transcript_model(transcript_model)
continue
if feature == "exon":
if tr_id not in gene_models.transcript_models:
raise ValueError(
f"exon or CDS transcript {tr_id} not found "
f"in transcript models",
)
transcript_model = gene_models.transcript_models[tr_id]
if feature == "exon":
exon = Exon(
rec["start"], rec["end"], frame=-1,
)
transcript_model.exons.append(exon)
continue
if feature in {"UTR", "5UTR", "3UTR", "CDS"}:
continue
if feature in {"start_codon", "stop_codon"}:
transcript_model = gene_models.transcript_models[tr_id]
cds = transcript_model.cds
transcript_model.cds = \
(min(cds[0], rec["start"]), max(cds[1], rec["end"]))
continue
raise ValueError(
f"unknown feature {feature} found in gtf gene models")
for transcript_model in gene_models.transcript_models.values():
transcript_model.exons = sorted(
transcript_model.exons, key=lambda x: x.start)
transcript_model.update_frames()
return True
[docs]
def load_gene_mapping(infile: IO) -> dict[str, str]:
"""Load alternative names for genes.
Assume that its first line has two column names
"""
df = pd.read_csv(infile, sep="\t")
assert len(df.columns) == 2
df = df.rename(columns={df.columns[0]: "tr_id", df.columns[1]: "gene"})
records = df.to_dict(orient="records")
alt_names = {}
for rec in records:
rec = cast(dict, rec)
alt_names[rec["tr_id"]] = rec["gene"]
return alt_names
SUPPORTED_GENE_MODELS_FILE_FORMATS: set[str] = {
"default",
"refflat",
"refseq",
"ccds",
"knowngene",
"gtf",
"ucscgenepred",
}
[docs]
def get_parser(
fileformat: str,
) -> GeneModelsParser | None:
"""Get gene models parser based on file format."""
# pylint: disable=too-many-return-statements
if fileformat == "default":
return parse_default_gene_models_format
if fileformat == "refflat":
return parse_ref_flat_gene_models_format
if fileformat == "refseq":
return parse_ref_seq_gene_models_format
if fileformat == "ccds":
return parse_ccds_gene_models_format
if fileformat == "knowngene":
return parse_known_gene_models_format
if fileformat == "gtf":
return parse_gtf_gene_models_format
if fileformat == "ucscgenepred":
return parse_ucscgenepred_models_format
return None
[docs]
def infer_gene_model_parser(
gene_models: GeneModels,
infile: IO,
file_format: str | None = None,
) -> str | None:
"""Infer gene models file format."""
if file_format is not None:
parser = get_parser(file_format)
if parser is not None:
return file_format
logger.info("going to infer gene models file format...")
inferred_formats = []
for inferred_format in SUPPORTED_GENE_MODELS_FILE_FORMATS:
gene_models.reset()
parser = get_parser(inferred_format)
if parser is None:
continue
try:
logger.debug("trying file format: %s...", inferred_format)
infile.seek(0)
res = parser(gene_models, infile, None, 50)
if res:
inferred_formats.append(inferred_format)
logger.debug(
"gene models format %s matches input", inferred_format)
except Exception as ex: # noqa: BLE001 pylint: disable=broad-except
logger.debug(
"file format %s does not match; %s",
inferred_format, ex, exc_info=True)
gene_models.reset()
logger.info("inferred file formats: %s", inferred_formats)
if len(inferred_formats) == 1:
return inferred_formats[0]
logger.error(
"can't find gene model parser; "
"inferred file formats are %s", inferred_formats)
return None
[docs]
def load_gene_models(gene_models: GeneModels) -> GeneModels:
"""Load gene models."""
resource = gene_models.resource
filename = resource.get_config()["filename"]
fileformat = resource.get_config().get("format", None)
gene_mapping_filename = resource.get_config().get(
"gene_mapping", None)
gene_mapping = None
if gene_mapping_filename is not None:
compression = False
if gene_mapping_filename.endswith(".gz"):
compression = True
with resource.open_raw_file(
gene_mapping_filename, "rt",
compression=compression) as gene_mapping_file:
logger.debug(
"loading gene mapping from %s", gene_mapping_filename)
gene_mapping = load_gene_mapping(gene_mapping_file)
logger.debug("loading gene models %s (%s)", filename, fileformat)
compression = False
if filename.endswith(".gz"):
compression = True
with resource.open_raw_file(
filename, mode="rt", compression=compression) as infile:
if fileformat is None:
fileformat = infer_gene_model_parser(gene_models, infile)
logger.info("infering gene models file format: %s", fileformat)
if fileformat is None:
logger.error(
"can't infer gene models file format for "
"%s...", resource.resource_id)
raise ValueError("can't infer gene models file format")
parser = get_parser(fileformat)
if parser is None:
logger.error(
"Unsupported file format %s for "
"gene model file %s.", fileformat,
resource.resource_id)
raise ValueError
infile.seek(0)
if not parser(gene_models, infile, gene_mapping, None):
raise ValueError(
f"Failed to parse gene models file {filename} "
f"with format {fileformat}")
return gene_models