From 84ac6a2fe989c3782d8c7d36e7ab207a3b3d746d Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Thu, 30 Jan 2025 14:54:40 -0500 Subject: [PATCH] Enhance replicate delete policy fixes: #5214 --- CHANGES/5214.feature | 4 + .../migrations/0128_upstreampulp_policy.py | 18 ++ pulpcore/app/models/replica.py | 16 ++ pulpcore/app/replica.py | 37 +++- pulpcore/app/serializers/replica.py | 8 + pulpcore/app/tasks/replica.py | 4 + .../tests/functional/api/test_replication.py | 171 +++++++++++++++--- 7 files changed, 224 insertions(+), 34 deletions(-) create mode 100644 CHANGES/5214.feature create mode 100644 pulpcore/app/migrations/0128_upstreampulp_policy.py diff --git a/CHANGES/5214.feature b/CHANGES/5214.feature new file mode 100644 index 00000000000..7c14e7404f8 --- /dev/null +++ b/CHANGES/5214.feature @@ -0,0 +1,4 @@ +Added new field `policy` to UpstreamPulp that decides how Replicate manages local objects within the domain. + +Replicate will now copy the upstream's `pulp_labels` on downstream objects. Also, replicate will now +label the downstream objects created with the UpstreamPulp they came from. diff --git a/pulpcore/app/migrations/0128_upstreampulp_policy.py b/pulpcore/app/migrations/0128_upstreampulp_policy.py new file mode 100644 index 00000000000..e610bae2af0 --- /dev/null +++ b/pulpcore/app/migrations/0128_upstreampulp_policy.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.16 on 2025-02-18 18:51 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0127_remove_upstreampulp_pulp_label_select'), + ] + + operations = [ + migrations.AddField( + model_name='upstreampulp', + name='policy', + field=models.TextField(choices=[('all', 'Replicate manages ALL local objects within the domain.'), ('labeled', 'Replicate will only manage the objects created from a previous replication, unlabled local objects will be untouched.'), ('nodelete', 'Replicate will not delete any local object whether they were created by replication or not.')], default='all'), + ), + ] diff --git a/pulpcore/app/models/replica.py b/pulpcore/app/models/replica.py index 8af3f07db66..9180f5004f3 100644 --- a/pulpcore/app/models/replica.py +++ b/pulpcore/app/models/replica.py @@ -11,6 +11,21 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin): + ALL = "all" + LABELED = "labeled" + NODELETE = "nodelete" + POLICY_CHOICES = ( + (ALL, "Replicate manages ALL local objects within the domain."), + ( + LABELED, + "Replicate will only manage the objects created from a previous replication, unlabled local objects will be untouched.", + ), + ( + NODELETE, + "Replicate will not delete any local object whether they were created by replication or not.", + ), + ) + name = models.TextField(db_index=True) pulp_domain = models.ForeignKey("Domain", default=get_domain_pk, on_delete=models.PROTECT) @@ -27,6 +42,7 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin): password = EncryptedTextField(null=True) q_select = models.TextField(null=True) + policy = models.TextField(choices=POLICY_CHOICES, default=ALL) last_replication = models.DateTimeField(null=True) diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 0f547f9e25c..7f236abd6cc 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -5,6 +5,7 @@ from urllib.parse import urljoin from pulp_glue.common.context import PulpContext +from pulpcore.app.models import UpstreamPulp from pulpcore.tasking.tasks import dispatch from pulpcore.app.tasks.base import ( general_update, @@ -92,6 +93,11 @@ def upstream_distributions(self, q=None): break offset += list_size + def labels(self, upstream_object): + upstream_labels = getattr(upstream_object, "pulp_labels", {}) + upstream_labels["UpstreamPulp"] = str(self.server.pk) + return upstream_labels + def url(self, upstream_distribution): return upstream_distribution["base_url"] @@ -106,8 +112,7 @@ def create_or_update_remote(self, upstream_distribution): url = self.url(upstream_distribution) if url.startswith("/"): url = urljoin(self.server.base_url, url) - - remote_fields_dict = {"url": url} + remote_fields_dict = {"url": url, "pulp_labels": self.labels(upstream_distribution)} remote_fields_dict.update(self.tls_settings) remote_fields_dict.update(self.remote_extra_fields(upstream_distribution)) @@ -116,6 +121,8 @@ def create_or_update_remote(self, upstream_distribution): remote = self.remote_model_cls.objects.get( name=upstream_distribution["name"], pulp_domain=self.domain ) + if self.server.POLICY==UpstreamPulp.LABELED and remote.pulp_labels.get("UpstreamPulp") != str(self.server.pk): + return None needs_update = self.needs_update(remote_fields_dict, remote) if needs_update: dispatch( @@ -136,11 +143,14 @@ def repository_extra_fields(self, remote): return {} def create_or_update_repository(self, remote): + repo_fields_dict = self.repository_extra_fields(remote) + repo_fields_dict["pulp_labels"] = self.labels(remote) try: repository = self.repository_model_cls.objects.get( name=remote.name, pulp_domain=self.domain ) - repo_fields_dict = self.repository_extra_fields(remote) + if self.server.POLICY==UpstreamPulp.LABELED and repository.pulp_labels.get("UpstreamPulp") != str(self.server.pk): + return None needs_update = self.needs_update(repo_fields_dict, repository) if needs_update: dispatch( @@ -151,9 +161,7 @@ def create_or_update_repository(self, remote): kwargs={"data": repo_fields_dict, "partial": True}, ) except self.repository_model_cls.DoesNotExist: - repository = self.repository_model_cls( - name=remote.name, **self.repository_extra_fields(remote) - ) + repository = self.repository_model_cls(name=remote.name, **repo_fields_dict) repository.save() return repository @@ -169,10 +177,13 @@ def distribution_extra_fields(self, repository, upstream_distribution): def create_or_update_distribution(self, repository, upstream_distribution): distribution_data = self.distribution_extra_fields(repository, upstream_distribution) + distribution_data["pulp_labels"] = self.labels(upstream_distribution) try: distro = self.distribution_model_cls.objects.get( name=upstream_distribution["name"], pulp_domain=self.domain ) + if self.server.POLICY==UpstreamPulp.LABELED and distro.pulp_labels.get("UpstreamPulp") != str(self.server.pk): + return None needs_update = self.needs_update(distribution_data, distro) if needs_update: # Update the distribution @@ -223,12 +234,18 @@ def sync(self, repository, remote): ) def remove_missing(self, names): + if self.server.policy == UpstreamPulp.NODELETE: + return + if self.server.policy == UpstreamPulp.LABELED: + label_filter = {"pulp_labels__contains": {"UpstreamPulp": str(self.server.pk)}} + else: + label_filter = {} # Remove all distributions with names not present in the list of names # Perform this in an extra task, because we hold a big lock here. distribution_ids = [ (distribution.pk, self.app_label, self.distribution_serializer_name) for distribution in self.distribution_model_cls.objects.filter( - pulp_domain=self.domain + pulp_domain=self.domain, **label_filter ).exclude(name__in=names) ] if distribution_ids: @@ -242,7 +259,7 @@ def remove_missing(self, names): # Remove all the repositories and remotes of the missing distributions repositories = list( self.repository_model_cls.objects.filter( - pulp_domain=self.domain, user_hidden=False + pulp_domain=self.domain, user_hidden=False, **label_filter ).exclude(name__in=names) ) repository_ids = [ @@ -250,7 +267,9 @@ def remove_missing(self, names): ] remotes = list( - self.remote_model_cls.objects.filter(pulp_domain=self.domain).exclude(name__in=names) + self.remote_model_cls.objects.filter(pulp_domain=self.domain, **label_filter).exclude( + name__in=names + ) ) remote_ids = [ (remote.pk, self.app_label, self.remote_serializer_name) for remote in remotes diff --git a/pulpcore/app/serializers/replica.py b/pulpcore/app/serializers/replica.py index 614392d238b..fbdfe05bc20 100644 --- a/pulpcore/app/serializers/replica.py +++ b/pulpcore/app/serializers/replica.py @@ -91,6 +91,13 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin): read_only=True, ) + policy = serializers.ChoiceField( + choices=UpstreamPulp.POLICY_CHOICES, + default="all", + help_text=_("Policy for how replicate will manage the local objects within the domain."), + required=False, + ) + def validate_q_select(self, value): """Ensure we have a valid q_select expression.""" from pulpcore.app.viewsets import DistributionFilter @@ -116,4 +123,5 @@ class Meta: "hidden_fields", "q_select", "last_replication", + "policy", ) diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index e3acc2ee84d..f80f3addcc5 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -85,6 +85,10 @@ def replicate_distributions(server_pk): continue # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) + if not repository: + # No update occured because server.policy==LABELED and there was + # an already existing local repository with the same name + continue # Dispatch a sync task if needed if replicator.requires_syncing(distro): diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index 8e98c3bfea6..0c344045b8c 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -56,6 +56,7 @@ def test_replication_idempotence( file_publication_factory, file_repository_factory, tmp_path, + add_domain_objects_to_cleanup, ): # This test assures that an Upstream Pulp can be created in a non-default domain and that this # Upstream Pulp configuration can be used to execute the replicate task. @@ -78,7 +79,9 @@ def test_replication_idempotence( publication = file_publication_factory( pulp_domain=source_domain.name, repository=repository.pulp_href ) - file_distribution_factory(pulp_domain=source_domain.name, publication=publication.pulp_href) + distro = file_distribution_factory( + pulp_domain=source_domain.name, publication=publication.pulp_href + ) # Create a domain as replica replica_domain = domain_factory() @@ -98,15 +101,7 @@ def test_replication_idempotence( # Run the replicate task and assert that all tasks successfully complete. response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) monitor_task_group(response.task_group) - - for api_client in ( - file_bindings.DistributionsFileApi, - file_bindings.RemotesFileApi, - file_bindings.RepositoriesFileApi, - ): - result = api_client.list(pulp_domain=replica_domain.name) - for item in result.results: - add_to_cleanup(api_client, item) + objects = add_domain_objects_to_cleanup(replica_domain) for api_client in ( file_bindings.DistributionsFileApi, @@ -116,6 +111,10 @@ def test_replication_idempotence( ): result = api_client.list(pulp_domain=replica_domain.name) assert result.count == 1 + # Test that each new object (besides Content) has a source UpstreamPulp label + for obj in objects: + assert "UpstreamPulp" in obj.pulp_labels + assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"] # Now replicate backwards @@ -127,21 +126,33 @@ def test_replication_idempotence( "username": bindings_cfg.username, "password": bindings_cfg.password, } - upstream_pulp = gen_object_with_cleanup( + upstream_pulp2 = gen_object_with_cleanup( pulpcore_bindings.UpstreamPulpsApi, upstream_pulp_body, pulp_domain=source_domain.name ) # Run the replicate task and assert that all tasks successfully complete. - response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp2.pulp_href) monitor_task_group(response.task_group) + objects = add_domain_objects_to_cleanup(source_domain) + # Replicating backwards will create a new repository (deleting the old) + new remote, + # but use the same distribution + result = file_bindings.RepositoriesFileApi.list(pulp_domain=source_domain.name) + assert result.count == 1 + new_repository = result.results[0] + assert new_repository.pulp_href != repository.pulp_href + assert new_repository.name == distro.name - for api_client in ( - file_bindings.DistributionsFileApi, - file_bindings.RemotesFileApi, - file_bindings.RepositoriesFileApi, - ): - result = api_client.list(pulp_domain=replica_domain.name) - for item in result.results: - add_to_cleanup(api_client, item) + result = file_bindings.DistributionsFileApi.list(pulp_domain=source_domain.name) + assert result.count == 1 + assert result.results[0].pulp_href == distro.pulp_href + assert result.results[0].repository == new_repository.pulp_href + assert result.results[0].publication is None + + result = file_bindings.RemotesFileApi.list(pulp_domain=source_domain.name) + assert result.count == 1 + # Test that each replicate object (besides Content) now has a new UpstreamPulp label + for obj in objects: + assert "UpstreamPulp" in obj.pulp_labels + assert upstream_pulp2.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"] @pytest.mark.parallel @@ -319,7 +330,9 @@ def test_replication_optimization( @pytest.fixture -def check_replication(pulpcore_bindings, file_bindings, monitor_task_group): +def check_replication( + pulpcore_bindings, file_bindings, monitor_task_group, add_domain_objects_to_cleanup +): def _check_replication( upstream_pulp, upstream_distribution, @@ -330,6 +343,7 @@ def _check_replication( response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) # check if the replication succeeded task_group = monitor_task_group(response.task_group) + add_domain_objects_to_cleanup(local_domain) for task in task_group.tasks: assert task.state == "completed" @@ -403,7 +417,7 @@ def _try_action(user, client, action, outcome, *args, **kwargs): response = action_api(*args, **kwargs) if isinstance(response, tuple): # old bindings - data, status, _ = response + data, status_code, _ = response else: # new bindings data = response.data @@ -483,17 +497,18 @@ def populate_upstream( write_3_iso_file_fixture_data_factory, monitor_task, ): - def _populate_upstream(number): + def _populate_upstream(number, prefix=""): upstream_domain = domain_factory() tasks = [] for i in range(number): repo = file_repository_factory(pulp_domain=upstream_domain.name, autopublish=True) - fix = write_3_iso_file_fixture_data_factory(str(i)) + name = f"{prefix}{i}" + fix = write_3_iso_file_fixture_data_factory(name) remote = file_remote_factory(pulp_domain=upstream_domain.name, manifest_path=fix) body = {"remote": remote.pulp_href} tasks.append(file_bindings.RepositoriesFileApi.sync(repo.pulp_href, body).task) file_distribution_factory( - name=str(i), + name=name, pulp_domain=upstream_domain.name, repository=repo.pulp_href, pulp_labels={"upstream": str(i), "even" if i % 2 == 0 else "odd": ""}, @@ -514,6 +529,7 @@ def test_replicate_with_basic_q_select( monitor_task_group, pulp_settings, gen_object_with_cleanup, + add_domain_objects_to_cleanup, ): """Test basic label select replication.""" source_domain = populate_upstream(10) @@ -532,6 +548,7 @@ def test_replicate_with_basic_q_select( # Run the replicate task and assert that all 10 repos got synced response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) monitor_task_group(response.task_group) + add_domain_objects_to_cleanup(dest_domain) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 10 @@ -572,6 +589,7 @@ def test_replicate_with_complex_q_select( monitor_task_group, pulp_settings, gen_object_with_cleanup, + add_domain_objects_to_cleanup, ): """Test complex q_select replication.""" source_domain = populate_upstream(10) @@ -591,6 +609,7 @@ def test_replicate_with_complex_q_select( # Run the replicate task and assert that two repos got synced response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) monitor_task_group(response.task_group) + add_domain_objects_to_cleanup(dest_domain) result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name) assert result.count == 2 assert {d.name for d in result.results} == {"1", "2"} @@ -616,3 +635,105 @@ def test_replicate_with_complex_q_select( pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body) assert e.value.status == 400 assert e.value.body == '{"q_select":["Syntax error in expression."]}' + + +@pytest.fixture +def add_domain_objects_to_cleanup(add_to_cleanup, file_bindings): + def _add_objects_to_cleanup(domain): + objects = [] + for api_client in ( + file_bindings.DistributionsFileApi, + file_bindings.RemotesFileApi, + file_bindings.RepositoriesFileApi, + ): + result = api_client.list(pulp_domain=domain.name) + for item in result.results: + objects.append(item) + add_to_cleanup(api_client, item.pulp_href) + return objects + + return _add_objects_to_cleanup + + +@pytest.mark.parallel +@pytest.mark.parametrize( + "policy,results", + [ + ("nodelete", [{"b0", "b1", "a0", "a1", "a2"}, {"b0", "b1", "a0", "a1", "a2"}]), + ("labeled", [{"b0", "b1", "a0", "a1", "a2"}, {"b0", "b1", "a0"}]), + ("all", [{"a0", "a1", "a2"}, {"a0"}]), + ], +) +def test_replicate_policy( + policy, + results, + populate_upstream, + bindings_cfg, + add_domain_objects_to_cleanup, + pulpcore_bindings, + file_bindings, + monitor_task, + monitor_task_group, + pulp_settings, + gen_object_with_cleanup, +): + """Test replicate delete_policy.""" + a_domain = populate_upstream(3, prefix="a") + b_domain = populate_upstream(2, prefix="b") + upstream_body = { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": a_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + "policy": policy, + } + upstream = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, upstream_body, pulp_domain=b_domain.name + ) + + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + monitor_task_group(response.task_group) + add_domain_objects_to_cleanup(b_domain) + + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=b_domain.name) + assert result.count == len(results[0]) + assert {r.name for r in result.results} == results[0] + + # delete a1, a2 + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=a_domain.name) + monitor_task(file_bindings.DistributionsFileApi.delete(result.results[0].pulp_href).task) + monitor_task(file_bindings.DistributionsFileApi.delete(result.results[1].pulp_href).task) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=a_domain.name) + assert result.count == 1 + assert result.results[0].name == "a0" + + # Perform second replicate and check the correct distros were deleted + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + monitor_task_group(response.task_group) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=b_domain.name) + assert result.count == len(results[1]) + assert {r.name for r in result.results} == results[1] + + # Delete a0 from upstream and remove the UpstreamPulp label from downstream a0 + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=a_domain.name) + monitor_task(file_bindings.DistributionsFileApi.delete(result.results[0].pulp_href).task) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=a_domain.name) + assert result.count == 0 + result = pulpcore_bindings.DistributionsApi.list(name="a0", pulp_domain=b_domain.name) + file_bindings.DistributionsFileApi.unset_label( + result.results[0].pulp_href, {"key": "UpstreamPulp"} + ) + + # Replicate again and check that it was managed correctly by policy + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href) + monitor_task_group(response.task_group) + result = pulpcore_bindings.DistributionsApi.list(pulp_domain=b_domain.name) + if policy in ("nodelete", "labeled"): + for distro in result.results: + assert distro.name in results[1] + if distro.name == "a0": + assert "UpstreamPulp" not in distro.pulp_labels + else: + assert result.count == 0