Skip to content

Commit

Permalink
🌱 Reconcile topology only when necessary (kubernetes-sigs#11605)
Browse files Browse the repository at this point in the history
* Reconcile topology only when necessary

* Address comments

* Allow resync for the cluster object
  • Loading branch information
fabriziopandini authored Dec 20, 2024
1 parent c8c7a95 commit 763285d
Showing 1 changed file with 113 additions and 4 deletions.
117 changes: 113 additions & 4 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@ package cluster
import (
"context"
"fmt"
"reflect"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand Down Expand Up @@ -103,8 +109,11 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "topology/cluster")
c, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.Cluster{}, builder.WithPredicates(
// Only reconcile Cluster with topology.
predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog),
// Only reconcile Cluster with topology and with changes relevant for this controller.
predicates.All(mgr.GetScheme(), predicateLog,
predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog),
clusterChangeIsRelevant(mgr.GetScheme(), predicateLog),
),
)).
Named("topology/cluster").
WatchesRawSource(r.ClusterCache.GetClusterSource("topology/cluster", func(_ context.Context, o client.Object) []ctrl.Request {
Expand All @@ -118,16 +127,17 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
Watches(
&clusterv1.MachineDeployment{},
handler.EnqueueRequestsFromMapFunc(r.machineDeploymentToCluster),
// Only trigger Cluster reconciliation if the MachineDeployment is topology owned.
// Only trigger Cluster reconciliation if the MachineDeployment is topology owned, the resource is changed, and the change is relevant.
builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog,
predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog),
predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog),
machineDeploymentChangeIsRelevant(mgr.GetScheme(), predicateLog),
)),
).
Watches(
&expv1.MachinePool{},
handler.EnqueueRequestsFromMapFunc(r.machinePoolToCluster),
// Only trigger Cluster reconciliation if the MachinePool is topology owned.
// Only trigger Cluster reconciliation if the MachinePool is topology owned, the resource is changed.
builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog,
predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog),
predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog),
Expand Down Expand Up @@ -155,6 +165,105 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
return nil
}

func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs {
dropNotRelevant := func(cluster *clusterv1.Cluster) *clusterv1.Cluster {
c := cluster.DeepCopy()
// Drop metadata fields which are impacted by not relevant changes.
c.ObjectMeta.ManagedFields = nil
c.ObjectMeta.ResourceVersion = ""

// Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this
// selectively drop changes not relevant for this controller.
c.Status.V1Beta2 = nil
return c
}

return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "ClusterChangeIsRelevant", "eventType", "update")
if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil {
log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld))
}

if e.ObjectOld.GetResourceVersion() == e.ObjectNew.GetResourceVersion() {
log.V(6).Info("Cluster resync event, allowing further processing")
return true
}

oldObj, ok := e.ObjectOld.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
oldObj = dropNotRelevant(oldObj)

newObj := e.ObjectNew.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectNew))
return false
}
newObj = dropNotRelevant(newObj)

if reflect.DeepEqual(oldObj, newObj) {
log.V(6).Info("Cluster does not have relevant changes, blocking further processing")
return false
}
log.V(6).Info("Cluster has relevant changes, allowing further processing")
return true
},
CreateFunc: func(event.CreateEvent) bool { return true },
DeleteFunc: func(event.DeleteEvent) bool { return true },
GenericFunc: func(event.GenericEvent) bool { return true },
}
}

func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs {
dropNotRelevant := func(machineDeployment *clusterv1.MachineDeployment) *clusterv1.MachineDeployment {
md := machineDeployment.DeepCopy()
// Drop metadata fields which are impacted by not relevant changes.
md.ObjectMeta.ManagedFields = nil
md.ObjectMeta.ResourceVersion = ""

// Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this
// selectively drop changes not relevant for this controller.
md.Status.V1Beta2 = nil
return md
}

return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "MachineDeploymentChangeIsRelevant", "eventType", "update")
if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil {
log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld))
}

oldObj, ok := e.ObjectOld.(*clusterv1.MachineDeployment)
if !ok {
log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
oldObj = dropNotRelevant(oldObj)

newObj := e.ObjectNew.(*clusterv1.MachineDeployment)
if !ok {
log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectNew))
return false
}
newObj = dropNotRelevant(newObj)

if reflect.DeepEqual(oldObj, newObj) {
log.V(6).Info("MachineDeployment does not have relevant changes, blocking further processing")
return false
}
log.V(6).Info("MachineDeployment has relevant changes, allowing further processing")
return true
},
CreateFunc: func(event.CreateEvent) bool { return true },
DeleteFunc: func(event.DeleteEvent) bool { return true },
GenericFunc: func(event.GenericEvent) bool { return true },
}
}

// SetupForDryRun prepares the Reconciler for a dry run execution.
func (r *Reconciler) SetupForDryRun(recorder record.EventRecorder) {
r.desiredStateGenerator = desiredstate.NewGenerator(r.Client, r.ClusterCache, r.RuntimeClient)
Expand Down

0 comments on commit 763285d

Please sign in to comment.