Skip to content

Commit

Permalink
refactor transfer user function
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt-Spence committed Jan 14, 2025
1 parent 5fce7c7 commit bc9bc8f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 85 deletions.
9 changes: 4 additions & 5 deletions src/registrar/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2759,7 +2759,7 @@ def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(s

messages.error.assert_not_called()

# @less_console_noise_decorator
@less_console_noise_decorator
def test_transfer_user_transfers_domain_request_creator_and_investigator(self):
"""Assert that domain request fields get transferred"""
domain_request = completed_domain_request(user=self.user2, name="wasteland.gov", investigator=self.user2)
Expand All @@ -2776,7 +2776,6 @@ def test_transfer_user_transfers_domain_request_creator_and_investigator(self):
self.assertEquals(domain_request.creator, self.user1)
self.assertEquals(domain_request.investigator, self.user1)


@less_console_noise_decorator
def test_transfer_user_transfers_domain_information_creator(self):
"""Assert that domain fields get transferred"""
Expand Down Expand Up @@ -2867,7 +2866,7 @@ def test_transfer_user_deletes_old_user(self):
with self.assertRaises(User.DoesNotExist):
self.user2.refresh_from_db()

# @less_console_noise_decorator
@less_console_noise_decorator
def test_transfer_user_throws_transfer_and_delete_success_messages(self):
"""Test that success messages for data transfer and user deletion are displayed."""
# Ensure the setup for VerifiedByStaff
Expand All @@ -2892,8 +2891,8 @@ def test_transfer_user_throws_transfer_and_delete_success_messages(self):
mock_success_message.assert_any_call(
ANY,
(
"Data transferred successfully for the following objects: ['Changed requestor "
+ 'from "Furiosa Jabassa " to "Max Rokatanski " on [email protected]\']'
"Data transferred successfully for the following objects: ['Transferred requestor "
+ "from Furiosa Jabassa to Max Rokatanski ']"
),
)

Expand Down
143 changes: 63 additions & 80 deletions src/registrar/views/transfer_user.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from django.db import transaction
from django.db.models import Manager,ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel
from django.db.models import Manager, ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel

from django.shortcuts import render, get_object_or_404, redirect
from django.views import View
Expand Down Expand Up @@ -59,8 +59,7 @@ def get(self, request, user_id):
return render(request, "admin/transfer_user.html", context)

def post(self, request, user_id):
"""This handles the transfer from selected_user to current_user then deletes selected_user.
"""
"""This handles the transfer from selected_user to current_user then deletes selected_user."""
current_user = get_object_or_404(User, pk=user_id)
selected_user_id = request.POST.get("selected_user")
selected_user = get_object_or_404(User, pk=selected_user_id)
Expand All @@ -73,7 +72,7 @@ def post(self, request, user_id):
self._delete_duplicate_user_domain_roles_and_log(selected_user, current_user, change_logs)

self._delete_duplicate_user_portfolio_permissions_and_log(selected_user, current_user, change_logs)
# Dynamically handle related fields
# Dynamically handle related fields
self.transfer_related_fields_and_log(selected_user, current_user, change_logs)

# Success message if any related objects were updated
Expand All @@ -92,19 +91,20 @@ def post(self, request, user_id):
logger.error(f"An error occurred during the transfer: {e}", exc_info=True)

return redirect("admin:registrar_user_change", object_id=user_id)

def _delete_duplicate_user_portfolio_permissions_and_log(self, selected_user, current_user, change_logs):
"""
Check and remove duplicate UserPortfolioPermission objects from the selected_user based on portfolios associated with the current_user.
"""
try:
# Fetch portfolios associated with the current_user
current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list('portfolio_id', flat=True)
current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list(
"portfolio_id", flat=True
)

# Identify duplicates in selected_user for these portfolios
duplicates = (
UserPortfolioPermission.objects
.filter(user=selected_user, portfolio_id__in=current_user_portfolios)
duplicates = UserPortfolioPermission.objects.filter(
user=selected_user, portfolio_id__in=current_user_portfolios
)

duplicate_count = duplicates.count()
Expand All @@ -115,9 +115,13 @@ def _delete_duplicate_user_portfolio_permissions_and_log(self, selected_user, cu
logger.debug(f"Duplicate permissions to be removed: {duplicate_permissions}")

duplicates.delete()
logger.info(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}")
change_logs.append(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}")

logger.info(
f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}"
)
change_logs.append(
f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}"
)

except Exception as e:
logger.error(f"Failed to check and remove duplicate UserPortfolioPermissions: {e}", exc_info=True)
raise
Expand All @@ -130,13 +134,10 @@ def _delete_duplicate_user_domain_roles_and_log(self, selected_user, current_use

try:
# Fetch domains associated with the current_user
current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list('domain_id', flat=True)
current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list("domain_id", flat=True)

# Identify duplicates in selected_user for these domains
duplicates = (
UserDomainRole.objects
.filter(user=selected_user, domain_id__in=current_user_domains)
)
duplicates = UserDomainRole.objects.filter(user=selected_user, domain_id__in=current_user_domains)

duplicate_count = duplicates.count()

Expand All @@ -150,7 +151,7 @@ def _delete_duplicate_user_domain_roles_and_log(self, selected_user, current_use
f"Removed {duplicate_count} duplicate UserDomainRole(s) from user_id {selected_user.id} "
f"for domains already associated with user_id {current_user.id}"
)

except Exception as e:
logger.error(f"Failed to check and remove duplicate UserDomainRoles: {e}", exc_info=True)
raise
Expand Down Expand Up @@ -187,107 +188,89 @@ def transfer_related_fields_and_log(self, selected_user, current_user, change_lo

def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs):
related_name = related_field.get_accessor_name()
# for foreign key relationships, getattr returns a manager
related_manager = getattr(selected_user, related_name, None)

if related_manager:
# get all the related objects
if related_manager.count() > 0:
related_queryset = related_manager.all()
for obj in related_queryset:
# set the foreign key to the current user
setattr(obj, related_field.field.name, current_user)
obj.save()
log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)

self.log_change(selected_user, current_user, related_field.field.name, change_logs)

def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs):
related_name = related_field.get_accessor_name()
# for one to one relationships, getattr returns the related object
related_object = getattr(selected_user, related_name, None)

