Source code for pheno_tool_api.views

import logging
import math
from collections.abc import Generator
from io import StringIO
from typing import Any, cast

from datasets_api.permissions import (
    get_permissions_etag,
    user_has_permission,
)
from django.http.response import StreamingHttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.http import etag
from query_base.query_base import DatasetAccessRightsView, QueryBaseView
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from utils.expand_gene_set import expand_gene_set
from utils.query_params import parse_query_params

from dae.effect_annotation.effect import EffectTypesMixin
from dae.pheno.common import MeasureType
from dae.pheno.pheno_data import Measure
from dae.pheno_tool.pheno_tool_adapter import PhenoToolAdapter
from dae.pheno_tool.tool import PhenoResult, PhenoTool

logger = logging.getLogger(__name__)


[docs] class PhenoToolView(QueryBaseView): """View for returning pheno tool results."""
[docs] @staticmethod def get_result_by_sex( result: dict[str, PhenoResult], sex: str, ) -> dict[str, Any]: return { "negative": { "count": result[sex].negative_count, "deviation": result[sex].negative_deviation, "mean": result[sex].negative_mean, }, "positive": { "count": result[sex].positive_count, "deviation": result[sex].positive_deviation, "mean": result[sex].positive_mean, }, "pValue": result[sex].pvalue, }
@staticmethod def _build_report_description( measure_id: str, normalize_by: list[str | Any], ) -> str: if not normalize_by: return measure_id normalize_desc = " + ".join(normalize_by) return f"{measure_id} ~ {normalize_desc}"
[docs] def post(self, request: Request) -> Response: """Return pheno tool results based on POST request.""" data = expand_gene_set(request.data) study_wrapper = self.gpf_instance.get_wdae_wrapper(data["datasetId"]) if study_wrapper is None: return Response(status=status.HTTP_404_NOT_FOUND) adapter = self.gpf_instance.get_pheno_tool_adapter( study_wrapper.genotype_data, ) data["phenoFilterFamilyIds"] = None if data.get("familyFilters") is not None: data["phenoFilterFamilyIds"] = list( study_wrapper.query_transformer # noqa: SLF001 ._transform_filters_to_ids( data["familyFilters"], ), ) if not adapter: return Response(status=status.HTTP_404_NOT_FOUND) effect_groups = list(data["effectTypes"]) data = study_wrapper.transform_request(data) try: result = adapter.calc_variants(data, effect_groups) except KeyError: return Response(status=status.HTTP_404_NOT_FOUND) return Response(result)
[docs] class PhenoToolDownload(PhenoToolView, DatasetAccessRightsView): """Pheno tool download view."""
[docs] def generate_columns( self, adapter: PhenoToolAdapter, effect_groups: list[Any], data: dict, ) -> Generator[str, None, None]: """Pheno tool download generator function.""" # Return a response instantly and make download more responsive yield "" measure_id = data["measureId"] family_ids = data.get("phenoFilterFamilyIds") person_ids = adapter.helper.genotype_data_persons( data.get("family_ids", []), ) normalize_by = data.get("normalizeBy") tool = adapter.pheno_tool result_df = tool.create_df( measure_id, person_ids=cast(list[str], person_ids), family_ids=family_ids, normalize_by=normalize_by, ) variants = adapter.helper.genotype_data_variants(data, effect_groups) for effect in effect_groups: result_df = PhenoTool.join_pheno_df_with_variants( result_df, variants[effect], ) result_df = result_df.rename(columns={"variant_count": effect}) if normalize_by: normalize_by = tool.init_normalize_measures( measure_id, normalize_by, ) column_name = self._build_report_description( measure_id, normalize_by, ) result_df = result_df.rename(columns={"normalized": column_name}) result_df[column_name] = result_df[column_name].round(decimals=5) result_df[measure_id] = \ result_df[measure_id].round(decimals=5) columns = [ col for col in result_df.columns.tolist() if col not in {"normalized", "role"} ] csv_buffer = StringIO() result_df.to_csv(csv_buffer, index=False, columns=columns) csv_buffer.seek(0) yield from csv_buffer.readlines()
[docs] def post(self, request: Request) -> Response: """Pheno tool download.""" data = expand_gene_set(parse_query_params(request.data)) if not user_has_permission( self.instance_id, request.user, data["datasetId"], ): return Response(status=status.HTTP_403_FORBIDDEN) study_wrapper = self.gpf_instance.get_wdae_wrapper(data["datasetId"]) if study_wrapper is None: return Response(status=status.HTTP_404_NOT_FOUND) data["effectTypes"] = EffectTypesMixin.build_effect_types_list( data["effectTypes"], ) effect_groups = list(data["effectTypes"]) data["phenoFilterFamilyIds"] = None if data.get("familyFilters") is not None: data["phenoFilterFamilyIds"] = list( study_wrapper.query_transformer # noqa: SLF001 ._transform_filters_to_ids( data["familyFilters"], ), ) data = study_wrapper.transform_request(data) adapter = self.gpf_instance.get_pheno_tool_adapter( study_wrapper.genotype_data, ) print("adapter finished") if not adapter: return Response(status=status.HTTP_404_NOT_FOUND) response = StreamingHttpResponse( self.generate_columns( cast(PhenoToolAdapter, adapter), effect_groups, data, ), content_type="text/csv", ) response[ "Content-Disposition" ] = "attachment; filename=pheno_report.csv" response["Expires"] = "0" return response
[docs] class PhenoToolPeopleValues(QueryBaseView, DatasetAccessRightsView): """View for returning person phenotype data."""
[docs] def post(self, request: Request) -> Response: data = request.data dataset_id = data["datasetId"] dataset = self.gpf_instance.get_wdae_wrapper(dataset_id) if not dataset or dataset.phenotype_data is None: return Response(status=status.HTTP_404_NOT_FOUND) res_df = dataset.phenotype_data.get_people_measure_values_df( data["measureIds"], data.get("personIds", None), data.get("familyIds", None), data.get("roles", None), ) result: list[dict[str, Any]] = cast( list[dict[str, Any]], res_df.to_dict("records")) for v in result: v["status"] = str(v["status"]) v["role"] = str(v["role"]) v["sex"] = str(v["sex"]) return Response(result)
[docs] class PhenoToolMeasure(QueryBaseView, DatasetAccessRightsView):
[docs] @method_decorator(etag(get_permissions_etag)) def get(self, request: Request) -> Response: params = request.GET dataset_id = params.get("datasetId", None) if not dataset_id: return Response(status=status.HTTP_400_BAD_REQUEST) dataset = self.gpf_instance.get_wdae_wrapper(dataset_id) if not dataset or dataset.phenotype_data is None: return Response(status=status.HTTP_404_NOT_FOUND) measure_id = params.get("measureId", None) if not measure_id: return Response(status=status.HTTP_400_BAD_REQUEST) if not dataset.phenotype_data.has_measure(measure_id): return Response(status=status.HTTP_404_NOT_FOUND) result = dataset.phenotype_data.get_measure( measure_id, ) return Response(result.to_json())
[docs] class PhenoToolMeasures(QueryBaseView, DatasetAccessRightsView):
[docs] @method_decorator(etag(get_permissions_etag)) def get(self, request: Request) -> Response: params = request.GET dataset_id = params.get("datasetId", None) if not dataset_id: return Response(status=status.HTTP_400_BAD_REQUEST) dataset = self.gpf_instance.get_wdae_wrapper(dataset_id) if not dataset or dataset.phenotype_data is None: return Response(status=status.HTTP_404_NOT_FOUND) instrument = params.get("instrument", None) if instrument and instrument not in dataset.phenotype_data.instruments: return Response(status=status.HTTP_404_NOT_FOUND) measure_type = params.get("measureType", None) if measure_type is not None: measure_type = MeasureType.from_str(measure_type) result = dataset.phenotype_data.get_measures( instrument, measure_type, ) return Response([m.to_json() for m in result.values()])
[docs] class PhenoToolInstruments(QueryBaseView, DatasetAccessRightsView):
[docs] def measure_to_json(self, measure: Measure) -> dict: return { "measureId": measure.measure_id, "instrumentName": measure.instrument_name, "measureName": measure.measure_name, "measureType": str(measure.measure_type), "description": measure.description, "defaultFilter": measure.default_filter, "valuesDomain": measure.values_domain, "minValue": None if math.isnan(measure.min_value) # type: ignore else measure.min_value, "maxValue": None if math.isnan(measure.max_value) # type: ignore else measure.max_value, }
[docs] @method_decorator(etag(get_permissions_etag)) def get(self, request: Request) -> Response: params = request.GET dataset_id = params.get("datasetId", None) if not dataset_id: return Response(status=status.HTTP_400_BAD_REQUEST) dataset = self.gpf_instance.get_wdae_wrapper(dataset_id) if not dataset or dataset.phenotype_data is None: return Response(status=status.HTTP_404_NOT_FOUND) instruments = dataset.phenotype_data.instruments result = {} for i in instruments.values(): result[i.instrument_name] = { "name": i.instrument_name, "measures": [ self.measure_to_json(m) for m in i.measures.values() ], } return Response(result)