dae.duckdb_storage package

Submodules

dae.duckdb_storage.duckdb2_variants module

class dae.duckdb_storage.duckdb2_variants.DuckDb2Runner(connection_factory: DuckDBPyConnection, query: list[str], deserializer: Any | None = None, limit: int | None = None)[source]

Bases: QueryRunner

Run a DuckDb query in a separate thread.

run() None[source]

Execute the query and enqueue the resulting rows.

class dae.duckdb_storage.duckdb2_variants.DuckDb2Variants(connection_factory: DuckDbConnectionFactory, db2_layout: Db2Layout, gene_models: GeneModels, reference_genome: ReferenceGenome)[source]

Bases: QueryVariantsBase

Backend for DuckDb storage backend.

build_family_variants_query_runner(*, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, family_ids: list[str] | None = None, person_ids: list[str] | None = None, inheritance: list[str] | None = None, roles: str | None = None, sexes: str | None = None, affected_statuses: str | None = None, variant_type: str | None = None, real_attr_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, categorical_attr_filter: list[tuple[str, list[str] | list[int] | None]] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, study_filters: list[str] | None = None, tags_query: TagsQuery | None = None, **kwargs: Any) QueryRunner | None[source]

Create a query runner for searching family variants.

build_summary_variants_query_runner(*, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, variant_type: str | None = None, real_attr_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, categorical_attr_filter: list[tuple[str, list[str] | list[int] | None]] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, **kwargs: Any) QueryRunner | None[source]

Create query runner for searching summary variants.

fetch_annotation() str[source]
query_summary_variants(*, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, variant_type: str | None = None, real_attr_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, categorical_attr_filter: list[tuple[str, list[str] | list[int] | None]] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, **kwargs: Any) Generator[SummaryVariant, None, None][source]

Execute the summary variants query and yields summary variants.

query_variants(*, regions: list[Region] | None = None, genes: list[str] | None = None, effect_types: list[str] | None = None, family_ids: list[str] | None = None, person_ids: list[str] | None = None, inheritance: list[str] | None = None, roles: str | None = None, sexes: str | None = None, variant_type: str | None = None, real_attr_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, categorical_attr_filter: list[tuple[str, list[str] | list[int] | None]] | None = None, ultra_rare: bool | None = None, frequency_filter: list[tuple[str, tuple[float | None, float | None]]] | None = None, return_reference: bool | None = None, return_unknown: bool | None = None, limit: int | None = None, tags_query: TagsQuery | None = None, **kwargs: Any) Generator[FamilyVariant, None, None][source]

Execute the family variants query and yields family variants.

dae.duckdb_storage.duckdb_connection_factory module

class dae.duckdb_storage.duckdb_connection_factory.DuckDbConnectionFactory[source]

Bases: ABC

Abstract factory for DuckDb connection.

abstract connect() DuckDBPyConnection[source]

Create a new DuckDb connection.

dae.duckdb_storage.duckdb_genotype_storage module

class dae.duckdb_storage.duckdb_genotype_storage.AbstractDuckDbStorage(dd_config: DuckDbConf | DuckDbS3Conf | DuckDbParquetConf | DuckDbS3ParquetConf)[source]

Bases: GenotypeStorage, DuckDbConnectionFactory

Defines abstract DuckDb genotype storage.

build_backend(study_config: dict, genome: ReferenceGenome, gene_models: GeneModels) DuckDb2Variants[source]

Construct a query backend for this genotype storage.

abstract build_study_layout(study_config: dict[str, Any]) Db2Layout[source]

Construct study layout from study and storage configuration.

connect() DuckDBPyConnection[source]

Create a new DuckDb connection.

is_read_only() bool[source]
shutdown() AbstractDuckDbStorage[source]

Frees all resources used by the genotype storage to work.

class dae.duckdb_storage.duckdb_genotype_storage.DuckDbParquetStorage(dd_config: DuckDbParquetConf)[source]

Bases: AbstractDuckDbStorage

