diff --git a/docs/settings.rst b/docs/settings.rst index eee208fb..7e0c02fb 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -267,11 +267,6 @@ of ``mozilla-django-oidc``. https://tools.ietf.org/html/rfc7519#section-6 -.. py:attribute:: OIDC_TOKEN_USE_BASIC_AUTH - - :default: False - - Use HTTP Basic Authentication instead of sending the client secret in token request POST body. .. py:attribute:: ALLOW_LOGOUT_GET_METHOD diff --git a/integration_tests/integration_tests.py b/integration_tests/integration_tests.py deleted file mode 100644 index 7af239f7..00000000 --- a/integration_tests/integration_tests.py +++ /dev/null @@ -1,87 +0,0 @@ -import unittest - -from splinter import Browser - - -class IntegrationTest(unittest.TestCase): - def __init__(self, *args, **kwargs): - super(IntegrationTest, self).__init__(*args, **kwargs) - - self.webdriver = "firefox" - self.account = { - "username": "example_username", - "password": "example_p@ssw0rd", - "email": "example@example.com", - } - - def setUp(self): - """Create test account in `testprovider` instance""" - with Browser(self.webdriver, headless=True) as browser: - browser.visit("http://testprovider:8080/account/signup") - browser.find_by_css("#id_username").fill(self.account["username"]) - browser.find_by_css("#id_password").fill(self.account["password"]) - browser.find_by_css("#id_password_confirm").fill(self.account["password"]) - browser.find_by_css("#id_email").fill(self.account["email"]) - browser.find_by_css(".btn-primary").click() - - def tearDown(self): - """Remove test account from `testprovider` instance""" - with Browser(self.webdriver, headless=True) as browser: - self.perform_login(browser) - browser.visit("http://testprovider:8080/account/delete") - browser.find_by_css(".btn-danger").click() - - def perform_login(self, browser): - """Perform login using webdriver""" - browser.visit("http://testrp:8081") - browser.find_by_css("div > a").click() - browser.find_by_css("#id_username").fill(self.account["username"]) - browser.find_by_css("#id_password").fill(self.account["password"]) - browser.find_by_css(".btn-primary").click() - - def perform_logout(self, browser): - """Perform logout using webdriver""" - browser.visit("http://testrp:8081") - browser.find_by_css('input[value="Logout"]').click() - - def test_login(self): - """Test logging in `testrp` using OIDC""" - browser = Browser(self.webdriver, headless=True) - - # Check that user is not logged in - browser.visit("http://testrp:8081") - self.assertTrue(browser.is_text_not_present("Current user:")) - - # Perform login - self.perform_login(browser) - - # Accept scope - browser.find_by_css('input[name="allow"]').click() - - # Check that user is now logged in - self.assertTrue(browser.is_text_present("Current user:")) - - def test_logout(self): - """Test logout functionality of OIDC lib""" - browser = Browser(self.webdriver, headless=True) - - # Check that user is not logged in - browser.visit("http://testrp:8081") - self.assertTrue(browser.is_text_not_present("Current user:")) - - self.perform_login(browser) - - # Accept scope - browser.find_by_css('input[name="allow"]').click() - - # Check that user is now logged in - self.assertTrue(browser.is_text_present("Current user:")) - - self.perform_logout(browser) - - # Check that user is now logged out - self.assertTrue(browser.is_text_not_present("Current user:")) - - -if __name__ == "__main__": - unittest.main() diff --git a/mozilla_django_oidc/auth.py b/mozilla_django_oidc/auth.py index 65370533..579fbe82 100644 --- a/mozilla_django_oidc/auth.py +++ b/mozilla_django_oidc/auth.py @@ -1,9 +1,6 @@ -import base64 -import hashlib import json import logging import requests -from requests.auth import HTTPBasicAuth # logindotgov-oidc import secrets @@ -28,26 +25,6 @@ LOGGER = logging.getLogger(__name__) -def default_username_algo(unique_identifier, claims=None): - """Generate username for the Django user. - - :arg str/unicode unique_identifier: the unique_identifier to use to generate a username - :arg dic claims: the claims from your OIDC provider, currently unused - - :returns: str/unicode - - """ - # bluntly stolen from django-browserid - # store the username as a base64 encoded sha224 of the unique_identifier - # this protects against data leakage because usernames are often - # treated as public identifiers (so we can't use the unique_identifier). - username = base64.urlsafe_b64encode( - hashlib.sha1(force_bytes(unique_identifier)).digest() - ).rstrip(b"=") - - return smart_str(username) - - class OIDCAuthenticationBackend(ModelBackend): """Override Django's authentication.""" @@ -129,20 +106,7 @@ def create_user(self, claims): email = claims.get("email") username = self.get_username(claims) - # Create user with custom values if they're specified - if not ( - (self.OIDC_RP_UNIQUE_IDENTIFIER == "email") - or (self.OIDC_RP_UNIQUE_IDENTIFIER == "username") - ): - # { app_field: idp_field} - # { "uuid": "sub_value"} - extra_params = { - self.OIDC_RP_UNIQUE_IDENTIFIER: self.get_idp_unique_id_value(claims) - } - else: - extra_params = {} - - return self.UserModel.objects.create_user(username, email=email, **extra_params) + return self.UserModel.objects.create_user(username, email=email) def get_username(self, claims): """Generate username based on claims.""" @@ -161,7 +125,7 @@ def get_username(self, claims): # also pass the claims to the custom user name algo return username_algo(self.get_idp_unique_id_value(claims), claims) - return default_username_algo(self.get_idp_unique_id_value(claims), claims) + return self.get_idp_unique_id_value(claims) def update_user(self, user, claims): """Update existing user with new email, if necessary save, and return user""" @@ -293,30 +257,17 @@ def get_token(self, payload): self.OIDC_RP_CLIENT_SECRET, algorithm=self.OIDC_RP_SIGN_ALGO ) - token_payload = { + code = payload.get("code") + payload = { "client_assertion": encoded_jwt, "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", - "code": payload.get("code"), + "code": code, "grant_type": "authorization_code", } - response = requests.post(self.OIDC_OP_TOKEN_ENDPOINT, data=token_payload) - self.raise_token_response_error(response) - return response.json() - - # Default implementation - auth = None - if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False): - # When Basic auth is defined, create the Auth Header and remove secret from payload. - user = payload.get("client_id") - pw = payload.get("client_secret") - - auth = HTTPBasicAuth(user, pw) - del payload["client_secret"] response = requests.post( self.OIDC_OP_TOKEN_ENDPOINT, data=payload, - auth=auth, verify=self.get_settings("OIDC_VERIFY_SSL", True), timeout=self.get_settings("OIDC_TIMEOUT", None), proxies=self.get_settings("OIDC_PROXY", None), diff --git a/mozilla_django_oidc/contrib/__init__.py b/mozilla_django_oidc/contrib/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/mozilla_django_oidc/contrib/drf.py b/mozilla_django_oidc/contrib/drf.py deleted file mode 100644 index 91f8ccc5..00000000 --- a/mozilla_django_oidc/contrib/drf.py +++ /dev/null @@ -1,143 +0,0 @@ -""" -Classes/functions for integrating with Django REST Framework. - -http://www.django-rest-framework.org/api-guide/authentication/#custom-authentication -""" - -import logging - -from django.contrib.auth import get_backends -from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation -from django.utils.module_loading import import_string -from rest_framework import authentication, exceptions -from requests.exceptions import HTTPError - -from mozilla_django_oidc.auth import OIDCAuthenticationBackend -from mozilla_django_oidc.utils import ( - import_from_settings, - parse_www_authenticate_header, -) - -LOGGER = logging.getLogger(__name__) - - -def get_oidc_backend(): - """ - Get the Django auth backend that uses OIDC. - """ - - # allow the user to force which back backend to use. this is mostly - # convenient if you want to use OIDC with DRF but don't want to configure - # OIDC for the "normal" Django auth. - backend_setting = import_from_settings("OIDC_DRF_AUTH_BACKEND", None) - if backend_setting: - backend = import_string(backend_setting)() - if not isinstance(backend, OIDCAuthenticationBackend): - msg = ( - "Class configured in OIDC_DRF_AUTH_BACKEND " - "does not extend OIDCAuthenticationBackend!" - ) - raise ImproperlyConfigured(msg) - return backend - - # if the backend setting is not set, look through the list of configured - # backends for one that is an OIDCAuthenticationBackend. - backends = [b for b in get_backends() if isinstance(b, OIDCAuthenticationBackend)] - - if not backends: - msg = ( - "No backends extending OIDCAuthenticationBackend found - " - "add one to AUTHENTICATION_BACKENDS or set OIDC_DRF_AUTH_BACKEND!" - ) - raise ImproperlyConfigured(msg) - if len(backends) > 1: - raise ImproperlyConfigured("More than one OIDCAuthenticationBackend found!") - return backends[0] - - -class OIDCAuthentication(authentication.BaseAuthentication): - """ - Provide OpenID authentication for DRF. - """ - - # used by the authenticate_header method. - www_authenticate_realm = "api" - - def __init__(self, backend=None): - self.backend = backend or get_oidc_backend() - - def authenticate(self, request): - """ - Authenticate the request and return a tuple of (user, token) or None - if there was no authentication attempt. - """ - access_token = self.get_access_token(request) - - if not access_token: - return None - - try: - user = self.backend.get_or_create_user(access_token, None, None) - except HTTPError as exc: - resp = exc.response - - # if the oidc provider returns 401, it means the token is invalid. - # in that case, we want to return the upstream error message (which - # we can get from the www-authentication header) in the response. - if resp.status_code == 401 and "www-authenticate" in resp.headers: - data = parse_www_authenticate_header(resp.headers["www-authenticate"]) - raise exceptions.AuthenticationFailed( - data.get( - "error_description", "no error description in www-authenticate" - ) - ) - - # for all other http errors, just re-raise the exception. - raise - except SuspiciousOperation as exc: - LOGGER.info("Login failed: %s", exc) - raise exceptions.AuthenticationFailed("Login failed") - - if not user: - msg = "Login failed: No user found for the given access token." - raise exceptions.AuthenticationFailed(msg) - - return user, access_token - - def get_access_token(self, request): - """ - Get the access token based on a request. - - Returns None if no authentication details were provided. Raises - AuthenticationFailed if the token is incorrect. - """ - header = authentication.get_authorization_header(request) - if not header: - return None - header = header.decode(authentication.HTTP_HEADER_ENCODING) - - auth = header.split() - - if auth[0].lower() != "bearer": - return None - - if len(auth) == 1: - msg = 'Invalid "bearer" header: No credentials provided.' - raise exceptions.AuthenticationFailed(msg) - elif len(auth) > 2: - msg = ( - 'Invalid "bearer" header: Credentials string should not contain spaces.' - ) - raise exceptions.AuthenticationFailed(msg) - - return auth[1] - - def authenticate_header(self, request): - """ - If this method returns None, a generic HTTP 403 forbidden response is - returned by DRF when authentication fails. - - By making the method return a string, a 401 is returned instead. The - return value will be used as the WWW-Authenticate header. - """ - return 'Bearer realm="%s"' % self.www_authenticate_realm diff --git a/mozilla_django_oidc/middleware.py b/mozilla_django_oidc/middleware.py deleted file mode 100644 index 3293ed2b..00000000 --- a/mozilla_django_oidc/middleware.py +++ /dev/null @@ -1,207 +0,0 @@ -import logging -import time -from re import Pattern as re_Pattern -from urllib.parse import quote, urlencode - -from django.contrib.auth import BACKEND_SESSION_KEY -from django.http import HttpResponseRedirect, JsonResponse -from django.urls import reverse -from django.utils.crypto import get_random_string -from django.utils.deprecation import MiddlewareMixin -from django.utils.functional import cached_property -from django.utils.module_loading import import_string - -from mozilla_django_oidc.auth import OIDCAuthenticationBackend -from mozilla_django_oidc.utils import ( - absolutify, - add_state_and_verifier_and_nonce_to_session, - add_state_to_cookie, - generate_code_challenge, - import_from_settings, -) - -LOGGER = logging.getLogger(__name__) - - -class SessionRefresh(MiddlewareMixin): - """Refreshes the session with the OIDC RP after expiry seconds - - For users authenticated with the OIDC RP, verify tokens are still valid and - if not, force the user to re-authenticate silently. - - """ - - def __init__(self, get_response): - super(SessionRefresh, self).__init__(get_response) - self.OIDC_EXEMPT_URLS = self.get_settings("OIDC_EXEMPT_URLS", []) - self.OIDC_OP_AUTHORIZATION_ENDPOINT = self.get_settings( - "OIDC_OP_AUTHORIZATION_ENDPOINT" - ) - self.OIDC_RP_CLIENT_ID = self.get_settings("OIDC_RP_CLIENT_ID") - self.OIDC_STATE_SIZE = self.get_settings("OIDC_STATE_SIZE", 32) - self.OIDC_AUTHENTICATION_CALLBACK_URL = self.get_settings( - "OIDC_AUTHENTICATION_CALLBACK_URL", - "oidc_authentication_callback", - ) - self.OIDC_RP_SCOPES = self.get_settings("OIDC_RP_SCOPES", "openid email") - self.OIDC_USE_NONCE = self.get_settings("OIDC_USE_NONCE", True) - self.OIDC_NONCE_SIZE = self.get_settings("OIDC_NONCE_SIZE", 32) - - @staticmethod - def get_settings(attr, *args): - return import_from_settings(attr, *args) - - @cached_property - def exempt_urls(self): - """Generate and return a set of url paths to exempt from SessionRefresh - - This takes the value of ``settings.OIDC_EXEMPT_URLS`` and appends three - urls that mozilla-django-oidc uses. These values can be view names or - absolute url paths. - - :returns: list of url paths (for example "/oidc/callback/") - - """ - exempt_urls = [] - for url in self.OIDC_EXEMPT_URLS: - if not isinstance(url, re_Pattern): - exempt_urls.append(url) - exempt_urls.extend( - [ - "oidc_authentication_init", - "oidc_authentication_callback", - "oidc_logout", - ] - ) - - return set( - [url if url.startswith("/") else reverse(url) for url in exempt_urls] - ) - - @cached_property - def exempt_url_patterns(self): - """Generate and return a set of url patterns to exempt from SessionRefresh - - This takes the value of ``settings.OIDC_EXEMPT_URLS`` and returns the - values that are compiled regular expression patterns. - - :returns: list of url patterns (for example, - ``re.compile(r"/user/[0-9]+/image")``) - """ - exempt_patterns = set() - for url_pattern in self.OIDC_EXEMPT_URLS: - if isinstance(url_pattern, re_Pattern): - exempt_patterns.add(url_pattern) - return exempt_patterns - - def is_refreshable_url(self, request): - """Takes a request and returns whether it triggers a refresh examination - - :arg HttpRequest request: - - :returns: boolean - - """ - # Do not attempt to refresh the session if the OIDC backend is not used - backend_session = request.session.get(BACKEND_SESSION_KEY) - is_oidc_enabled = True - if backend_session: - auth_backend = import_string(backend_session) - is_oidc_enabled = issubclass(auth_backend, OIDCAuthenticationBackend) - - return ( - request.method == "GET" - and request.user.is_authenticated - and is_oidc_enabled - and request.path not in self.exempt_urls - and not any(pat.match(request.path) for pat in self.exempt_url_patterns) - ) - - def process_request(self, request): - if not self.is_refreshable_url(request): - LOGGER.debug("request is not refreshable") - return - - expiration = request.session.get("oidc_id_token_expiration", 0) - now = time.time() - if expiration > now: - # The id_token is still valid, so we don't have to do anything. - LOGGER.debug("id token is still valid (%s > %s)", expiration, now) - return - - LOGGER.debug("id token has expired") - # The id_token has expired, so we have to re-authenticate silently. - auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT - client_id = self.OIDC_RP_CLIENT_ID - state = get_random_string(self.OIDC_STATE_SIZE) - - # Build the parameters as if we were doing a real auth handoff, except - # we also include prompt=none. - params = { - "response_type": "code", - "client_id": client_id, - "redirect_uri": absolutify( - request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL) - ), - "state": state, - "scope": self.OIDC_RP_SCOPES, - "prompt": "none", - } - - params.update(self.get_settings("OIDC_AUTH_REQUEST_EXTRA_PARAMS", {})) - - if self.OIDC_USE_NONCE: - nonce = get_random_string(self.OIDC_NONCE_SIZE) - params.update({"nonce": nonce}) - - if self.get_settings("OIDC_USE_PKCE", False): - code_verifier_length = self.get_settings("OIDC_PKCE_CODE_VERIFIER_SIZE", 64) - # Check that code_verifier_length is between the min and max length - # defined in https://datatracker.ietf.org/doc/html/rfc7636#section-4.1 - if not (43 <= code_verifier_length <= 128): - raise ValueError("code_verifier_length must be between 43 and 128") - - # Generate code_verifier and code_challenge pair - code_verifier = get_random_string(code_verifier_length) - code_challenge_method = self.get_settings( - "OIDC_PKCE_CODE_CHALLENGE_METHOD", "S256" - ) - code_challenge = generate_code_challenge( - code_verifier, code_challenge_method - ) - - # Append code_challenge to authentication request parameters - params.update( - { - "code_challenge": code_challenge, - "code_challenge_method": code_challenge_method, - } - ) - else: - code_verifier = None - - add_state_and_verifier_and_nonce_to_session( - request, state, params, code_verifier - ) - - request.session["oidc_login_next"] = request.get_full_path() - - query = urlencode(params, quote_via=quote) - redirect_url = "{url}?{query}".format(url=auth_url, query=query) - if request.headers.get("x-requested-with") == "XMLHttpRequest": - # Almost all XHR request handling in client-side code struggles - # with redirects since redirecting to a page where the user - # is supposed to do something is extremely unlikely to work - # in an XHR request. Make a special response for these kinds - # of requests. - # The use of 403 Forbidden is to match the fact that this - # middleware doesn't really want the user in if they don't - # refresh their session. - response = JsonResponse({"refresh_url": redirect_url}, status=403) - response["refresh_url"] = redirect_url - return response - - response = HttpResponseRedirect(redirect_url) - add_state_to_cookie(response, state) - - return response diff --git a/mozilla_django_oidc/views.py b/mozilla_django_oidc/views.py index 21392b39..cad83eb5 100644 --- a/mozilla_django_oidc/views.py +++ b/mozilla_django_oidc/views.py @@ -1,4 +1,3 @@ -import time import logging from urllib.parse import urlencode @@ -57,15 +56,6 @@ def login_success(self): ): auth.login(self.request, self.user) - # Figure out when this id_token will expire. This is ignored unless you're - # using the SessionRefresh middleware. - expiration_interval = self.get_settings( - "OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS", 60 * 15 - ) - self.request.session["oidc_id_token_expiration"] = ( - time.time() + expiration_interval - ) - return HttpResponseRedirect(self.success_url) def get(self, request): @@ -85,9 +75,6 @@ def get(self, request): request.session.save() # Make sure the user doesn't get to continue to be logged in - # otherwise the refresh middleware will force the user to - # redirect to authorize again if the session refresh has - # expired. if request.user.is_authenticated: auth.logout(request) assert not request.user.is_authenticated diff --git a/requirements.txt b/requirements.txt index 625e0ad8..c9883ef9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,5 @@ josepy==1.14.0 requests==2.31.0 cryptography==42.0.5 PyJWT==2.8.0 -jwcrypto==1.5.4 +jwcrypto==1.5.6 # NOTE! When updating versions here, also update setup.py and tox.ini diff --git a/setup.py b/setup.py index 85f90209..0463f393 100755 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ "requests == 2.31.0", "cryptography == 42.0.4", "pyjwt == 2.8.0", - "jwcrypto == 1.5.4", + "jwcrypto == 1.5.6", ] setup( diff --git a/tests/test_auth.py b/tests/test_auth.py index 55a4e4d6..363f5ee5 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -15,24 +15,11 @@ from josepy.b64 import b64encode from josepy.jwa import ES256 -from mozilla_django_oidc.auth import OIDCAuthenticationBackend, default_username_algo +from mozilla_django_oidc.auth import OIDCAuthenticationBackend User = get_user_model() -class DefaultUsernameAlgoTestCase(TestCase): - def run_test(self, data, expected): - actual = default_username_algo(data) - self.assertEqual(actual, expected) - self.assertEqual(type(actual), type(expected)) - - def test_empty(self): - self.run_test("", "2jmj7l5rSw0yVb_vlWAYkK_YBwk") - - def test_email(self): - self.run_test("janet@example.com", "VUCUpl08JVpFeAFKBYkAjLhsQ1c") - - class OIDCAuthenticationBackendTestCase(TestCase): """Authentication tests.""" @@ -264,7 +251,6 @@ def test_successful_authentication_existing_user_namespaced( request_mock.post.assert_called_once_with( "https://server.example.com/token", data=post_data, - auth=None, verify=True, timeout=None, proxies=None, @@ -313,7 +299,6 @@ def test_successful_authentication_existing_user(self, token_mock, request_mock) request_mock.post.assert_called_once_with( "https://server.example.com/token", data=post_data, - auth=None, verify=True, timeout=None, proxies=None, @@ -366,7 +351,6 @@ def test_successful_authentication_existing_user_upper_case( request_mock.post.assert_called_once_with( "https://server.example.com/token", data=post_data, - auth=None, verify=True, timeout=None, proxies=None, @@ -420,7 +404,6 @@ def test_failed_authentication_verify_claims( request_mock.post.assert_called_once_with( "https://server.example.com/token", data=post_data, - auth=None, verify=True, timeout=None, proxies=None, @@ -475,78 +458,10 @@ def test_successful_authentication_new_user( request_mock.post.assert_called_once_with( "https://server.example.com/token", data=post_data, - auth=None, - verify=True, - timeout=None, - proxies=None, - ) - request_mock.get.assert_called_once_with( - "https://server.example.com/user", - headers={"Authorization": "Bearer access_granted"}, verify=True, timeout=None, proxies=None, ) - - @override_settings(OIDC_TOKEN_USE_BASIC_AUTH=True) - @override_settings(OIDC_STORE_ACCESS_TOKEN=True) - @override_settings(OIDC_STORE_ID_TOKEN=True) - @patch("mozilla_django_oidc.auth.requests") - @patch("mozilla_django_oidc.auth.OIDCAuthenticationBackend.verify_token") - def test_successful_authentication_basic_auth_token(self, token_mock, request_mock): - """ - Test successful authentication when using HTTP basic authentication - for token endpoint authentication. - """ - auth_request = RequestFactory().get("/foo", {"code": "foo", "state": "bar"}) - auth_request.session = {} - - user = User.objects.create_user( - username="a_username", email="EMAIL@EXAMPLE.COM" - ) - token_mock.return_value = True - get_json_mock = Mock() - get_json_mock.json.return_value = { - "nickname": "a_username", - "email": "email@example.com", - } - request_mock.get.return_value = get_json_mock - post_json_mock = Mock(status_code=200) - post_json_mock.json.return_value = { - "id_token": "id_token", - "access_token": "access_granted", - } - request_mock.post.return_value = post_json_mock - - post_data = { - "client_id": "example_id", - "client_secret": "client_secret", - "grant_type": "authorization_code", - "code": "foo", - "redirect_uri": "http://testserver/callback/", - } - self.assertEqual(self.backend.authenticate(request=auth_request), user) - token_mock.assert_called_once_with("id_token", nonce=None) - - # As the auth parameter is an object, we can't compare them directly - request_mock.post.assert_called_once() - post_params = request_mock.post.call_args - _kwargs = post_params[1] - - self.assertEqual(post_params[0][0], "https://server.example.com/token") - # Test individual params separately - sent_data = _kwargs["data"] - self.assertEqual(sent_data["client_id"], post_data["client_id"]) - self.assertTrue("client_secret" not in _kwargs["data"]) - self.assertEqual(sent_data["grant_type"], post_data["grant_type"]) - self.assertEqual(sent_data["code"], post_data["code"]) - self.assertEqual(sent_data["redirect_uri"], post_data["redirect_uri"]) - - auth = _kwargs["auth"] # requests.auth.HTTPBasicAuth - self.assertEqual(auth.username, "example_id") - self.assertEqual(auth.password, "client_secret") - self.assertEqual(_kwargs["verify"], True) - request_mock.get.assert_called_once_with( "https://server.example.com/user", headers={"Authorization": "Bearer access_granted"}, @@ -554,10 +469,6 @@ def test_successful_authentication_basic_auth_token(self, token_mock, request_mo timeout=None, proxies=None, ) - self.assertEqual(auth_request.session.get("oidc_id_token"), "id_token") - self.assertEqual( - auth_request.session.get("oidc_access_token"), "access_granted" - ) @override_settings(OIDC_OP_CLIENT_AUTH_METHOD="private_key_jwt") @override_settings(OIDC_RP_CLIENT_SECRET=CLIENT_PRIVATE_KEY) @@ -572,7 +483,7 @@ def test_successful_authentication_basic_auth_token(self, token_mock, request_mo @patch("mozilla_django_oidc.auth.OIDCAuthenticationBackend.verify_token") def test_successful_authentication_private_key_jwt(self, token_mock, request_mock): """ - Test successful authentication when using HTTP basic authentication + Test successful authentication when using private_key_jwt for token endpoint authentication. """ self.backend = OIDCAuthenticationBackend() diff --git a/tests/test_contrib_drf.py b/tests/test_contrib_drf.py deleted file mode 100644 index 9cb405b3..00000000 --- a/tests/test_contrib_drf.py +++ /dev/null @@ -1,41 +0,0 @@ -import unittest.mock - -from django.core.exceptions import SuspiciousOperation -from django.test import RequestFactory, TestCase, override_settings -from rest_framework import exceptions - -from mozilla_django_oidc.contrib.drf import OIDCAuthentication - - -class TestDRF(TestCase): - @override_settings(OIDC_OP_TOKEN_ENDPOINT="https://server.example.com/token") - @override_settings(OIDC_OP_USER_ENDPOINT="https://server.example.com/user") - @override_settings(OIDC_RP_CLIENT_ID="example_id") - @override_settings(OIDC_RP_CLIENT_SECRET="client_secret") - def setUp(self): - self.auth = OIDCAuthentication(backend=unittest.mock.Mock()) - self.request = RequestFactory().get("/", HTTP_AUTHORIZATION="Bearer faketoken") - - def test_authenticate_returns_none_if_no_access_token(self): - with unittest.mock.patch.object( - self.auth, "get_access_token", return_value=None - ): - ret = self.auth.authenticate(self.request) - self.assertIsNone(ret) - - def test_authenticate_raises_authenticationfailed_if_backend_returns_no_user(self): - self.auth.backend.get_or_create_user.return_value = None - with self.assertRaises(exceptions.AuthenticationFailed): - self.auth.authenticate(self.request) - - def test_authenticate_raises_authenticationfailed_on_suspiciousoperation(self): - self.auth.backend.get_or_create_user.side_effect = SuspiciousOperation - with self.assertRaises(exceptions.AuthenticationFailed): - self.auth.authenticate(self.request) - - def test_returns_user_and_token_if_backend_returns_user(self): - user = unittest.mock.Mock() - self.auth.backend.get_or_create_user.return_value = user - ret = self.auth.authenticate(self.request) - self.assertEqual(ret[0], user) - self.assertEqual(ret[1], "faketoken") diff --git a/tests/test_middleware.py b/tests/test_middleware.py deleted file mode 100644 index 1dc6f3ee..00000000 --- a/tests/test_middleware.py +++ /dev/null @@ -1,493 +0,0 @@ -import json -import re -import time -from urllib.parse import parse_qs - -from django.contrib.auth import get_user_model -from django.contrib.auth.models import AnonymousUser -from django.contrib.auth.signals import user_logged_out -from django.core.cache import cache -from django.dispatch import receiver -from django.http import HttpResponse -from django.test import Client, RequestFactory, TestCase, override_settings -from django.test.client import ClientHandler -from django.urls import path -from unittest.mock import MagicMock, patch - -from mozilla_django_oidc.middleware import SessionRefresh -from mozilla_django_oidc.urls import urlpatterns as orig_urlpatterns - -User = get_user_model() - - -@override_settings(OIDC_OP_AUTHORIZATION_ENDPOINT="http://example.com/authorize") -@override_settings(OIDC_RP_CLIENT_ID="foo") -@override_settings(OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS=120) -@patch("mozilla_django_oidc.middleware.get_random_string") -class SessionRefreshTokenMiddlewareTestCase(TestCase): - def setUp(self): - self.factory = RequestFactory() - self.middleware = SessionRefresh(MagicMock) - self.user = User.objects.create_user("example_username") - - def test_anonymous(self, mock_middleware_random): - request = self.factory.get("/foo") - request.session = {} - request.user = AnonymousUser() - response = self.middleware.process_request(request) - self.assertTrue(not response) - - def test_is_oidc_path(self, mock_middleware_random): - request = self.factory.get("/oidc/callback/") - request.user = AnonymousUser() - request.session = {} - response = self.middleware.process_request(request) - self.assertTrue(not response) - - def test_is_POST(self, mock_middleware_random): - request = self.factory.post("/foo") - request.user = AnonymousUser() - request.session = {} - response = self.middleware.process_request(request) - self.assertTrue(not response) - - def test_is_ajax(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - request = self.factory.get("/foo", HTTP_X_REQUESTED_WITH="XMLHttpRequest") - request.session = {} - request.user = self.user - - response = self.middleware.process_request(request) - self.assertEqual(response.status_code, 403) - # The URL to go to is available both as a header and as a key - # in the JSON response. - self.assertTrue(response["refresh_url"]) - url, qs = response["refresh_url"].split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - } - self.assertEqual(expected_query, parse_qs(qs)) - json_payload = json.loads(response.content.decode("utf-8")) - self.assertEqual(json_payload["refresh_url"], response["refresh_url"]) - - @override_settings(OIDC_USE_PKCE=True) - def test_is_ajax_with_pkce(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - request = self.factory.get("/foo", HTTP_X_REQUESTED_WITH="XMLHttpRequest") - request.session = {} - request.user = self.user - - response = self.middleware.process_request(request) - self.assertEqual(response.status_code, 403) - # The URL to go to is available both as a header and as a key - # in the JSON response. - self.assertTrue(response["refresh_url"]) - url, qs = response["refresh_url"].split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - "code_challenge_method": ["S256"], - "code_challenge": ["m8yog7rVNdOd7hYIoUg6yl5mk_IYauWdSIBUjoPJHB0"], - } - self.assertEqual(expected_query, parse_qs(qs)) - json_payload = json.loads(response.content.decode("utf-8")) - self.assertEqual(json_payload["refresh_url"], response["refresh_url"]) - - def test_no_oidc_token_expiration_forces_renewal(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - request = self.factory.get("/foo") - request.user = self.user - request.session = {} - - response = self.middleware.process_request(request) - - self.assertEqual(response.status_code, 302) - url, qs = response.url.split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - } - self.assertEqual(expected_query, parse_qs(qs)) - - @override_settings(OIDC_USE_PKCE=True) - def test_no_oidc_token_expiration_forces_renewal_with_pkce( - self, mock_middleware_random - ): - mock_middleware_random.return_value = "examplestring" - - request = self.factory.get("/foo") - request.user = self.user - request.session = {} - - response = self.middleware.process_request(request) - - self.assertEqual(response.status_code, 302) - url, qs = response.url.split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - "code_challenge_method": ["S256"], - "code_challenge": ["m8yog7rVNdOd7hYIoUg6yl5mk_IYauWdSIBUjoPJHB0"], - } - self.assertEqual(expected_query, parse_qs(qs)) - - def test_expired_token_forces_renewal(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - request = self.factory.get("/foo") - request.user = self.user - request.session = {"oidc_id_token_expiration": time.time() - 10} - - response = self.middleware.process_request(request) - - self.assertEqual(response.status_code, 302) - url, qs = response.url.split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - } - self.assertEqual(expected_query, parse_qs(qs)) - - @override_settings(OIDC_USE_PKCE=True) - def test_expired_token_forces_renewal_with_pkce(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - request = self.factory.get("/foo") - request.user = self.user - request.session = {"oidc_id_token_expiration": time.time() - 10} - - response = self.middleware.process_request(request) - - self.assertEqual(response.status_code, 302) - url, qs = response.url.split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - "code_challenge_method": ["S256"], - "code_challenge": ["m8yog7rVNdOd7hYIoUg6yl5mk_IYauWdSIBUjoPJHB0"], - } - self.assertEqual(expected_query, parse_qs(qs)) - - -# This adds a "home page" we can test against. -def fakeview(req): - return HttpResponse("Win!") - - -urlpatterns = list(orig_urlpatterns) + [ - path("mdo_fake_view/", fakeview, name="mdo_fake_view") -] - - -def override_middleware(fun): - classes = [ - "django.contrib.sessions.middleware.SessionMiddleware", - "mozilla_django_oidc.middleware.SessionRefresh", - ] - return override_settings(MIDDLEWARE=classes)(fun) - - -class UserifiedClientHandler(ClientHandler): - """Enhances ClientHandler to "work" with users properly""" - - def __init__(self, *args, **kwargs): - self.user = kwargs.pop("user") - super(UserifiedClientHandler, self).__init__(*args, **kwargs) - - def get_response(self, req): - req.user = self.user - return super(UserifiedClientHandler, self).get_response(req) - - -class ClientWithUser(Client): - """Enhances Client to "work" with users properly""" - - def __init__(self, enforce_csrf_checks=False, **defaults): - # Start off with the AnonymousUser - self.user = AnonymousUser() - # Get this because we need to create a new UserifiedClientHandler later - self.enforce_csrf_checks = enforce_csrf_checks - super(ClientWithUser, self).__init__(**defaults) - # Stomp on the ClientHandler with one that correctly makes request.user - # the AnonymousUser - self.handler = UserifiedClientHandler(enforce_csrf_checks, user=self.user) - - def login(self, **credentials): - from django.contrib.auth import authenticate - - # Try to authenticate and throw an exception if that fails; also, this gets - # the user instance that was authenticated with - user = authenticate(**credentials) - if not user: - # Client lets you fail authentication without providing any helpful - # messages; we throw an exception because silent failure is - # unhelpful - raise Exception("Unable to authenticate with %r" % credentials) - - ret = super(ClientWithUser, self).login(**credentials) - if not ret: - raise Exception("Login failed") - - # Stash the user object it used and rebuild the UserifiedClientHandler - self.user = user - self.handler = UserifiedClientHandler(self.enforce_csrf_checks, user=self.user) - return ret - - -@override_settings(OIDC_RP_CLIENT_ID="foo") -@override_settings(OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS=120) -@override_settings(OIDC_OP_AUTHORIZATION_ENDPOINT="http://example.com/authorize") -@override_settings(ROOT_URLCONF="tests.test_middleware") -@override_middleware -class MiddlewareTestCase(TestCase): - """These tests test the middleware as part of the request/response cycle""" - - def setUp(self): - self.factory = RequestFactory() - self.user = User.objects.create_user( - username="example_username", password="password" - ) - cache.clear() - - @override_settings(OIDC_EXEMPT_URLS=["mdo_fake_view"]) - def test_get_exempt_urls_setting_view_name(self): - middleware = SessionRefresh(MagicMock()) - self.assertEqual( - sorted(list(middleware.exempt_urls)), - ["/authenticate/", "/callback/", "/logout/", "/mdo_fake_view/"], - ) - - @override_settings(OIDC_EXEMPT_URLS=["/foo/"]) - def test_get_exempt_urls_setting_url_path(self): - middleware = SessionRefresh(MagicMock()) - self.assertEqual( - sorted(list(middleware.exempt_urls)), - ["/authenticate/", "/callback/", "/foo/", "/logout/"], - ) - - def test_is_refreshable_url(self): - request = self.factory.get("/mdo_fake_view/") - request.user = self.user - request.session = dict() - middleware = SessionRefresh(MagicMock()) - assert middleware.is_refreshable_url(request) - - @override_settings(OIDC_EXEMPT_URLS=["mdo_fake_view"]) - def test_is_not_refreshable_url_exempt_view_name(self): - request = self.factory.get("/mdo_fake_view/") - request.user = self.user - request.session = dict() - middleware = SessionRefresh(MagicMock()) - assert not middleware.is_refreshable_url(request) - - @override_settings(OIDC_EXEMPT_URLS=["/mdo_fake_view/"]) - def test_is_not_refreshable_url_exempt_path(self): - request = self.factory.get("/mdo_fake_view/") - request.user = self.user - request.session = dict() - middleware = SessionRefresh(MagicMock()) - assert not middleware.is_refreshable_url(request) - - @override_settings(OIDC_EXEMPT_URLS=[re.compile(r"^/mdo_.*_view/$")]) - def test_is_not_refreshable_url_exempt_pattern(self): - request = self.factory.get("/mdo_fake_view/") - request.user = self.user - request.session = dict() - middleware = SessionRefresh(MagicMock()) - assert not middleware.is_refreshable_url(request) - - def test_anonymous(self): - client = ClientWithUser() - resp = client.get("/mdo_fake_view/") - self.assertEqual(resp.status_code, 200) - - @override_settings(OIDC_OP_AUTHORIZATION_ENDPOINT="http://example.com/authorize") - @override_settings(OIDC_RP_CLIENT_ID="foo") - @override_settings(OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS=120) - def test_authenticated_user(self): - client = ClientWithUser() - client.login(username=self.user.username, password="password") - - # Set the expiration to some time in the future so this user is valid - session = client.session - session["oidc_id_token_expiration"] = time.time() + 100 - session.save() - - resp = client.get("/mdo_fake_view/") - self.assertEqual(resp.status_code, 200) - - @override_settings(OIDC_OP_AUTHORIZATION_ENDPOINT="http://example.com/authorize") - @override_settings(OIDC_RP_CLIENT_ID="foo") - @override_settings(OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS=120) - @patch("mozilla_django_oidc.middleware.get_random_string") - def test_expired_token_redirects_to_sso(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - client = ClientWithUser() - client.login(username=self.user.username, password="password") - - # Set expiration to some time in the past - session = client.session - session["oidc_id_token_expiration"] = time.time() - 100 - session["_auth_user_backend"] = ( - "mozilla_django_oidc.auth.OIDCAuthenticationBackend" - ) - session.save() - - resp = client.get("/mdo_fake_view/") - self.assertEqual(resp.status_code, 302) - - url, qs = resp.url.split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - } - self.assertEqual(expected_query, parse_qs(qs)) - - @override_settings(OIDC_OP_AUTHORIZATION_ENDPOINT="http://example.com/authorize") - @override_settings(OIDC_RP_CLIENT_ID="foo") - @override_settings(OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS=120) - @override_settings(OIDC_USE_PKCE=True) - @patch("mozilla_django_oidc.middleware.get_random_string") - def test_expired_token_redirects_to_sso_with_pkce(self, mock_middleware_random): - mock_middleware_random.return_value = "examplestring" - - client = ClientWithUser() - client.login(username=self.user.username, password="password") - - # Set expiration to some time in the past - session = client.session - session["oidc_id_token_expiration"] = time.time() - 100 - session["_auth_user_backend"] = ( - "mozilla_django_oidc.auth.OIDCAuthenticationBackend" - ) - session.save() - - resp = client.get("/mdo_fake_view/") - self.assertEqual(resp.status_code, 302) - - url, qs = resp.url.split("?") - self.assertEqual(url, "http://example.com/authorize") - expected_query = { - "response_type": ["code"], - "redirect_uri": ["http://testserver/callback/"], - "client_id": ["foo"], - "nonce": ["examplestring"], - "prompt": ["none"], - "scope": ["openid email"], - "state": ["examplestring"], - "code_challenge_method": ["S256"], - "code_challenge": ["m8yog7rVNdOd7hYIoUg6yl5mk_IYauWdSIBUjoPJHB0"], - } - self.assertEqual(expected_query, parse_qs(qs)) - - @override_settings(OIDC_OP_AUTHORIZATION_ENDPOINT="http://example.com/authorize") - @override_settings(OIDC_RP_CLIENT_ID="foo") - @override_settings(OIDC_RENEW_ID_TOKEN_EXPIRY_SECONDS=120) - @patch("mozilla_django_oidc.middleware.get_random_string") - def test_refresh_fails_for_already_signed_in_user(self, mock_random_string): - mock_random_string.return_value = "examplestring" - - # Mutable to log which users get logged out. - logged_out_users = [] - - # Register a signal on 'user_logged_out' so we can - # update 'logged_out_users'. - @receiver(user_logged_out) - def logged_out(sender, user=None, **kwargs): - logged_out_users.append(user) - - client = ClientWithUser() - # First confirm that the home page is a public page. - resp = client.get("/") - # At least security doesn't kick you out. - self.assertEqual(resp.status_code, 404) - # Also check that this page doesn't force you to redirect - # to authenticate. - resp = client.get("/mdo_fake_view/") - self.assertEqual(resp.status_code, 200) - client.login(username=self.user.username, password="password") - - # Set expiration to some time in the past - session = client.session - session["oidc_id_token_expiration"] = time.time() - 100 - session["_auth_user_backend"] = ( - "mozilla_django_oidc.auth.OIDCAuthenticationBackend" - ) - session.save() - - # Confirm that now you're forced to authenticate again. - resp = client.get("/mdo_fake_view/") - self.assertEqual(resp.status_code, 302) - self.assertTrue( - "http://example.com/authorize" in resp.url and "prompt=none" in resp.url - ) - # Now suppose the user goes there and something goes wrong. - # For example, the user might have become "blocked" or the 2FA - # verficiation has expired and needs to be done again. - resp = client.get( - "/callback/", - { - "error": "login_required", - "error_description": "Multifactor authentication required", - }, - ) - self.assertEqual(resp.status_code, 302) - self.assertEqual(resp.url, "/") - - # Since the user in 'client' doesn't change, we have to use other - # queues to assert that the user got logged out properly. - - # The session gets flushed when you get signed out. - # This is the only decent way to know the user lost all - # request.session and - self.assertTrue(not client.session.items()) - - # The signal we registered should have fired for this user. - self.assertEqual(client.user, logged_out_users[0])