Skip to content

Commit

Permalink
Add jitter and resync to polling (#3151)
Browse files Browse the repository at this point in the history
Don't poll all GitRepos at once, within ms.
If gitrepos are lost from the requeueAfter polling, resync should add them
again.

* Add jitter to the pollingInterval of GitRepos
* Gitops controller uses shorter resync interval
* GenerationChangedPredicate prevented Cache Sync to Trigger Reconciler
  • Loading branch information
manno committed Jan 9, 2025
1 parent c5f21a6 commit 1f532b6
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 4 deletions.
4 changes: 4 additions & 0 deletions charts/fleet/templates/deployment_gitjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ spec:
- name: NO_PROXY
value: {{ $.Values.noProxy }}
{{- end }}
{{- if $.Values.gitops.syncPeriod }}
- name: GITREPO_SYNC_PERIOD
value: {{ quote $.Values.gitops.syncPeriod }}
{{- end }}
{{- if $.Values.controller.reconciler.workers.gitrepo }}
- name: GITREPO_RECONCILER_WORKERS
value: {{ quote $.Values.controller.reconciler.workers.gitrepo }}
Expand Down
3 changes: 3 additions & 0 deletions charts/fleet/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ priorityClassName: ""

gitops:
enabled: true
# syncPeriod is used to pick up polling for lost gitrepo events.
# It should be larger than the largest gitrepo pollinginterval.
syncPeriod: 2h

metrics:
enabled: true
Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/agent/controller/bundledeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithEventFilter(
// we do not trigger for status changes
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down
19 changes: 16 additions & 3 deletions internal/cmd/controller/gitops/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
zopts *zap.Options
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
zopts *zap.Options
defaultSyncPeriod = 10 * time.Hour
)

func init() {
Expand Down Expand Up @@ -96,6 +97,14 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error {
shardIDSuffix = fmt.Sprintf("-%s", g.ShardID)
}

syncPeriod := defaultSyncPeriod
if d := os.Getenv("GITREPO_SYNC_PERIOD"); d != "" {
syncPeriod, err = time.ParseDuration(d)
if err != nil {
return err
}
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: g.setupMetrics(),
Expand All @@ -105,6 +114,10 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error {
LeaseDuration: leaderOpts.LeaseDuration,
RenewDeadline: leaderOpts.RenewDeadline,
RetryPeriod: leaderOpts.RetryPeriod,
// resync to pick up lost gitrepos
Cache: cache.Options{
SyncPeriod: &syncPeriod,
},
})

if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion internal/cmd/controller/gitops/reconciler/gitjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"os"
"reflect"
"sort"
Expand Down Expand Up @@ -102,8 +103,9 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.GitRepo{},
builder.WithPredicates(
// do not trigger for GitRepo status changes (except for commit changes)
// do not trigger for GitRepo status changes (except for commit changes and cache sync)
predicate.Or(
TypedResourceVersionUnchangedPredicate[client.Object]{},
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down Expand Up @@ -189,6 +191,7 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
result := reconcile.Result{}
if repoPolled {
result = reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)}
result.RequeueAfter = addJitter(result.RequeueAfter)
}

res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled, result)
Expand All @@ -208,6 +211,16 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return result, nil
}

// addJitter to the requeue time to avoid thundering herd
// generate a random number between -10% and +10% of the duration
func addJitter(d time.Duration) time.Duration {
if d <= 0 {
return d
}

return d + time.Duration(rand.Int64N(int64(d)/10)) // nolint:gosec // gosec G404 false positive, not used for crypto
}

// manageGitJob is responsible for creating, updating and deleting the GitJob and setting the GitRepo's status accordingly
func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool, oldResult reconcile.Result) (reconcile.Result, error) {
name := types.NamespacedName{Namespace: gitrepo.Namespace, Name: gitrepo.Name}
Expand Down
51 changes: 51 additions & 0 deletions internal/cmd/controller/gitops/reconciler/predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package reconciler

import (
"reflect"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// TypedResourceVersionUnchangedPredicate implements a update predicate to
// allow syncPeriod to trigger the reconciler
type TypedResourceVersionUnchangedPredicate[T metav1.Object] struct {
predicate.TypedFuncs[T]
}

func isNil(arg any) bool {
if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
v.Kind() == reflect.Interface ||
v.Kind() == reflect.Slice ||
v.Kind() == reflect.Map ||
v.Kind() == reflect.Chan ||
v.Kind() == reflect.Func) && v.IsNil()) {
return true
}
return false
}

func (TypedResourceVersionUnchangedPredicate[T]) Create(e event.CreateEvent) bool {
return false
}

func (TypedResourceVersionUnchangedPredicate[T]) Delete(e event.DeleteEvent) bool {
return false
}

// Update implements default UpdateEvent filter for validating resource version change.
func (TypedResourceVersionUnchangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool {
if isNil(e.ObjectOld) {
return false
}
if isNil(e.ObjectNew) {
return false
}

return e.ObjectNew.GetResourceVersion() == e.ObjectOld.GetResourceVersion()
}

func (TypedResourceVersionUnchangedPredicate[T]) Generic(e event.GenericEvent) bool {
return false
}
3 changes: 3 additions & 0 deletions internal/cmd/controller/reconciler/config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
object.GetName() == config.ManagerConfigName
}),
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.ResourceVersionChangedPredicate{},
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/controller/reconciler/imagescan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (r *ImageScanReconciler) SetupWithManager(mgr ctrl.Manager) error {
predicate.And(
sharding.FilterByShardID(r.ShardID),
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down

0 comments on commit 1f532b6

Please sign in to comment.