import csv
import logging
from collections.abc import Generator
from io import StringIO
from datasets_api.permissions import (
get_instance_timestamp_etag,
get_permissions_etag,
)
from django.http.response import HttpResponse, StreamingHttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.http import etag
from query_base.query_base import DatasetAccessRightsView, QueryBaseView
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from studies.study_wrapper import StudyWrapper
from utils.streaming_response_util import iterator_to_json
logger = logging.getLogger(__name__)
[docs]
class PhenoConfigView(QueryBaseView, DatasetAccessRightsView):
"""Phenotype data configuration view."""
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
def get(self, request: Request) -> Response:
"""Get the phenotype data configuration."""
if "db_name" not in request.query_params:
return Response(status=status.HTTP_400_BAD_REQUEST)
dbname = request.query_params["db_name"]
logger.debug("dbname: %s", dbname)
if dbname not in self.gpf_instance.get_phenotype_data_ids():
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(self.gpf_instance.get_phenotype_data_config(dbname))
[docs]
class PhenoInstrumentsView(QueryBaseView):
"""Phenotype instruments view."""
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
def get(self, request: Request) -> Response:
"""Get phenotype instruments."""
if "dataset_id" not in request.query_params:
return Response(status=status.HTTP_400_BAD_REQUEST)
dataset_id = request.query_params["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
return Response(status=status.HTTP_404_NOT_FOUND)
instruments = sorted(dataset.phenotype_data.get_instruments())
res = {
"instruments": instruments,
"default": instruments[0],
}
return Response(res)
[docs]
class PhenoMeasuresInfoView(QueryBaseView):
"""Phenotype measures info view."""
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
def get(self, request: Request) -> Response:
"""Get pheno measures info."""
if "dataset_id" not in request.query_params:
return Response(status=status.HTTP_400_BAD_REQUEST)
dataset_id = request.query_params["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
return Response(status=status.HTTP_404_NOT_FOUND)
res = dataset.phenotype_data.get_measures_info()
return Response(res)
[docs]
class PhenoMeasureDescriptionView(QueryBaseView):
"""Phenotype measures description view."""
[docs]
@method_decorator(etag(get_permissions_etag))
def get(self, request: Request) -> Response:
"""Get pheno measures description."""
if "dataset_id" not in request.query_params:
return Response(status=status.HTTP_400_BAD_REQUEST)
dataset_id = request.query_params["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
return Response(
{"error": "Dataset not found"},
status=status.HTTP_404_NOT_FOUND,
)
measure_id = request.query_params["measure_id"]
if not dataset.phenotype_data.has_measure(measure_id):
return Response(
{"error": "Measure not found"},
status=status.HTTP_404_NOT_FOUND,
)
res = dataset.phenotype_data.get_measure_description(measure_id)
return Response(res)
[docs]
class PhenoMeasuresView(QueryBaseView):
"""Phenotype measures view."""
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
def get(self, request: Request) -> Response:
"""Get pheno measures pages."""
if "dataset_id" not in request.query_params:
return Response(status=status.HTTP_400_BAD_REQUEST)
dataset_id = request.query_params["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
return Response(status=status.HTTP_404_NOT_FOUND)
if (
request.query_params.get("page") is not None
or request.query_params.get("sort_by") is not None
or request.query_params.get("order_by") is not None
):
logger.warning(
"Received deprecated params %s", request.query_params,
)
instrument = request.query_params.get("instrument", None)
search_term = request.query_params.get("search", None)
pheno_instruments = dataset.phenotype_data.get_instruments()
if instrument and instrument not in pheno_instruments:
return Response(status=status.HTTP_404_NOT_FOUND)
try:
measures = dataset.phenotype_data.search_measures(
instrument, search_term,
)
measures_page = list(measures)
except ValueError:
logger.exception("Error when searching measures")
return Response(status=status.HTTP_400_BAD_REQUEST)
if measures_page is None:
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(measures_page)
[docs]
class CountError(Exception):
pass
[docs]
class PhenoMeasuresDownload(QueryBaseView, DatasetAccessRightsView):
"""Phenotype measure downloads view."""
[docs]
def csv_value_iterator(
self,
dataset: StudyWrapper,
measure_ids: list[str],
) -> Generator[str, None, None]:
"""Create CSV content for people measures data."""
header = ["person_id", *measure_ids]
buffer = StringIO()
writer = csv.writer(buffer, delimiter=",")
writer.writerow(header)
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
assert dataset.phenotype_data is not None
values_iterator = dataset.phenotype_data.get_people_measure_values(
measure_ids)
for values_dict in values_iterator:
output = [values_dict[header[0]]]
all_null = True
for col in header[1:]:
value = values_dict[col]
if value is not None:
all_null = False
output.append(value)
if all_null:
continue
writer.writerow(output)
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
buffer.close()
[docs]
def get_measure_ids(self, request: Request) -> Generator[str, None, None]:
"""Get measure ids."""
data = request.query_params
data = {k: str(v) for k, v in data.items()}
if "dataset_id" not in data:
raise ValueError
dataset_id = data["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
raise KeyError
search_term = data.get("search_term", None)
instrument = data.get("instrument", None)
if (instrument is not None
and instrument != ""
and instrument not in dataset.phenotype_data.instruments):
raise KeyError
measures = dataset.phenotype_data.search_measures(
instrument, search_term,
)
measure_ids = [
measure["measure"]["measure_id"] for measure in measures
]
if len(measure_ids) > 1900:
raise CountError
return self.csv_value_iterator(
dataset, measure_ids,
)
[docs]
def count_measure_ids(self, request: Request) -> int:
"""Get measure ids."""
data = request.query_params
data = {k: str(v) for k, v in data.items()}
if "dataset_id" not in data:
raise ValueError
dataset_id = data["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
raise KeyError
search_term = data.get("search_term", None)
instrument = data.get("instrument", None)
if (instrument is not None
and instrument != ""
and instrument not in dataset.phenotype_data.instruments):
raise KeyError
return dataset.phenotype_data.count_measures(
instrument, search_term,
)
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
def get(self, request: Request) -> Response:
"""Return a CSV file stream for measures."""
try:
values_iterator = self.get_measure_ids(request)
response = StreamingHttpResponse(
values_iterator, content_type="text/csv")
except ValueError:
logger.exception("Error")
return Response(status=status.HTTP_400_BAD_REQUEST)
except KeyError:
logger.info("Measures not found")
return Response(status=status.HTTP_404_NOT_FOUND)
except CountError:
logger.info("Measure count is too large")
return Response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
response["Content-Disposition"] = \
"attachment; filename=measures.csv"
response["Expires"] = "0"
return response
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
# pylint:disable=method-hidden
def head(self, request: Request) -> Response:
"""Return a status code validating if measures can be downloaded."""
try:
measure_ids_count = self.count_measure_ids(request)
except ValueError:
logger.exception("Error")
return Response(status=status.HTTP_400_BAD_REQUEST)
except KeyError:
logger.exception("Measures not found")
return Response(status=status.HTTP_404_NOT_FOUND)
if measure_ids_count > 1900:
logger.info("Measure count is too large")
return Response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
if measure_ids_count == 0:
logger.info("Measure count zero")
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_200_OK)
[docs]
class PhenoMeasuresCount(QueryBaseView, DatasetAccessRightsView):
"""Phenotype measure search count view."""
[docs]
def get_count(self, request: Request) -> int:
"""Return measure count for request."""
data = request.query_params
data = {k: str(v) for k, v in data.items()}
if "dataset_id" not in data:
raise ValueError
dataset_id = data["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
raise KeyError
search_term = data.get("search_term", None)
instrument = data.get("instrument", None)
if (instrument is not None
and instrument != ""
and instrument not in dataset.phenotype_data.instruments):
raise KeyError
return dataset.phenotype_data.count_measures(
instrument, search_term,
)
[docs]
@method_decorator(etag(get_instance_timestamp_etag))
def get(self, request: Request) -> Response:
"""Return a CSV file stream for measures."""
try:
count = self.get_count(request)
except ValueError:
logger.exception("Error")
return Response(status=status.HTTP_400_BAD_REQUEST)
except KeyError:
logger.exception("Measures not found")
return Response(status=status.HTTP_404_NOT_FOUND)
except CountError:
logger.exception("Measure count is too large")
return Response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
return Response({"count": count})
[docs]
class PhenoMeasureValues(QueryBaseView, DatasetAccessRightsView):
"""Phenotype measure values view."""
[docs]
def post(self, request: Request) -> Response:
"""Return measure values as stream."""
data = request.data
if "dataset_id" not in data:
return Response(status=status.HTTP_400_BAD_REQUEST)
dataset_id = data["dataset_id"]
dataset = self.gpf_instance.get_wdae_wrapper(dataset_id)
if not dataset or dataset.phenotype_data is None:
return Response(status=status.HTTP_404_NOT_FOUND)
measure_ids = data.get("measure_ids", None)
instrument = data.get("instrument", None)
if instrument is None:
if measure_ids is None:
measure_ids = list(dataset.phenotype_data.measures.keys())
else:
if instrument not in dataset.phenotype_data.instruments:
return Response(status=status.HTTP_404_NOT_FOUND)
instrument_measures = \
dataset.phenotype_data.get_instrument_measures(instrument)
if measure_ids is None:
measure_ids = instrument_measures
if not set(measure_ids).issubset(set(instrument_measures)):
return Response(status=status.HTTP_400_BAD_REQUEST)
if len(measure_ids) > 1900:
measure_ids = measure_ids[0:1900]
values_iterator = dataset.phenotype_data.get_people_measure_values(
measure_ids,
)
return StreamingHttpResponse(
iterator_to_json(values_iterator),
status=status.HTTP_200_OK,
content_type="text/event-stream",
)
[docs]
class PhenoImagesView(QueryBaseView, DatasetAccessRightsView):
"""Remote pheno images view."""
[docs]
@method_decorator(etag(get_permissions_etag))
def get(
self, _request: Request, pheno_id: str, image_path: str,
) -> Response | HttpResponse:
"""Return raw image data from a remote GPF instance."""
if image_path == "":
return Response(status=status.HTTP_400_BAD_REQUEST)
phenotype_data = self.gpf_instance.get_phenotype_data(pheno_id)
if phenotype_data is None:
return Response(status=status.HTTP_404_NOT_FOUND)
try:
image, mimetype = phenotype_data.get_image(image_path)
except ValueError:
logger.exception(
"Could not get image %s for %s",
image_path,
pheno_id,
)
return Response(status=status.HTTP_404_NOT_FOUND)
return HttpResponse(image, content_type=mimetype)