Defines duckdb_parquet genotype storage.

build_study_layout(study_config: dict[str, Any]) Db2Layout[source]

Construct study layout from study and storage configuration.

classmethod get_storage_types() set[str][source]

Return the genotype storage type.

start() DuckDbParquetStorage[source]

Allocate all resources needed for the genotype storage to work.

class dae.duckdb_storage.duckdb_genotype_storage.DuckDbS3ParquetStorage(dd_config: DuckDbS3ParquetConf)[source]

Bases: AbstractDuckDbStorage

Defines duckdb_s3_parquet genotype storage.

build_study_layout(study_config: dict[str, Any]) Db2Layout[source]

Construct study layout from study and storage configuration.

classmethod get_storage_types() set[str][source]

Return the genotype storage type.

start() DuckDbS3ParquetStorage[source]

Allocate all resources needed for the genotype storage to work.

class dae.duckdb_storage.duckdb_genotype_storage.DuckDbS3Storage(dd_config: DuckDbS3Conf)[source]

Bases: AbstractDuckDbStorage

Defines duckdb genotype storage.

build_study_layout(study_config: dict[str, Any]) Db2Layout[source]

Construct study layout from study and storage configuration.

get_db_filename() str[source]

Construct database full filename.

classmethod get_storage_types() set[str][source]

Return the genotype storage type.

start() DuckDbS3Storage[source]

Allocate all resources needed for the genotype storage to work.

class dae.duckdb_storage.duckdb_genotype_storage.DuckDbStorage(dd_config: DuckDbConf)[source]

Bases: AbstractDuckDbStorage

Defines duckdb genotype storage.

build_study_layout(study_config: dict[str, Any]) Db2Layout[source]

Construct study layout from study and storage configuration.

get_db_filename() str[source]

Construct database full filename.

classmethod get_storage_types() set[str][source]

Return the genotype storage type.

start() DuckDbStorage[source]

Allocate all resources needed for the genotype storage to work.

dae.duckdb_storage.duckdb_genotype_storage.duckdb_parquet_storage_factory(storage_config: dict[str, Any]) DuckDbParquetStorage[source]

Create duckdb_parquet genotype storage.

dae.duckdb_storage.duckdb_genotype_storage.duckdb_s3_parquet_storage_factory(storage_config: dict[str, Any]) DuckDbS3ParquetStorage[source]

Create duckdb_s3_parquet genotype storage.

dae.duckdb_storage.duckdb_genotype_storage.duckdb_s3_storage_factory(storage_config: dict[str, Any]) DuckDbS3Storage[source]

Create duckdb_s3 genotype storage.

dae.duckdb_storage.duckdb_genotype_storage.duckdb_storage_factory(storage_config: dict[str, Any]) DuckDbStorage[source]

Create duckdb genotype storage.

dae.duckdb_storage.duckdb_import_storage module

class dae.duckdb_storage.duckdb_import_storage.AbstractDuckDbImportStorage[source]

Bases: Schema2ImportStorage, ABC

Import logic for data in the DuckDb Schema 2 format.

classmethod do_study_config(project: ImportProject, study_tables: Schema2DatasetLayout) None[source]

Produce a study config for the given project.

generate_import_task_graph(project: ImportProject) TaskGraph[source]

Generate task grap for import of the project into this storage.

class dae.duckdb_storage.duckdb_import_storage.DuckDbImportStorage[source]

Bases: AbstractDuckDbImportStorage

Import logic for data in the DuckDb Schema 2 format.

class dae.duckdb_storage.duckdb_import_storage.DuckDbLegacyImportStorage[source]

Bases: AbstractDuckDbImportStorage

Import logic for data in the DuckDb Schema 2 format.

class dae.duckdb_storage.duckdb_import_storage.DuckDbParquetImportStorage[source]

Bases: AbstractDuckDbImportStorage

Import logic for data in the DuckDb Schema 2 format.

