Source code for sentry.views

import codecs
import io
import os
import time
from urllib.parse import urlparse

import requests
from django.conf import settings
from rest_framework import status
from rest_framework.decorators import api_view, parser_classes
from rest_framework.exceptions import ParseError
from rest_framework.parsers import BaseParser
from rest_framework.request import Request
from rest_framework.response import Response


[docs] class PlainTextParser(BaseParser): """Plaintext parser.""" media_type = "text/plain"
[docs] def parse( self, stream: io.IOBase, _media_type: str | None = None, parser_context: dict | None = None, ) -> str: parser_context = parser_context or {} encoding = parser_context.get("encoding", settings.DEFAULT_CHARSET) try: decoded_stream = codecs.getreader(encoding)(stream) return decoded_stream.read() except ValueError as exc: raise ParseError(f"Plain text parse error - {exc!s}") from exc
[docs] @api_view(["POST"]) @parser_classes([PlainTextParser]) def sentry(request: Request) -> Response: """Tunnel Sentry requests from the frontend.""" expiration_time = 60 * 60 # in seconds sentry_token = request.COOKIES.get("sentry_token") if not sentry_token: return Response({}, status.HTTP_200_OK) sentry_token = sentry_token.encode().decode("utf-8") try: time_issued = float( sentry_token.split("&")[0].split("=")[1]) / pow(10, 6) except ValueError: return Response({}, status.HTTP_200_OK) curr_time = time.time() if time_issued + expiration_time <= curr_time: return Response({}, status.HTTP_401_UNAUTHORIZED) dsn = os.environ.get("GPFJS_SENTRY_DSN") if not dsn: return Response({}, status.HTTP_200_OK) fake_dsn = "https://0@0.ingest.sentry.io/0" # gpfjs: main.ts project_id = urlparse(dsn).path.strip("/") sentry_host = dsn.split("@")[1].split("/")[0] upstream_sentry_url = f"https://{sentry_host}/api/{project_id}/envelope/" requests.post( upstream_sentry_url, data=request.data.replace(fake_dsn, dsn).encode(), timeout=30, ) return Response({}, status.HTTP_200_OK)