dae.parquet.schema2 package

Submodules

dae.parquet.schema2.annotate_schema2_parquet module

dae.parquet.schema2.annotate_schema2_parquet.backup_schema2_study(directory: str) Schema2DatasetLayout[source]

Backup current meta and summary data for a parquet study.

Renames the meta Parquet file and summary variants directory by attaching a suffix with the current date, then returns a corrected layout using the newly-renamed paths. This clears the way for then new ‘meta’ and ‘summary’ that will be produced when reannotating a Parquet study in place.

dae.parquet.schema2.annotate_schema2_parquet.cli(raw_args: list[str] | None = None) None[source]

Entry method for running the Schema2 Parquet annotation tool.

dae.parquet.schema2.annotate_schema2_parquet.merge_partitions(summary_dir: str, partitions: list[list[tuple[str, str]]], partition_descriptor: PartitionDescriptor) None[source]

Helper method to merge Parquet files in partitioned studies.

dae.parquet.schema2.annotate_schema2_parquet.process_parquet(input_layout: Schema2DatasetLayout, pipeline_config: list[dict[str, Any]] | RawFullConfig, grr_definition: dict | None, output_dir: str, bucket_idx: int, region: Region, args: _ProcessingArgs) None[source]

Process a Parquet dataset for annotation.

dae.parquet.schema2.annotate_schema2_parquet.produce_regions(target_region: str | None, region_size: int, contig_lens: dict[str, int]) list[str][source]

Produce regions to annotate by.

dae.parquet.schema2.annotate_schema2_parquet.produce_schema2_annotation_tasks(task_graph: TaskGraph, loader: ParquetLoader, output_dir: str, raw_pipeline: list[dict[str, Any]] | RawFullConfig, grr: GenomicResourceRepo, target_region: str | None, args: _ProcessingArgs) list[Task][source]

Produce TaskGraph tasks for Parquet file annotation.

dae.parquet.schema2.annotate_schema2_parquet.produce_schema2_merging_tasks(task_graph: TaskGraph, sync_task: Task, reference_genome: ReferenceGenome, loader: ParquetLoader, output_layout: Schema2DatasetLayout) list[Task][source]

Produce TaskGraph tasks for Parquet file merging.

Mirror pedigree and family variants data using symlinks.

dae.parquet.schema2.annotate_schema2_parquet.write_new_meta(loader: ParquetLoader, pipeline: AnnotationPipeline, output_layout: Schema2DatasetLayout) None[source]

Produce and write new metadata to the output Parquet dataset.

dae.parquet.schema2.loader module

class dae.parquet.schema2.loader.MultiReader(dirs: Iterable[str], columns: Iterable[str], batch_size: int = 1000)[source]

Bases: object

Incrementally fetch variants from multiple files.

This class assumes variants are ordered by their bucket and summary index!

close() None[source]
property current_idx: tuple[int, int]
class dae.parquet.schema2.loader.ParquetLoader(layout: Schema2DatasetLayout, batch_size: int = 1000)[source]

Bases: object

Variants loader implementation for the Parquet format.

FAMILY_COLUMNS: ClassVar[list[str]] = ['bucket_index', 'summary_index', 'family_id', 'family_variant_data']
SUMMARY_COLUMNS: ClassVar[list[str]] = ['bucket_index', 'summary_index', 'allele_index', 'summary_variant_data', 'chromosome', 'position', 'end_position']
fetch_family_variants(region: Region | None = None) Generator[FamilyVariant, None, None][source]

Iterate over family variants.

fetch_summary_variants(region: Region | None = None) Generator[SummaryVariant, None, None][source]

Iterate over summary variants.

fetch_variants(region: Region | None = None) Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None][source]

Iterate over summary and family variants.

get_family_pq_filepaths(summary_path: str) list[str][source]

Get all family parquet files for given summary parquet file.