class dae.duckdb_storage.duckdb_import_storage.DuckDbS3ImportStorage[source]

Bases: AbstractDuckDbImportStorage

Import logic for data in the DuckDb Schema 2 format.

class dae.duckdb_storage.duckdb_import_storage.DuckDbS3ParquetImportStorage[source]

Bases: AbstractDuckDbImportStorage

Import logic for data in the DuckDb Schema 2 format.

dae.duckdb_storage.duckdb_legacy_genotype_storage module

class dae.duckdb_storage.duckdb_legacy_genotype_storage.DuckDbLegacyStorage(storage_config: dict[str, Any])[source]

Bases: GenotypeStorage, DuckDbConnectionFactory

Defines DuckDb genotype storage.

VALIDATION_SCHEMA: ClassVar[dict[str, Any]] = {'base_dir': {'type': 'string'}, 'db': {'type': 'string'}, 'endpoint_url': {'type': 'string'}, 'id': {'required': True, 'type': 'string'}, 'memory_limit': {'default': '16GB', 'type': 'string'}, 'read_only': {'default': True, 'type': 'boolean'}, 'storage_type': {'allowed': ['duckdb_legacy'], 'type': 'string'}, 'work_dir': {'type': 'string'}}
build_backend(study_config: dict, genome: ReferenceGenome, gene_models: GeneModels) DuckDbVariants | DuckDb2Variants[source]

Construct a query backend for this genotype storage.

close() None[source]
connect() DuckDBPyConnection[source]

Create a new DuckDb connection.

create_database_connection() DuckDBPyConnection[source]

Create a read-write connection to the DuckDb database.

static create_parquet_scans_layout_from_layout(layout: Schema2DatasetLayout, partition_descriptor: PartitionDescriptor) Schema2DatasetLayout[source]

Construct DuckDb parquet scans for all studies tables.

create_parquet_scans_layout_relative(study_id: str, partition_descriptor: PartitionDescriptor) Schema2DatasetLayout[source]

Construct DuckDb parquet scans relative to base dir.

create_table(connection: DuckDBPyConnection, parquet_path: str, table_name: str) None[source]

Create a table from a parquet file.

static create_table_layout(study_id: str) Schema2DatasetLayout[source]
create_table_partitioned(connection: DuckDBPyConnection, parquet_path: str, table_name: str, partition: list[tuple[str, str]]) None[source]

Create a table from a partitioned parquet dataset.

get_base_dir() str | None[source]
get_db() str | None[source]
get_db_filename() str[source]

Construct database full filename.

get_memory_limit() str[source]
classmethod get_storage_types() set[str][source]

Return the genotype storage type.

get_work_dir() str | None[source]
import_dataset(work_dir: str, study_id: str, layout: Schema2DatasetLayout, partition_descriptor: PartitionDescriptor) Schema2DatasetLayout[source]

Import study parquet dataset into duckdb genotype storage.

shutdown() DuckDbLegacyStorage[source]

Frees all resources used by the genotype storage to work.

start() DuckDbLegacyStorage[source]

Allocate all resources needed for the genotype storage to work.

classmethod validate_and_normalize_config(config: dict) dict[source]

Normalize and validate the genotype storage configuration.

When validation passes returns the normalized and validated annotator configuration dict.

When validation fails, raises ValueError.

All genotype storage configurations are required to have:

  • “storage_type” - which storage type this configuration is used for;

  • “id” - the ID of the genotype storage instance that will be created.

dae.duckdb_storage.duckdb_legacy_genotype_storage.duckdb_connect(db_name: str | None = None, *, read_only: bool = True) DuckDBPyConnection[source]

dae.duckdb_storage.duckdb_storage_config module

class dae.duckdb_storage.duckdb_storage_config.DuckDbBaseConf(*, id: str, memory_limit: ByteSize | None = None)[source]

Bases: BaseModel

Base class for DuckDb based storage configuration.

