dae.parquet.schema2 package

Submodules

dae.parquet.schema2.annotation module

class dae.parquet.schema2.annotation.AnnotateSchema2ParquetTool(raw_args: list[str] | None = None)[source]

Bases: AnnotationTool

Annotation tool for the Parquet file format.

add_tasks_to_graph() None[source]

Add tasks to annotation tool task graph.

dry_run() None[source]

Print a summary of the annotation without running it.

get_argument_parser() ArgumentParser[source]

Construct and configure argument parser.

prepare_for_annotation() None[source]

Perform operations required for annotation.

print_meta() None[source]

Print the metadata of a Parquet study.

dae.parquet.schema2.annotation.cli(raw_args: list[str] | None = None) None[source]

Entry method for AnnotateSchema2ParquetTool.

dae.parquet.schema2.annotation_utils module

dae.parquet.schema2.annotation_utils.backup_schema2_study(directory: str) Schema2DatasetLayout[source]

Backup current meta and summary data for a parquet study.

Renames the meta Parquet file and summary variants directory by attaching a suffix with the current date, then returns a corrected layout using the newly-renamed paths. This clears the way for then new ‘meta’ and ‘summary’ that will be produced when reannotating a Parquet study in place.

dae.parquet.schema2.annotation_utils.merge_non_partitioned(output_dir: str) None[source]
dae.parquet.schema2.annotation_utils.merge_partitioned(summary_dir: str, partition_dir: str, partition_descriptor: PartitionDescriptor) None[source]

Helper method to merge Parquet files in partitioned studies.

dae.parquet.schema2.annotation_utils.process_parquet(input_layout: Schema2DatasetLayout, pipeline_config: list[dict[str, Any]] | RawFullConfig, grr_definition: dict | None, output_dir: str, bucket_idx: int, work_dir: str, batch_size: int, region: Region, allow_repeated_attributes: bool, full_reannotation: bool) None[source]

Process a Parquet dataset for annotation.

dae.parquet.schema2.annotation_utils.produce_regions(target_region: str | None, region_size: int, contig_lens: dict[str, int]) list[str][source]

Produce regions to annotate by.

dae.parquet.schema2.annotation_utils.produce_schema2_annotation_tasks(task_graph: TaskGraph, loader: ParquetLoader, output_dir: str, raw_pipeline: list[dict[str, Any]] | RawFullConfig, grr: GenomicResourceRepo, region_size: int, work_dir: str, batch_size: int, *, target_region: str | None = None, allow_repeated_attributes: bool = False, full_reannotation: bool = False) list[Task][source]

Produce TaskGraph tasks for Parquet file annotation.

dae.parquet.schema2.annotation_utils.produce_schema2_merging_tasks(task_graph: TaskGraph, annotation_tasks: list[Task], loader: ParquetLoader, output_layout: Schema2DatasetLayout) list[Task][source]

Produce TaskGraph tasks for Parquet file merging.

Mirror pedigree and family variants data using symlinks.

dae.parquet.schema2.annotation_utils.write_new_meta(loader: ParquetLoader, pipeline: AnnotationPipeline, output_layout: Schema2DatasetLayout) None[source]

Produce and write new metadata to the output Parquet dataset.

dae.parquet.schema2.loader module

class dae.parquet.schema2.loader.MultiReader(dirs: Iterable[str], columns: Iterable[str])[source]

Bases: object

Incrementally fetch variants from multiple files.

This class assumes variants are ordered by their bucket and summary index!

close() None[source]
property current_idx: tuple[int, int]
class dae.parquet.schema2.loader.ParquetLoader(layout: Schema2DatasetLayout)[source]

Bases: object

Variants loader implementation for the Parquet format.

FAMILY_COLUMNS: ClassVar[list[str]] = ['bucket_index', 'summary_index', 'family_id', 'family_variant_data']
SUMMARY_COLUMNS: ClassVar[list[str]] = ['bucket_index', 'summary_index', 'allele_index', 'summary_variant_data', 'chromosome', 'position', 'end_position']
fetch_family_variants(region: Region | None = None) Generator[FamilyVariant, None, None][source]

Iterate over family variants.

fetch_summary_variants(region: Region | None = None) Generator[SummaryVariant, None, None][source]

Iterate over summary variants.

fetch_variants(region: Region | None = None) Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None][source]

Iterate over summary and family variants.

get_family_pq_filepaths(summary_path: str) list[str][source]

Get all family parquet files for given summary parquet file.

