Source code for dae.duckdb_storage.duckdb_storage_helpers

from __future__ import annotations

import logging
import os
import re
import textwrap
from typing import Any
from urllib.parse import urlparse

import duckdb
import jinja2
from pydantic import (
    ByteSize,
)
from s3fs.core import S3FileSystem

from dae.duckdb_storage.duckdb2_variants import (
    Db2Layout,
)
from dae.duckdb_storage.duckdb_storage_config import (
    S3Path,
)
from dae.parquet.partition_descriptor import PartitionDescriptor
from dae.schema2_storage.schema2_import_storage import Schema2DatasetLayout
from dae.utils import fs_utils

logger = logging.getLogger(__name__)
PARQUET_SCAN = re.compile(r"parquet_scan\('(?P<parquet_path>.+)'\)")


[docs] def create_database_connection( db_filename: str, *, read_only: bool = True, memory_limit: str | ByteSize | None = None, ) -> duckdb.DuckDBPyConnection: """Create a read-write connection to the DuckDb database.""" dirname = os.path.dirname(db_filename) os.makedirs(dirname, exist_ok=True) logger.debug("working with duckdb: %s", db_filename) logger.info( "duckdb connection to %s; read_only=%s", db_filename, read_only) try: connection = duckdb.connect(db_filename, read_only=read_only) if memory_limit is not None: if isinstance(memory_limit, ByteSize): mlimit = memory_limit.human_readable(decimal=True) else: mlimit = memory_limit connection.sql(f"SET memory_limit='{mlimit}'") except duckdb.ConnectionException: logger.exception( "duckdb connection error: %s; read_only=%s", db_filename, read_only) raise return connection
[docs] def create_memory_connection( *, memory_limit: str | ByteSize | None = None, ) -> duckdb.DuckDBPyConnection: """Create a read-write connection to the DuckDb database.""" try: connection = duckdb.connect(":memory:") if memory_limit is not None: if isinstance(memory_limit, ByteSize): mlimit = memory_limit.human_readable(decimal=True) else: mlimit = memory_limit connection.sql(f"SET memory_limit='{mlimit}'") except duckdb.ConnectionException: logger.exception( "duckdb connection ':memory:' error") raise return connection
[docs] def create_table_layout(study_id: str) -> Schema2DatasetLayout: return Schema2DatasetLayout( study_id, f"{study_id}_pedigree", f"{study_id}_summary", f"{study_id}_family", f"{study_id}_meta")
def _create_table( connection: duckdb.DuckDBPyConnection, parquet_path: str, table_name: str, ) -> None: """Create a table from a parquet file.""" with connection.cursor() as cursor: assert cursor is not None query = f"DROP TABLE IF EXISTS {table_name}" cursor.sql(query) query = f""" CREATE TABLE {table_name} AS SELECT * FROM parquet_scan('{parquet_path}') """ # noqa: S608 cursor.sql(query) def _create_table_partitioned( connection: duckdb.DuckDBPyConnection, parquet_path: str, table_name: str, partition: list[tuple[str, str]], ) -> None: """Create a table from a partitioned parquet dataset.""" with connection.cursor() as cursor: dataset_path = f"{parquet_path}/{ '*/' * len(partition)}*.parquet" logger.debug("creating table %s from %s", table_name, dataset_path) query = f"DROP TABLE IF EXISTS {table_name}" logger.debug("query: %s", query) cursor.sql(query) query = f""" CREATE TABLE {table_name} AS SELECT * FROM parquet_scan('{dataset_path}', hive_partitioning = 1) """ logger.info("query: %s", query) cursor.sql(query)
[docs] def create_duckdb_tables( connection: duckdb.DuckDBPyConnection, study_id: str, layout: Schema2DatasetLayout, partition_descriptor: PartitionDescriptor, ) -> Schema2DatasetLayout: """Create tables in the DuckDb database.""" tables_layout = create_table_layout(study_id) _create_table( connection, layout.meta, tables_layout.meta) _create_table( connection, layout.pedigree, tables_layout.pedigree) if layout.summary is None: assert layout.family is None return Schema2DatasetLayout( tables_layout.study, tables_layout.pedigree, None, None, tables_layout.meta) assert tables_layout.summary is not None assert tables_layout.family is not None assert layout.summary is not None assert layout.family is not None _create_table_partitioned( connection, layout.summary, tables_layout.summary, partition_descriptor.dataset_summary_partition()) _create_table_partitioned( connection, layout.family, tables_layout.family, partition_descriptor.dataset_family_partition()) return tables_layout
[docs] def join_base_url_and_parquet_scan( base_url: str, parquet_scan: str | None, ) -> str | None: """Join the base URL and the parquet scan.""" if parquet_scan is None: return None match = PARQUET_SCAN.fullmatch(parquet_scan) if not match: return parquet_scan parquet_path = match.groupdict()["parquet_path"] assert parquet_path assert base_url is not None full_path = fs_utils.join(base_url, parquet_path) return f"parquet_scan('{full_path}')"
[docs] def create_study_parquet_tables_layout( study_config: dict[str, Any], base_url: str, ) -> Db2Layout: """Construct study tables layout.""" study_config_layout = get_study_config_tables( study_config, db_name=None) assert study_config_layout.db is None pedigree = join_base_url_and_parquet_scan( base_url, study_config_layout.pedigree) meta = join_base_url_and_parquet_scan( base_url, study_config_layout.meta) assert pedigree is not None assert meta is not None return Db2Layout( db=None, study=study_config_layout.study, pedigree=pedigree, summary=join_base_url_and_parquet_scan( base_url, study_config_layout.summary), family=join_base_url_and_parquet_scan( base_url, study_config_layout.family), meta=meta, )
[docs] def create_relative_parquet_scans_layout( base_url: str, study_id: str, partition_descriptor: PartitionDescriptor, ) -> Schema2DatasetLayout: """Construct DuckDb parquet scans relative to base dir.""" study_dir = study_id pedigree_path = fs_utils.join(study_dir, "pedigree") meta_path = fs_utils.join(study_dir, "meta") summary_path = fs_utils.join(study_dir, "summary") summary_partition = partition_descriptor.dataset_summary_partition() family_path = fs_utils.join(study_dir, "family") family_partition = partition_descriptor.dataset_family_partition() study_dir = fs_utils.join(base_url, study_dir) paths = Schema2DatasetLayout( study_dir, f"{pedigree_path}/pedigree.parquet", f"{summary_path}/{'*/' * len(summary_partition)}*.parquet", f"{family_path}/{'*/' * len(family_partition)}*.parquet", f"{meta_path}/meta.parquet") return Schema2DatasetLayout( study_dir, f"parquet_scan('{paths.pedigree}')", f"parquet_scan('{paths.summary}')", f"parquet_scan('{paths.family}')", f"parquet_scan('{paths.meta}')")
[docs] def get_study_config_tables( study_config: dict[str, Any], db_name: str | None, ) -> Db2Layout: """Return the study tables configuration.""" tables = study_config["genotype_storage"]["tables"] return Db2Layout( db=db_name, study=study_config["id"], pedigree=tables["pedigree"], summary=tables.get("summary"), family=tables.get("family"), meta=tables["meta"], )
[docs] def create_s3_secret_clause( storage_id: str, endpoint_url: str | S3Path | None, ) -> str: """Create a DuckDb secret clause for S3 storage.""" endpoint = None if endpoint_url: parsed = urlparse(str(endpoint_url)) endpoint = parsed.netloc return jinja2.Template(textwrap.dedent( """ drop secret if exists {{ storage_id }}_secret; create secret {{ storage_id }}_secret ( type s3, key_id '{{ aws_access_key_id }}', secret '{{ aws_secret_access_key }}', {%- if endpoint %} endpoint '{{ endpoint }}', {%- endif %} url_style 'path', {%- if region %} region '{{ region }}', {%- else %} region 'None' {%- endif %} ); """, )).render( storage_id=storage_id, aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), endpoint=endpoint, region=os.getenv("AWS_REGION"), )
[docs] def create_s3_attach_db_clause(db_url: str) -> str: return f"ATTACH DATABASE '{ db_url }' (type duckdb, read_only);"
[docs] def create_s3_filesystem(endpoint_url: str | S3Path | None) -> S3FileSystem: client_kwargs = {} if endpoint_url: client_kwargs["endpoint_url"] = str(endpoint_url) s3filesystem = S3FileSystem(anon=False, client_kwargs=client_kwargs) s3filesystem.invalidate_cache() return s3filesystem