get_summary_pq_filepaths(region: Region | None = None) Generator[list[str], None, None][source]

Produce paths to available Parquet files grouped by region.

Can filter by region if region bins are configured.

static load_from_dir(input_dir: str) ParquetLoader[source]
exception dae.parquet.schema2.loader.ParquetLoaderException[source]

Bases: Exception

class dae.parquet.schema2.loader.Reader(path: str, columns: Iterable[str], batch_size: int = 500)[source]

Bases: object

Helper class to incrementally fetch variants.

This class assumes variants are ordered by their bucket and summary index!

close() None[source]
property current_idx: tuple[int, int]

dae.parquet.schema2.merge_parquet module

dae.parquet.schema2.merge_parquet.merge_parquet_directory(parquets_dir: Path | str, output_parquet_filename: str, *, variants_type: Literal['summary', 'family'] | None = None, row_group_size: int = 50000, delete_in_files: bool = True) None[source]

Merge all parquet files from parquets_dir into a single parquet file.

dae.parquet.schema2.merge_parquet.merge_parquets(in_files: list[str], out_file: str, *, delete_in_files: bool = True, row_group_size: int = 50000) None[source]

Merge multiple parquet files into a single parquet file.

dae.parquet.schema2.merge_parquet.merge_variants_parquets(partition_descriptor: PartitionDescriptor, variants_dir: str, partition: list[tuple[str, str]], row_group_size: int = 50000, variants_type: Literal['summary', 'family'] | None = None) None[source]

Merge parquet files in variants_dir.

Deprecated since version dae.parquet.schema2.merge_variants_parquet: is deprecated. Use dae.parquet.schema2.merge_parquet_directory instead.

dae.parquet.schema2.parquet_io module

class dae.parquet.schema2.parquet_io.ContinuousParquetFileWriter(filepath: str, allele_serializer: AlleleParquetSerializer, row_group_size: int = 10000)[source]

Bases: object

A continous parquet writer.

Class that automatically writes to a given parquet file when supplied enough data. Automatically dumps leftover data when closing into the file

BATCH_ROWS = 500
DEFAULT_COMPRESSION = 'SNAPPY'
append_allele(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) None[source]

Append the data for entire allele to the correct partition file.

build_batch() RecordBatch[source]
build_table() Table[source]
close() None[source]

Close the parquet writer and write any remaining data.

data_reset() None[source]
size() int[source]
dae.parquet.schema2.parquet_io.VariantsParquetWriterDeprecated(out_dir: str, annotation_pipeline: AnnotationPipeline, partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json') None[source]

Provide functions for storing variants into parquet dataset.

Deprecated since version Use: dae.parquet.schema2.variants_parquet_writer.VariantsParquetWriter instead

dae.parquet.schema2.processing_pipeline module

class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsBatchFilter(annotation_pipeline: AnnotationPipeline)[source]

Bases: Filter, AnnotationPipelineVariantsFilterMixin

Annotation pipeline batched variants filter.

filter(data: Sequence[FullVariant]) Sequence[FullVariant][source]

Filter variants in batches.

class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsFilter(annotation_pipeline: AnnotationPipeline)[source]

Bases: Filter, AnnotationPipelineVariantsFilterMixin

Annotation pipeline batched variants filter.

filter(data: FullVariant) FullVariant[source]
class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsFilterMixin(annotation_pipeline: AnnotationPipeline)[source]

Bases: object

Mixin for annotation pipeline filters.

class dae.parquet.schema2.processing_pipeline.DeleteAttributesFromVariantFilter(attributes_to_remove: Sequence[str])[source]

Bases: Filter

Filter to remove items from variants. Works in-place.

filter(data: FullVariant) FullVariant[source]

Remove specified attributes from a variant.

class dae.parquet.schema2.processing_pipeline.DeleteAttributesFromVariantsBatchFilter(attributes_to_remove: Sequence[str])[source]

Bases: Filter

Filter to remove items from variant batches. Works in-place.

filter(data: Sequence[FullVariant]) Sequence[FullVariant][source]

Remove specified attributes from batches of variants.

class dae.parquet.schema2.processing_pipeline.Schema2SummaryVariantsBatchSource(loader: ParquetLoader, batch_size: int = 500)[source]

Bases: Source

Producer for summary variants from a Parquet dataset.

fetch(region: Region | None = None) Iterable[Sequence[FullVariant]][source]

Fetch full variants from a variant loader in batches.

class dae.parquet.schema2.processing_pipeline.Schema2SummaryVariantsSource(loader: ParquetLoader)[source]

Bases: Source

Producer for summary variants from a Parquet dataset.

fetch(region: Region | None = None) Iterable[FullVariant][source]
class dae.parquet.schema2.processing_pipeline.VariantsLoaderBatchSource(loader: VariantsGenotypesLoader, batch_size: int = 500)[source]

Bases: Source

A source that can fetch variants in batches from a loader.

fetch(region: Region | None = None) Iterable[Sequence[FullVariant]][source]

Fetch full variants from a variant loader in batches.

class dae.parquet.schema2.processing_pipeline.VariantsLoaderSource(loader: VariantsGenotypesLoader)[source]

Bases: Source

A source that can fetch variants from a loader.

fetch(region: Region | None = None) Iterable[FullVariant][source]

Fetch full variants from a variant loader.

dae.parquet.schema2.serializers module

class dae.parquet.schema2.serializers.AlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]