id: str
memory_limit: ByteSize | None
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'id': FieldInfo(annotation=str, required=True), 'memory_limit': FieldInfo(annotation=Union[ByteSize, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class dae.duckdb_storage.duckdb_storage_config.DuckDbConf(*, id: str, memory_limit: ByteSize | None = None, storage_type: Literal['duckdb'] | Literal['duckdb_legacy'], db: Path, read_only: bool = True, base_dir: Path)[source]

Bases: DuckDbBaseConf

duckdb storage configuration class.

base_dir: BaseDirPath
db: pathlib.Path
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'base_dir': FieldInfo(annotation=Path, required=True, metadata=[AfterValidator(func=<function _validate_abs_path>)]), 'db': FieldInfo(annotation=Path, required=True), 'id': FieldInfo(annotation=str, required=True), 'memory_limit': FieldInfo(annotation=Union[ByteSize, NoneType], required=False, default=None), 'read_only': FieldInfo(annotation=bool, required=False, default=True), 'storage_type': FieldInfo(annotation=Union[Literal['duckdb'], Literal['duckdb_legacy']], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

read_only: bool
storage_type: Literal['duckdb'] | Literal['duckdb_legacy']
class dae.duckdb_storage.duckdb_storage_config.DuckDbParquetConf(*, id: str, memory_limit: ByteSize | None = None, storage_type: Literal['duckdb_parquet'], base_dir: Path)[source]

Bases: DuckDbBaseConf

duckdb_parquet storage configuration class.

base_dir: BaseDirPath
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'base_dir': FieldInfo(annotation=Path, required=True, metadata=[AfterValidator(func=<function _validate_abs_path>)]), 'id': FieldInfo(annotation=str, required=True), 'memory_limit': FieldInfo(annotation=Union[ByteSize, NoneType], required=False, default=None), 'storage_type': FieldInfo(annotation=Literal['duckdb_parquet'], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

storage_type: Literal['duckdb_parquet']
class dae.duckdb_storage.duckdb_storage_config.DuckDbS3Conf(*, id: str, memory_limit: ByteSize | None = None, storage_type: Literal['duckdb_s3'], db: str, bucket_url: Url, endpoint_url: Url | None = None)[source]

Bases: DuckDbBaseConf

duckdb_s3 storage configuration class.

bucket_url: S3Path
db: str
endpoint_url: HttpUrl | None
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'bucket_url': FieldInfo(annotation=Url, required=True, metadata=[UrlConstraints(max_length=None, allowed_schemes=['s3'], host_required=None, default_host=None, default_port=None, default_path=None)]), 'db': FieldInfo(annotation=str, required=True), 'endpoint_url': FieldInfo(annotation=Union[Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'], host_required=None, default_host=None, default_port=None, default_path=None)], NoneType], required=False, default=None), 'id': FieldInfo(annotation=str, required=True), 'memory_limit': FieldInfo(annotation=Union[ByteSize, NoneType], required=False, default=None), 'storage_type': FieldInfo(annotation=Literal['duckdb_s3'], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

storage_type: Literal['duckdb_s3']
class dae.duckdb_storage.duckdb_storage_config.DuckDbS3ParquetConf(*, id: str, memory_limit: ByteSize | None = None, storage_type: Literal['duckdb_s3_parquet'], bucket_url: Url, endpoint_url: Url | None = None)[source]

Bases: DuckDbBaseConf

duckdb_parquet storage configuration class.

bucket_url: S3Path
endpoint_url: HttpUrl | None
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'bucket_url': FieldInfo(annotation=Url, required=True, metadata=[UrlConstraints(max_length=None, allowed_schemes=['s3'], host_required=None, default_host=None, default_port=None, default_path=None)]), 'endpoint_url': FieldInfo(annotation=Union[Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'], host_required=None, default_host=None, default_port=None, default_path=None)], NoneType], required=False, default=None), 'id': FieldInfo(annotation=str, required=True), 'memory_limit': FieldInfo(annotation=Union[ByteSize, NoneType], required=False, default=None), 'storage_type': FieldInfo(annotation=Literal['duckdb_s3_parquet'], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

storage_type: Literal['duckdb_s3_parquet']
dae.duckdb_storage.duckdb_storage_config.parse_duckdb_config(config: dict[str, Any]) DuckDbConf | DuckDbParquetConf | DuckDbS3Conf | DuckDbS3ParquetConf[source]

Parse duckdb storage configuration.

dae.duckdb_storage.duckdb_storage_helpers module

dae.duckdb_storage.duckdb_storage_helpers.create_database_connection(db_filename: str, *, read_only: bool = True, memory_limit: str | ByteSize | None = None) DuckDBPyConnection[source]

Create a read-write connection to the DuckDb database.

dae.duckdb_storage.duckdb_storage_helpers.create_duckdb_tables(connection: DuckDBPyConnection, study_id: str, layout: Schema2DatasetLayout, partition_descriptor: PartitionDescriptor) Schema2DatasetLayout[source]

Create tables in the DuckDb database.

dae.duckdb_storage.duckdb_storage_helpers.create_memory_connection(*, memory_limit: str | ByteSize | None = None) DuckDBPyConnection[source]

Create a read-write connection to the DuckDb database.

dae.duckdb_storage.duckdb_storage_helpers.create_relative_parquet_scans_layout(base_url: str, study_id: str, partition_descriptor: PartitionDescriptor) Schema2DatasetLayout[source]

Construct DuckDb parquet scans relative to base dir.

dae.duckdb_storage.duckdb_storage_helpers.create_s3_attach_db_clause(db_url: str) str[source]
dae.duckdb_storage.duckdb_storage_helpers.create_s3_filesystem(endpoint_url: str | Url | None) S3FileSystem[source]
dae.duckdb_storage.duckdb_storage_helpers.create_s3_secret_clause(storage_id: str, endpoint_url: str | Url | None) str[source]

Create a DuckDb secret clause for S3 storage.

dae.duckdb_storage.duckdb_storage_helpers.create_study_parquet_tables_layout(study_config: dict[str, Any], base_url: str) Db2Layout[source]

Construct study tables layout.

dae.duckdb_storage.duckdb_storage_helpers.create_table_layout(study_id: str) Schema2DatasetLayout[source]
dae.duckdb_storage.duckdb_storage_helpers.get_study_config_tables(study_config: dict[str, Any], db_name: str | None) Db2Layout[source]

Return the study tables configuration.

dae.duckdb_storage.duckdb_storage_helpers.join_base_url_and_parquet_scan(base_url: str, parquet_scan: str | None) str | None[source]

Join the base URL and the parquet scan.

dae.duckdb_storage.duckdb_variants module

class dae.duckdb_storage.duckdb_variants.DuckDbQueryDialect(ns: str | None = None)[source]

Bases: Dialect

Abstracts away details related to bigquery.

static add_unnest_in_join() bool[source]
static array_item_suffix() str[source]
build_array_join(col: str, alias: str) str[source]
build_table_name(table: str, db: str | None) str[source]
static escape_char() str[source]
static escape_quote_char() str[source]
static float_type() str[source]
static int_type() str[source]
static use_bit_and_function() bool[source]
class dae.duckdb_storage.duckdb_variants.DuckDbRunner(connection_factory: DuckDBPyConnection, query: str, deserializer: Any | None = None)[source]

Bases: QueryRunner

Run a DuckDb query in a separate thread.

run() None[source]

Execute the query and enqueue the resulting rows.

class dae.duckdb_storage.duckdb_variants.DuckDbVariants(connection_factory: DuckDbConnectionFactory, db: str | None, family_variant_table: str | None, summary_allele_table: str | None, pedigree_table: str, meta_table: str, gene_models: GeneModels | None = None)[source]

Bases: SqlSchema2Variants

Backend for DuckDb storage backend.

RUNNER_CLASS

alias of DuckDbRunner

Module contents