dae.parquet.schema2 package
Submodules
dae.parquet.schema2.annotation module
- class dae.parquet.schema2.annotation.AnnotateSchema2ParquetTool(raw_args: list[str] | None = None)[source]
Bases:
AnnotationTool
Annotation tool for the Parquet file format.
dae.parquet.schema2.annotation_utils module
- dae.parquet.schema2.annotation_utils.backup_schema2_study(directory: str) Schema2DatasetLayout [source]
Backup current meta and summary data for a parquet study.
Renames the meta Parquet file and summary variants directory by attaching a suffix with the current date, then returns a corrected layout using the newly-renamed paths. This clears the way for then new ‘meta’ and ‘summary’ that will be produced when reannotating a Parquet study in place.
- dae.parquet.schema2.annotation_utils.merge_partitioned(summary_dir: str, partition_dir: str, partition_descriptor: PartitionDescriptor) None [source]
Helper method to merge Parquet files in partitioned studies.
- dae.parquet.schema2.annotation_utils.process_parquet(input_layout: Schema2DatasetLayout, pipeline_config: list[dict[str, Any]] | RawFullConfig, grr_definition: dict | None, output_dir: str, bucket_idx: int, work_dir: str, batch_size: int, region: Region, allow_repeated_attributes: bool, full_reannotation: bool) None [source]
Process a Parquet dataset for annotation.
- dae.parquet.schema2.annotation_utils.produce_regions(target_region: str | None, region_size: int, contig_lens: dict[str, int]) list[str] [source]
Produce regions to annotate by.
- dae.parquet.schema2.annotation_utils.produce_schema2_annotation_tasks(task_graph: TaskGraph, loader: ParquetLoader, output_dir: str, raw_pipeline: list[dict[str, Any]] | RawFullConfig, grr: GenomicResourceRepo, region_size: int, work_dir: str, batch_size: int, *, target_region: str | None = None, allow_repeated_attributes: bool = False, full_reannotation: bool = False) list[Task] [source]
Produce TaskGraph tasks for Parquet file annotation.
- dae.parquet.schema2.annotation_utils.produce_schema2_merging_tasks(task_graph: TaskGraph, annotation_tasks: list[Task], loader: ParquetLoader, output_layout: Schema2DatasetLayout) list[Task] [source]
Produce TaskGraph tasks for Parquet file merging.
- dae.parquet.schema2.annotation_utils.symlink_pedigree_and_family_variants(src_layout: Schema2DatasetLayout, dest_layout: Schema2DatasetLayout) None [source]
Mirror pedigree and family variants data using symlinks.
- dae.parquet.schema2.annotation_utils.write_new_meta(loader: ParquetLoader, pipeline: AnnotationPipeline, output_layout: Schema2DatasetLayout) None [source]
Produce and write new metadata to the output Parquet dataset.
dae.parquet.schema2.loader module
- class dae.parquet.schema2.loader.MultiReader(dirs: Iterable[str], columns: Iterable[str])[source]
Bases:
object
Incrementally fetch variants from multiple files.
This class assumes variants are ordered by their bucket and summary index!
- property current_idx: tuple[int, int]
- class dae.parquet.schema2.loader.ParquetLoader(layout: Schema2DatasetLayout)[source]
Bases:
object
Variants loader implementation for the Parquet format.
- FAMILY_COLUMNS: ClassVar[list[str]] = ['bucket_index', 'summary_index', 'family_id', 'family_variant_data']
- SUMMARY_COLUMNS: ClassVar[list[str]] = ['bucket_index', 'summary_index', 'allele_index', 'summary_variant_data', 'chromosome', 'position', 'end_position']
- fetch_family_variants(region: Region | None = None) Generator[FamilyVariant, None, None] [source]
Iterate over family variants.
- fetch_summary_variants(region: Region | None = None) Generator[SummaryVariant, None, None] [source]
Iterate over summary variants.
- fetch_variants(region: Region | None = None) Generator[tuple[SummaryVariant, list[FamilyVariant]], None, None] [source]
Iterate over summary and family variants.
- get_family_pq_filepaths(summary_path: str) list[str] [source]
Get all family parquet files for given summary parquet file.
- get_summary_pq_filepaths(region: Region | None = None) Generator[list[str], None, None] [source]
Produce paths to available Parquet files grouped by region.
Can filter by region if region bins are configured.
- static load_from_dir(input_dir: str) ParquetLoader [source]
dae.parquet.schema2.merge_parquet module
dae.parquet.schema2.parquet_io module
- class dae.parquet.schema2.parquet_io.ContinuousParquetFileWriter(filepath: str, allele_serializer: AlleleParquetSerializer, row_group_size: int = 10000)[source]
Bases:
object
A continous parquet writer.
Class that automatically writes to a given parquet file when supplied enough data. Automatically dumps leftover data when closing into the file
- BATCH_ROWS = 500
- DEFAULT_COMPRESSION = 'SNAPPY'
- append_allele(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) None [source]
Append the data for entire allele to the correct partition file.
- dae.parquet.schema2.parquet_io.VariantsParquetWriterDeprecated(out_dir: str, annotation_pipeline: AnnotationPipeline, partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json') None [source]
Provide functions for storing variants into parquet dataset.
Deprecated since version Use: dae.parquet.schema2.variants_parquet_writer.VariantsParquetWriter instead
dae.parquet.schema2.processing_pipeline module
- class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsBatchFilter(annotation_pipeline: AnnotationPipeline)[source]
Bases:
VariantsBatchFilter
,AnnotationPipelineVariantsFilterMixin
Annotation pipeline batched variants filter.
- filter_batch(batch: Sequence[FullVariant]) Sequence[FullVariant] [source]
Filter variants in batches.
- class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsFilter(annotation_pipeline: AnnotationPipeline)[source]
Bases:
VariantsFilter
,AnnotationPipelineVariantsFilterMixin
Annotation pipeline batched variants filter.
- filter_one(full_variant: FullVariant) FullVariant [source]
Filter a single variant.
- class dae.parquet.schema2.processing_pipeline.AnnotationPipelineVariantsFilterMixin(annotation_pipeline: AnnotationPipeline)[source]
Bases:
object
Mixin for annotation pipeline filters.
- class dae.parquet.schema2.processing_pipeline.DeleteAttributesFromVariantFilter(attributes_to_remove: Sequence[str])[source]
Bases:
VariantsFilter
Filter to remove items from AWC contexts. Works in-place.
- filter_one(full_variant: FullVariant) FullVariant [source]
Remove specified attributes from the context of an AWC.
- class dae.parquet.schema2.processing_pipeline.DeleteAttributesFromVariantsBatchFilter(attributes_to_remove: Sequence[str])[source]
Bases:
VariantsBatchFilter
Filter to remove items from AWC contexts. Works in-place.
- filter_batch(batch: Sequence[FullVariant]) Sequence[FullVariant] [source]
Remove specified attributes from batches of variants.
- class dae.parquet.schema2.processing_pipeline.Schema2SummaryVariantsBatchSource(loader: ParquetLoader, batch_size: int = 500)[source]
Bases:
VariantsBatchSource
Producer for summary variants from a Parquet dataset.
- fetch_batches(region: Region | None = None) Iterable[Sequence[FullVariant]] [source]
Fetch full variants from a variant loader in batches.
- class dae.parquet.schema2.processing_pipeline.Schema2SummaryVariantsSource(loader: ParquetLoader)[source]
Bases:
VariantsSource
Producer for summary variants from a Parquet dataset.
- fetch(region: Region | None = None) Iterable[FullVariant] [source]
Fetch variants.
- class dae.parquet.schema2.processing_pipeline.VariantsBatchConsumer[source]
Bases:
AbstractContextManager
A sink that can write variants in batches.
- abstract consume_batch(batch: Sequence[FullVariant]) None [source]
Consume a single batch of variants.
- consume_batches(batches: Iterable[Sequence[FullVariant]]) None [source]
Consume variants in batches.
- class dae.parquet.schema2.processing_pipeline.VariantsBatchFilter[source]
Bases:
AbstractContextManager
A filter that can filter variants in batches.
- abstract filter_batch(batch: Sequence[FullVariant]) Sequence[FullVariant] [source]
Filter variants in a single batch.
- filter_batches(batches: Iterable[Sequence[FullVariant]]) Iterable[Sequence[FullVariant]] [source]
Filter variants in batches.
- class dae.parquet.schema2.processing_pipeline.VariantsBatchPipelineProcessor(source: VariantsBatchSource, filters: Sequence[VariantsBatchFilter], consumer: VariantsBatchConsumer)[source]
Bases:
object
A processor that can be used to process variants in a pipeline.
- class dae.parquet.schema2.processing_pipeline.VariantsBatchSource[source]
Bases:
AbstractContextManager
A source that can fetch variants in batches.
- abstract fetch_batches(region: Region | None = None) Iterable[Sequence[FullVariant]] [source]
Fetch variants in batches.
- class dae.parquet.schema2.processing_pipeline.VariantsConsumer[source]
Bases:
AbstractContextManager
A terminator for variant processing pipelines.
- consume(variants: Iterable[FullVariant]) None [source]
Consume variants.
- abstract consume_one(full_variant: FullVariant) None [source]
Consume a single variant.
- class dae.parquet.schema2.processing_pipeline.VariantsFilter[source]
Bases:
AbstractContextManager
A filter that can be used to filter variants.
- filter(variants: Iterable[FullVariant]) Iterable[FullVariant] [source]
Filter variants.
- abstract filter_one(full_variant: FullVariant) FullVariant [source]
Filter a single variant.
- class dae.parquet.schema2.processing_pipeline.VariantsLoaderBatchSource(loader: VariantsGenotypesLoader, batch_size: int = 500)[source]
Bases:
VariantsBatchSource
A source that can fetch variants in batches from a loader.
- fetch_batches(region: Region | None = None) Iterable[Sequence[FullVariant]] [source]
Fetch full variants from a variant loader in batches.
- class dae.parquet.schema2.processing_pipeline.VariantsLoaderSource(loader: VariantsGenotypesLoader)[source]
Bases:
VariantsSource
A source that can fetch variants from a loader.
- fetch(region: Region | None = None) Iterable[FullVariant] [source]
Fetch full variants from a variant loader.
- class dae.parquet.schema2.processing_pipeline.VariantsPipelineProcessor(source: VariantsSource, filters: Sequence[VariantsFilter], consumer: VariantsConsumer)[source]
Bases:
AbstractContextManager
A processor that can be used to process variants in a pipeline.
dae.parquet.schema2.serializers module
- class dae.parquet.schema2.serializers.AlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]
Bases:
ABC
Base class for serializing alleles to parquet format.
- abstract build_allele_record_dict(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) dict[str, Any] [source]
Build a record from allele data in the form of a dict.
- class dae.parquet.schema2.serializers.FamilyAlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]
Bases:
AlleleParquetSerializer
Serialize family alleles.
- build_allele_record_dict(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) dict[str, Any] [source]
Build a batch of family allele data in the form of a dict.
- class dae.parquet.schema2.serializers.JsonVariantsDataSerializer(metadata: dict[str, Any])[source]
Bases:
VariantsDataSerializer
Serialize family and summary alleles to json.
- deserialize_family_record(data: bytes) dict[str, Any] [source]
Deserialize a family allele from a byte string.
- deserialize_summary_record(data: bytes) list[dict[str, Any]] [source]
Deserialize a summary allele from a byte string.
- serialize_family(variant: FamilyVariant) bytes [source]
Serialize a family variant part to a byte string.
- serialize_summary(variant: SummaryVariant) bytes [source]
Serialize a summary allele to a byte string.
- class dae.parquet.schema2.serializers.SummaryAlleleParquetSerializer(annotation_schema: list[AttributeInfo], extra_attributes: list[str] | None = None)[source]
Bases:
AlleleParquetSerializer
Serialize summary alleles for parquet storage.
- build_allele_record_dict(allele: SummaryAllele, variant_blob: bytes) dict[str, Any] [source]
Build a record of summary allele data in the form of a dict.
- classmethod build_blob_schema(annotation_schema: list[AttributeInfo]) dict[str, str] [source]
- classmethod build_schema(annotation_schema: list[AttributeInfo]) Schema [source]
Build the schema for the summary alleles.
- class dae.parquet.schema2.serializers.VariantsDataAvroSerializer(summary_blob_schema: dict[str, str])[source]
Bases:
VariantsDataSerializer
Interface for serializing family and summary alleles.
- deserialize_family_record(data: bytes) dict[str, Any] [source]
Deserialize a family allele from a byte string.
- deserialize_summary_record(data: bytes) list[dict[str, Any]] [source]
Deserialize a summary allele from a byte string.
- serialize_family(variant: FamilyVariant) bytes [source]
Serialize a family variant part to a byte string.
- serialize_summary(variant: SummaryVariant) bytes [source]
Serialize a summary allele to a byte string.
- class dae.parquet.schema2.serializers.VariantsDataSerializer(metadata: dict[str, Any])[source]
Bases:
ABC
Interface for serializing family and summary alleles.
- static build_serializer(metadata: dict[str, Any], serializer_type: str = 'json') VariantsDataSerializer [source]
Build a serializer based on the metadata.
- abstract deserialize_family_record(data: bytes) dict[str, Any] [source]
Deserialize a family allele from a byte string.
- abstract deserialize_summary_record(data: bytes) list[dict[str, Any]] [source]
Deserialize a summary allele from a byte string.
- abstract serialize_family(variant: FamilyVariant) bytes [source]
Serialize a family variant part to a byte string.
- abstract serialize_summary(variant: SummaryVariant) bytes [source]
Serialize a summary allele to a byte string.
- dae.parquet.schema2.serializers.build_family_schema() Schema [source]
Build the schema for the family alleles.
- dae.parquet.schema2.serializers.build_summary_blob_schema(annotation_schema: list[AttributeInfo]) dict[str, str] [source]
- dae.parquet.schema2.serializers.build_summary_schema(annotation_schema: list[AttributeInfo]) Schema [source]
Build the schema for the summary alleles.
dae.parquet.schema2.variants_parquet_writer module
- class dae.parquet.schema2.variants_parquet_writer.ContinuousParquetFileWriter(filepath: str, allele_serializer: AlleleParquetSerializer, row_group_size: int = 10000)[source]
Bases:
object
A continous parquet writer.
Class that automatically writes to a given parquet file when supplied enough data. Automatically dumps leftover data when closing into the file
- DEFAULT_COMPRESSION = 'SNAPPY'
- append_allele(allele: SummaryAllele | FamilyAllele, variant_blob: bytes) None [source]
Append the data for entire allele to the correct partition file.
- class dae.parquet.schema2.variants_parquet_writer.Schema2SummaryVariantConsumer(out_dir: Path | str, annotation_schema: list[AttributeInfo], partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json')[source]
Bases:
VariantsParquetWriter
Consumer for Parquet summary variants.
- consume_one(full_variant: FullVariant) None [source]
Consume a single full variant.
- class dae.parquet.schema2.variants_parquet_writer.VariantsParquetWriter(out_dir: Path | str, annotation_schema: list[AttributeInfo], partition_descriptor: PartitionDescriptor, *, blob_serializer: VariantsDataSerializer | None = None, bucket_index: int = 1, row_group_size: int = 10000, include_reference: bool = False, variants_blob_serializer: str = 'json')[source]
Bases:
VariantsConsumer
,VariantsBatchConsumer
,AbstractContextManager
Provide functions for storing variants into parquet dataset.
- consume_batch(batch: Sequence[FullVariant]) None [source]
Consume a batch of full variants.
- consume_one(full_variant: FullVariant) None [source]
Consume a single full variant.
- write_summary_variant(summary_variant: SummaryVariant, sj_base_index: int | None = None) None [source]
Write a single summary variant to the correct parquet file.