Bases: ABC

Base class for serializing alleles to parquet format.

abstract blob_column() str[source]

Return the name of the column that contains the variant blob.

abstract build_allele_record_dict(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) dict[str, Any][source]

Build a record from allele data in the form of a dict.

abstract schema() Schema[source]

Lazy construct and return the schema for the summary alleles.

class dae.parquet.schema2.serializers.FamilyAlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]

Bases: AlleleParquetSerializer

Serialize family alleles.

blob_column() str[source]

Return the name of the column that contains the variant blob.

build_allele_record_dict(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) dict[str, Any][source]

Build a batch of family allele data in the form of a dict.

classmethod build_schema() Schema[source]

Build the schema for the family alleles.

schema() Schema[source]

Lazy construct and return the schema for the family alleles.

class dae.parquet.schema2.serializers.JsonVariantsDataSerializer(metadata: dict[str, Any])[source]

Bases: VariantsDataSerializer

Serialize family and summary alleles to json.

deserialize_family_record(data: bytes) dict[str, Any][source]

Deserialize a family allele from a byte string.

deserialize_summary_record(data: bytes) list[dict[str, Any]][source]

Deserialize a summary allele from a byte string.

serialize_family(variant: FamilyVariant) bytes[source]

Serialize a family variant part to a byte string.

serialize_summary(variant: SummaryVariant) bytes[source]

Serialize a summary allele to a byte string.

class dae.parquet.schema2.serializers.SummaryAlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]

Bases: AlleleParquetSerializer

Serialize summary alleles for parquet storage.

blob_column() str[source]

Return the name of the column that contains the variant blob.

build_allele_record_dict(allele: SummaryAllele, variant_blob: bytes) dict[str, Any][source]

Build a record of summary allele data in the form of a dict.

classmethod build_blob_schema(annotation_schema: list[AttributeInfo]) dict[str, str][source]
classmethod build_schema(annotation_schema: list[AttributeInfo]) Schema[source]

Build the schema for the summary alleles.

schema() Schema[source]

Lazy construct and return the schema for the summary alleles.

class dae.parquet.schema2.serializers.VariantsDataAvroSerializer(summary_blob_schema: dict[str, str])[source]

Bases: VariantsDataSerializer

Interface for serializing family and summary alleles.

deserialize_family_record(data: bytes) dict[str, Any][source]

Deserialize a family allele from a byte string.

