Skip to content

Commit

Permalink
Enhance replicate delete policy
Browse files Browse the repository at this point in the history
fixes: #5214
  • Loading branch information
gerrod3 committed Feb 18, 2025
1 parent fd6c749 commit 84ac6a2
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGES/5214.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added new field `policy` to UpstreamPulp that decides how Replicate manages local objects within the domain.

Replicate will now copy the upstream's `pulp_labels` on downstream objects. Also, replicate will now
label the downstream objects created with the UpstreamPulp they came from.
18 changes: 18 additions & 0 deletions pulpcore/app/migrations/0128_upstreampulp_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.16 on 2025-02-18 18:51

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0127_remove_upstreampulp_pulp_label_select'),
]

operations = [
migrations.AddField(
model_name='upstreampulp',
name='policy',
field=models.TextField(choices=[('all', 'Replicate manages ALL local objects within the domain.'), ('labeled', 'Replicate will only manage the objects created from a previous replication, unlabled local objects will be untouched.'), ('nodelete', 'Replicate will not delete any local object whether they were created by replication or not.')], default='all'),
),
]
16 changes: 16 additions & 0 deletions pulpcore/app/models/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@


class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):
ALL = "all"
LABELED = "labeled"
NODELETE = "nodelete"
POLICY_CHOICES = (
(ALL, "Replicate manages ALL local objects within the domain."),
(
LABELED,
"Replicate will only manage the objects created from a previous replication, unlabled local objects will be untouched.",
),
(
NODELETE,
"Replicate will not delete any local object whether they were created by replication or not.",
),
)

name = models.TextField(db_index=True)
pulp_domain = models.ForeignKey("Domain", default=get_domain_pk, on_delete=models.PROTECT)

Expand All @@ -27,6 +42,7 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):
password = EncryptedTextField(null=True)

q_select = models.TextField(null=True)
policy = models.TextField(choices=POLICY_CHOICES, default=ALL)

last_replication = models.DateTimeField(null=True)

Expand Down
37 changes: 28 additions & 9 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from urllib.parse import urljoin