get_summary_pq_filepaths(region: Region | None = None) Generator[list[str], None, None][source]

Produce paths to available Parquet files grouped by region.

Can filter by region if region bins are configured.

static load_from_dir(input_dir: str) ParquetLoader[source]
exception dae.parquet.schema2.loader.ParquetLoaderException[source]

Bases: Exception

class dae.parquet.schema2.loader.Reader(path: str, columns: Iterable[str])[source]

Bases: object

Helper class to incrementally fetch variants.

This class assumes variants are ordered by their bucket and summary index!

BATCH_SIZE = 1000
close() None[source]
property current_idx: tuple[int, int]

dae.parquet.schema2.merge_parquet module

dae.parquet.schema2.merge_parquet.merge_parquet_directory(parquets_dir: Path | str, output_parquet_filename: str, row_group_size: int = 50000, parquet_version: str | None = None) None[source]

Merge all parquet files from parquets_dir into a single parquet file.

dae.parquet.schema2.parquet_io module

class dae.parquet.schema2.parquet_io.ContinuousParquetFileWriter(filepath: str, allele_serializer: AlleleParquetSerializer, row_group_size: int = 10000)[source]

Bases: object

A continous parquet writer.

Class that automatically writes to a given parquet file when supplied enough data. Automatically dumps leftover data when closing into the file

BATCH_ROWS = 500
DEFAULT_COMPRESSION = 'SNAPPY'
append_allele(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) None[source]

Append the data for entire allele to the correct partition file.

build_batch() RecordBatch[source]
build_table() Table[source]
close() None[source]

Close the parquet writer and write any remaining data.

