Source code for datasets_api.permissions

import hashlib
import logging
import textwrap
from collections.abc import Iterable
from typing import Any, cast

from django.conf import settings
from django.contrib.auth.models import Group, User
from django.db import connection
from django.http import HttpRequest
from django.utils.encoding import force_str
from gpf_instance.gpf_instance import (
    get_instance_timestamp,
    get_permission_timestamp,
    get_wgpf_instance,
)
from rest_framework import permissions
from rest_framework.request import Request
from utils.datasets import find_dataset_id_in_request

from .models import Dataset, DatasetHierarchy

logger = logging.getLogger(__name__)


[docs] def get_instance_timestamp_etag( _request: Request, **_kwargs: dict[str, Any], ) -> str: etag = f"{get_instance_timestamp()}" return hashlib.md5(etag.encode()).hexdigest() # noqa: S324
[docs] def get_permissions_etag( request: Request, **_kwargs: dict[str, Any], ) -> str: """Return E-Tag for queries dependant on user access permissions.""" etag = ( f"{get_instance_timestamp()}" f"{get_permission_timestamp()}" f"{request.user.id}" ) return hashlib.md5(etag.encode()).hexdigest() # noqa: S324
[docs] class IsDatasetAllowed(permissions.BasePermission): """Checks the permissions to a dataset."""
[docs] def has_permission(self, request: HttpRequest, view: Any) -> bool: dataset_id = find_dataset_id_in_request(request) if dataset_id is None: return True return self.has_object_permission(request, view, dataset_id)
[docs] def has_object_permission( self, request: HttpRequest, _view: Any, obj: str, ) -> bool: wgpf_instance = get_wgpf_instance() return user_has_permission( wgpf_instance.instance_id, cast(User, request.user), obj, )
[docs] @staticmethod def prepare_allowed_datasets_query() -> str: """ Return query for getting all datasets a user has access to. This handles cases in the hierarchy where there are partial rights. Query is divided and abstracted into multiple Common Table Expressions. The query has a single parameter for user ID and returns rows of dataset DB ID and WDAE ID pairs. user_to_group joins users with their respective groups. dataset_to_group joins datasets with their respective groups. from_root and to_root are recursive and walk through the dataset hierarchy from a given dataset ID in the respective direction. dataset_branch combines from_root and to_root to give all datasets present in a dataset hierarchy "branch". """ return textwrap.dedent(""" WITH RECURSIVE -- user_to_group AS ( SELECT u.id AS uid, u.email AS uname, g.id AS gid, g.name AS gname FROM users AS u LEFT OUTER JOIN users_groups AS ug ON u.id = ug.wdaeuser_id LEFT OUTER JOIN auth_group AS g ON ug.group_id = g.id ), -- dataset_to_group AS ( SELECT d.id AS did, d.dataset_id AS dname, g.id AS gid, g.name AS gname FROM datasets_api_dataset AS d LEFT OUTER JOIN datasets_api_dataset_groups AS dg ON d.id = dg.dataset_id LEFT OUTER JOIN auth_group AS g ON dg.group_id = g.id ), -- from_root ( dataset_id, descendant_id, descendant_wdae_id, instance_id, DEPTH ) AS ( SELECT t.id AS dataset_id, t.id AS descendant_id, t.dataset_id AS descendant_wdae_id, h.instance_id AS instance_id, 0 FROM datasets_api_dataset AS t LEFT OUTER JOIN datasets_api_datasethierarchy AS h ON h.ancestor_id = t.id and h.descendant_id = t.id UNION ALL SELECT t.dataset_id, h.descendant_id, hd.dataset_id, h.instance_id, t.DEPTH + 1 FROM from_root AS t LEFT OUTER JOIN datasets_api_datasethierarchy AS h ON h.ancestor_id = t.descendant_id LEFT OUTER JOIN datasets_api_dataset AS hd ON hd.id = h.descendant_id WHERE 1 = 1 AND h.ancestor_id <> h.descendant_id AND h.direct = TRUE AND h.id IS NOT NULL ), to_root ( dataset_id, ancestor_id, ancestor_wdae_id, instance_id, DEPTH ) AS ( SELECT t.id AS dataset_id, t.id AS ancestor_id, t.dataset_id AS ancestor_wdae_id, h.instance_id AS instance_id, 0 FROM datasets_api_dataset AS t LEFT OUTER JOIN datasets_api_datasethierarchy AS h ON h.ancestor_id = t.id and h.descendant_id = t.id UNION ALL SELECT t.dataset_id, h.ancestor_id, hd.dataset_id, h.instance_id, t.DEPTH - 1 FROM to_root AS t LEFT OUTER JOIN datasets_api_datasethierarchy AS h ON 1=1 AND h.descendant_id = t.ancestor_id LEFT OUTER JOIN datasets_api_dataset AS hd ON hd.id = h.ancestor_id WHERE 1 = 1 AND h.ancestor_id <> h.descendant_id AND h.direct = TRUE AND h.id IS NOT NULL ), dataset_branch( dataset_id, branch_dataset_id, branch_dataset_wdae_id, instance_id ) AS ( SELECT to_root.dataset_id, to_root.ancestor_id, to_root.ancestor_wdae_id, to_root.instance_id FROM to_root UNION ALL SELECT from_root.dataset_id, from_root.descendant_id, from_root.descendant_wdae_id, from_root.instance_id FROM from_root ) SELECT DISTINCT db.branch_dataset_id, db.branch_dataset_wdae_id FROM user_to_group AS ug LEFT OUTER JOIN dataset_to_group AS dg ON ug.gid = dg.gid OR dg.gname = 'any_user' LEFT OUTER JOIN dataset_branch AS db ON db.dataset_id = dg.did WHERE 1=1 AND dg.gid IS NOT NULL AND db.branch_dataset_id IS NOT NULL AND ug.uid = %s AND db.instance_id = %s ORDER BY db.branch_dataset_id; """)
[docs] @staticmethod def permitted_datasets(user: User, instance_id: str) -> Iterable[str]: """Return list of allowed datasets for a specific user.""" wgpf_instance = get_wgpf_instance() dataset_ids = set(wgpf_instance.get_genotype_data_ids()) if user.is_anonymous: db_datasets = { ds.dataset_id: ds for ds in Dataset.objects.prefetch_related("groups") } allowed_datasets: set[str] = set() for dataset_id, dataset in db_datasets.items(): for group in dataset.groups.all(): if group.name == "any_user": allowed_datasets.add(dataset_id) allowed_datasets.update( [ child.dataset_id for child in DatasetHierarchy.get_children( instance_id, dataset, ) ], ) break return allowed_datasets user_groups = get_user_groups(user) if ( settings.DISABLE_PERMISSIONS or user.is_superuser or user.is_staff or "admin" in user_groups ): return dataset_ids query = IsDatasetAllowed.prepare_allowed_datasets_query() with connection.cursor() as cursor: cursor.execute(query, [user.id, instance_id]) # type: ignore allowed_datasets_ids = {row[1] for row in cursor.fetchall()} return dataset_ids.intersection(allowed_datasets_ids)
[docs] def get_wdae_dataset( dataset: str, ) -> Dataset | None: """ Return wdae dataset object. Given a dataset ID or DAE genotype data object, returns WDAE dataset object. """ dataset_id = force_str(dataset) # pylint: disable=no-member if not Dataset.objects.filter(dataset_id=dataset_id).exists(): logger.warning("dataset %s does not exists...", dataset_id) return None return Dataset.objects.get(dataset_id=dataset_id)
[docs] def get_wdae_parents( instance_id: str, dataset_id: str, *, direct: bool = False, ) -> list[Dataset]: """ Return list of parent wdae dataset objects. Given a dataset ID or DAE genotype data object or WDAE dataset object, returns list of parents as WDAE dataset object. """ dataset = get_wdae_dataset(dataset_id) if dataset is None: return [] if direct: return DatasetHierarchy.get_parents( instance_id, dataset, direct=True) return DatasetHierarchy.get_parents(instance_id, dataset)
[docs] def get_wdae_children(instance_id: str, dataset_id: str) -> list[Dataset]: """ Return list of child wdae dataset objects. Given a dataset ID or DAE genotype data object or WDAE dataset object, returns list of direct childrens as WDAE dataset object (if 'leaves' parameter is 'False'). If 'leaves' parameter is 'True', returns list of leaves of the datasets tree. """ dataset = get_wdae_dataset(dataset_id) if dataset is None: return [] return DatasetHierarchy.get_children(instance_id, dataset)
[docs] def user_has_permission(instance_id: str, user: User, dataset_id: str) -> bool: """Check if a user has permission to browse the given dataset.""" if settings.DISABLE_PERMISSIONS: return True if not user.is_active: return False if user.is_superuser or user.is_staff: return True user_groups = get_user_groups(user) if "admin" in user_groups: return True logger.debug("checking user <%s> permissions on %s", user, dataset_id) dataset = get_wdae_dataset(dataset_id) if dataset is None: return True return dataset_id in IsDatasetAllowed.permitted_datasets(user, instance_id)
[docs] def get_allowed_genotype_studies( instance_id: str, user: User, dataset_id: str, ) -> set[str]: """Collect and return genotype study IDs the user has access to.""" skip_check = False if settings.DISABLE_PERMISSIONS or user.is_superuser or user.is_staff: skip_check = True user_groups = get_user_groups(user) if "admin" in user_groups: skip_check = True allowed_studies = set() dataset = get_wdae_dataset(dataset_id) if dataset is None: return set() if DatasetHierarchy.is_study(instance_id, dataset): if skip_check or dataset_id in IsDatasetAllowed.permitted_datasets( user, instance_id, ): allowed_studies.add(dataset.dataset_id) return allowed_studies for child in get_wdae_children(instance_id, dataset.dataset_id): if DatasetHierarchy.is_study(instance_id, child) and ( skip_check or child.dataset_id in IsDatasetAllowed.permitted_datasets( user, instance_id, ) ): allowed_studies.add(child.dataset_id) return set(allowed_studies)
[docs] def get_dataset_info(dataset_id: str) -> dict[str, Any] | None: """Return a dictionary describing a Dataset object.""" dataset = get_wdae_dataset(dataset_id) if dataset is None: logger.error("Could not find WDAE dataset for %s", dataset_id) return None return { "datasetName": dataset.dataset_name, "datasetId": dataset.dataset_id, "broken": dataset.broken, }
[docs] def get_directly_allowed_genotype_data(user: User) -> list[dict[str, Any]]: """Return list of genotype data the user has direct permissions to.""" gpf_instance = get_wgpf_instance() dataset_ids = gpf_instance.get_genotype_data_ids() user_groups = get_user_groups(user) datasets = { dataset.dataset_id: dataset for dataset in Dataset.objects.all() # pylint: disable=no-member } result = [] for dataset_id in dataset_ids: if dataset_id not in datasets: logger.warning( "Dataset %s found in DAE, but not in WDAE!", dataset_id, ) result.append({ "datasetName": dataset_id, "datasetId": dataset_id, "broken": True, }) dataset = datasets[dataset_id] if not user_groups & get_dataset_groups(dataset): continue dataset_info = get_dataset_info(dataset_id) if dataset_info is not None: result.append(dataset_info) return sorted( result, key=lambda ds: ds["datasetName"] if ds["datasetName"] is not None else ds["datasetId"], )
[docs] def add_group_perm_to_user(group_name: str, user: User) -> None: group, _created = Group.objects.get_or_create(name=group_name) user.groups.add(group) user.save()
[docs] def add_group_perm_to_dataset(group_name: str, dataset_id: str) -> None: # pylint: disable=no-member dataset, _created = Dataset.objects.get_or_create(dataset_id=dataset_id) group, _created = Group.objects.get_or_create(name=group_name) dataset.groups.add(group)
[docs] def get_user_groups(user: User) -> set[str]: if user.is_anonymous: return {"any_user"} return {g.name for g in user.groups.all()}
[docs] def get_dataset_groups(dataset: str | Dataset) -> set[str]: # pylint: disable=no-member if not isinstance(dataset, Dataset): dataset = Dataset.objects.get(dataset_id=force_str(dataset)) return {g.name for g in dataset.groups.all()}
[docs] def handle_partial_permissions( instance_id: str, user: User, dataset_id: str, request_data: dict, ) -> None: """Handle partial permission on a dataset. A user may have only partial access to a dataset based on which of its constituent studies he has rights to access. This method attaches these rights to the request as study filters in order to filter variants from studies the user cannot access. """ request_data["allowed_studies"] = \ get_allowed_genotype_studies(instance_id, user, dataset_id)