From db0f262c0833650a608e1773364c5721ea9016b2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 24 Jan 2025 04:59:41 +0000 Subject: [PATCH] chore(deps): Bump sigs.k8s.io/controller-runtime Bumps the production-dependencies group in /api and / with 1 update: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime). Updates `sigs.k8s.io/controller-runtime` from 0.20.0 to 0.20.1 - [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases) - [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md) - [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.20.0...v0.20.1) --- updated-dependencies: - dependency-name: sigs.k8s.io/controller-runtime dependency-type: direct:production update-type: version-update:semver-patch dependency-group: production-dependencies ... Signed-off-by: Javier Cano Cano Signed-off-by: dependabot[bot] --- api/go.mod | 2 +- api/go.sum | 4 +- api/vendor/modules.txt | 2 +- go.mod | 2 +- go.sum | 4 +- vendor/modules.txt | 2 +- .../controller-runtime/pkg/cache/cache.go | 5 +- .../pkg/controller/controller.go | 1 + .../pkg/controller/priorityqueue/metrics.go | 4 +- .../controller/priorityqueue/priorityqueue.go | 107 ++++++++++++------ .../pkg/internal/controller/controller.go | 13 ++- 11 files changed, 96 insertions(+), 50 deletions(-) diff --git a/api/go.mod b/api/go.mod index 194d6d45a..af96fc3c2 100644 --- a/api/go.mod +++ b/api/go.mod @@ -7,7 +7,7 @@ require ( k8s.io/apimachinery v0.32.1 kubevirt.io/containerized-data-importer-api v1.61.0 kubevirt.io/controller-lifecycle-operator-sdk/api v0.2.4 - sigs.k8s.io/controller-runtime v0.20.0 + sigs.k8s.io/controller-runtime v0.20.1 ) require ( diff --git a/api/go.sum b/api/go.sum index 443d3a87a..8aec0ecf6 100644 --- a/api/go.sum +++ b/api/go.sum @@ -320,8 +320,8 @@ kubevirt.io/containerized-data-importer-api v1.61.0 h1:zqgn4/ftAPRK4ljZDZcgRiW25 kubevirt.io/containerized-data-importer-api v1.61.0/go.mod h1:SDJjLGhbPyayDqAqawcGmVNapBp0KodOQvhKPLVGCQU= kubevirt.io/controller-lifecycle-operator-sdk/api v0.2.4 h1:fZYvD3/Vnitfkx6IJxjLAk8ugnZQ7CXVYcRfkSKmuZY= kubevirt.io/controller-lifecycle-operator-sdk/api v0.2.4/go.mod h1:018lASpFYBsYN6XwmA2TIrPCx6e0gviTd/ZNtSitKgc= -sigs.k8s.io/controller-runtime v0.20.0 h1:jjkMo29xEXH+02Md9qaVXfEIaMESSpy3TBWPrsfQkQs= -sigs.k8s.io/controller-runtime v0.20.0/go.mod h1:BrP3w158MwvB3ZbNpaAcIKkHQ7YGpYnzpoSTZ8E14WU= +sigs.k8s.io/controller-runtime v0.20.1 h1:JbGMAG/X94NeM3xvjenVUaBjy6Ui4Ogd/J5ZtjZnHaE= +sigs.k8s.io/controller-runtime v0.20.1/go.mod h1:BrP3w158MwvB3ZbNpaAcIKkHQ7YGpYnzpoSTZ8E14WU= sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6/go.mod h1:p4QtZmO4uMYipTQNzagwnNoseA6OxSUutVw05NhYDRs= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= diff --git a/api/vendor/modules.txt b/api/vendor/modules.txt index 15afff6ce..027b03b7c 100644 --- a/api/vendor/modules.txt +++ b/api/vendor/modules.txt @@ -97,7 +97,7 @@ kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1 # kubevirt.io/controller-lifecycle-operator-sdk/api v0.2.4 ## explicit; go 1.17 kubevirt.io/controller-lifecycle-operator-sdk/api -# sigs.k8s.io/controller-runtime v0.20.0 +# sigs.k8s.io/controller-runtime v0.20.1 ## explicit; go 1.23.0 sigs.k8s.io/controller-runtime/pkg/scheme # sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 diff --git a/go.mod b/go.mod index c84493b9f..380fba68f 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( kubevirt.io/qe-tools v0.1.8 kubevirt.io/ssp-operator/api v0.0.0 kubevirt.io/vm-console-proxy/api v0.7.0 - sigs.k8s.io/controller-runtime v0.20.0 + sigs.k8s.io/controller-runtime v0.20.1 sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index 09b54671d..aa14a0e72 100644 --- a/go.sum +++ b/go.sum @@ -870,8 +870,8 @@ kubevirt.io/vm-console-proxy/api v0.7.0/go.mod h1:7q967W73c6/Iczl3xJkOjQOASAJ2wX rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.20.0 h1:jjkMo29xEXH+02Md9qaVXfEIaMESSpy3TBWPrsfQkQs= -sigs.k8s.io/controller-runtime v0.20.0/go.mod h1:BrP3w158MwvB3ZbNpaAcIKkHQ7YGpYnzpoSTZ8E14WU= +sigs.k8s.io/controller-runtime v0.20.1 h1:JbGMAG/X94NeM3xvjenVUaBjy6Ui4Ogd/J5ZtjZnHaE= +sigs.k8s.io/controller-runtime v0.20.1/go.mod h1:BrP3w158MwvB3ZbNpaAcIKkHQ7YGpYnzpoSTZ8E14WU= sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6/go.mod h1:p4QtZmO4uMYipTQNzagwnNoseA6OxSUutVw05NhYDRs= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= diff --git a/vendor/modules.txt b/vendor/modules.txt index ed593398b..707c6ac0e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1049,7 +1049,7 @@ kubevirt.io/ssp-operator/api/v1beta3 # kubevirt.io/vm-console-proxy/api v0.7.0 ## explicit; go 1.22.4 kubevirt.io/vm-console-proxy/api/v1 -# sigs.k8s.io/controller-runtime v0.20.0 +# sigs.k8s.io/controller-runtime v0.20.1 ## explicit; go 1.23.0 sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/builder diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go index ecffe0798..8f14bfdbf 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go @@ -469,6 +469,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { } } + opts.ByObject = maps.Clone(opts.ByObject) + opts.DefaultNamespaces = maps.Clone(opts.DefaultNamespaces) for obj, byObject := range opts.ByObject { isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper) if err != nil { @@ -480,6 +482,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if isNamespaced && byObject.Namespaces == nil { byObject.Namespaces = maps.Clone(opts.DefaultNamespaces) + } else { + byObject.Namespaces = maps.Clone(byObject.Namespaces) } // Default the namespace-level configs first, because they need to use the undefaulted type-level config @@ -487,7 +491,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { for namespace, config := range byObject.Namespaces { // 1. Default from the undefaulted type-level config config = defaultConfig(config, byObjectToConfig(byObject)) - // 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but // might not have an entry for the current namespace. if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go index b7d728603..5c5b249ef 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go @@ -202,6 +202,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { + o.Log = mgr.GetLogger().WithValues("controller", controllerName) o.RateLimiter = rateLimiter }) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go index f6a2697a6..36626646f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go @@ -85,11 +85,11 @@ func (m *defaultQueueMetrics[T]) get(item T) { return } + m.depth.Dec() + m.mapLock.Lock() defer m.mapLock.Unlock() - m.depth.Dec() - m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { m.latency.Observe(m.sinceInSeconds(startTime)) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go index 2b3a8904d..996369f47 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/go-logr/logr" "github.com/google/btree" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" @@ -36,6 +37,7 @@ type Opts[T comparable] struct { // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. RateLimiter workqueue.TypedRateLimiter[T] MetricProvider workqueue.MetricsProvider + Log logr.Logger } // Opt allows to configure a PriorityQueue. @@ -57,6 +59,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } pq := &priorityqueue[T]{ + log: opts.Log, items: map[T]*item[T]{}, queue: btree.NewG(32, less[T]), becameReady: sets.Set[T]{}, @@ -75,6 +78,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } go pq.spin() + go pq.logState() if _, ok := pq.metrics.(noMetrics[T]); !ok { go pq.updateUnfinishedWorkLoop() } @@ -83,6 +87,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } type priorityqueue[T comparable] struct { + log logr.Logger // lock has to be acquired for any access any of items, queue, addedCounter // or becameReady lock sync.Mutex @@ -141,14 +146,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } if _, ok := w.items[key]; !ok { item := &item[T]{ - key: key, - addedCounter: w.addedCounter, - priority: o.Priority, - readyAt: readyAt, + Key: key, + AddedCounter: w.addedCounter, + Priority: o.Priority, + ReadyAt: readyAt, } w.items[key] = item w.queue.ReplaceOrInsert(item) - if item.readyAt == nil { + if item.ReadyAt == nil { w.metrics.add(key) } w.addedCounter++ @@ -158,15 +163,15 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // The b-tree de-duplicates based on ordering and any change here // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.priority { - item.priority = o.Priority + if o.Priority > item.Priority { + item.Priority = o.Priority } - if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { - if readyAt == nil { + if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { + if readyAt == nil && !w.becameReady.Has(key) { w.metrics.add(key) } - item.readyAt = readyAt + item.ReadyAt = readyAt } w.queue.ReplaceOrInsert(item) @@ -210,14 +215,14 @@ func (w *priorityqueue[T]) spin() { // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] w.queue.Ascend(func(item *item[T]) bool { - if item.readyAt != nil { - if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 { + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { nextReady = w.tick(readyAt) return false } - if !w.becameReady.Has(item.key) { - w.metrics.add(item.key) - w.becameReady.Insert(item.key) + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key) + w.becameReady.Insert(item.Key) } } @@ -228,16 +233,16 @@ func (w *priorityqueue[T]) spin() { } // Item is locked, we can not hand it out - if w.locked.Has(item.key) { + if w.locked.Has(item.Key) { return true } - w.metrics.get(item.key) - w.locked.Insert(item.key) + w.metrics.get(item.Key) + w.locked.Insert(item.Key) w.waiters.Add(-1) - delete(w.items, item.key) + delete(w.items, item.Key) toDelete = append(toDelete, item) - w.becameReady.Delete(item.key) + w.becameReady.Delete(item.Key) w.get <- *item return true @@ -268,7 +273,7 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.notifyItemOrWaiterAdded() item := <-w.get - return item.key, item.priority, w.shutdown.Load() + return item.Key, item.Priority, w.shutdown.Load() } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { @@ -316,7 +321,7 @@ func (w *priorityqueue[T]) Len() int { var result int w.queue.Ascend(func(item *item[T]) bool { - if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 { + if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 { result++ return true } @@ -326,36 +331,64 @@ func (w *priorityqueue[T]) Len() int { return result } +func (w *priorityqueue[T]) logState() { + t := time.Tick(10 * time.Second) + for { + select { + case <-w.done: + return + case <-t: + } + + // Log level may change at runtime, so keep the + // loop going even if a given level is currently + // not enabled. + if !w.log.V(5).Enabled() { + continue + } + w.lock.Lock() + items := make([]*item[T], 0, len(w.items)) + w.queue.Ascend(func(item *item[T]) bool { + items = append(items, item) + return true + }) + w.lock.Unlock() + + w.log.V(5).Info("workqueue_items", "items", items) + } +} + func less[T comparable](a, b *item[T]) bool { - if a.readyAt == nil && b.readyAt != nil { + if a.ReadyAt == nil && b.ReadyAt != nil { return true } - if b.readyAt == nil && a.readyAt != nil { + if b.ReadyAt == nil && a.ReadyAt != nil { return false } - if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) { - return a.readyAt.Before(*b.readyAt) + if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { + return a.ReadyAt.Before(*b.ReadyAt) } - if a.priority != b.priority { - return a.priority > b.priority + if a.Priority != b.Priority { + return a.Priority > b.Priority } - return a.addedCounter < b.addedCounter + return a.AddedCounter < b.AddedCounter } type item[T comparable] struct { - key T - addedCounter uint64 - priority int - readyAt *time.Time + Key T `json:"key"` + AddedCounter uint64 `json:"addedCounter"` + Priority int `json:"priority"` + ReadyAt *time.Time `json:"readyAt,omitempty"` } func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { - t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 - defer t.Stop() - for range t.C { - if w.shutdown.Load() { + t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 + for { + select { + case <-w.done: return + case <-t: } w.metrics.updateUnfinishedWork() } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index fda25e064..cc734dfb6 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -175,7 +175,16 @@ func (c *Controller[request]) Start(ctx context.Context) error { // caches. errGroup := &errgroup.Group{} for _, watch := range c.startWatches { - log := c.LogConstructor(nil).WithValues("source", fmt.Sprintf("%s", watch)) + log := c.LogConstructor(nil) + _, ok := watch.(interface { + String() string + }) + + if !ok { + log = log.WithValues("source", fmt.Sprintf("%T", watch)) + } else { + log = log.WithValues("source", fmt.Sprintf("%s", watch)) + } didStartSyncingSource := &atomic.Bool{} errGroup.Go(func() error { // Use a timeout for starting and syncing the source to avoid silently @@ -191,7 +200,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { sourceStartErrChan <- err return } - syncingSource, ok := watch.(source.SyncingSource) + syncingSource, ok := watch.(source.TypedSyncingSource[request]) if !ok { return }