data_reset() None[source]
size() int[source]
dae.parquet.schema2.parquet_io.VariantsParquetWriterDeprecated(out_dir: str, annotation_pipeline: AnnotationPipeline, partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json') None[source]

Provide functions for storing variants into parquet dataset.

Deprecated since version Use: dae.parquet.schema2.variants_parquet_writer.VariantsParquetWriter instead

dae.parquet.schema2.processing_pipeline module

class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsBatchFilter(annotation_pipeline: AnnotationPipeline)[source]

Bases: VariantsBatchFilter, AnnotationPipelineVariantsFilterMixin

Annotation pipeline batched variants filter.

filter_batch(batch: Sequence[FullVariant]) Sequence[FullVariant][source]

Filter variants in batches.

class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsFilter(annotation_pipeline: AnnotationPipeline)[source]

Bases: VariantsFilter, AnnotationPipelineVariantsFilterMixin

Annotation pipeline batched variants filter.

filter_one(full_variant: FullVariant) FullVariant[source]

Filter a single variant.

class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsFilterMixin(annotation_pipeline: AnnotationPipeline)[source]

Bases: object

Mixin for annotation pipeline filters.

class dae.parquet.schema2.processing_pipeline.DeleteAttributesFromVariantFilter(attributes_to_remove: Sequence[str])[source]

Bases: VariantsFilter

Filter to remove items from AWC contexts. Works in-place.

filter_one(full_variant: FullVariant) FullVariant[source]

Remove specified attributes from the context of an AWC.

class dae.parquet.schema2.processing_pipeline.DeleteAttributesFromVariantsBatchFilter(attributes_to_remove: Sequence[str])[source]

Bases: VariantsBatchFilter

Filter to remove items from AWC contexts. Works in-place.

filter_batch(batch: Sequence[FullVariant]) Sequence[FullVariant][source]

Remove specified attributes from batches of variants.

class dae.parquet.schema2.processing_pipeline.Schema2SummaryVariantsBatchSource(loader: ParquetLoader, batch_size: int = 500)[source]

Bases: VariantsBatchSource

Producer for summary variants from a Parquet dataset.

fetch_batches(region: Region | None = None) Iterable[Sequence[FullVariant]][source]

Fetch full variants from a variant loader in batches.

class dae.parquet.schema2.processing_pipeline.Schema2SummaryVariantsSource(loader: ParquetLoader)[source]

Bases: VariantsSource

Producer for summary variants from a Parquet dataset.

fetch(region: Region | None = None) Iterable[FullVariant][source]

Fetch variants.

class dae.parquet.schema2.processing_pipeline.VariantsBatchConsumer[source]

Bases: AbstractContextManager

A sink that can write variants in batches.

abstract consume_batch(batch: Sequence[FullVariant]) None[source]

Consume a single batch of variants.

consume_batches(batches: Iterable[Sequence[FullVariant]]) None[source]

Consume variants in batches.

class dae.parquet.schema2.processing_pipeline.VariantsBatchFilter[source]

Bases: AbstractContextManager

A filter that can filter variants in batches.

abstract filter_batch(batch: Sequence[FullVariant]) Sequence[FullVariant][source]

Filter variants in a single batch.

filter_batches(batches: Iterable[Sequence[FullVariant]]) Iterable[Sequence[FullVariant]][source]

Filter variants in batches.

class dae.parquet.schema2.processing_pipeline.VariantsBatchPipelineProcessor(source: VariantsBatchSource, filters: Sequence[VariantsBatchFilter], consumer: VariantsBatchConsumer)[source]

Bases: object

A processor that can be used to process variants in a pipeline.

process(regions: Iterable[Region] | None = None) None[source]

Process variants in batches for the given regions.

process_region(region: Region | None = None) None[source]
class dae.parquet.schema2.processing_pipeline.VariantsBatchSource[source]

Bases: AbstractContextManager

A source that can fetch variants in batches.

abstract fetch_batches(region: Region | None = None) Iterable[Sequence[FullVariant]][source]

Fetch variants in batches.

class dae.parquet.schema2.processing_pipeline.VariantsConsumer[source]

Bases: AbstractContextManager

A terminator for variant processing pipelines.

consume(variants: Iterable[FullVariant]) None[source]

Consume variants.

abstract consume_one(full_variant: FullVariant) None[source]

Consume a single variant.

class dae.parquet.schema2.processing_pipeline.VariantsFilter[source]

Bases: AbstractContextManager

A filter that can be used to filter variants.

filter(variants: Iterable[FullVariant]) Iterable[FullVariant][source]

Filter variants.

abstract filter_one(full_variant: FullVariant) FullVariant[source]

Filter a single variant.

class dae.parquet.schema2.processing_pipeline.VariantsLoaderBatchSource(loader: VariantsGenotypesLoader, batch_size: int = 500)[source]

Bases: VariantsBatchSource

A source that can fetch variants in batches from a loader.

fetch_batches(region: Region | None = None) Iterable[Sequence[FullVariant]][source]

Fetch full variants from a variant loader in batches.

class dae.parquet.schema2.processing_pipeline.VariantsLoaderSource(loader: VariantsGenotypesLoader)[source]

Bases: VariantsSource

A source that can fetch variants from a loader.

fetch(region: Region | None = None) Iterable[FullVariant][source]

Fetch full variants from a variant loader.

class dae.parquet.schema2.processing_pipeline.VariantsPipelineProcessor(source: VariantsSource, filters: Sequence[VariantsFilter], consumer: VariantsConsumer)[source]

Bases: AbstractContextManager

A processor that can be used to process variants in a pipeline.

process(regions: Iterable[Region] | None = None) None[source]

Process variants in batches for the given regions.

process_region(region: Region | None = None) None[source]
class dae.parquet.schema2.processing_pipeline.VariantsSource[source]

Bases: AbstractContextManager

abstract fetch(region: Region | None = None) Iterable[FullVariant][source]

Fetch variants.

dae.parquet.schema2.serializers module

class dae.parquet.schema2.serializers.AlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]

Bases: ABC

Base class for serializing alleles to parquet format.

abstract blob_column() str[source]

Return the name of the column that contains the variant blob.

abstract build_allele_record_dict(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) dict[str, Any][source]

Build a record from allele data in the form of a dict.

abstract schema() Schema[source]

Lazy construct and return the schema for the summary alleles.

class dae.parquet.schema2.serializers.FamilyAlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]

Bases: AlleleParquetSerializer

Serialize family alleles.

blob_column() str[source]

Return the name of the column that contains the variant blob.

build_allele_record_dict(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) dict[str, Any][source]

Build a batch of family allele data in the form of a dict.

classmethod build_schema() Schema[source]

Build the schema for the family alleles.

schema() Schema[source]

Lazy construct and return the schema for the family alleles.

class dae.parquet.schema2.serializers.JsonVariantsDataSerializer(metadata: dict[str, Any])[source]

Bases: VariantsDataSerializer

Serialize family and summary alleles to json.

deserialize_family_record(data: bytes) dict[str, Any][source]

Deserialize a family allele from a byte string.

deserialize_summary_record(data: bytes) list[dict[str, Any]][source]

Deserialize a summary allele from a byte string.

serialize_family(variant: FamilyVariant) bytes[source]

Serialize a family variant part to a byte string.