deserialize_summary_record(data: bytes) list[dict[str, Any]][source]

Deserialize a summary allele from a byte string.

serialize_family(variant: FamilyVariant) bytes[source]

Serialize a family variant part to a byte string.

serialize_summary(variant: SummaryVariant) bytes[source]

Serialize a summary allele to a byte string.

class dae.parquet.schema2.serializers.VariantsDataSerializer(metadata: dict[str, Any])[source]

Bases: ABC

Interface for serializing family and summary alleles.

static build_serializer(metadata: dict[str, Any], serializer_type: str = 'json') VariantsDataSerializer[source]

Build a serializer based on the metadata.

abstract deserialize_family_record(data: bytes) dict[str, Any][source]

Deserialize a family allele from a byte string.

abstract deserialize_summary_record(data: bytes) list[dict[str, Any]][source]

Deserialize a summary allele from a byte string.

abstract serialize_family(variant: FamilyVariant) bytes[source]

Serialize a family variant part to a byte string.

abstract serialize_summary(variant: SummaryVariant) bytes[source]

Serialize a summary allele to a byte string.

dae.parquet.schema2.serializers.build_family_schema() Schema[source]

Build the schema for the family alleles.

dae.parquet.schema2.serializers.build_summary_blob_schema(annotation_schema: list[AttributeInfo]) dict[str, str][source]
dae.parquet.schema2.serializers.build_summary_schema(annotation_schema: list[AttributeInfo]) Schema[source]

Build the schema for the summary alleles.

dae.parquet.schema2.serializers.construct_avro_family_schema() dict[str, Any][source]

Construct an Avro schema for family variants.

dae.parquet.schema2.serializers.construct_avro_summary_schema(schema: dict[str, str]) dict[str, Any][source]

Construct an Avro schema from the summary schema.

dae.parquet.schema2.variants_parquet_writer module

class dae.parquet.schema2.variants_parquet_writer.ContinuousParquetFileWriter(filepath: str, allele_serializer: AlleleParquetSerializer, row_group_size: int = 10000)[source]

Bases: object

A continous parquet writer.

Class that automatically writes to a given parquet file when supplied enough data. Automatically dumps leftover data when closing into the file

DEFAULT_COMPRESSION = 'SNAPPY'
append_allele(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) None[source]

Append the data for entire allele to the correct partition file.

close() None[source]

Close the parquet writer and write any remaining data.

size() int[source]
class dae.parquet.schema2.variants_parquet_writer.Schema2SummaryVariantBatchConsumer(writer: VariantsParquetWriter)[source]

Bases: Filter

Consumer for Parquet summary variants.

filter(data: Sequence[FullVariant]) None[source]
class dae.parquet.schema2.variants_parquet_writer.Schema2SummaryVariantConsumer(writer: VariantsParquetWriter)[source]

Bases: Filter

Consumer for Parquet summary variants.

filter(data: FullVariant) None[source]
class dae.parquet.schema2.variants_parquet_writer.Schema2VariantBatchConsumer(writer: VariantsParquetWriter)[source]

Bases: Filter

Consumer for Parquet summary variants.

filter(data: Sequence[FullVariant]) None[source]
class dae.parquet.schema2.variants_parquet_writer.Schema2VariantConsumer(writer: VariantsParquetWriter)[source]

Bases: Filter

Consumer for Parquet summary variants.

filter(data: FullVariant) None[source]
class dae.parquet.schema2.variants_parquet_writer.VariantsParquetWriter(out_dir: Path | str, annotation_schema: list[AttributeInfo], partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json')[source]

Bases: AbstractContextManager

Provide functions for storing variants into parquet dataset.

close() None[source]
write(data: FullVariant) None[source]

Consume a single full variant.

write_summary_variant(summary_variant: SummaryVariant, sj_base_index: int | None = None) None[source]

Write a single summary variant to the correct parquet file.

Module contents