import logging
import sys
import time
from collections.abc import Generator
from contextlib import closing
from typing import Any, cast
import duckdb
import pandas as pd
import yaml
from dae.duckdb_storage.duckdb_connection_factory import DuckDbConnectionFactory
from dae.genomic_resources.gene_models import GeneModels
from dae.genomic_resources.reference_genome import ReferenceGenome
from dae.inmemory_storage.raw_variants import RawFamilyVariants
from dae.parquet.partition_descriptor import PartitionDescriptor
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.loader import FamiliesLoader
from dae.query_variants.base_query_variants import QueryVariantsBase
from dae.query_variants.query_runners import QueryResult, QueryRunner
from dae.query_variants.sql.schema2.sql_query_builder import (
Db2Layout,
SqlQueryBuilder,
)
from dae.utils.regions import Region
from dae.variants.attributes import Role, Sex, Status
from dae.variants.family_variant import FamilyVariant
from dae.variants.variant import SummaryVariant
RealAttrFilterType = list[tuple[str, tuple[float | None, float | None]]]
logger = logging.getLogger(__name__)
[docs]
class DuckDb2Runner(QueryRunner):
"""Run a DuckDb query in a separate thread."""
def __init__(
self,
connection_factory: duckdb.DuckDBPyConnection,
query: list[str],
deserializer: Any | None = None,
limit: int | None = None,
):
super().__init__(deserializer=deserializer)
self.connection = connection_factory
self.query = query
self.limit = sys.maxsize if limit is None else limit
self._counter = 0
[docs]
def run(self) -> None:
"""Execute the query and enqueue the resulting rows."""
started = time.time()
logger.debug(
"duckdb2 runner (%s) started; running %s queries",
self.study_id, len(self.query))
try:
if self.is_closed():
logger.info(
"runner (%s) closed before execution",
self.study_id)
self._finalize(started)
return
for single_query in self.query:
with self.connection.cursor() as cursor:
logger.debug("running SQL query: %s", single_query)
cursor.execute(single_query)
while record := cursor.fetchone():
if record is None:
logger.debug("query %s done")
break
val = self.deserializer(record)
if val is None:
continue
self.put_value_in_result_queue(val)
self._counter += 1
if self.is_closed():
logger.debug(
"query runner (%s) closed while iterating",
self.study_id)
break
if self.is_closed() or self._counter >= self.limit:
logger.debug(
"runner (%s) reached limit: %s",
self.study_id, self.limit)
break
except Exception as ex: # pylint: disable=broad-except
logger.exception(
"exception in runner (%s)",
self.study_id)
self.put_value_in_result_queue(ex)
finally:
logger.debug(
"runner (%s) closing connection", self.study_id)
self._finalize(started)
def _finalize(self, started: float) -> None:
with self._status_lock:
self._done = True
elapsed = time.time() - started
logger.debug("runner (%s) done in %0.3f sec", self.study_id, elapsed)
[docs]
class DuckDb2Variants(QueryVariantsBase):
"""Backend for DuckDb storage backend."""
def __init__(
self,
connection_factory: DuckDbConnectionFactory,
db2_layout: Db2Layout,
gene_models: GeneModels,
reference_genome: ReferenceGenome,
) -> None:
self.connection_factory = connection_factory
assert self.connection_factory is not None
self.layout = db2_layout
logger.debug("working with duckdb2 layout: %s", self.layout)
self.gene_models = gene_models
self.reference_genome = reference_genome
self.start_time = time.time()
pedigree_schema = self._fetch_pedigree_schema()
summary_schema = self._fetch_summary_schema()
family_schema = self._fetch_family_schema()
schema = SqlQueryBuilder.build_schema(
pedigree_schema=pedigree_schema,
summary_schema=summary_schema,
family_schema=family_schema,
)
partition_description = self._fetch_partition_descriptor()
families = self._fetch_families()
super().__init__(families)
self.query_builder = SqlQueryBuilder(
self.layout,
schema=schema,
partition_descriptor=partition_description,
families=self.families,
gene_models=self.gene_models,
reference_genome=self.reference_genome,
)
def _fetch_meta_property(self, key: str) -> str:
meta = self.layout.meta
if self.layout.db is not None:
meta = f"{self.layout.db}.{meta}"
query = f"""SELECT value FROM {meta}
WHERE key = '{key}'
LIMIT 1
""" # noqa: S608
with self.connection_factory.connect().cursor() as cursor:
content = ""
result = cursor.execute(query).fetchall()
for row in result:
content = row[0]
return content
def _fetch_partition_descriptor(self) -> PartitionDescriptor:
content = self._fetch_meta_property("partition_description")
return PartitionDescriptor.parse_string(content)
def _fetch_summary_schema(self) -> dict[str, str]:
schema_content = self._fetch_meta_property("summary_schema")
return dict(
line.split("|") for line in schema_content.split("\n"))
def _fetch_family_schema(self) -> dict[str, str]:
schema_content = self._fetch_meta_property("family_schema")
return dict(
line.split("|") for line in schema_content.split("\n"))
def _fetch_pedigree_schema(self) -> dict[str, str]:
schema_content = self._fetch_meta_property("pedigree_schema")
if schema_content:
return dict(
line.split("|") for line in schema_content.split("\n"))
return {
"family_id": "VARCHAR",
"person_id": "VARCHAR",
"dad_id": "VARCHAR",
"mom_id": "VARCHAR",
"sex": "TINYINT",
"status": "TINYINT",
"role": "INTEGER",
"sample_id": "VARCHAR",
"generated": "BOOLEAN",
"layout": "VARCHAR",
"not_sequenced": "BOOLEAN",
}
def _fetch_variants_data_schema(self) -> dict[str, Any] | None:
content = self._fetch_meta_property("variants_data_schema")
if not content:
return None
return cast(dict[str, Any], yaml.safe_load(content))
def _fetch_pedigree(self) -> pd.DataFrame:
pedigree = self.layout.pedigree
if self.layout.db is not None:
pedigree = f"{self.layout.db}.{pedigree}"
query = f"SELECT * FROM {pedigree}" # noqa: S608
with self.connection_factory.connect().cursor() as cursor:
ped_df = cursor.execute(query).df()
columns = {
"personId": "person_id",
"familyId": "family_id",
"momId": "mom_id",
"dadId": "dad_id",
"sampleId": "sample_id",
"sex": "sex",
"status": "status",
"role": "role",
"generated": "generated",
"layout": "layout",
"phenotype": "phenotype",
}
ped_df = ped_df.rename(columns=columns)
ped_df.role = ped_df.role.apply(Role.from_value) # type: ignore
ped_df.sex = ped_df.sex.apply(Sex.from_value) # type: ignore
ped_df.status = ped_df.status.apply(
Status.from_value) # type: ignore
ped_df.loc[ped_df.layout.isna(), "layout"] = None
return ped_df
def _fetch_families(self) -> FamiliesData:
ped_df = self._fetch_pedigree()
return FamiliesLoader.build_families_data_from_pedigree(ped_df)
[docs]
def fetch_annotation(self) -> str:
return self._fetch_meta_property("annotation_pipeline")
def _deserialize_summary_variant(
self, record: list[bytes],
) -> SummaryVariant:
return self.deserialize_summary_variant(
record[3],
)
def _deserialize_family_variant(
self, record: list[bytes],
) -> FamilyVariant:
return self.deserialize_family_variant(
record[4], record[5],
)
[docs]
def build_summary_variants_query_runner(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
variant_type: str | None = None,
real_attr_filter: RealAttrFilterType | None = None,
ultra_rare: bool | None = None,
frequency_filter: RealAttrFilterType | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
**kwargs: Any,
) -> QueryRunner:
"""Create query runner for searching summary variants."""
if limit is None or limit < 0:
query_limit = None
limit = -1
else:
query_limit = 10 * limit
query = self.query_builder.build_summary_variants_query(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=query_limit,
)
logger.info("SUMMARY VARIANTS QUERY:\n%s", query)
runner = DuckDb2Runner(
connection_factory=self.connection_factory.connect(),
query=query,
deserializer=self._deserialize_summary_variant)
skip_inmemory_filterng = kwargs.get("skip_inmemory_filterng", False)
if not skip_inmemory_filterng:
filter_func = RawFamilyVariants.summary_variant_filter_function(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
seen=set())
runner.adapt(filter_func)
return runner
[docs]
def query_summary_variants(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
variant_type: str | None = None,
real_attr_filter: RealAttrFilterType | None = None,
ultra_rare: bool | None = None,
frequency_filter: RealAttrFilterType | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
**kwargs: Any,
) -> Generator[SummaryVariant, None, None]:
"""Execute the summary variants query and yields summary variants."""
if limit is None or limit < 0:
query_limit = None
limit = -1
else:
query_limit = 10 * limit
runner = self.build_summary_variants_query_runner(
regions=regions,
genes=genes,
effect_types=effect_types,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=query_limit,
**kwargs,
)
result = QueryResult(runners=[runner], limit=limit)
result.start()
with closing(result) as result:
for v in result:
if v is None:
continue
yield v
limit -= 1
if limit == 0:
break
[docs]
def build_family_variants_query_runner(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
family_ids: list[str] | None = None,
person_ids: list[str] | None = None,
inheritance: list[str] | None = None,
roles: str | None = None,
sexes: str | None = None,
variant_type: str | None = None,
real_attr_filter: RealAttrFilterType | None = None,
ultra_rare: bool | None = None,
frequency_filter: RealAttrFilterType | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
study_filters: list[str] | None = None, # noqa: ARG002
pedigree_fields: tuple | None = None,
**kwargs: Any,
) -> QueryRunner:
# pylint: disable=too-many-arguments
"""Create a query runner for searching family variants."""
if limit is None or limit < 0:
query_limit = None
limit = -1
else:
query_limit = 10 * limit
query = self.query_builder.build_family_variants_query(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=person_ids,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=query_limit,
pedigree_fields=pedigree_fields,
)
logger.info("FAMILY VARIANTS QUERY:\n%s", query)
deserialize_row = self._deserialize_family_variant
# pylint: disable=protected-access
runner = DuckDb2Runner(
connection_factory=self.connection_factory.connect(),
query=query,
deserializer=deserialize_row)
skip_inmemory_filterng = kwargs.get("skip_inmemory_filterng", False)
if not skip_inmemory_filterng:
filter_func = RawFamilyVariants.family_variant_filter_function(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=person_ids,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=limit,
seen=set())
runner.adapt(filter_func)
return runner
[docs]
def query_variants(
self, *,
regions: list[Region] | None = None,
genes: list[str] | None = None,
effect_types: list[str] | None = None,
family_ids: list[str] | None = None,
person_ids: list[str] | None = None,
inheritance: list[str] | None = None,
roles: str | None = None,
sexes: str | None = None,
variant_type: str | None = None,
real_attr_filter: RealAttrFilterType | None = None,
ultra_rare: bool | None = None,
frequency_filter: RealAttrFilterType | None = None,
return_reference: bool | None = None,
return_unknown: bool | None = None,
limit: int | None = None,
pedigree_fields: tuple | None = None,
**kwargs: Any,
) -> Generator[FamilyVariant, None, None]:
# pylint: disable=too-many-arguments
"""Execute the family variants query and yields family variants."""
if limit is None or limit < 0:
query_limit = None
limit = -1
else:
query_limit = 10 * limit
runner = self.build_family_variants_query_runner(
regions=regions,
genes=genes,
effect_types=effect_types,
family_ids=family_ids,
person_ids=person_ids,
inheritance=inheritance,
roles=roles,
sexes=sexes,
variant_type=variant_type,
real_attr_filter=real_attr_filter,
ultra_rare=ultra_rare,
frequency_filter=frequency_filter,
return_reference=return_reference,
return_unknown=return_unknown,
limit=query_limit,
pedigree_fields=pedigree_fields,
**kwargs,
)
result = QueryResult(runners=[runner], limit=limit)
result.start()
with closing(result) as result:
for v in result:
if v is None:
continue
yield v
limit -= 1
if limit == 0:
break