serialize_summary(variant: SummaryVariant) bytes[source]

Serialize a summary allele to a byte string.

class dae.parquet.schema2.serializers.SummaryAlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]

Bases: AlleleParquetSerializer

Serialize summary alleles for parquet storage.

blob_column() str[source]

Return the name of the column that contains the variant blob.

build_allele_record_dict(allele: SummaryAllele, variant_blob: bytes) dict[str, Any][source]

Build a record of summary allele data in the form of a dict.

classmethod build_blob_schema(annotation_schema: list[AttributeInfo]) dict[str, str][source]
classmethod build_schema(annotation_schema: list[AttributeInfo]) Schema[source]

Build the schema for the summary alleles.

schema() Schema[source]

Lazy construct and return the schema for the summary alleles.

class dae.parquet.schema2.serializers.VariantsDataAvroSerializer(summary_blob_schema: dict[str, str])[source]

Bases: VariantsDataSerializer

Interface for serializing family and summary alleles.

deserialize_family_record(data: bytes) dict[str, Any][source]

Deserialize a family allele from a byte string.

deserialize_summary_record(data: bytes) list[dict[str, Any]][source]

Deserialize a summary allele from a byte string.

serialize_family(variant: FamilyVariant) bytes[source]

Serialize a family variant part to a byte string.

serialize_summary(variant: SummaryVariant) bytes[source]

Serialize a summary allele to a byte string.

class dae.parquet.schema2.serializers.VariantsDataSerializer(metadata: dict[str, Any])[source]

Bases: ABC

Interface for serializing family and summary alleles.

static build_serializer(metadata: dict[str, Any], serializer_type: str = 'json') VariantsDataSerializer[source]

Build a serializer based on the metadata.

abstract deserialize_family_record(data: bytes) dict[str, Any][source]

Deserialize a family allele from a byte string.

abstract deserialize_summary_record(data: bytes) list[dict[str, Any]][source]

Deserialize a summary allele from a byte string.

abstract serialize_family(variant: FamilyVariant) bytes[source]

Serialize a family variant part to a byte string.

abstract serialize_summary(variant: SummaryVariant) bytes[source]

Serialize a summary allele to a byte string.

dae.parquet.schema2.serializers.build_family_schema() Schema[source]

Build the schema for the family alleles.

dae.parquet.schema2.serializers.build_summary_blob_schema(annotation_schema: list[AttributeInfo]) dict[str, str][source]
dae.parquet.schema2.serializers.build_summary_schema(annotation_schema: list[AttributeInfo]) Schema[source]

Build the schema for the summary alleles.

dae.parquet.schema2.serializers.construct_avro_family_schema() dict[str, Any][source]

Construct an Avro schema for family variants.

dae.parquet.schema2.serializers.construct_avro_summary_schema(schema: dict[str, str]) dict[str, Any][source]

Construct an Avro schema from the summary schema.

dae.parquet.schema2.variants_parquet_writer module

class dae.parquet.schema2.variants_parquet_writer.ContinuousParquetFileWriter(filepath: str, allele_serializer: AlleleParquetSerializer, row_group_size: int = 10000)[source]

Bases: object

A continous parquet writer.

Class that automatically writes to a given parquet file when supplied enough data. Automatically dumps leftover data when closing into the file

DEFAULT_COMPRESSION = 'SNAPPY'
append_allele(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) None[source]

Append the data for entire allele to the correct partition file.

close() None[source]

Close the parquet writer and write any remaining data.

size() int[source]
class dae.parquet.schema2.variants_parquet_writer.Schema2SummaryVariantConsumer(out_dir: Path | str, annotation_schema: list[AttributeInfo], partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json')[source]

Bases: VariantsParquetWriter

Consumer for Parquet summary variants.

consume_one(full_variant: FullVariant) None[source]

Consume a single full variant.

class dae.parquet.schema2.variants_parquet_writer.VariantsParquetWriter(out_dir: Path | str, annotation_schema: list[AttributeInfo], partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json')[source]

Bases: VariantsConsumer, VariantsBatchConsumer, AbstractContextManager

Provide functions for storing variants into parquet dataset.

close() None[source]
consume_batch(batch: Sequence[FullVariant]) None[source]

Consume a batch of full variants.

consume_one(full_variant: FullVariant) None[source]

Consume a single full variant.

write_summary_variant(summary_variant: SummaryVariant, sj_base_index: int | None = None) None[source]

Write a single summary variant to the correct parquet file.

Module contents