Skip to content

Commit

Permalink
Fix SAML tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dezhidki committed Nov 7, 2022
1 parent d797c60 commit e169471
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 110 deletions.
37 changes: 24 additions & 13 deletions timApp/auth/saml/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,34 @@ def _do_get_saml_config(try_new_cert: bool, try_new_metadata: bool) -> Saml2Conf
except KeyError:
raise KeyError(f"Could not find CONFIG dict in SAML config file.")

metadata_file_name = "metadata_new.crt" if try_new_metadata else "metadata.crt"

config_dict["key_file"] = str(saml_path / "certs" / "sp.key")
config_dict["cert_file"] = str(saml_path / "certs" / "sp.crt")
# Encryption keypairs seem to be a different option, but e.g., in HAKA the same keys are used for encrypting
# requests and decrypting responses
config_dict["encryption_keypairs"] = [
{
"key_file": str(saml_path / "certs" / "sp.key"),
"cert_file": str(saml_path / "certs" / "sp.crt"),
}
]
metadata_cert_file_name = (
"metadata_new.crt" if try_new_metadata else "metadata.crt"
)
metadata_cert_file = saml_path / "certs" / metadata_cert_file_name

sp_key = saml_path / "certs" / "sp.key"
sp_cert = saml_path / "certs" / "sp.crt"

if sp_key.exists() and sp_cert.exists():
sp_key_str = str(sp_key)
sp_cert_str = str(sp_cert)
config_dict["key_file"] = sp_key_str
config_dict["cert_file"] = sp_cert_str
# Encryption keypairs seem to be a different option, but e.g., in HAKA the same keys are used for encrypting
# requests and decrypting responses
config_dict["encryption_keypairs"] = [
{
"key_file": sp_key_str,
"cert_file": sp_cert_str,
}
]
config_dict["metadata"] = {
"loadex": [
{
"loader": metadata_loader,
"cert": str(saml_path / "certs" / metadata_file_name),
"cert": str(saml_path / "certs" / metadata_cert_file)
if app.config["SAML_VERIFY_METADATA"]
else None,
}
]
}
Expand Down
28 changes: 21 additions & 7 deletions timApp/auth/saml/routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import field
from xml.etree.ElementTree import ParseError

import requests
from flask import make_response, session, request, url_for, redirect
Expand All @@ -7,6 +8,7 @@
from saml2.config import Config as Saml2Config
from saml2.mdstore import MetadataStore
from saml2.metadata import create_metadata_string
from saml2.response import AuthnResponse
from saml2.s_utils import SamlException
from werkzeug.sansio.response import Response

