diff --git a/training/api/api_v1/certificates.py b/training/api/api_v1/certificates.py index bd479991..e1335a52 100644 --- a/training/api/api_v1/certificates.py +++ b/training/api/api_v1/certificates.py @@ -1,4 +1,4 @@ -from typing import List, Any +from typing import List, Any, Dict from fastapi import APIRouter, status, HTTPException, Depends, Response from training.schemas import UserCertificate, CertificateType, CertificateListValue from training.repositories import CertificateRepository @@ -52,11 +52,13 @@ def get_certificate_by_type_and_id( ): pdf_bytes = None filename = '' + is_admin_user = is_admin(user) + user_id = user["id"] if (certType == CertificateType.QUIZ.value): db_user_certificate = certificateRepo.get_certificate_by_id(id) - verify_certificate_is_valid(db_user_certificate, user["id"]) + verify_certificate_is_valid(db_user_certificate, user_id, is_admin_user) pdf_bytes = certificateService.generate_pdf( db_user_certificate.quiz_name, @@ -69,7 +71,7 @@ def get_certificate_by_type_and_id( elif (certType == CertificateType.GSPC.value): certificate = certificateRepo.get_gspc_certificate_by_id(id) - verify_certificate_is_valid(certificate, user["id"]) + verify_certificate_is_valid(certificate, user_id, is_admin_user) pdf_bytes = certificateService.generate_gspc_pdf( certificate.user_name, @@ -87,9 +89,18 @@ def get_certificate_by_type_and_id( return Response(pdf_bytes, headers=headers, media_type='application/pdf') -def verify_certificate_is_valid(cert: object, user_id: int): +def verify_certificate_is_valid(cert: object, user_id: int, is_admin_user: bool): if cert is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - if cert.user_id != user_id: + if cert.user_id != user_id and not is_admin_user: raise HTTPException(status_code=401, detail="Not Authorized") + + +def is_admin(user: Dict[str, List[str]]) -> bool: + # Ensure that 'roles' is in the user dictionary and is a list + if 'roles' not in user or not isinstance(user['roles'], list): + return False + + # Normalize roles to avoid case sensitivity issues + return 'Admin' in user['roles'] diff --git a/training/tests/test_api_certificates.py b/training/tests/test_api_certificates.py index f8a047a5..d102d3da 100644 --- a/training/tests/test_api_certificates.py +++ b/training/tests/test_api_certificates.py @@ -3,13 +3,14 @@ from unittest.mock import MagicMock -from fastapi import status +from fastapi import status, HTTPException from fastapi.testclient import TestClient from training.api.deps import certificate_repository from training.config import settings from training.main import app from training.schemas import UserCertificate, GspcCertificate, CertificateListValue from training.services.certificate import Certificate +from training.api.api_v1.certificates import verify_certificate_is_valid, is_admin client = TestClient(app) @@ -86,6 +87,11 @@ def goodJWT(): return jwt.encode({'id': 1}, settings.JWT_SECRET, algorithm="HS256") +class MockCertificate: + def __init__(self, user_id): + self.user_id = user_id + + class TestCertificateAPI: def test_get_certificates_no_auth(self, fake_cert_repo): response = client.get( @@ -219,3 +225,58 @@ def test_get_specific_gspc_certificate_wrong_user(self, fake_cert_repo, goodJWT, data={"jwtToken": goodJWT} ) assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_verify_certificate_is_valid_certificate_none(self): + """Test when the certificate is None, should raise 404 HTTPException.""" + with pytest.raises(HTTPException) as exc_info: + verify_certificate_is_valid(cert=None, user_id=1, is_admin_user=False) + assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND + + def test_verify_certificate_is_valid_user_not_authorized(self): + """Test when user_id does not match and is not an admin, should raise 401 HTTPException.""" + cert = MockCertificate(user_id=2) + with pytest.raises(HTTPException) as exc_info: + verify_certificate_is_valid(cert=cert, user_id=1, is_admin_user=False) + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Not Authorized" + + def test_verify_certificate_is_valid_user_authorized(self): + """Test when user_id matches, should not raise any exception.""" + cert = MockCertificate(user_id=1) + try: + verify_certificate_is_valid(cert=cert, user_id=1, is_admin_user=False) + except HTTPException: + pytest.fail("HTTPException raised unexpectedly!") + + def test_verify_certificate_is_valid_admin_user(self): + """Test when the user is an admin, should not raise any exception even if user_id does not match.""" + cert = MockCertificate(user_id=2) + try: + verify_certificate_is_valid(cert=cert, user_id=1, is_admin_user=True) + except HTTPException: + pytest.fail("HTTPException raised unexpectedly!") + + def test_is_admin_with_admin_role(self): + """Test when 'Admin' is in the roles list.""" + user = {"roles": ["User", "Admin", "Editor"]} + assert is_admin(user) is True + + def test_is_admin_without_admin_role(self): + """Test when 'Admin' is not in the roles list.""" + user = {"roles": ["User", "Editor"]} + assert is_admin(user) is False + + def test_is_admin_empty_roles(self): + """Test when the roles list is empty.""" + user = {"roles": []} + assert is_admin(user) is False + + def test_is_admin_roles_is_none(self): + """Test when the roles list is None.""" + user = {"roles": None} + assert is_admin(user) is False + + def test_is_admin_roles_key_missing(self): + """Test when the roles key is missing from the dictionary.""" + user = {} + assert is_admin(user) is False