Source code for datasets_api.views

import logging
from collections.abc import Iterable
from operator import itemgetter
from typing import Any, cast

from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import User
from django.utils.decorators import method_decorator
from django.views.decorators.http import etag
from gpf_instance.gpf_instance import (
    calc_and_set_cacheable_hash,
    get_cacheable_hash,
)
from groups_api.serializers import GroupSerializer
from query_base.query_base import QueryBaseView
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from studies.study_wrapper import StudyWrapperBase

from dae.studies.study import GenotypeData
from datasets_api.permissions import (
    IsDatasetAllowed,
    get_instance_timestamp_etag,
    get_permissions_etag,
    get_wdae_parents,
)

from .models import Dataset, DatasetHierarchy

logger = logging.getLogger(__name__)


[docs] def augment_accessibility( dataset: dict[str, Any], allowed_datasets: Iterable[str], ) -> dict[str, Any]: """Augment a dataset response JSON with access_rights section.""" # pylint: disable=no-member dataset["access_rights"] = dataset["id"] in allowed_datasets return dataset
[docs] def augment_with_groups( dataset: dict[str, Any], db_dataset: Dataset | None = None, ) -> dict[str, Any]: """Add groups to response object.""" # pylint: disable=no-member if db_dataset is None: db_dataset = Dataset.objects.get(dataset_id=dataset["id"]) serializer = GroupSerializer(db_dataset.groups.all(), many=True) dataset["groups"] = serializer.data return dataset
[docs] def augment_with_parents( instance_id: str, dataset: dict[str, Any], ) -> dict[str, Any]: """Augment a dataset response JSON with parents section.""" dataset["parents"] = [ ds.dataset_id for ds in get_wdae_parents( instance_id, dataset["id"], direct=True, ) ] return dataset
[docs] def get_description_etag( request: Request, **_kwargs: dict[str, Any], ) -> str | None: """Get description etag.""" dataset_id = request.parser_context["kwargs"]["dataset_id"] return get_cacheable_hash(f"{dataset_id}_description")
[docs] class DatasetView(QueryBaseView): """ General dataset view. Provides either a summary of ALL available dataset configs or a specific dataset configuration in full, depending on whether the request is made with a dataset_id param or not. """ def _collect_datasets_summary( self, user: User, ) -> list[dict[str, Any]]: genotype_data = self.gpf_instance.get_genotype_data_ids() datasets: list[StudyWrapperBase] = cast( list[StudyWrapperBase], filter(None, [ self.gpf_instance.get_wdae_wrapper(genotype_data_id) for genotype_data_id in genotype_data ]), ) res = [ StudyWrapperBase.build_genotype_data_all_datasets(dataset.config) for dataset in datasets ] db_datasets = { ds.dataset_id: ds for ds in Dataset.objects.prefetch_related("groups") } parents = DatasetHierarchy.get_direct_datasets_parents( self.instance_id, db_datasets.values(), ) allowed_datasets = self.get_permitted_datasets(user) res = [augment_accessibility(ds, allowed_datasets) for ds in res] res = [augment_with_groups(ds, db_datasets[ds["id"]]) for ds in res] for result in res: if result["id"] in parents: result["parents"] = parents[result["id"]] return res
[docs] @method_decorator(etag(get_permissions_etag)) def get( self, request: Request, dataset_id: str | None = None, ) -> Response: """Return response to a get request for a dataset or all datasets.""" user = request.user if dataset_id is None: return Response({"data": self._collect_datasets_summary(user)}) dataset = self.gpf_instance.get_wdae_wrapper(dataset_id) if not dataset: return Response({"error": f"Dataset {dataset_id} not found"}, status=status.HTTP_404_NOT_FOUND) person_set_collection_configs = { psc.id: psc.domain_json() for psc in dataset.person_set_collections.values() } res = StudyWrapperBase.build_genotype_data_description( self.gpf_instance, dataset.config, person_set_collection_configs, ) allowed_datasets = self.get_permitted_datasets(user) res = augment_accessibility(res, allowed_datasets) res = augment_with_groups(res) res = augment_with_parents(self.instance_id, res) return Response({"data": res})
[docs] class StudiesView(QueryBaseView): """View class for genotype data stuides and datasets.""" def _collect_datasets_summary( self, user: User, ) -> list[dict[str, Any]]: genotype_data_ids = self.gpf_instance.get_genotype_data_ids() datasets: list[StudyWrapperBase] = [] for genotype_data_id in genotype_data_ids: study = self.gpf_instance.get_wdae_wrapper(genotype_data_id) if study is None or study.is_group: continue datasets.append(study) res = [] for dataset in datasets: assert dataset is not None res.append( StudyWrapperBase.build_genotype_data_all_datasets( dataset.config)) allowed_datasets = self.get_permitted_datasets(user) res = [augment_accessibility(ds, allowed_datasets) for ds in res] res = [augment_with_groups(ds) for ds in res] return [augment_with_parents(self.instance_id, ds) for ds in res]
[docs] @method_decorator(etag(get_permissions_etag)) def get(self, request: Request) -> Response: user = request.user return Response({"data": self._collect_datasets_summary(user)})
[docs] class DatasetDetailsView(QueryBaseView): """Provide miscellaneous details for a given dataset."""
[docs] @method_decorator(etag(get_instance_timestamp_etag)) def get(self, _request: Request, dataset_id: str) -> Response: # pylint: disable=unused-argument """Return response for a specific dataset configuration details.""" genotype_data_config = \ self.gpf_instance.get_genotype_data_config(dataset_id) if genotype_data_config is None: return Response( {"error": f"Dataset {dataset_id} not found"}, status=status.HTTP_404_NOT_FOUND, ) has_denovo = genotype_data_config.get("has_denovo", False) dataset_details = { "hasDenovo": has_denovo, "genome": genotype_data_config.genome, "chrPrefix": genotype_data_config.chr_prefix, } return Response(dataset_details)
[docs] class DatasetPedigreeView(QueryBaseView): """Provide pedigree data for a given dataset."""
[docs] @method_decorator(etag(get_instance_timestamp_etag)) def get(self, _request: Request, dataset_id: str, column: str) -> Response: # pylint: disable=unused-argument """Return response for a pedigree get request for pedigree column.""" genotype_data = self.gpf_instance.get_genotype_data(dataset_id) if genotype_data is None: return Response( {"error": f"Dataset {dataset_id} not found"}, status=status.HTTP_404_NOT_FOUND, ) if column not in genotype_data.families.ped_df.columns: return Response( {"error": f"No such column {column}"}, status=status.HTTP_404_NOT_FOUND, ) values_domain = list( map(str, genotype_data.families.ped_df[column].unique()), ) return Response( {"column_name": column, "values_domain": values_domain}, )
[docs] class DatasetConfigView(DatasetView): """Provide a dataset's configuration. Used for remote instances."""
[docs] @method_decorator(etag(get_instance_timestamp_etag)) def get( self, _request: Request, dataset_id: str | None = None, ) -> Response: if dataset_id is None: return Response(status=status.HTTP_400_BAD_REQUEST) genotype_data = self.gpf_instance.get_genotype_data(dataset_id) if genotype_data is None: return Response( {"error": f"Dataset {dataset_id} not found"}, status=status.HTTP_404_NOT_FOUND, ) return Response(augment_with_parents( self.instance_id, genotype_data.config.to_dict(), ))
[docs] class DatasetDescriptionView(QueryBaseView): """Provide fetching and editing a dataset's description."""
[docs] @method_decorator(etag(get_description_etag)) def get( self, _request: Request, dataset_id: str | None, ) -> Response: # pylint: disable=unused-argument """Collect a dataset's description.""" if dataset_id is None: return Response( {"error": "No dataset id provided."}, status=status.HTTP_400_BAD_REQUEST, ) genotype_data = self.gpf_instance.get_genotype_data(dataset_id) if genotype_data is None: return Response( {"error": f"Dataset {dataset_id} not found"}, status=status.HTTP_404_NOT_FOUND, ) if get_cacheable_hash(dataset_id) is None: calc_and_set_cacheable_hash(f"{dataset_id}_description", genotype_data.description) return Response( {"description": genotype_data.description}, status=status.HTTP_200_OK, )
[docs] def post(self, request: Request, dataset_id: str) -> Response: """Overwrite a dataset's description.""" if not request.user.is_staff: return Response( {"error": "You have no permission to edit the description."}, status=status.HTTP_403_FORBIDDEN, ) description = request.data.get("description") genotype_data = self.gpf_instance.get_genotype_data(dataset_id) genotype_data.description = description calc_and_set_cacheable_hash(f"{dataset_id}_description", genotype_data.description) return Response(status=status.HTTP_200_OK)
[docs] class BaseDatasetPermissionsView(QueryBaseView): """Base dataset permission view.""" def _get_dataset_info(self, dataset: Dataset) -> dict[str, Any] | None: groups = dataset.groups.all() group_names = sorted([group.name for group in groups]) user_model = get_user_model() users_list = [] users_found = set() for group in groups: users = user_model.objects.filter( groups__name=group.name, ).all() for user in users: if user.email not in users_found: users_list += [ {"name": user.name, "email": user.email}, ] users_found.add(user.email) users_list = sorted(users_list, key=itemgetter("email")) dataset_gd = self.gpf_instance.get_genotype_data( dataset.dataset_id, ) if dataset_gd is None: logger.error( "Dataset %s missing in GPF instance!", dataset.dataset_id, ) return None name = dataset_gd.name if name is None: name = "" return { "dataset_id": dataset_gd.study_id, "dataset_name": name, "broken": dataset.broken, "users": users_list, "groups": group_names, }
[docs] class DatasetPermissionsView(BaseDatasetPermissionsView): """Dataset permissions view.""" page_size = settings.REST_FRAMEWORK["PAGE_SIZE"]
[docs] @method_decorator(etag(get_permissions_etag)) def get(self, request: Request) -> Response: """Return dataset permissions details.""" if not request.user.is_staff: return Response( {"error": "You have no permission to edit the description."}, status=status.HTTP_403_FORBIDDEN, ) dataset_search = request.GET.get("search") page = request.GET.get("page", 1) query = Dataset.objects if dataset_search is not None and dataset_search != "": query = query.filter( # type: ignore dataset_name__icontains=dataset_search) if page is None: return Response(status.HTTP_400_BAD_REQUEST) if isinstance(page, str): page = int(page) page_start = (page - 1) * self.page_size page_end = page * self.page_size datasets = query.all().order_by("dataset_id")[page_start:page_end] dataset_details = [] for dataset in datasets: info = self._get_dataset_info(dataset) if info is None: continue dataset_details.append(info) if len(dataset_details) == 0: return Response(status=status.HTTP_204_NO_CONTENT) return Response(dataset_details)
[docs] class DatasetPermissionsSingleView(BaseDatasetPermissionsView): """Single dataset permission view.""" page_size = settings.REST_FRAMEWORK["PAGE_SIZE"]
[docs] def get(self, _request: Request, dataset_id: str) -> Response: # pylint: disable=unused-argument """Return dataset permission details.""" try: dataset = Dataset.objects.get(dataset_id=dataset_id) except Dataset.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) dataset_details = self._get_dataset_info(dataset) if dataset_details is None: return Response({}, status=status.HTTP_404_NOT_FOUND) return Response(dataset_details)
[docs] class DatasetHierarchyView(QueryBaseView): """Provide the hierarchy of one dataset configured in the instance."""
[docs] def produce_tree( self, dataset: GenotypeData, selected: list[str], permitted_datasets: set[str], ) -> dict[str, Any] | None: """Recursively collect a dataset's id, children and access rights.""" has_rights = dataset.study_id in permitted_datasets dataset_obj = Dataset.objects.get(dataset_id=dataset.study_id) groups = dataset_obj.groups.all() if "hidden" in [group.name for group in groups] and not has_rights: return None children = None if dataset.is_group: children = [] for child in dataset.studies: if child.study_id in selected: tree = self.produce_tree( child, selected, permitted_datasets, ) if tree is not None: children.append(tree) return { "dataset": dataset.study_id, "name": dataset.name, "children": children, "access_rights": has_rights, }
[docs] @method_decorator(etag(get_permissions_etag)) def get(self, request: Request, dataset_id: str | None = None) -> Response: """Return the hierarchy of one dataset in the instance.""" user = request.user genotype_data_ids = self.gpf_instance.get_genotype_data_ids() permitted_datasets = set( IsDatasetAllowed.permitted_datasets(user, self.instance_id), ) if dataset_id: genotype_data = self.gpf_instance.get_genotype_data(dataset_id) tree = self.produce_tree( genotype_data, genotype_data_ids, permitted_datasets, ) return Response({"data": tree}, status=status.HTTP_200_OK) genotype_datas = filter(lambda gd: gd and not gd.parents, [ self.gpf_instance.get_wdae_wrapper(genotype_data_id) for genotype_data_id in genotype_data_ids ]) trees = [] for gd in genotype_datas: tree = self.produce_tree( gd, genotype_data_ids, permitted_datasets, ) if tree is not None: trees.append(tree) return Response({"data": trees}, status=status.HTTP_200_OK)
[docs] class VisibleDatasetsView(QueryBaseView): """Provide a list of which datasets to show in the frontend."""
[docs] @method_decorator(etag(get_instance_timestamp_etag)) def get(self, _request: Request) -> Response: """Return the list of visible datasets.""" # pylint: disable=unused-argument res = self.gpf_instance.get_visible_datasets() if not res: res = sorted(self.gpf_instance.get_genotype_data_ids()) return Response(res)