Expand All @@ -26,7 +28,7 @@
from timApp.user.usergroup import UserGroup
from timApp.util.error_handlers import report_error
from timApp.util.flask.cache import cache
from timApp.util.flask.requesthelper import RouteException
from timApp.util.flask.requesthelper import RouteException, is_testing
from timApp.util.flask.responsehelper import json_response
from timApp.util.flask.typedblueprint import TypedBlueprint
from timApp.util.logger import log_warning
Expand Down Expand Up @@ -81,6 +83,19 @@ def sso(
return redirect(redirect_url)


def _get_saml_response(
client: Saml2Client, request_id: str, came_from: str
) -> AuthnResponse:
saml_response = request.form.get("SAMLResponse")
if not saml_response:
raise SamlException("SAML Response is missing")
return client.parse_authn_request_response(
saml_response,
BINDING_HTTP_POST,
outstanding={request_id: came_from},
)


@csrf.exempt
@saml.post("/acs")
def acs() -> Response:
Expand All @@ -97,17 +112,16 @@ def acs() -> Response:
client = _get_saml_client()

try:
resp = client.parse_authn_request_response(
request.form["SAMLResponse"],
BINDING_HTTP_POST,
outstanding={request_id: came_from},
)
except SamlException as e:
resp = _get_saml_response(client, request_id, came_from)
except (SamlException, ParseError, SAMLError) as e:
report_error(f"Error parsing SAML response: {e}", with_http_body=True)
if is_testing():
raise RouteException(str(e))
raise RouteException(
f"Error parsing SAML response. You can log in using your TIM username and password instead. "
f"Please contact {app.config['HELP_EMAIL']} if the problem persists."
)

ava = resp.get_identity()

session.pop("requestID", None)
Expand Down
1 change: 1 addition & 0 deletions timApp/testconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Schedule(TypedDict):
HOME_ORGANIZATION = "jyu.fi"

SAML_PATH = "/service/timApp/auth/saml/test"
SAML_VERIFY_METADATA = False

SESSION_COOKIE_SECURE = (
False # Test running does not have HTTPS, so secure cookie can't be used.
Expand Down
118 changes: 28 additions & 90 deletions timApp/tests/server/test_signup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,105 +63,34 @@
""".strip()


class SettingsMock:
def get_sp_data(self):
return {
"attributeConsumingService": {
"requestedAttributes": [
{
"name": "urn:oid:2.5.4.3",
"isRequired": True,
"friendlyName": "cn",
},
{
"name": "urn:oid:2.16.840.1.113730.3.1.241",
"isRequired": True,
"friendlyName": "displayName",
},
{
"name": "urn:oid:1.3.6.1.4.1.5923.1.1.1.6",
"isRequired": True,
"friendlyName": "eduPersonPrincipalName",
},
{
"name": "urn:oid:1.3.6.1.4.1.5923.1.1.1.11",
"isRequired": True,
"friendlyName": "eduPersonAssurance",
},
{
"name": "urn:oid:2.5.4.42",
"isRequired": True,
"friendlyName": "givenName",
},
{
"name": "urn:oid:0.9.2342.19200300.100.1.3",
"isRequired": True,
"friendlyName": "mail",
},
{
"name": "urn:oid:2.16.840.1.113730.3.1.39",
"isRequired": True,
"friendlyName": "preferredLanguage",
},
{
"name": "urn:oid:2.5.4.4",
"isRequired": True,
"friendlyName": "sn",
},
{
"name": "urn:oid:1.3.6.1.4.1.25178.1.2.14",
"isRequired": False,
"friendlyName": "schacPersonalUniqueCode",
},
],
},
}


LOW_ASSURANCE = "https://refeds.org/assurance/IAP/low"
MEDIUM_ASSURANCE = "https://refeds.org/assurance/IAP/medium"
HIGH_ASSURANCE = "https://refeds.org/assurance/IAP/high"
LOCAL_ENTERPRISE_ASSURANCE = "https://refeds.org/assurance/IAP/local-enterprise"


@dataclass
class OneLoginMock:
class SamlSSOResponseMock:
info: UserInfo
assurance_levels: list[str]
mock_missing_uniquecode: bool = False

def process_response(self, request_id):
pass

def get_errors(self):
return []

def is_authenticated(self):
return True

def get_settings(self):
return SettingsMock()

def get_attribute(self, name):
values = {
"urn:oid:0.9.2342.19200300.100.1.3": [self.info.email], # mail
"urn:oid:1.3.6.1.4.1.5923.1.1.1.6": [
self.info.username
], # eduPersonPrincipalName
"urn:oid:1.3.6.1.4.1.5923.1.1.1.11": [
self.assurance_levels
], # eduPersonAssurance
"urn:oid:2.16.840.1.113730.3.1.241": [self.info.full_name], # displayName
"urn:oid:2.16.840.1.113730.3.1.39": ["fi"], # preferredLanguage
"urn:oid:2.5.4.3": [self.info.full_name], # cn
"urn:oid:2.5.4.4": [self.info.last_name], # sn
"urn:oid:2.5.4.42": [self.info.given_name], # givenName
def get_identity(self) -> dict:
res = {
"mail": [self.info.email],
"eduPersonPrincipalName": [self.info.username],
"eduPersonAssurance": self.assurance_levels,
"displayName": [self.info.full_name],
"preferredLanguage": ["fi"],
"cn": [self.info.full_name],
"sn": [self.info.last_name],
"givenName": [self.info.given_name],
}
if not self.mock_missing_uniquecode:
values["urn:oid:1.3.6.1.4.1.25178.1.2.14"] = [
res["schacPersonalUniqueCode"] = [
uq.to_urn() for uq in self.info.unique_codes
]
return values.get(name)
return res


acs_url = "/saml/acs"
Expand Down Expand Up @@ -591,7 +520,7 @@ def test_haka_invalid_settings(self):
acs_url,
{},
expect_status=400,
expect_content="entityID not in session",
expect_content="No entityID in session.",
)
self.get(
"/saml/sso",
Expand All @@ -605,23 +534,23 @@ def test_haka_invalid_settings(self):
acs_url,
data={},
expect_status=400,
expect_content="Error processing SAML response: SAML Response not found, Only supported HTTP_POST Binding",
expect_content="SAML Response is missing",
)
self.post(
acs_url,
data={
"SAMLResponse": base64.encodebytes(b"x").decode(),
},
expect_status=400,
expect_content="Error processing SAML response: Start tag expected, '<' not found, line 1, column 1 (<string>, line 1)",
expect_content="syntax error: line 1, column 0",
)
self.post(
acs_url,
data={
"SAMLResponse": base64.encodebytes(samltestresp.encode()).decode(),
},
expect_status=400,
expect_contains="Error processing SAML response: No private key available to decrypt the assertion, check settings",
expect_contains="Unsolicited response:",
)

def test_haka_login(self):
Expand Down Expand Up @@ -722,6 +651,8 @@ def test_student_id_login_match(self):
UserInfo(
username="[email protected]",
full_name="X Test",
last_name="X",
given_name="Test",
email="[email protected]",
origin=UserOrigin.Haka,
unique_codes=[
Expand All @@ -736,6 +667,8 @@ def test_student_id_login_match(self):
UserInfo(
username="[email protected]",
full_name="X Test",
last_name="X",
given_name="Test",
email="[email protected]",
origin=UserOrigin.Haka,
unique_codes=[
Expand Down Expand Up @@ -765,6 +698,9 @@ def test_haka_login_email_conflict(self):
username="[email protected]",
email="[email protected]",
origin=UserOrigin.Haka,
full_name="Person Söme",
last_name="Person",
given_name="Söme",
)
)

Expand All @@ -773,6 +709,8 @@ def test_missing_uniquecode(self):
UserInfo(
username="[email protected]",
full_name="X Test",
last_name="X",
given_name="Test",
email="[email protected]",
origin=UserOrigin.Haka,
unique_codes=[
Expand All @@ -795,8 +733,8 @@ def do_acs_mock(
},
expect_status=302,
)
with mock.patch("timApp.auth.saml.OneLogin_Saml2_Auth") as m:
m.return_value = OneLoginMock(
with mock.patch("timApp.auth.saml.routes._get_saml_response") as m:
m.return_value = SamlSSOResponseMock(
info=info,
mock_missing_uniquecode=missing_uniquecode,
assurance_levels=assurance_levels or [LOW_ASSURANCE],
Expand Down

0 comments on commit e169471

Please sign in to comment.