Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement Request for registration/serializers.py #672

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
124 changes: 124 additions & 0 deletions dj_rest_auth/registration/password_validations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import re
from typing import Union, List
from dataclasses import dataclass
from enum import Enum

class PasswordStrength(Enum):
WEAK = "weak"
MEDIUM = "medium"
STRONG = "strong"

@dataclass
class PasswordValidationResult:
is_valid: bool
errors: List[str]
strength: PasswordStrength

class PasswordValidator:
def __init__(self,
min_length: int = 8,
require_uppercase: bool = True,
require_lowercase: bool = True,
require_digits: bool = True,
require_special_chars: bool = True):

self.min_length = min_length
self.require_uppercase = require_uppercase
self.require_lowercase = require_lowercase
self.require_digits = require_digits
self.require_special_chars = require_special_chars

def validate_password(self, password1, password2) -> PasswordValidationResult:
errors = []

# Basic validation
if not password1 or not password2:
errors.append("Password fields cannot be empty")
return PasswordValidationResult(False, errors, PasswordStrength.WEAK)

# Check if passwords match
if password1 != password2:
errors.append("Passwords do not match")
return PasswordValidationResult(False, errors, PasswordStrength.WEAK)

# Length check
if len(password1) < self.min_length:
errors.append(f"Password must be at least {self.min_length} characters long")

# Uppercase check
if self.require_uppercase and not any(c.isupper() for c in password1):
errors.append("Password must contain at least one uppercase letter")

# Lowercase check
if self.require_lowercase and not any(c.islower() for c in password1):
errors.append("Password must contain at least one lowercase letter")

# Digit check
if self.require_digits and not any(c.isdigit() for c in password1):
errors.append("Password must contain at least one number")

# Special character check
if self.require_special_chars:
special_chars = re.compile(r'[!@#$%^&*(),.?":{}|<>]')
if not special_chars.search(password1):
errors.append("Password must contain at least one special character")


# Sequential characters check
if self._has_sequential_chars(password1):
errors.append("Password contains sequential characters")

# Determine password strength
strength = self._calculate_strength(password1)

return PasswordValidationResult

def _has_sequential_chars(self, password: str) -> bool:
"""Check for sequential characters (e.g., 'abc', '123')"""
sequences = ('abcdefghijklmnopqrstuvwxyz', '0123456789')
lowercase_pass = password.lower()

for seq in sequences:
for i in range(len(seq) - 2):
if seq[i:i+3] in lowercase_pass:
return True
return False

def _calculate_strength(self, password: str) -> PasswordStrength:
"""Calculate password strength based on various factors"""
score = 0

