Skip to content

Commit

Permalink
refactor: Rewrite operator status reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
rg0now committed Oct 9, 2024
1 parent 7bd9fee commit 6a05f07
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 119 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ require (
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
go.uber.org/zap v1.26.0
golang.org/x/time v0.3.0
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
k8s.io/metrics v0.31.1
sigs.k8s.io/controller-runtime v0.19.0
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -59,7 +61,6 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/metrics v0.31.1 h1:h4I4dakgh/zKflWYAOQhwf0EXaqy8LxAIyE/GBvxqRc=
k8s.io/metrics v0.31.1/go.mod h1:JuH1S9tJiH9q1VCY0yzSCawi7kzNLsDzlWDJN4xR+iA=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q=
Expand Down
51 changes: 29 additions & 22 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ type ProcessorFunc func(ctx context.Context, c *Controller, req reconciler.Reque
type Options struct {
// Processor allows to override the default request processor of the controller.
Processor ProcessorFunc
// ErrorHandler can be used to handle errors.
ErrorHandler ErrorHandler
// ErrorChannel is a channel to receive errors from the controller.
ErrorChan chan error
}

var _ runtimeManager.Runnable = &Controller{}

// Controller is a dcontroller reconciler.
type Controller struct {
*errorReporter
name, kind string
config opv1a1.Controller
sources []reconciler.Source
Expand All @@ -45,7 +46,6 @@ type Controller struct {
watcher chan reconciler.Request
pipeline pipeline.Evaluator
processor ProcessorFunc
errHandler *errorReporter
logger, log logr.Logger
}

Expand All @@ -59,29 +59,29 @@ func New(mgr runtimeManager.Manager, config opv1a1.Controller, opts Options) (*C
}

c := &Controller{
mgr: mgr,
sources: []reconciler.Source{},
config: config,
watcher: make(chan reconciler.Request, WatcherBufferSize),
errHandler: NewErrorReporter(opts.ErrorHandler),
logger: logger,
mgr: mgr,
sources: []reconciler.Source{},
config: config,
watcher: make(chan reconciler.Request, WatcherBufferSize),
errorReporter: NewErrorReporter(opts.ErrorChan),
logger: logger,
}

name := config.Name
if name == "" {
return c, c.errHandler.PushCriticalError(errors.New("invalid controller configuration: empty name"))
return c, c.PushCriticalError(errors.New("invalid controller configuration: empty name"))
}
c.name = name
c.log = logger.WithName("controller").WithValues("name", name)

// sanity check
if len(config.Sources) == 0 {
return c, c.errHandler.PushCriticalError(errors.New("invalid controller configuration: no source"))
return c, c.PushCriticalError(errors.New("invalid controller configuration: no source"))
}

emptyTarget := opv1a1.Target{}
if config.Target == emptyTarget {
return c, c.errHandler.PushCriticalError(errors.New("invalid controller configuration: no target"))
return c, c.PushCriticalError(errors.New("invalid controller configuration: no target"))
}

// opts
Expand Down Expand Up @@ -112,7 +112,7 @@ func New(mgr runtimeManager.Manager, config opv1a1.Controller, opts Options) (*C
for _, s := range c.sources {
gvk, err := s.GetGVK()
if err != nil {
return c, c.errHandler.PushCriticalError(fmt.Errorf("failed to obtain GVK for source %s: %w",
return c, c.PushCriticalError(fmt.Errorf("failed to obtain GVK for source %s: %w",
util.Stringify(s), err))
}

Expand All @@ -122,19 +122,19 @@ func New(mgr runtimeManager.Manager, config opv1a1.Controller, opts Options) (*C
Reconciler: controllerReconciler,
})
if err != nil {
return c, c.errHandler.PushCriticalError(fmt.Errorf("failed to create runtime controller "+
return c, c.PushCriticalError(fmt.Errorf("failed to create runtime controller "+
"for resource %s: %w", gvk.String(), err))
}

// Set up the watch
src, err := s.GetSource()
if err != nil {
return c, c.errHandler.PushCriticalError(fmt.Errorf("failed to create runtime source for "+
return c, c.PushCriticalError(fmt.Errorf("failed to create runtime source for "+
"resource %s: %w", gvk.String(), err))
}

if err := ctrl.Watch(src); err != nil {
return c, c.errHandler.PushCriticalError(fmt.Errorf("failed to watch resource %s: %w",
return c, c.PushCriticalError(fmt.Errorf("failed to watch resource %s: %w",
gvk.String(), err))
}

Expand All @@ -147,26 +147,33 @@ func New(mgr runtimeManager.Manager, config opv1a1.Controller, opts Options) (*C
pipeline, err := pipeline.NewPipeline(c.kind, baseviews, c.config.Pipeline,
logger.WithName("pipeline").WithValues("controller", c.name, "kind/view", c.kind))
if err != nil {
return c, c.errHandler.PushCriticalError(fmt.Errorf("failed to create pipleline for controller %s: %w",
return c, c.PushCriticalError(fmt.Errorf("failed to create pipleline for controller %s: %w",
c.name, err))
}
c.pipeline = pipeline

// Add the controller to the manager (this will automatically start it when Start is called
// on the manager, but the reconciler must still be explicitly started)
if err := mgr.Add(c); err != nil {
return c, c.errHandler.PushCriticalError(fmt.Errorf("failed to schedule controller %s: %w",
return c, c.PushCriticalError(fmt.Errorf("failed to schedule controller %s: %w",
c.name, err))
}

return c, nil
}

// GetName returns the name of the controller.
func (c *Controller) GetName() string { return c.name }

// GetWatcher returns the channel that multiplexes the requests coming from the base resources.
func (c *Controller) GetWatcher() chan reconciler.Request { return c.watcher }

// SetPipeline overrides the pipeline of the controller. Useful for adding a custom pipeline to a controller.
func (c *Controller) SetPipeline(pipeline pipeline.Evaluator) { c.pipeline = pipeline }

// ReportErrors returns a short report on the error stack of the controller.
func (c *Controller) ReportErrors() []string { return c.errorReporter.Report() }

// Start starts running the controller. The Start function blocks until the context is closed or an
// error occurs, and it will stop running when the context is closed.
func (c *Controller) Start(ctx context.Context) error {
Expand All @@ -180,7 +187,7 @@ func (c *Controller) Start(ctx context.Context) error {

if err := c.processor(ctx, c, req); err != nil {
err = fmt.Errorf("error processing watch event: %w", err)
c.log.Error(c.errHandler.PushError(err), "error", "request", req)
c.log.Error(c.PushError(err), "error", "request", req)
}
case <-ctx.Done():
c.log.V(2).Info("controller terminating")
Expand All @@ -194,7 +201,7 @@ func (c *Controller) GetStatus(gen int64) opv1a1.ControllerStatus {

var condition metav1.Condition
switch {
case c.errHandler.IsEmpty():
case c.errorReporter.IsEmpty():
condition = metav1.Condition{
Type: string(opv1a1.ControllerConditionReady),
Status: metav1.ConditionTrue,
Expand All @@ -203,7 +210,7 @@ func (c *Controller) GetStatus(gen int64) opv1a1.ControllerStatus {
Reason: string(opv1a1.ControllerReasonReady),
Message: "Controller is up and running",
}
case c.errHandler.IsCritical():
case c.errorReporter.IsCritical():
condition = metav1.Condition{
Type: string(opv1a1.ControllerConditionReady),
Status: metav1.ConditionFalse,
Expand All @@ -226,7 +233,7 @@ func (c *Controller) GetStatus(gen int64) opv1a1.ControllerStatus {
conditions := []metav1.Condition{condition}
status.Conditions = conditions

status.LastErrors = c.errHandler.Report()
status.LastErrors = c.errorReporter.Report()

return status
}
Expand Down
20 changes: 7 additions & 13 deletions pkg/controller/status_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,21 @@ const (
)

// RateLimit controls the status updater rate-limiter so that the first 3 errors will trigger an
// update per every 5 seconds.
// update per every 2 seconds.
func getDefaultRateLimiter() rate.Sometimes {
return rate.Sometimes{First: 3, Interval: 5 * time.Second}
}

// ErrorHandler is a thing that knows how to act on an errors. Typically the response is to update
// some error status.
type ErrorHandler interface {
Trigger(error)
return rate.Sometimes{First: 3, Interval: 2 * time.Second}
}

// errorReporter is the error stack implementatoin
type errorReporter struct {
errorStack []error
ratelimiter rate.Sometimes
trigger ErrorHandler
errorChan chan error
critical bool // whether a critical error has been reported
}

func NewErrorReporter(trigger ErrorHandler) *errorReporter {
return &errorReporter{errorStack: []error{}, ratelimiter: getDefaultRateLimiter(), trigger: trigger}
func NewErrorReporter(errorChan chan error) *errorReporter {
return &errorReporter{errorStack: []error{}, ratelimiter: getDefaultRateLimiter(), errorChan: errorChan}
}

func (s *errorReporter) PushError(err error) error {
Expand All @@ -52,8 +46,8 @@ func (s *errorReporter) PushCriticalError(err error) error {
func (s *errorReporter) Push(err error, critical bool) error {
// ask a status update if trigger is set
defer s.ratelimiter.Do(func() {
if s.trigger != nil {
s.trigger.Trigger(err)
if s.errorChan != nil {
s.errorChan <- err
}
})

Expand Down
38 changes: 28 additions & 10 deletions pkg/controller/status_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,65 @@ package controller
import (
"errors"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

type testTrig struct{ counter int }

func (t *testTrig) Trigger(err error) { t.counter++ }

var _ = Describe("StatusReporter", func() {
It("should be able to push errors and return the last", func() {
r := NewErrorReporter(&testTrig{0})
ch := make(chan error, 1)
r := NewErrorReporter(ch)

err := errors.New("1")
Expect(r.PushError(err)).To(Equal(err))
Expect(r.Top()).To(Equal(err))
Expect(r.Size()).To(Equal(1))
Expect(r.IsCritical()).To(BeFalse())
err2, ok := tryReadErrorChannel(ch, interval)
Expect(ok).To(BeTrue())
Expect(err2).To(Equal(err))

err = errors.New("2")
Expect(r.PushError(err)).To(Equal(err))
Expect(r.Top()).To(Equal(err))
Expect(r.Size()).To(Equal(2))
Expect(r.IsCritical()).To(BeFalse())
err2, ok = tryReadErrorChannel(ch, interval)
Expect(ok).To(BeTrue())
Expect(err2).To(Equal(err))

err = errors.New("3")
Expect(r.PushError(err)).To(Equal(err))
Expect(r.Top()).To(Equal(err))
Expect(r.Size()).To(Equal(3))
Expect(r.IsCritical()).To(BeFalse())
err2, ok = tryReadErrorChannel(ch, interval)
Expect(ok).To(BeTrue())
Expect(err2).To(Equal(err))

err = errors.New("4")
Expect(r.PushError(err)).To(Equal(err))
Expect(r.Top()).To(Equal(err))
Expect(r.Size()).To(Equal(4))
Expect(r.trigger.(*testTrig).counter).To(Equal(3))
Expect(r.IsCritical()).To(BeFalse())
_, ok = tryReadErrorChannel(ch, interval)
// the rate limiter let's only the first 3 errors through
Expect(ok).To(BeFalse())

err = errors.New("5")
Expect(r.PushCriticalError(err)).To(Equal(err))
Expect(r.Top()).To(Equal(err))
Expect(r.Size()).To(Equal(5))
Expect(r.trigger.(*testTrig).counter).To(Equal(3))
Expect(r.IsCritical()).To(BeTrue())
_, ok = tryReadErrorChannel(ch, interval)
Expect(ok).To(BeFalse())
})

It("should hold the last 10 errors", func() {
r := NewErrorReporter(&testTrig{0})
ch := make(chan error, ErrorReporterStackSize+11)
r := NewErrorReporter(ch)

errs := [ErrorReporterStackSize + 10]error{}
for i := 0; i < ErrorReporterStackSize+10; i++ {
Expand All @@ -59,7 +70,14 @@ var _ = Describe("StatusReporter", func() {
}
Expect(r.Size()).To(Equal(ErrorReporterStackSize))
Expect(r.Top()).To(Equal(errs[ErrorReporterStackSize+9]))

Expect(r.trigger.(*testTrig).counter).To(Equal(3))
})
})

func tryReadErrorChannel(ch chan error, d time.Duration) (error, bool) {
select {
case err := <-ch:
return err, true
case <-time.After(d):
return nil, false
}
}
2 changes: 1 addition & 1 deletion pkg/manager/fake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewFakeManager(opts manager.Options, objs ...client.Object) (*FakeManager,
compositeCache: compositeCache,
}, logger)

mgr, err := New(fakeRuntimeManager, nil, opts)
mgr, err := New(nil, Options{Options: opts, Manager: fakeRuntimeManager})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 6a05f07

Please sign in to comment.