From bc9bc8fbaee837b9e23ac822cb55d590eb3ac95b Mon Sep 17 00:00:00 2001 From: matthewswspence Date: Tue, 14 Jan 2025 14:58:30 -0600 Subject: [PATCH] refactor transfer user function --- src/registrar/tests/test_admin.py | 9 +- src/registrar/views/transfer_user.py | 143 ++++++++++++--------------- 2 files changed, 67 insertions(+), 85 deletions(-) diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py index e38c47f74..0cc75d6e2 100644 --- a/src/registrar/tests/test_admin.py +++ b/src/registrar/tests/test_admin.py @@ -2759,7 +2759,7 @@ def test_transfer_user_transfers_user_portfolio_roles_no_error_when_duplicates(s messages.error.assert_not_called() - # @less_console_noise_decorator + @less_console_noise_decorator def test_transfer_user_transfers_domain_request_creator_and_investigator(self): """Assert that domain request fields get transferred""" domain_request = completed_domain_request(user=self.user2, name="wasteland.gov", investigator=self.user2) @@ -2776,7 +2776,6 @@ def test_transfer_user_transfers_domain_request_creator_and_investigator(self): self.assertEquals(domain_request.creator, self.user1) self.assertEquals(domain_request.investigator, self.user1) - @less_console_noise_decorator def test_transfer_user_transfers_domain_information_creator(self): """Assert that domain fields get transferred""" @@ -2867,7 +2866,7 @@ def test_transfer_user_deletes_old_user(self): with self.assertRaises(User.DoesNotExist): self.user2.refresh_from_db() - # @less_console_noise_decorator + @less_console_noise_decorator def test_transfer_user_throws_transfer_and_delete_success_messages(self): """Test that success messages for data transfer and user deletion are displayed.""" # Ensure the setup for VerifiedByStaff @@ -2892,8 +2891,8 @@ def test_transfer_user_throws_transfer_and_delete_success_messages(self): mock_success_message.assert_any_call( ANY, ( - "Data transferred successfully for the following objects: ['Changed requestor " - + 'from "Furiosa Jabassa " to "Max Rokatanski " on immortan.joe@citadel.com\']' + "Data transferred successfully for the following objects: ['Transferred requestor " + + "from Furiosa Jabassa to Max Rokatanski ']" ), ) diff --git a/src/registrar/views/transfer_user.py b/src/registrar/views/transfer_user.py index fa4c59a3f..e10a5f3fc 100644 --- a/src/registrar/views/transfer_user.py +++ b/src/registrar/views/transfer_user.py @@ -1,6 +1,6 @@ import logging from django.db import transaction -from django.db.models import Manager,ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel +from django.db.models import Manager, ForeignKey, OneToOneField, ManyToManyField, ManyToOneRel from django.shortcuts import render, get_object_or_404, redirect from django.views import View @@ -59,8 +59,7 @@ def get(self, request, user_id): return render(request, "admin/transfer_user.html", context) def post(self, request, user_id): - """This handles the transfer from selected_user to current_user then deletes selected_user. - """ + """This handles the transfer from selected_user to current_user then deletes selected_user.""" current_user = get_object_or_404(User, pk=user_id) selected_user_id = request.POST.get("selected_user") selected_user = get_object_or_404(User, pk=selected_user_id) @@ -73,7 +72,7 @@ def post(self, request, user_id): self._delete_duplicate_user_domain_roles_and_log(selected_user, current_user, change_logs) self._delete_duplicate_user_portfolio_permissions_and_log(selected_user, current_user, change_logs) - # Dynamically handle related fields + # Dynamically handle related fields self.transfer_related_fields_and_log(selected_user, current_user, change_logs) # Success message if any related objects were updated @@ -92,19 +91,20 @@ def post(self, request, user_id): logger.error(f"An error occurred during the transfer: {e}", exc_info=True) return redirect("admin:registrar_user_change", object_id=user_id) - + def _delete_duplicate_user_portfolio_permissions_and_log(self, selected_user, current_user, change_logs): """ Check and remove duplicate UserPortfolioPermission objects from the selected_user based on portfolios associated with the current_user. """ try: # Fetch portfolios associated with the current_user - current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list('portfolio_id', flat=True) + current_user_portfolios = UserPortfolioPermission.objects.filter(user=current_user).values_list( + "portfolio_id", flat=True + ) # Identify duplicates in selected_user for these portfolios - duplicates = ( - UserPortfolioPermission.objects - .filter(user=selected_user, portfolio_id__in=current_user_portfolios) + duplicates = UserPortfolioPermission.objects.filter( + user=selected_user, portfolio_id__in=current_user_portfolios ) duplicate_count = duplicates.count() @@ -115,9 +115,13 @@ def _delete_duplicate_user_portfolio_permissions_and_log(self, selected_user, cu logger.debug(f"Duplicate permissions to be removed: {duplicate_permissions}") duplicates.delete() - logger.info(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}") - change_logs.append(f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}") - + logger.info( + f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}" + ) + change_logs.append( + f"Removed {duplicate_count} duplicate UserPortfolioPermission(s) from user_id {selected_user.id} for portfolios already associated with user_id {current_user.id}" + ) + except Exception as e: logger.error(f"Failed to check and remove duplicate UserPortfolioPermissions: {e}", exc_info=True) raise @@ -130,13 +134,10 @@ def _delete_duplicate_user_domain_roles_and_log(self, selected_user, current_use try: # Fetch domains associated with the current_user - current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list('domain_id', flat=True) + current_user_domains = UserDomainRole.objects.filter(user=current_user).values_list("domain_id", flat=True) # Identify duplicates in selected_user for these domains - duplicates = ( - UserDomainRole.objects - .filter(user=selected_user, domain_id__in=current_user_domains) - ) + duplicates = UserDomainRole.objects.filter(user=selected_user, domain_id__in=current_user_domains) duplicate_count = duplicates.count() @@ -150,7 +151,7 @@ def _delete_duplicate_user_domain_roles_and_log(self, selected_user, current_use f"Removed {duplicate_count} duplicate UserDomainRole(s) from user_id {selected_user.id} " f"for domains already associated with user_id {current_user.id}" ) - + except Exception as e: logger.error(f"Failed to check and remove duplicate UserDomainRoles: {e}", exc_info=True) raise @@ -187,65 +188,48 @@ def transfer_related_fields_and_log(self, selected_user, current_user, change_lo def _handle_foreign_key(self, related_field: ForeignKey, selected_user, current_user, change_logs): related_name = related_field.get_accessor_name() - # for foreign key relationships, getattr returns a manager related_manager = getattr(selected_user, related_name, None) - if related_manager: - # get all the related objects + if related_manager.count() > 0: related_queryset = related_manager.all() for obj in related_queryset: - # set the foreign key to the current user setattr(obj, related_field.field.name, current_user) obj.save() - log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) - + self.log_change(selected_user, current_user, related_field.field.name, change_logs) + def _handle_one_to_one(self, related_field: OneToOneField, selected_user, current_user, change_logs): related_name = related_field.get_accessor_name() - # for one to one relationships, getattr returns the related object related_object = getattr(selected_user, related_name, None) if related_object: - # set the one to one field to the current user setattr(related_object, related_field.field.name, current_user) related_object.save() - log_entry = f'Transferred {related_field.field.name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) + self.log_change(selected_user, current_user, related_field.field.name, change_logs) def _handle_many_to_many(self, related_field: ManyToManyField, selected_user, current_user, change_logs): - # for many to many relationships, getattr returns a manager related_manager = getattr(selected_user, related_field.name, None) - if related_manager: - # get all the related objects + if related_manager.count() > 0: related_queryset = related_manager.all() - # add the related objects to the current user getattr(current_user, related_field.name).add(*related_queryset) - log_entry = f'Transferred {related_field.name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) + self.log_change(selected_user, current_user, related_field.name, change_logs) - def _handle_many_to_one_rel(self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str]): - """ - Handles ManyToOneRel relationships, where multiple objects relate to the User via a ForeignKey. - """ + def _handle_many_to_one_rel( + self, related_object: ManyToOneRel, selected_user: User, current_user: User, change_logs: List[str] + ): related_model = related_object.related_model related_name = related_object.field.name - # for many to one relationships, we need a queryset related_queryset = related_model.objects.filter(**{related_name: selected_user}) - for obj in related_queryset: - setattr(obj, related_name, current_user) - obj.save() - log_entry = f'Transferred {related_name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) - - def _handle_one_to_one_reverse(self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str]): - """ - Handles reverse OneToOneField relationships. - """ + + if related_queryset.count() > 0: + for obj in related_queryset: + setattr(obj, related_name, current_user) + obj.save() + self.log_change(selected_user, current_user, related_name, change_logs) + + def _handle_one_to_one_reverse( + self, related_object: OneToOneField, selected_user: User, current_user: User, change_logs: List[str] + ): related_model = related_object.related_model field_name = related_object.field.name @@ -253,41 +237,40 @@ def _handle_one_to_one_reverse(self, related_object: OneToOneField, selected_use related_instance = related_model.objects.filter(**{field_name: selected_user}).first() setattr(related_instance, field_name, current_user) related_instance.save() - log_entry = f'Transferred {field_name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) + self.log_change(selected_user, current_user, field_name, change_logs) except related_model.DoesNotExist: logger.warning(f"No related instance found for reverse OneToOneField {field_name} for {selected_user}") - - def _handle_foreign_key_reverse(self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str]): - """ - Handles reverse ForeignKey relationships. - """ + + def _handle_foreign_key_reverse( + self, related_object: ForeignKey, selected_user: User, current_user: User, change_logs: List[str] + ): related_model = related_object.related_model field_name = related_object.field.name related_queryset = related_model.objects.filter(**{field_name: selected_user}) - for obj in related_queryset: - setattr(obj, field_name, current_user) - obj.save() - log_entry = f'Transferred {field_name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) - - def _handle_many_to_many_reverse(self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str]): - """ - Handles reverse ManyToManyField relationships. - """ + + if related_queryset.count() > 0: + for obj in related_queryset: + setattr(obj, field_name, current_user) + obj.save() + self.log_change(selected_user, current_user, field_name, change_logs) + + def _handle_many_to_many_reverse( + self, related_object: ManyToManyField, selected_user: User, current_user: User, change_logs: List[str] + ): related_model = related_object.related_model field_name = related_object.field.name - related_manager = related_model.objects.filter(**{field_name: selected_user}) - if related_manager: - related_qs = related_manager.all() - getattr(current_user, field_name).add(*related_qs) - log_entry = f'Transferred {field_name} from {selected_user} to {current_user}' - logger.info(log_entry) - change_logs.append(log_entry) + related_queryset = related_model.objects.filter(**{field_name: selected_user}) + if related_queryset.count() > 0: + getattr(current_user, field_name).add(*related_queryset) + self.log_change(selected_user, current_user, field_name, change_logs) + + @classmethod + def log_change(cls, selected_user, current_user, field_name, change_logs): + log_entry = f"Transferred {field_name} from {selected_user} to {current_user}" + logger.info(log_entry) + change_logs.append(log_entry) @classmethod def get_domains(cls, user):