# pylint: disable=W0621,C0114,C0116,W0212,W0613
import json
import re
from datetime import timedelta
from typing import Tuple
from django.test.client import Client
from django.utils import timezone
from oauth2_provider.models import AccessToken
from rest_framework import status
from users_api.models import (
LOCKOUT_THRESHOLD,
AuthenticationLog,
ResetPasswordCode,
WdaeUser,
)
[docs]
def lockout_email(client: Client, email: str) -> None:
data = {
"username": email,
"password": "invalidpasswordinvalidpassword",
}
for _ in range(LOCKOUT_THRESHOLD + 1):
client.post(
"/accounts/login",
json.dumps(data),
content_type="application/json", format="json",
)
[docs]
def expire_email_lockout(email: str) -> None:
"""Wind back times of all logins so that the lockout expires."""
query = AuthenticationLog.objects.filter(
email__iexact=email,
).order_by("-time", "-failed_attempt")
last_login = AuthenticationLog.get_last_login_for(email)
assert last_login is not None
subtract_time = timedelta(
seconds=pow(2, last_login.failed_attempt + 1 - LOCKOUT_THRESHOLD) * 60,
)
for login in query:
login.time = login.time - subtract_time
login.save()
[docs]
def test_successful_auth(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "secret",
}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_302_FOUND
assert response.url == "http://localhost:4200/datasets" # type: ignore
[docs]
def test_successful_auth_with_next(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "secret",
"next": "somewhere.com",
}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_302_FOUND
assert response.url == "somewhere.com" # type: ignore
[docs]
def test_successful_auth_case_insensitive(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
url = "/accounts/login"
data = {
"username": "UsER@ExAmPlE.cOm",
"password": "secret",
}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_302_FOUND
assert response.url == "http://localhost:4200/datasets" # type: ignore
[docs]
def test_failed_auth(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
url = "/accounts/login"
data = {"username": "bad@example.com", "password": "secret"}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
[docs]
def test_no_username_auth(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
url = "/accounts/login"
data = {"username": "", "password": "secret"}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
[docs]
def test_get_user_info_after_auth(user_client: Client) -> None:
response = user_client.get("/api/v3/users/get_user_info")
data = response.json()
assert data["loggedIn"] is True
assert data["email"] == "user@example.com"
[docs]
def test_no_password_auth(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
url = "/accounts/login"
data = {
"username": "user@example.com",
}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.content.find(b"Password not provided") != -1
[docs]
def test_email_auth_unsuccessful(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
"""Try to login with a non-existing email."""
url = "/accounts/login"
data = {
"username": "nonexistntuser@example.com",
}
response = client.post(
url, json.dumps(data), content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
[docs]
def test_failed_auth_attempts(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: Tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
# Check if the user is allowed four failed
# login attempts before being locked out.
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "wrongpassword",
}
for i in range(LOCKOUT_THRESHOLD):
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
last_login = AuthenticationLog.get_last_login_for(data["username"])
assert last_login is not None
assert last_login.failed_attempt == i + 1
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.content.find(b"Invalid login credentials") != -1
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
last_login = AuthenticationLog.get_last_login_for(data["username"])
assert last_login is not None
assert last_login.failed_attempt == LOCKOUT_THRESHOLD + 1
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.content.find(b"This account is locked out") != -1
[docs]
def test_failed_auth_lockouts(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
# Check if progressive lockouts are working.
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "wrongpassword",
}
lockout_email(client, data["username"])
for i in range(1, 5):
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.content.find(b"This account is locked out for") != -1
regex = r"locked out for (\d+) hours and (\d+) minutes"
match = re.search(regex, response.content.decode())
assert match is not None
response_hours, response_minutes = \
map(int, match.groups())
response_td = timedelta(
hours=response_hours, minutes=response_minutes,
)
expected_td = timedelta(minutes=pow(2, i))
# Give a tolerance of 60 seconds to prevent test from becoming flaky
# This large tolerance has been chosen intentionally
assert abs(response_td - expected_td) <= timedelta(seconds=60)
assert response.status_code == status.HTTP_403_FORBIDDEN
expire_email_lockout(data["username"])
[docs]
def test_lockout_prevents_login(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
# Check if lockouts prevent even valid logins.
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "secret",
}
lockout_email(client, data["username"])
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
[docs]
def test_successful_auth_resets_lockouts(
db: None, user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
# Check if a successful login will reset the email's
# lockouts and allow another five failed attempts.
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "wrongpassword",
}
lockout_email(client, data["username"])
expire_email_lockout(data["username"])
data["password"] = "secret"
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_302_FOUND
data["password"] = "wrongpasswordagain"
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.content.find(b"Invalid login credentials") != -1
[docs]
def test_password_reset_resets_lockouts(
user: WdaeUser, client: Client,
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
# Check if a password reset will reset the email's
# lockouts and allow another five failed attempts.
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "wrongpassword",
}
lockout_email(client, data["username"])
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.content.find(b"This account is locked out for") != -1
assert response.status_code == status.HTTP_403_FORBIDDEN
# Reset and change password
assert client.post(
"/api/v3/users/forgotten_password",
json.dumps({"email": user.email}),
content_type="application/json",
format="json",
).status_code == status.HTTP_200_OK
code = ResetPasswordCode.get_code(user)
assert code is not None
session = client.session
session.update({"reset_code": code.path})
session.save()
response = client.post(
"/api/v3/users/reset_password",
json.dumps({
"new_password1": "samplenewpassword",
"new_password2": "samplenewpassword",
}),
content_type="application/json",
format="json",
)
assert response.status_code == status.HTTP_302_FOUND
# See that the lockouts have been reset
data["password"] = "wrongpasswordagain"
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.content.find(b"Invalid login credentials") != -1
# Try properly logging in
data["password"] = "samplenewpassword"
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
assert response.status_code == status.HTTP_302_FOUND
[docs]
def test_authentication_logging(
user: WdaeUser, client: Client, # noqa: ARG001
tokens: tuple[AccessToken, AccessToken], # noqa: ARG001
) -> None:
# Check if both successful and unsuccessful
# authentication attempts are logged.
url = "/accounts/login"
data = {
"username": "user@example.com",
"password": "secret",
}
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
last_login = AuthenticationLog.get_last_login_for(data["username"])
assert last_login is not None
expected_time = timezone.now().replace(microsecond=0)
login_time = last_login.time.replace(microsecond=0)
assert response.status_code == status.HTTP_302_FOUND
assert last_login.email == "user@example.com"
# Give a tolerance of 5 seconds to prevent test from becoming flaky
assert abs(login_time - expected_time) <= timedelta(seconds=5)
assert last_login.failed_attempt == 0
data["password"] = "wrongpassword"
response = client.post(
url, json.dumps(data),
content_type="application/json", format="json",
)
last_login = AuthenticationLog.get_last_login_for(data["username"])
assert last_login is not None
expected_time = timezone.now().replace(microsecond=0)
login_time = last_login.time.replace(microsecond=0)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert last_login.email == "user@example.com"
# Give a tolerance of 5 seconds to prevent test from becoming flaky
assert abs(login_time - expected_time) <= timedelta(seconds=5)
assert last_login.failed_attempt == 1
[docs]
def test_login_page_template_data(
client: Client,
) -> None:
url = "/accounts/login/?next=authstuffredirect_uri=http://frontend/login"
response = client.get(
url, content_type="application/json", format="json",
)
assert response.context["next"] == (
"authstuffredirect_uri=http://frontend/login"
)
assert response.context["about"] == "http://frontend/about"