From fa932b68d88f2cd09d0caa3c8fb56a39c72a12a5 Mon Sep 17 00:00:00 2001 From: John Labbate Date: Thu, 29 Aug 2024 11:07:23 -0400 Subject: [PATCH 1/3] Add admin bypass for accessing user certs. --- training/api/api_v1/certificates.py | 17 +++++-- training/tests/test_api_certificates.py | 59 ++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/training/api/api_v1/certificates.py b/training/api/api_v1/certificates.py index bd479991..77bc923e 100644 --- a/training/api/api_v1/certificates.py +++ b/training/api/api_v1/certificates.py @@ -52,11 +52,13 @@ def get_certificate_by_type_and_id( ): pdf_bytes = None filename = '' + is_admin_user = is_admin(user["roles"]) + user_id = user["id"] if (certType == CertificateType.QUIZ.value): db_user_certificate = certificateRepo.get_certificate_by_id(id) - verify_certificate_is_valid(db_user_certificate, user["id"]) + verify_certificate_is_valid(db_user_certificate, user_id, is_admin_user) pdf_bytes = certificateService.generate_pdf( db_user_certificate.quiz_name, @@ -69,7 +71,7 @@ def get_certificate_by_type_and_id( elif (certType == CertificateType.GSPC.value): certificate = certificateRepo.get_gspc_certificate_by_id(id) - verify_certificate_is_valid(certificate, user["id"]) + verify_certificate_is_valid(certificate, user_id, is_admin_user) pdf_bytes = certificateService.generate_gspc_pdf( certificate.user_name, @@ -87,9 +89,16 @@ def get_certificate_by_type_and_id( return Response(pdf_bytes, headers=headers, media_type='application/pdf') -def verify_certificate_is_valid(cert: object, user_id: int): +def verify_certificate_is_valid(cert: object, user_id: int, is_admin_user: bool): if cert is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - if cert.user_id != user_id: + if cert.user_id != user_id and not is_admin_user: raise HTTPException(status_code=401, detail="Not Authorized") + + +def is_admin(user_roles: List[str]): + if not user_roles: # Handle None or empty list + return False + + return 'Admin' in user_roles diff --git a/training/tests/test_api_certificates.py b/training/tests/test_api_certificates.py index f8a047a5..e42a7fdf 100644 --- a/training/tests/test_api_certificates.py +++ b/training/tests/test_api_certificates.py @@ -3,13 +3,15 @@ from unittest.mock import MagicMock -from fastapi import status +from fastapi import status, HTTPException from fastapi.testclient import TestClient from training.api.deps import certificate_repository from training.config import settings from training.main import app from training.schemas import UserCertificate, GspcCertificate, CertificateListValue from training.services.certificate import Certificate +from training.api.api_v1.certificates import verify_certificate_is_valid, is_admin +from starlette import status client = TestClient(app) @@ -86,6 +88,11 @@ def goodJWT(): return jwt.encode({'id': 1}, settings.JWT_SECRET, algorithm="HS256") +class MockCertificate: + def __init__(self, user_id): + self.user_id = user_id + + class TestCertificateAPI: def test_get_certificates_no_auth(self, fake_cert_repo): response = client.get( @@ -219,3 +226,53 @@ def test_get_specific_gspc_certificate_wrong_user(self, fake_cert_repo, goodJWT, data={"jwtToken": goodJWT} ) assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_verify_certificate_is_valid_certificate_none(self): + """Test when the certificate is None, should raise 404 HTTPException.""" + with pytest.raises(HTTPException) as exc_info: + verify_certificate_is_valid(cert=None, user_id=1, is_admin_user=False) + assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND + + def test_verify_certificate_is_valid_user_not_authorized(self): + """Test when user_id does not match and is not an admin, should raise 401 HTTPException.""" + cert = MockCertificate(user_id=2) + with pytest.raises(HTTPException) as exc_info: + verify_certificate_is_valid(cert=cert, user_id=1, is_admin_user=False) + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Not Authorized" + + def test_verify_certificate_is_valid_user_authorized(self): + """Test when user_id matches, should not raise any exception.""" + cert = MockCertificate(user_id=1) + try: + verify_certificate_is_valid(cert=cert, user_id=1, is_admin_user=False) + except HTTPException: + pytest.fail("HTTPException raised unexpectedly!") + + def test_verify_certificate_is_valid_admin_user(self): + """Test when the user is an admin, should not raise any exception even if user_id does not match.""" + cert = MockCertificate(user_id=2) + try: + verify_certificate_is_valid(cert=cert, user_id=1, is_admin_user=True) + except HTTPException: + pytest.fail("HTTPException raised unexpectedly!") + + def test_is_admin_with_admin_role(self): + """Test when 'Admin' is in the roles list.""" + user_roles = ["User", "Admin", "Editor"] + assert is_admin(user_roles) is True + + def test_is_admin_without_admin_role(self): + """Test when 'Admin' is not in the roles list.""" + user_roles = ["User", "Editor"] + assert is_admin(user_roles) is False + + def test_is_admin_empty_list(self): + """Test when the roles list is empty.""" + user_roles = [] + assert is_admin(user_roles) is False + + def test_is_admin_none(self): + """Test when the roles list is None.""" + user_roles = None + assert is_admin(user_roles) is False From b0e44da61f540242b4b238abe64e5265aa660024 Mon Sep 17 00:00:00 2001 From: John Labbate Date: Thu, 29 Aug 2024 11:09:50 -0400 Subject: [PATCH 2/3] lint fix. --- training/tests/test_api_certificates.py | 1 - 1 file changed, 1 deletion(-) diff --git a/training/tests/test_api_certificates.py b/training/tests/test_api_certificates.py index e42a7fdf..de7e9fcd 100644 --- a/training/tests/test_api_certificates.py +++ b/training/tests/test_api_certificates.py @@ -11,7 +11,6 @@ from training.schemas import UserCertificate, GspcCertificate, CertificateListValue from training.services.certificate import Certificate from training.api.api_v1.certificates import verify_certificate_is_valid, is_admin -from starlette import status client = TestClient(app) From bdcf8b571bc6e5ad47527cace6d77eb0037d2daa Mon Sep 17 00:00:00 2001 From: John Labbate Date: Thu, 29 Aug 2024 11:20:14 -0400 Subject: [PATCH 3/3] rework for when user object doesnt have role list. --- training/api/api_v1/certificates.py | 12 +++++++----- training/tests/test_api_certificates.py | 25 +++++++++++++++---------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/training/api/api_v1/certificates.py b/training/api/api_v1/certificates.py index 77bc923e..e1335a52 100644 --- a/training/api/api_v1/certificates.py +++ b/training/api/api_v1/certificates.py @@ -1,4 +1,4 @@ -from typing import List, Any +from typing import List, Any, Dict from fastapi import APIRouter, status, HTTPException, Depends, Response from training.schemas import UserCertificate, CertificateType, CertificateListValue from training.repositories import CertificateRepository @@ -52,7 +52,7 @@ def get_certificate_by_type_and_id( ): pdf_bytes = None filename = '' - is_admin_user = is_admin(user["roles"]) + is_admin_user = is_admin(user) user_id = user["id"] if (certType == CertificateType.QUIZ.value): @@ -97,8 +97,10 @@ def verify_certificate_is_valid(cert: object, user_id: int, is_admin_user: bool) raise HTTPException(status_code=401, detail="Not Authorized") -def is_admin(user_roles: List[str]): - if not user_roles: # Handle None or empty list +def is_admin(user: Dict[str, List[str]]) -> bool: + # Ensure that 'roles' is in the user dictionary and is a list + if 'roles' not in user or not isinstance(user['roles'], list): return False - return 'Admin' in user_roles + # Normalize roles to avoid case sensitivity issues + return 'Admin' in user['roles'] diff --git a/training/tests/test_api_certificates.py b/training/tests/test_api_certificates.py index de7e9fcd..d102d3da 100644 --- a/training/tests/test_api_certificates.py +++ b/training/tests/test_api_certificates.py @@ -258,20 +258,25 @@ def test_verify_certificate_is_valid_admin_user(self): def test_is_admin_with_admin_role(self): """Test when 'Admin' is in the roles list.""" - user_roles = ["User", "Admin", "Editor"] - assert is_admin(user_roles) is True + user = {"roles": ["User", "Admin", "Editor"]} + assert is_admin(user) is True def test_is_admin_without_admin_role(self): """Test when 'Admin' is not in the roles list.""" - user_roles = ["User", "Editor"] - assert is_admin(user_roles) is False + user = {"roles": ["User", "Editor"]} + assert is_admin(user) is False - def test_is_admin_empty_list(self): + def test_is_admin_empty_roles(self): """Test when the roles list is empty.""" - user_roles = [] - assert is_admin(user_roles) is False + user = {"roles": []} + assert is_admin(user) is False - def test_is_admin_none(self): + def test_is_admin_roles_is_none(self): """Test when the roles list is None.""" - user_roles = None - assert is_admin(user_roles) is False + user = {"roles": None} + assert is_admin(user) is False + + def test_is_admin_roles_key_missing(self): + """Test when the roles key is missing from the dictionary.""" + user = {} + assert is_admin(user) is False