Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last_used possibility and filtering on tokens #347

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
2 changes: 1 addition & 1 deletion knox/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

@admin.register(models.AuthToken)
class AuthTokenAdmin(admin.ModelAdmin):
list_display = ('digest', 'user', 'created', 'expiry',)
list_display = ('digest', 'user', 'created', 'accessed', 'expiry',)
fields = ()
raw_id_fields = ('user',)
14 changes: 13 additions & 1 deletion knox/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,23 @@ def renew_token(self, auth_token) -> None:
if delta > knox_settings.MIN_REFRESH_INTERVAL:
auth_token.save(update_fields=('expiry',))

def update_last_accessed(self, auth_token):
current_access = auth_token.accessed
# not too-fast update
if current_access is not None:
passed_time = (timezone.now() - current_access).total_seconds()
if passed_time < knox_settings.MIN_REFRESH_INTERVAL:
return False
auth_token.accessed = timezone.now()
auth_token.save(update_fields=('accessed',))
return True

def validate_user(self, auth_token):
self.update_last_accessed(auth_token)
if not auth_token.user.is_active:
raise exceptions.AuthenticationFailed(
_('User inactive or deleted.'))
return (auth_token.user, auth_token)
return auth_token.user, auth_token

def authenticate_header(self, request):
return knox_settings.AUTH_HEADER_PREFIX
Expand Down
18 changes: 18 additions & 0 deletions knox/migrations/0010_authtoken_accessed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.13 on 2024-05-13 10:09

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('knox', '0009_extend_authtoken_field'),
]

operations = [
migrations.AddField(
model_name='authtoken',
name='accessed',
field=models.DateTimeField(blank=True, null=True),
),
]
19 changes: 19 additions & 0 deletions knox/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,24 @@ def create(
user=user, expiry=expiry, **kwargs)
return instance, token

def migrate(
self,
token
):
instance = self.filter(token_key__startswith=token[:8])
if not instance.exists():
return None, None

existing_instance = instance.first()
digest = crypto.hash_token(token)

instance = super(AuthTokenManager, self).create(
token_key=token[:CONSTANTS.TOKEN_KEY_LENGTH], digest=digest,
user=existing_instance.user, expiry=existing_instance.expiry)
existing_instance.delete()

return instance, token


class AbstractAuthToken(models.Model):

Expand All @@ -44,6 +62,7 @@ class AbstractAuthToken(models.Model):
user = models.ForeignKey(User, null=False, blank=False,
related_name='auth_token_set', on_delete=models.CASCADE)
created = models.DateTimeField(auto_now_add=True)
accessed = models.DateTimeField(null=True, blank=True)
expiry = models.DateTimeField(null=True, blank=True)

class Meta:
Expand Down
11 changes: 10 additions & 1 deletion knox/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,17 @@ class LogoutAllView(APIView):
def get_post_response(self, request):
return Response(None, status=status.HTTP_204_NO_CONTENT)

def get_token_prefix(self):
return knox_settings.TOKEN_PREFIX

def post(self, request, format=None):
request.user.auth_token_set.all().delete()
# Only logout API-created sessions!
query = request.user.auth_token_set.all()
if self.get_token_prefix():
query = query.filter(token_key__startswith=self.get_token_prefix())
query.delete()
# If a flagged to remove all, do it. Use a query_param?
# request.user.auth_token_set.all().delete()
user_logged_out.send(sender=request.user.__class__,
request=request, user=request.user)
return self.get_post_response(request)
39 changes: 39 additions & 0 deletions tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import time
from datetime import datetime, timedelta
from importlib import reload

Expand Down Expand Up @@ -160,6 +161,23 @@ def test_logout_all_deletes_keys(self):
self.client.post(url, {}, format='json')
self.assertEqual(AuthToken.objects.count(), 0)

def test_logout_all_deletes_keys_prefixed(self):
self.assertEqual(AuthToken.objects.count(), 0)
for _ in range(10):
AuthToken.objects.create(self.user, prefix="OTHER_")
instance, token = AuthToken.objects.create(self.user, prefix=token_prefix)
self.assertEqual(AuthToken.objects.count(), 20)

url = reverse('knox_logoutall')
with override_settings(REST_KNOX=token_prefix_knox):
reload(views)
reload(crypto)
self.client.credentials(HTTP_AUTHORIZATION=('Token %s' % token))
self.client.post(url, {}, format='json')
reload(views)
reload(crypto)
self.assertEqual(AuthToken.objects.count(), 10)

def test_logout_all_deletes_only_targets_keys(self):
self.assertEqual(AuthToken.objects.count(), 0)
for _ in range(10):
Expand All @@ -182,6 +200,27 @@ def test_expired_tokens_login_fails(self):
self.assertEqual(response.status_code, 401)
self.assertEqual(response.data, {"detail": "Invalid token."})

def test_accessed_gets_updated(self):
self.assertEqual(AuthToken.objects.count(), 0)
instance, token = AuthToken.objects.create(user=self.user)

self.assertEqual(AuthToken.objects.first().accessed, None)
rf = APIRequestFactory()
request = rf.get('/')
request.META = {'HTTP_AUTHORIZATION': f'Token {token}'}
(self.user, auth_token) = TokenAuthentication().authenticate(request)

self.assertNotEqual(AuthToken.objects.first().accessed, None)
last_time = AuthToken.objects.first().accessed

(self.user, auth_token) = TokenAuthentication().authenticate(request)
self.assertEqual(AuthToken.objects.first().accessed, last_time)

time.sleep(knox_settings.MIN_REFRESH_INTERVAL)
(self.user, auth_token) = TokenAuthentication().authenticate(request)
self.assertNotEqual(AuthToken.objects.first().accessed, last_time)
self.assertNotEqual(AuthToken.objects.first().accessed, None)

def test_expired_tokens_deleted(self):
self.assertEqual(AuthToken.objects.count(), 0)
for _ in range(10):
Expand Down