dae.import_tools package



dae.import_tools.cli module

dae.import_tools.cli.main(argv: List[str] | None = None) int[source]

Entry point for import tools when invoked as a cli tool.

dae.import_tools.cli.run_with_project(project: ImportProject, executor: AbstractTaskGraphExecutor | None = None) bool[source]

Run import with the given project.

dae.import_tools.import_tools module

class dae.import_tools.import_tools.Bucket(type: str, region_bin: str, regions: list[str], index: int)[source]

Bases: object

A region of the input used for processing.

index: int
region_bin: str
regions: list[str]
type: str
class dae.import_tools.import_tools.ImportConfigNormalizer[source]

Bases: object

Class to normalize import configs.

Most of the normalization is done by Cerberus but it fails short in a few cases. This class picks up the slack. It also reads external files and embeds them in the final configuration dict.

normalize(import_config: dict, base_input_dir: str) tuple[dict[str, Any], str, list[str]][source]

Normalize the import config.

class dae.import_tools.import_tools.ImportProject(import_config: dict[str, Any], base_input_dir: str | None, base_config_dir: str | None = None, gpf_instance: GPFInstance | None = None, config_filenames: list[str] | None = None)[source]

Bases: object

Encapsulate the import configuration.

This class creates the necessary objects needed to import a study (e.g. loaders, family data and so one).

build_annotation_pipeline() AnnotationPipeline[source]
static build_from_config(import_config: dict[str, Any], base_input_dir: str = '', gpf_instance: GPFInstance | None = None) ImportProject[source]

Create a new project from the provided config.

The config is first validated and normalized. :param import_config: The config to use for the import. :base_input_dir: Default input dir. Use cwd by default.

static build_from_file(import_filename: str | PathLike, gpf_instance: GPFInstance | None = None) ImportProject[source]

Create a new project from the provided config filename.

The file is first parsed, validated and normalized. The path to the file is used as the default input path for the project.

  • import_filename – Path to the config file

  • gpf_instance – Gpf Instance to use.

build_variants_loader_pipeline(variants_loader: VariantsLoader) VariantsLoader[source]

Create an annotation pipeline around variants_loader.

static del_loader_prefix(params: dict[str, Any], prefix: str) dict[str, Any][source]

Remove prefix from parameter keys.

get_annotation_pipeline_config() list[dict][source]

Return the annotation pipeline configuration.

get_genotype_storage() GenotypeStorage[source]

Find, create and return the correct genotype storage.

get_gpf_instance() GPFInstance[source]

Create and return a gpf instance as desribed in the config.

get_import_storage() ImportStorage[source]

Create an import storage as described in the import config.

get_import_variants_buckets() list[dae.import_tools.import_tools.Bucket][source]

Split variant files into buckets enabling parallel processing.

get_input_filenames(bucket: Bucket) list[str][source]

Get a list of input files for a specific bucket.

get_parquet_dataset_dir() str[source]

Return parquet dataset direcotry.

If processing parquet dataset dir is configured this method will return it. Otherwise it will construct work dir parquet dataset directory.

get_partition_descriptor() PartitionDescriptor[source]
get_pedigree() FamiliesData[source]

Load, parse and return the pedigree data.

get_pedigree_filename() str[source]

Return the path to the pedigree file.

get_pedigree_loader() FamiliesLoader[source]
get_pedigree_params() tuple[str, dict[str, Any]][source]

Get params for loading the pedigree.

get_processing_parquet_dataset_dir() str | None[source]

Return processing parquet dataset dir if configured and exists.

get_row_group_size() int[source]
get_variant_loader(bucket: Bucket | None = None, loader_type: str | None = None, reference_genome: ReferenceGenome | None = None) VariantsLoader[source]

Get the appropriate variant loader for the specified bucket.

get_variant_loader_chromosomes(loader_type: str | None = None) list[str][source]

Collect all chromosomes available in input files.

get_variant_loader_types() set[str][source]

Collect all variant import types used in the project.

get_variant_params(loader_type: str) tuple[Union[str, list[str]], dict[str, Any]][source]

Return variant loader filenames and params.

has_denovo_variants() bool[source]

Check if the resulting imported study has denovo variants.

has_genotype_storage() bool[source]

Return if a genotype storage can be created.

property include_reference: bool

Check if the import should include ref allele in the output data.

property input_dir: str

Return the path relative to which input files are specified.

property study_id: str
property work_dir: str

Where to store generated import files (e.g. parquet files).

class dae.import_tools.import_tools.ImportStorage[source]

Bases: ABC

Defines abstract base class for import storages.

abstract generate_import_task_graph(project: ImportProject) TaskGraph[source]

Generate task grap for import of the project into this storage.

class dae.import_tools.import_tools.MakefilePartitionHelper(partition_descriptor: PartitionDescriptor, genome: ReferenceGenome)[source]

Bases: object

Helper class for organizing partition targets.

bucket_index(region_bin: str) int[source]

Return bucket index based on variants target.

static build_target_chromosomes(target_chromosomes: list[str]) list[str][source]
generate_chrom_targets(target_chrom: str) list[tuple[str, str]][source]

Generate variant targets based on partition descriptor.

generate_variants_targets(target_chromosomes: list[str], mode: str | None = None) dict[str, list][source]

Produce variants targets.

region_bins_count(chrom: str) int[source]
dae.import_tools.import_tools.construct_import_annotation_pipeline(gpf_instance: GPFInstance, annotation_configfile: str | None = None) AnnotationPipeline[source]

Construct annotation pipeline for importing data.

dae.import_tools.import_tools.construct_import_annotation_pipeline_config(gpf_instance: GPFInstance, annotation_configfile: str | None = None) list[dict][source]

Construct annotation pipeline config for importing data.

dae.import_tools.import_tools.get_import_storage_factory(storage_type: str) Callable[[], ImportStorage][source]

Find and return a factory function for creation of a storage type.

dae.import_tools.import_tools.get_import_storage_types() list[str][source]
dae.import_tools.import_tools.register_import_storage_factory(storage_type: str, factory: Callable[[], ImportStorage]) None[source]
dae.import_tools.import_tools.save_study_config(dae_config: Box, study_id: str, study_config: str, *, force: bool = False) None[source]

Save the study config to a file.

Module contents