Source code for dae.duckdb_storage.duckdb_import_storage

import abc
import logging
import os
from contextlib import closing
from typing import Any, cast

import yaml

from dae.configuration.study_config_builder import StudyConfigBuilder
from dae.duckdb_storage.duckdb_genotype_storage import (
    DuckDbParquetStorage,
    DuckDbS3ParquetStorage,
    DuckDbS3Storage,
    DuckDbStorage,
)
from dae.duckdb_storage.duckdb_legacy_genotype_storage import (
    DuckDbLegacyStorage,
)
from dae.duckdb_storage.duckdb_storage_helpers import (
    create_database_connection,
    create_duckdb_tables,
    create_relative_parquet_scans_layout,
    create_s3_filesystem,
)
from dae.import_tools.import_tools import ImportProject, save_study_config
from dae.schema2_storage.schema2_import_storage import (
    Schema2ImportStorage,
)
from dae.schema2_storage.schema2_layout import (
    Schema2DatasetLayout,
    load_schema2_dataset_layout,
)
from dae.task_graph.graph import TaskGraph
from dae.utils import fs_utils

logger = logging.getLogger(__name__)


[docs] class AbstractDuckDbImportStorage(Schema2ImportStorage, abc.ABC): """Import logic for data in the DuckDb Schema 2 format.""" @staticmethod @abc.abstractmethod def _do_import_dataset( project: ImportProject, ) -> Schema2DatasetLayout: pass
[docs] @classmethod def do_study_config( cls, project: ImportProject, study_tables: Schema2DatasetLayout, ) -> None: """Produce a study config for the given project.""" genotype_storage = project.get_genotype_storage() if project.get_processing_parquet_dataset_dir() is not None: meta = cls.load_meta(project) study_config = yaml.safe_load(meta["study"]) study_config["id"] = project.study_id else: variants_types = project.get_variant_loader_types() study_config = { "id": project.study_id, "conf_dir": ".", "has_denovo": project.has_denovo_variants(), "has_cnv": "cnv" in variants_types, "has_transmitted": bool({"dae", "vcf"} & variants_types), } study_config.update({ "genotype_storage": { "id": genotype_storage.storage_id, "tables": { "pedigree": study_tables.pedigree, "meta": study_tables.meta, }, }, "genotype_browser": {"enabled": False}, }) if study_tables.summary: assert study_tables.family is not None storage_config = cast(dict, study_config["genotype_storage"]) tables_config = cast(dict[str, str], storage_config["tables"]) tables_config["summary"] = study_tables.summary tables_config["family"] = study_tables.family tables_config["meta"] = study_tables.meta genotype_browser_config = \ cast(dict[str, Any], study_config["genotype_browser"]) genotype_browser_config["enabled"] = True config_builder = StudyConfigBuilder(study_config) config = config_builder.build_config() save_study_config( project.get_gpf_instance().dae_config, project.study_id, config)
[docs] def generate_import_task_graph(self, project: ImportProject) -> TaskGraph: graph = TaskGraph() all_parquet_tasks = [] if project.get_processing_parquet_dataset_dir() is None: all_parquet_tasks = self._build_all_parquet_tasks(project, graph) if project.has_genotype_storage(): tables_task = graph.create_task( "Create DuckDb import dataset", self._do_import_dataset, [project], all_parquet_tasks) graph.create_task( "Creating a study config", self.do_study_config, [project, tables_task], [tables_task]) return graph
[docs] class DuckDbLegacyImportStorage(AbstractDuckDbImportStorage): """Import logic for data in the DuckDb Schema 2 format.""" @classmethod def _do_import_dataset( cls, project: ImportProject, ) -> Schema2DatasetLayout: genotype_storage = project.get_genotype_storage() assert isinstance( genotype_storage, DuckDbLegacyStorage) layout = load_schema2_dataset_layout( project.get_parquet_dataset_dir(), ) work_dir = project.work_dir return genotype_storage.import_dataset( work_dir, project.study_id, layout, project.get_partition_descriptor(), )
[docs] class DuckDbParquetImportStorage(AbstractDuckDbImportStorage): """Import logic for data in the DuckDb Schema 2 format.""" @classmethod def _do_import_dataset( cls, project: ImportProject, ) -> Schema2DatasetLayout: genotype_storage = project.get_genotype_storage() assert isinstance( genotype_storage, DuckDbParquetStorage) layout = load_schema2_dataset_layout( project.get_parquet_dataset_dir(), ) dest_layout = create_relative_parquet_scans_layout( str(genotype_storage.config.base_dir), project.study_id, project.get_partition_descriptor(), ) fs_utils.copy(dest_layout.study, layout.study) return dest_layout
[docs] class DuckDbS3ParquetImportStorage(AbstractDuckDbImportStorage): """Import logic for data in the DuckDb Schema 2 format.""" @classmethod def _do_import_dataset( cls, project: ImportProject, ) -> Schema2DatasetLayout: genotype_storage = project.get_genotype_storage() assert isinstance( genotype_storage, DuckDbS3ParquetStorage) layout = load_schema2_dataset_layout( project.get_parquet_dataset_dir(), ) dest_layout = create_relative_parquet_scans_layout( str(genotype_storage.config.bucket_url), project.study_id, project.get_partition_descriptor(), ) s3_fs = create_s3_filesystem(genotype_storage.config.endpoint_url) s3_fs.put(layout.study, dest_layout.study, recursive=True) return dest_layout
[docs] class DuckDbImportStorage(AbstractDuckDbImportStorage): """Import logic for data in the DuckDb Schema 2 format.""" @classmethod def _do_import_dataset( cls, project: ImportProject, ) -> Schema2DatasetLayout: genotype_storage = project.get_genotype_storage() assert isinstance( genotype_storage, DuckDbStorage) layout = load_schema2_dataset_layout( project.get_parquet_dataset_dir(), ) work_dir = project.work_dir work_db_filename = os.path.join( work_dir, genotype_storage.config.db) with closing(create_database_connection( work_db_filename, read_only=False), ) as connection: work_tables = create_duckdb_tables( connection, project.study_id, layout, project.get_partition_descriptor(), ) db_filename = genotype_storage.get_db_filename() if not os.path.exists(db_filename): logger.warning( "replacing existing DuckDb database: %s", db_filename) # this could replace already existing database so we need # to shut down the genotype storage # reconnect the storage if genotype_storage.connection_factory is not None: genotype_storage.shutdown() assert genotype_storage.connection_factory is None fs_utils.copy(db_filename, work_db_filename) return work_tables
[docs] class DuckDbS3ImportStorage(AbstractDuckDbImportStorage): """Import logic for data in the DuckDb Schema 2 format.""" @classmethod def _do_import_dataset( cls, project: ImportProject, ) -> Schema2DatasetLayout: genotype_storage = project.get_genotype_storage() assert isinstance( genotype_storage, DuckDbS3Storage) layout = load_schema2_dataset_layout( project.get_parquet_dataset_dir(), ) work_dir = project.work_dir work_db_filename = os.path.join( work_dir, genotype_storage.config.db) with closing(create_database_connection( work_db_filename, read_only=False), ) as connection: work_tables = create_duckdb_tables( connection, project.study_id, layout, project.get_partition_descriptor(), ) db_filename = genotype_storage.get_db_filename() # this could replace already existing database so we need # to shut down the genotype storage # reconnect the storage if genotype_storage.connection_factory is not None: genotype_storage.shutdown() assert genotype_storage.connection_factory is None s3_fs = create_s3_filesystem(genotype_storage.config.endpoint_url) if not s3_fs.exists(db_filename): logger.warning( "replacing existing DuckDb database: %s", db_filename) s3_fs.put(work_db_filename, db_filename, recursive=True) return work_tables