# Length points (up to 5)
score += min(5, len(password) // 2)

# Character variety points
if any(c.isupper() for c in password):
score += 2
if any(c.islower() for c in password):
score += 2
if any(c.isdigit() for c in password):
score += 2
if any(not c.isalnum() for c in password):
score += 3

# Unique character points
score += min(3, len(set(password)) // 3)

match score:
case s if s >= 10:
return PasswordStrength.STRONG
case s if s >= 6:
return PasswordStrength.MEDIUM
case _:
return PasswordStrength.WEAK

# Example usage:
def validate_password(password1: str, password2: str) -> Union[str, List[str]]:
validator = PasswordValidator()
result = validator.validate_password(password1, password2)

if not result.is_valid:
return result.errors

return f"Password is valid (Strength: {result.strength.value})"

164 changes: 72 additions & 92 deletions dj_rest_auth/registration/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ class SocialLoginSerializer(serializers.Serializer):
id_token = serializers.CharField(required=False, allow_blank=True)

def _get_request(self):
request = self.context.get('request')
if not isinstance(request, HttpRequest):
request = request._request
return request
return self.context.get('request', getattr(self.context.get('request'), '_request', None))

def get_social_login(self, adapter, app, token, response):
"""
Expand Down Expand Up @@ -75,7 +72,7 @@ def set_callback_url(self, view, adapter_class):
)
except NoReverseMatch:
raise serializers.ValidationError(
_('Define callback_url in view'),
({'callback_url': _('Define callback_url in view or ensure URL name exists.')})
)

def validate(self, attrs):
Expand All @@ -99,99 +96,97 @@ def validate(self, attrs):

access_token = attrs.get('access_token')
code = attrs.get('code')
# Case 1: We received the access_token

if access_token:
tokens_to_parse = {'access_token': access_token}
token = access_token
# For sign in with apple

id_token = attrs.get('id_token')
if id_token:
tokens_to_parse['id_token'] = id_token

# Case 2: We received the authorization code
tokens_to_parse['id_token'] = id_token
elif code:
self.set_callback_url(view=view, adapter_class=adapter_class)
self.client_class = getattr(view, 'client_class', None)

if not self.client_class:
raise serializers.ValidationError(
_('Define client_class in view'),
)

provider = adapter.get_provider()
scope = provider.get_scope_from_request(request)
client = self.client_class(
request,
app.client_id,
app.secret,
adapter.access_token_method,
adapter.access_token_url,
self.callback_url,
scope,
scope_delimiter=adapter.scope_delimiter,
headers=adapter.headers,
basic_auth=adapter.basic_auth,
)
try:
token = client.get_access_token(code)
except OAuth2Error as ex:
raise serializers.ValidationError(
_('Failed to exchange code for access token')
) from ex
access_token = token['access_token']
tokens_to_parse = {'access_token': access_token}

# If available we add additional data to the dictionary
for key in ['refresh_token', 'id_token', adapter.expires_in_key]:
if key in token:
tokens_to_parse[key] = token[key]
self._handle_code_flow(view, adapter_class, adapter, app, code, request, tokens_to_parse)
else:
raise serializers.ValidationError(
_('Incorrect input. access_token or code is required.'),
)

raise serializers.ValidationError(_('Incorrect input. access_token or code is required.'))

social_token = adapter.parse_token(tokens_to_parse)
social_token.app = app

try:
if adapter.provider_id == 'google' and not code:
login = self.get_social_login(adapter, app, social_token, response={'id_token': id_token})
else:
login = self.get_social_login(adapter, app, social_token, token)
ret = complete_social_login(request, login)
login = self._attempt_login(adapter, app, social_token, code, attrs)
except HTTPError:
raise serializers.ValidationError(_('Incorrect value'))


if isinstance(ret, HttpResponseBadRequest):
raise serializers.ValidationError(ret.content)
if isinstance(login, HttpResponseBadRequest):
raise serializers.ValidationError(login.content)

if not login.is_existing:
# We have an account already signed up in a different flow
# with the same email address: raise an exception.
# This needs to be handled in the frontend. We can not just
# link up the accounts due to security constraints
if allauth_account_settings.UNIQUE_EMAIL:
# Do we have an account already with this email address?
account_exists = get_user_model().objects.filter(
email=login.user.email,
).exists()
if account_exists:
raise serializers.ValidationError(
_('User is already registered with this e-mail address.'),
)

login.lookup()
try:
login.save(request, connect=True)
except IntegrityError as ex:
raise serializers.ValidationError(
_('User is already registered with this e-mail address.'),
) from ex
self.post_signup(login, attrs)
self._new_user_registration(login, request, attrs)

attrs['user'] = login.account.user

return attrs

def _handle_code_flow(self, view, adapter_class, adapter, app, code, request, tokens_to_parse):
"""Handles the auth flow when an authorization code is provided."""
self.set_callback_url(view=view, adapter_class=adapter_class)
self.client_class = getattr(view, 'client_class', None)

if not self.client_class:
raise serializers.ValidationError(_('Define client_class in view'))

provider = adapter.get_provider()
scope = provider.get_scope_from_request(request)
client = self.client_class(
request,
app.client_id,
app.secret,
adapter.access_token_method,
adapter.access_token_url,
self.callback_url,
scope,
scope_delimiter=adapter.scope_delimiter,
headers=adapter.headers,
basic_auth=adapter.basic_auth,
)
try:
token = client.get_access_token(code)
except OAuth2Error as ex:
raise serializers.ValidationError(
('Failed to exchange code for access token')
) from ex

access_token = token['access_token']
tokens_to_parse = {'access_token': access_token}
for key in ['refresh_token', 'id_token', adapter.expires_in_key]:
if key in token:
tokens_to_parse[key] = token[key]



def _attempt_login(self, adapter, app, social_token, code, attrs):
"""Attempts to log in the user using the adapter."""
if adapter.provider_id == 'google' and not code:
return self.get_social_login(adapter, app, social_token, response={'id_token': attrs.get('id_token')})
return self.get_social_login(adapter, app, social_token, token=attrs.get('access_token'))

def _new_user_registration(self, login, request, attrs):
"""Handles user registration if the user does not exist."""
if allauth_account_settings.UNIQUE_EMAIL:
account_exists = get_user_model().objects.filter(email=login.user.email).exists()
if account_exists:
raise serializers.ValidationError(_('User is already registered with this e-mail address.'))
login.lookup()
try:
login.save(request, connect=True)
except IntegrityError as ex:
raise serializers.ValidationError(
_('User is already registered with this e-mail address.'),
) from ex
self.post_signup(login, attrs)


def post_signup(self, login, attrs):
"""
Expand Down Expand Up @@ -244,21 +239,13 @@ def validate_email(self, email):
)
return email

def validate_password1(self, password):
return get_adapter().clean_password(password)

def validate(self, data):
if data['password1'] != data['password2']:
raise serializers.ValidationError(_("The two password fields didn't match."))
return data

def custom_signup(self, request, user):
pass

def get_cleaned_data(self):
return {
'username': self.validated_data.get('username', ''),
'password1': self.validated_data.get('password1', ''),
'email': self.validated_data.get('email', ''),
}

Expand All @@ -267,13 +254,6 @@ def save(self, request):
user = adapter.new_user(request)
self.cleaned_data = self.get_cleaned_data()
user = adapter.save_user(request, user, self, commit=False)
if "password1" in self.cleaned_data:
try:
adapter.clean_password(self.cleaned_data['password1'], user=user)
except DjangoValidationError as exc:
raise serializers.ValidationError(
detail=serializers.as_serializer_error(exc)
)
user.save()
self.custom_signup(request, user)
setup_user_email(request, user, [])
Expand Down