[docs]defiterator_to_json(users:Iterator[WdaeUser])->Generator[str,None,int]:"""Wrap an iterator over WdaeUser models to produce json objects."""yield"["curr=next(users,None)post=next(users,None)whilecurrisnotNone:ifcurr.email:serializer=UserSerializerelse:serializer=UserWithoutEmailSerializeryieldval=json.dumps(serializer(curr).data,default=convert)ifpostisNone:yieldyieldvalbreakyieldyieldval+","curr=postpost=next(users,None)yield"]"return0
[docs]classUserViewSet(viewsets.ModelViewSet):# pylint: disable=too-many-ancestors"""API endpoint that allows users to be viewed or edited."""authentication_classes=[SessionAuthenticationWithoutCSRF,GPFOAuth2Authentication]serializer_class=UserSerializerqueryset=get_user_model().objects.order_by("email").all()permission_classes=(permissions.IsAdminUser,)filter_backends=(filters.SearchFilter,)search_fields=("email","name","groups__name")
[docs]@request_logging(logger)@action(detail=False,methods=["get"])defstreaming_search(self,request:Request)->StreamingHttpResponse:"""Search for users and stream the results."""self.check_permissions(request)queryset=get_user_model().objects.all()search_param=request.GET.get("search",None)ifsearch_param:queryset=queryset.filter(Q(name__icontains=search_param)|Q(email__icontains=search_param),)returnStreamingHttpResponse(iterator_to_json(queryset.iterator()),status=status.HTTP_200_OK,content_type="text/event-stream",)
[docs]@request_logging(logger)@action(detail=True,methods=["get","post"])defpassword_reset(self,request:Request,pk:int)->Response:"""Reset the password for a user."""self.check_permissions(request)user_model=get_user_model()try:user=user_model.objects.get(pk=pk)user.reset_password()user.deauthenticate()returnResponse(status=status.HTTP_204_NO_CONTENT)exceptuser_model.DoesNotExist:returnResponse(status=status.HTTP_404_NOT_FOUND)
[docs]classForgotPassword(views.APIView):"""View for forgotten password."""
[docs]@request_logging(logger)defpost(self,request:Request)->HttpResponse:"""Send a reset password email to the user."""form=WdaePasswordForgottenForm(request.data)is_valid=form.is_valid()ifnotis_valid:returnrender(request,"users_api/registration/forgotten-password.html",{"form":form,"message":"Invalid email","message_type":"warn","show_form":True,},status=status.HTTP_400_BAD_REQUEST,)email=form.data["email"]user_model=get_user_model()message=(f"An e-mail has been sent to {email}"" containing the reset link")try:user=user_model.objects.get(email=email)user.reset_password()user.deauthenticate()returnrender(request,"users_api/registration/forgotten-password.html",{"form":form,"message":message,"message_type":"success","show_form":False,},)exceptuser_model.DoesNotExist:returnrender(request,"users_api/registration/forgotten-password.html",{"form":form,"message":message,"message_type":"success","show_form":False,},)
[docs]classBasePasswordView(views.APIView):"""Base class for set/reset password views."""verification_code_model:models.Model|None=Nonetemplate:str|None=Noneform:forms.Form|None=Nonecode_type:str|None=Nonedef_check_request_verification_path(self,request:Request,)->Tuple[ResetPasswordCode|None|SetPasswordCode,str|None]:""" Check, validate and return a verification path from a request. Returns a tuple of the model instance and the error message if any. When the instance is not found, None is returned. """verification_path=request.GET.get("code")ifverification_pathisNone:verification_path=request.session.get(f"{self.code_type}_code")ifverification_pathisNone:returnNone,f"No {self.code_type} code provided"try:assertverification_pathisnotNoneassertself.verification_code_modelisnotNoneverif_code= \
self.verification_code_model.objects.get(# type: ignorepath=verification_path)exceptObjectDoesNotExist:returnNone,f"Invalid {self.code_type} code"is_valid=verif_code.validate()ifnotis_valid:returnverif_code,f"Expired {self.code_type} code"returnverif_code,None
[docs]classRESTLoginView(views.APIView):"""View for REST session bases logging in."""
[docs]@request_logging(logger)defpost(self,request:Request)->Response:"""Supports a REST login endpoint."""username=request.data.get("username")password=request.data.get("password")ifnotusernameornotpassword:returnResponse(status=status.HTTP_400_BAD_REQUEST)user=authenticate(username=username,password=password,)ifuserisNone:AuthenticationLog.log_authentication_attempt(username,failed=True,)ifAuthenticationLog.is_user_locked_out(username):returnResponse(AuthenticationLog.get_locked_out_error(username),status=status.HTTP_403_FORBIDDEN,)returnResponse(status=status.HTTP_401_UNAUTHORIZED)login(request,user)logger.info(log_filter(request,"login success: "+str(username)))AuthenticationLog.log_authentication_attempt(username,failed=False)returnResponse(status=status.HTTP_204_NO_CONTENT)
[docs]classWdaeLoginView(views.APIView):"""View for logging in."""
[docs]@request_logging(logger)defget(self,request:Request)->HttpResponse:"""Render the login form."""next_uri=request.GET.get("next")ifnext_uriisNone:next_uri=get_default_application().redirect_uris.split(" ")[0]form=WdaeLoginForm()returnrender(request,"users_api/registration/login.html",{"form":form,"next":next_uri,},)
[docs]@request_logging(logger)defpost(self,request:Request)->Response|HttpResponse:"""Handle the login form."""data=request.datanext_uri=data.get("next")ifnext_uriisNone:next_uri=get_default_application().redirect_uris.split(" ")[0]response_status=status.HTTP_200_OKform=WdaeLoginForm(request,data=data)ifform.is_valid():returnredirect(next_uri)response_status=form.status_codereturnrender(request,"users_api/registration/login.html",{"form":form,"next":next_uri,"show_errors":True,},status=response_status,)
[docs]@request_logging_function_view(logger)@api_view(["POST"])defchange_password(request:Request)->Response:"""Change the password for a user."""password=request.data["password"]verif_code=request.data["verifPath"]ifnotis_password_valid(password):logger.error(log_filter(request,"Password change failed: Invalid password: '%s'",str(password),))returnResponse({"error_msg":("Invalid password entered. Password is either too"" short (<10 symbols) or too weak.")},status=status.HTTP_400_BAD_REQUEST,)get_user_model().change_password(verif_code,password)returnResponse({},status.HTTP_201_CREATED)
[docs]@request_logging_function_view(logger)@api_view(["POST"])defregister(request:Request)->Response:"""Register a new user."""user_model=get_user_model()try:email=BaseUserManager.normalize_email(request.data["email"])ifnotis_email_valid(email):raiseValueErrorifsettings.OPEN_REGISTRATION:preexisting_user,_=user_model.objects.get_or_create(email=email)else:preexisting_user=user_model.objects.get(email__iexact=email,)preexisting_user.register_preexisting_user(request.data.get("name"))logger.info(log_filter(request,"registration succeeded; email: '%s'",str(email),),)returnResponse({},status=status.HTTP_201_CREATED)exceptIntegrityError:logger.error(log_filter(request,"Registration failed: IntegrityError; email: '%s'",str(email),),)returnResponse({},status=status.HTTP_201_CREATED)exceptuser_model.DoesNotExist:logger.error(log_filter(request,"Registration failed: Email or Researcher Id not found; ""email: '%s'",str(email),),)returnResponse({"error_msg":("Registration is closed."" Please contact an administrator.")},status=status.HTTP_403_FORBIDDEN,)exceptKeyError:logger.error(log_filter(request,"Registration failed: KeyError; %s",str(request.data),),)returnResponse({},status=status.HTTP_201_CREATED)exceptValueError:logger.error(log_filter(request,"Registration failed: Invalid email; email: '%s'",str(email),),)returnResponse({"error_msg":("Invalid email address entered."" Please use a valid email address.")},status=status.HTTP_400_BAD_REQUEST,)
[docs]@request_logging_function_view(logger)@csrf_clear@api_view(["POST"])@authentication_classes((GPFOAuth2Authentication,SessionAuthenticationWithoutCSRF))deflogout(request:Request)->Response:"""Log out the currently logged-in user."""django.contrib.auth.logout(request)returnResponse(status=status.HTTP_204_NO_CONTENT)
[docs]@request_logging_function_view(logger)@ensure_csrf_cookie@api_view(["GET"])@authentication_classes((GPFOAuth2Authentication,))defget_user_info(request:Request)->Response:"""Get user info for currently logged-in user."""user=request.userifuser.is_authenticated:returnResponse({"loggedIn":True,"email":user.email,"isAdministrator":user.is_staff,},status.HTTP_200_OK,)returnResponse({"loggedIn":False},status.HTTP_200_OK)
[docs]@request_logging_function_view(logger)@api_view(["POST"])defcheck_verif_code(request:Request)->Response:"""Check if a verification code is valid."""verif_code=request.data["verifPath"]try:ResetPasswordCode.objects.get(path=verif_code)returnResponse({},status=status.HTTP_200_OK)exceptObjectDoesNotExist:returnResponse({"errors":"Verification path does not exist."},status=status.HTTP_400_BAD_REQUEST,)
[docs]classFederationCredentials(views.APIView):"""API for handling federation credentials/applications."""authentication_classes=(GPFOAuth2Authentication,)
[docs]@request_logging(logger)defget(self,request:Request)->Response:"""List all federation apps for a user."""user=request.userifnotuser.is_authenticated:returnResponse(status=status.HTTP_401_UNAUTHORIZED)apps=get_application_model().objects.filter(user_id=user.id,authorization_grant_type="client-credentials",client_type="confidential",)res=[]forappinapps:res.append({"name":app.name,})returnResponse(res,status=status.HTTP_200_OK)
[docs]@request_logging(logger)defpost(self,request:Request)->Response:"""Create a new federation application and return its credentials."""user=request.userifnotuser.is_authenticated:returnResponse(status=status.HTTP_401_UNAUTHORIZED)application=get_application_model()ifapplication.objects.filter(name=request.data["name"]).exists():returnResponse(status=status.HTTP_400_BAD_REQUEST)new_application=application(name=request.data["name"],user_id=user.id,client_type="confidential",authorization_grant_type="client-credentials")new_application.full_clean()cleartext_secret=new_application.client_secretnew_application.save()credentials=base64.b64encode(f"{new_application.client_id}:{cleartext_secret}".encode(),)returnResponse({"credentials":credentials},status=status.HTTP_200_OK,)
[docs]@request_logging(logger)defdelete(self,request:Request)->Response:"""Delete a given federation app."""user=request.userifnotuser.is_authenticated:returnResponse(status=status.HTTP_401_UNAUTHORIZED)ifnotget_application_model() \
.objects \
.filter(name=request.data["name"]) \
.exists():returnResponse(status=status.HTTP_400_BAD_REQUEST)app=get_application_model().objects.get(name=request.data["name"],)ifuser.id!=app.user_id:returnResponse(status=status.HTTP_401_UNAUTHORIZED)app.delete()returnResponse(status=status.HTTP_200_OK)