if related_object:
# set the one to one field to the current user
setattr(related_object, related_field.field.name, current_user)
related_object.save()
log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, related_field.field.name, change_logs)

def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs):
# for many to many relationships, getattr returns a manager
related_manager = getattr(selected_user, related_field.name, None)
if related_manager:
# get all the related objects
if related_manager.count() > 0:
related_queryset = related_manager.all()
# add the related objects to the current user
getattr(current_user, related_field.name).add(*related_queryset)
log_entry = f'Transferred {related_field.name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, related_field.name, change_logs)

def _handle_many_to_one_rel(self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles ManyToOneRel relationships, where multiple objects relate to the User via a ForeignKey.
"""
def _handle_many_to_one_rel(
self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
related_name = related_object.field.name

# for many to one relationships, we need a queryset
related_queryset = related_model.objects.filter(**{related_name: selected_user})
for obj in related_queryset:
setattr(obj, related_name, current_user)
obj.save()
log_entry = f'Transferred {related_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)

def _handle_one_to_one_reverse(self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse OneToOneField relationships.
"""

if related_queryset.count() > 0:
for obj in related_queryset:
setattr(obj, related_name, current_user)
obj.save()
self.log_change(selected_user, current_user, related_name, change_logs)

def _handle_one_to_one_reverse(
self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
field_name = related_object.field.name

try:
related_instance = related_model.objects.filter(**{field_name: selected_user}).first()
setattr(related_instance, field_name, current_user)
related_instance.save()
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
self.log_change(selected_user, current_user, field_name, change_logs)
except related_model.DoesNotExist:
logger.warning(f"No related instance found for reverse OneToOneField {field_name} for {selected_user}")

def _handle_foreign_key_reverse(self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse ForeignKey relationships.
"""

def _handle_foreign_key_reverse(
self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
field_name = related_object.field.name

related_queryset = related_model.objects.filter(**{field_name: selected_user})
for obj in related_queryset:
setattr(obj, field_name, current_user)
obj.save()
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)

def _handle_many_to_many_reverse(self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str]):
"""
Handles reverse ManyToManyField relationships.
"""

if related_queryset.count() > 0:
for obj in related_queryset:
setattr(obj, field_name, current_user)
obj.save()
self.log_change(selected_user, current_user, field_name, change_logs)

def _handle_many_to_many_reverse(
self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str]
):
related_model = related_object.related_model
field_name = related_object.field.name

related_manager = related_model.objects.filter(**{field_name: selected_user})
if related_manager:
related_qs = related_manager.all()
getattr(current_user, field_name).add(*related_qs)
log_entry = f'Transferred {field_name} from {selected_user} to {current_user}'
logger.info(log_entry)
change_logs.append(log_entry)
related_queryset = related_model.objects.filter(**{field_name: selected_user})
if related_queryset.count() > 0:
getattr(current_user, field_name).add(*related_queryset)
self.log_change(selected_user, current_user, field_name, change_logs)

@classmethod
def log_change(cls, selected_user, current_user, field_name, change_logs):
log_entry = f"Transferred {field_name} from {selected_user} to {current_user}"
logger.info(log_entry)
change_logs.append(log_entry)

@classmethod
def get_domains(cls, user):
Expand Down

0 comments on commit bc9bc8f

Please sign in to comment.