from pulp_glue.common.context import PulpContext
from pulpcore.app.models import UpstreamPulp
from pulpcore.tasking.tasks import dispatch
from pulpcore.app.tasks.base import (
general_update,
Expand Down Expand Up @@ -92,6 +93,11 @@ def upstream_distributions(self, q=None):
break
offset += list_size

def labels(self, upstream_object):
upstream_labels = getattr(upstream_object, "pulp_labels", {})
upstream_labels["UpstreamPulp"] = str(self.server.pk)
return upstream_labels

def url(self, upstream_distribution):
return upstream_distribution["base_url"]

Expand All @@ -106,8 +112,7 @@ def create_or_update_remote(self, upstream_distribution):
url = self.url(upstream_distribution)
if url.startswith("/"):
url = urljoin(self.server.base_url, url)

remote_fields_dict = {"url": url}
remote_fields_dict = {"url": url, "pulp_labels": self.labels(upstream_distribution)}
remote_fields_dict.update(self.tls_settings)
remote_fields_dict.update(self.remote_extra_fields(upstream_distribution))

Expand All @@ -116,6 +121,8 @@ def create_or_update_remote(self, upstream_distribution):
remote = self.remote_model_cls.objects.get(
name=upstream_distribution["name"], pulp_domain=self.domain
)
if self.server.POLICY==UpstreamPulp.LABELED and remote.pulp_labels.get("UpstreamPulp") != str(self.server.pk):
return None
needs_update = self.needs_update(remote_fields_dict, remote)
if needs_update:
dispatch(
Expand All @@ -136,11 +143,14 @@ def repository_extra_fields(self, remote):
return {}

def create_or_update_repository(self, remote):
repo_fields_dict = self.repository_extra_fields(remote)
repo_fields_dict["pulp_labels"] = self.labels(remote)
try:
repository = self.repository_model_cls.objects.get(
name=remote.name, pulp_domain=self.domain
)
repo_fields_dict = self.repository_extra_fields(remote)
if self.server.POLICY==UpstreamPulp.LABELED and repository.pulp_labels.get("UpstreamPulp") != str(self.server.pk):
return None
needs_update = self.needs_update(repo_fields_dict, repository)
if needs_update:
dispatch(
Expand All @@ -151,9 +161,7 @@ def create_or_update_repository(self, remote):
kwargs={"data": repo_fields_dict, "partial": True},
)
except self.repository_model_cls.DoesNotExist:
repository = self.repository_model_cls(
name=remote.name, **self.repository_extra_fields(remote)
)
repository = self.repository_model_cls(name=remote.name, **repo_fields_dict)
repository.save()
return repository

Expand All @@ -169,10 +177,13 @@ def distribution_extra_fields(self, repository, upstream_distribution):

def create_or_update_distribution(self, repository, upstream_distribution):
distribution_data = self.distribution_extra_fields(repository, upstream_distribution)
distribution_data["pulp_labels"] = self.labels(upstream_distribution)
try:
distro = self.distribution_model_cls.objects.get(
name=upstream_distribution["name"], pulp_domain=self.domain
)
if self.server.POLICY==UpstreamPulp.LABELED and distro.pulp_labels.get("UpstreamPulp") != str(self.server.pk):
return None
needs_update = self.needs_update(distribution_data, distro)
if needs_update:
# Update the distribution
Expand Down Expand Up @@ -223,12 +234,18 @@ def sync(self, repository, remote):
)

def remove_missing(self, names):
if self.server.policy == UpstreamPulp.NODELETE:
return
if self.server.policy == UpstreamPulp.LABELED:
label_filter = {"pulp_labels__contains": {"UpstreamPulp": str(self.server.pk)}}
else:
label_filter = {}
# Remove all distributions with names not present in the list of names
# Perform this in an extra task, because we hold a big lock here.
distribution_ids = [
(distribution.pk, self.app_label, self.distribution_serializer_name)
for distribution in self.distribution_model_cls.objects.filter(
pulp_domain=self.domain
pulp_domain=self.domain, **label_filter
).exclude(name__in=names)
]
if distribution_ids:
Expand All @@ -242,15 +259,17 @@ def remove_missing(self, names):
# Remove all the repositories and remotes of the missing distributions
repositories = list(
self.repository_model_cls.objects.filter(
pulp_domain=self.domain, user_hidden=False
pulp_domain=self.domain, user_hidden=False, **label_filter
).exclude(name__in=names)
)
repository_ids = [
(repo.pk, self.app_label, self.repository_serializer_name) for repo in repositories
]

remotes = list(
self.remote_model_cls.objects.filter(pulp_domain=self.domain).exclude(name__in=names)
self.remote_model_cls.objects.filter(pulp_domain=self.domain, **label_filter).exclude(
name__in=names
)
)
remote_ids = [
(remote.pk, self.app_label, self.remote_serializer_name) for remote in remotes
Expand Down
8 changes: 8 additions & 0 deletions pulpcore/app/serializers/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
read_only=True,
)

policy = serializers.ChoiceField(
choices=UpstreamPulp.POLICY_CHOICES,
default="all",
help_text=_("Policy for how replicate will manage the local objects within the domain."),
required=False,
)

def validate_q_select(self, value):
"""Ensure we have a valid q_select expression."""
from pulpcore.app.viewsets import DistributionFilter
Expand All @@ -116,4 +123,5 @@ class Meta:
"hidden_fields",
"q_select",
"last_replication",
"policy",
)
4 changes: 4 additions & 0 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def replicate_distributions(server_pk):
continue
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)
if not repository:
# No update occured because server.policy==LABELED and there was
# an already existing local repository with the same name
continue

# Dispatch a sync task if needed
if replicator.requires_syncing(distro):
Expand Down
Loading

0 comments on commit 84ac6a2

Please sign in to comment.