From 1f532b6f25aaca875dd2d284ff5b17e4f46bf5c3 Mon Sep 17 00:00:00 2001 From: Mario Manno Date: Thu, 9 Jan 2025 16:03:17 +0100 Subject: [PATCH] Add jitter and resync to polling (#3151) Don't poll all GitRepos at once, within ms. If gitrepos are lost from the requeueAfter polling, resync should add them again. * Add jitter to the pollingInterval of GitRepos * Gitops controller uses shorter resync interval * GenerationChangedPredicate prevented Cache Sync to Trigger Reconciler --- charts/fleet/templates/deployment_gitjob.yaml | 4 ++ charts/fleet/values.yaml | 3 ++ .../controller/bundledeployment_controller.go | 3 ++ internal/cmd/controller/gitops/operator.go | 19 +++++-- .../gitops/reconciler/gitjob_controller.go | 15 +++++- .../controller/gitops/reconciler/predicate.go | 51 +++++++++++++++++++ .../reconciler/config_controller.go | 3 ++ .../reconciler/imagescan_controller.go | 3 ++ 8 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 internal/cmd/controller/gitops/reconciler/predicate.go diff --git a/charts/fleet/templates/deployment_gitjob.yaml b/charts/fleet/templates/deployment_gitjob.yaml index 686608d1cb..880de9c867 100644 --- a/charts/fleet/templates/deployment_gitjob.yaml +++ b/charts/fleet/templates/deployment_gitjob.yaml @@ -83,6 +83,10 @@ spec: - name: NO_PROXY value: {{ $.Values.noProxy }} {{- end }} + {{- if $.Values.gitops.syncPeriod }} + - name: GITREPO_SYNC_PERIOD + value: {{ quote $.Values.gitops.syncPeriod }} + {{- end }} {{- if $.Values.controller.reconciler.workers.gitrepo }} - name: GITREPO_RECONCILER_WORKERS value: {{ quote $.Values.controller.reconciler.workers.gitrepo }} diff --git a/charts/fleet/values.yaml b/charts/fleet/values.yaml index 59489eb74a..8307c6deb0 100644 --- a/charts/fleet/values.yaml +++ b/charts/fleet/values.yaml @@ -77,6 +77,9 @@ priorityClassName: "" gitops: enabled: true + # syncPeriod is used to pick up polling for lost gitrepo events. + # It should be larger than the largest gitrepo pollinginterval. + syncPeriod: 2h metrics: enabled: true diff --git a/internal/cmd/agent/controller/bundledeployment_controller.go b/internal/cmd/agent/controller/bundledeployment_controller.go index f659f4349f..a2d1762ad9 100644 --- a/internal/cmd/agent/controller/bundledeployment_controller.go +++ b/internal/cmd/agent/controller/bundledeployment_controller.go @@ -59,6 +59,9 @@ func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { WithEventFilter( // we do not trigger for status changes predicate.Or( + // Note: These predicates prevent cache + // syncPeriod from triggering reconcile, since + // cache sync is an Update event. predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{}, diff --git a/internal/cmd/controller/gitops/operator.go b/internal/cmd/controller/gitops/operator.go index 0475f86a39..60d0cf03bb 100644 --- a/internal/cmd/controller/gitops/operator.go +++ b/internal/cmd/controller/gitops/operator.go @@ -33,9 +33,10 @@ import ( ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") - zopts *zap.Options + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + zopts *zap.Options + defaultSyncPeriod = 10 * time.Hour ) func init() { @@ -96,6 +97,14 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error { shardIDSuffix = fmt.Sprintf("-%s", g.ShardID) } + syncPeriod := defaultSyncPeriod + if d := os.Getenv("GITREPO_SYNC_PERIOD"); d != "" { + syncPeriod, err = time.ParseDuration(d) + if err != nil { + return err + } + } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Metrics: g.setupMetrics(), @@ -105,6 +114,10 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error { LeaseDuration: leaderOpts.LeaseDuration, RenewDeadline: leaderOpts.RenewDeadline, RetryPeriod: leaderOpts.RetryPeriod, + // resync to pick up lost gitrepos + Cache: cache.Options{ + SyncPeriod: &syncPeriod, + }, }) if err != nil { diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index 5edfd45c88..b6093b8e81 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand/v2" "os" "reflect" "sort" @@ -102,8 +103,9 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.GitRepo{}, builder.WithPredicates( - // do not trigger for GitRepo status changes (except for commit changes) + // do not trigger for GitRepo status changes (except for commit changes and cache sync) predicate.Or( + TypedResourceVersionUnchangedPredicate[client.Object]{}, predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{}, @@ -189,6 +191,7 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr result := reconcile.Result{} if repoPolled { result = reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)} + result.RequeueAfter = addJitter(result.RequeueAfter) } res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled, result) @@ -208,6 +211,16 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return result, nil } +// addJitter to the requeue time to avoid thundering herd +// generate a random number between -10% and +10% of the duration +func addJitter(d time.Duration) time.Duration { + if d <= 0 { + return d + } + + return d + time.Duration(rand.Int64N(int64(d)/10)) // nolint:gosec // gosec G404 false positive, not used for crypto +} + // manageGitJob is responsible for creating, updating and deleting the GitJob and setting the GitRepo's status accordingly func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool, oldResult reconcile.Result) (reconcile.Result, error) { name := types.NamespacedName{Namespace: gitrepo.Namespace, Name: gitrepo.Name} diff --git a/internal/cmd/controller/gitops/reconciler/predicate.go b/internal/cmd/controller/gitops/reconciler/predicate.go new file mode 100644 index 0000000000..864648c965 --- /dev/null +++ b/internal/cmd/controller/gitops/reconciler/predicate.go @@ -0,0 +1,51 @@ +package reconciler + +import ( + "reflect" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// TypedResourceVersionUnchangedPredicate implements a update predicate to +// allow syncPeriod to trigger the reconciler +type TypedResourceVersionUnchangedPredicate[T metav1.Object] struct { + predicate.TypedFuncs[T] +} + +func isNil(arg any) bool { + if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr || + v.Kind() == reflect.Interface || + v.Kind() == reflect.Slice || + v.Kind() == reflect.Map || + v.Kind() == reflect.Chan || + v.Kind() == reflect.Func) && v.IsNil()) { + return true + } + return false +} + +func (TypedResourceVersionUnchangedPredicate[T]) Create(e event.CreateEvent) bool { + return false +} + +func (TypedResourceVersionUnchangedPredicate[T]) Delete(e event.DeleteEvent) bool { + return false +} + +// Update implements default UpdateEvent filter for validating resource version change. +func (TypedResourceVersionUnchangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { + if isNil(e.ObjectOld) { + return false + } + if isNil(e.ObjectNew) { + return false + } + + return e.ObjectNew.GetResourceVersion() == e.ObjectOld.GetResourceVersion() +} + +func (TypedResourceVersionUnchangedPredicate[T]) Generic(e event.GenericEvent) bool { + return false +} diff --git a/internal/cmd/controller/reconciler/config_controller.go b/internal/cmd/controller/reconciler/config_controller.go index e1efa47e2e..c7a56ef135 100644 --- a/internal/cmd/controller/reconciler/config_controller.go +++ b/internal/cmd/controller/reconciler/config_controller.go @@ -58,6 +58,9 @@ func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { object.GetName() == config.ManagerConfigName }), predicate.Or( + // Note: These predicates prevent cache + // syncPeriod from triggering reconcile, since + // cache sync is an Update event. predicate.ResourceVersionChangedPredicate{}, predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, diff --git a/internal/cmd/controller/reconciler/imagescan_controller.go b/internal/cmd/controller/reconciler/imagescan_controller.go index e99ec053ad..535d1ca2b5 100644 --- a/internal/cmd/controller/reconciler/imagescan_controller.go +++ b/internal/cmd/controller/reconciler/imagescan_controller.go @@ -40,6 +40,9 @@ func (r *ImageScanReconciler) SetupWithManager(mgr ctrl.Manager) error { predicate.And( sharding.FilterByShardID(r.ShardID), predicate.Or( + // Note: These predicates prevent cache + // syncPeriod from triggering reconcile, since + // cache sync is an Update event. predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{}, predicate.LabelChangedPredicate{},