Skip to content

Commit

Permalink
refactor: Rewrite the status updater logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rg0now committed Oct 6, 2024
1 parent efb7534 commit 7d30758
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 46 deletions.
61 changes: 46 additions & 15 deletions pkg/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"hsnlab/dcontroller/pkg/manager"
)

// StatusChannelBufferSize defines the longest backlog on the status channel.
const StatusChannelBufferSize = 64

var _ Controller = &controller{}

type Controller interface {
Expand All @@ -32,8 +35,9 @@ type Controller interface {
}

type opEntry struct {
op *operator
cancel context.CancelFunc
op *Operator
cancel context.CancelFunc
statusChan chan StatusTrigger
}

type controller struct {
Expand Down Expand Up @@ -115,14 +119,7 @@ func (c *controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
c.deleteOperator(req.NamespacedName)
}

operator, err := c.upsertOperator(&op)
if err != nil {
return reconcile.Result{}, err
}

// we pass in our own client: the operator's client may not have been started yet
if err := operator.UpdateStatus(c); err != nil {
log.Error(err, "failed to update Operator status")
if _, err := c.upsertOperator(&op); err != nil {
return reconcile.Result{}, err
}

Expand All @@ -136,10 +133,11 @@ func (c *controller) Start(ctx context.Context) error {
c.mu.Lock()
c.ctx = ctx
c.started = true
c.mu.Unlock()

for k := range c.operators {
c.startOp(k)
}
c.mu.Unlock()

return c.mgr.Start(ctx)
}
Expand All @@ -156,12 +154,24 @@ func (c *controller) startOp(key types.NamespacedName) {

c.mu.Lock()
c.operators[key] = e
statusChan := c.operators[key].statusChan
op := c.operators[key].op
c.mu.Unlock()

go func() {
// set the initial oeprator status
c.updateStatus(ctx, op)

for range statusChan {
c.log.V(4).Info("updating operator status", "name", op.name)
c.updateStatus(ctx, op)
}
}()

go e.op.Start(ctx) //nolint:errcheck
}

func (c *controller) upsertOperator(spec *opv1a1.Operator) (*operator, error) {
func (c *controller) upsertOperator(spec *opv1a1.Operator) (*Operator, error) {
key := client.ObjectKeyFromObject(spec)
c.log.V(4).Info("upserting operator", "name", key.String())

Expand All @@ -175,7 +185,7 @@ func (c *controller) upsertOperator(spec *opv1a1.Operator) (*operator, error) {
return c.addOperator(spec)
}

func (c *controller) addOperator(spec *opv1a1.Operator) (*operator, error) {
func (c *controller) addOperator(spec *opv1a1.Operator) (*Operator, error) {
c.log.V(2).Info("adding operator", "name", client.ObjectKeyFromObject(spec).String())

// disable leader-election, health-check and the metrics server on the embedded manager
Expand All @@ -195,10 +205,14 @@ func (c *controller) addOperator(spec *opv1a1.Operator) (*operator, error) {
}

key := client.ObjectKeyFromObject(spec)
operator := New(mgr, spec.GetName(), &spec.Spec, c.logger)
statusChan := make(chan StatusTrigger, StatusChannelBufferSize)
operator := New(spec.GetName(), mgr, &spec.Spec, Options{
StatusChannel: statusChan,
Logger: c.logger,
})

c.mu.Lock()
c.operators[key] = &opEntry{op: operator}
c.operators[key] = &opEntry{op: operator, statusChan: statusChan}
c.mu.Unlock()

// start the new operator if we are already running
Expand Down Expand Up @@ -230,3 +244,20 @@ func (c *controller) getOperatorEntry(key types.NamespacedName) *opEntry {
c.mu.Unlock()
return op
}

func (c *controller) updateStatus(ctx context.Context, op *Operator) {
client := c.mgr.GetClient()
spec := opv1a1.Operator{}
key := types.NamespacedName{Name: op.name}
err := client.Get(ctx, key, &spec)
if err != nil {
op.log.Error(err, "failed to Get opertor", "name", op.name)
return
}

spec.Status = op.GetStatus(spec.GetGeneration())
if err := client.Status().Update(ctx, &spec); err != nil {
op.log.Error(err, "failed to Update opearator status", "name", op.GetName())
return
}
}
91 changes: 60 additions & 31 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,54 @@ package operator
import (
"context"
"fmt"
"os"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimeManager "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/yaml"

opv1a1 "hsnlab/dcontroller/pkg/api/operator/v1alpha1"
dcontroller "hsnlab/dcontroller/pkg/controller"
)

var _ Operator = &operator{}
var _ runtimeManager.Runnable = &Operator{}

type Operator interface {
runtimeManager.Runnable
GetStatus(int64) opv1a1.OperatorStatus
type StatusTrigger = struct{}

// Options can be used to customize the Operator's behavior.
type Options struct {
// StatusChannel is a channel to receive status update triggers from the operator. Once the
// caller receives a trigger, it should call GetStatus() on the operator to read the
// current status and set it in the Operator CRD.
StatusChannel chan StatusTrigger

// Logger is a standard logger.
Logger logr.Logger
}

type operator struct {
type Operator struct {
name string
mgr runtimeManager.Manager
spec *opv1a1.OperatorSpec
controllers []*dcontroller.Controller // maybe nil
ctx context.Context
statusChan chan StatusTrigger
logger, log logr.Logger
}

// New creates a new operator.
func New(mgr runtimeManager.Manager, name string, spec *opv1a1.OperatorSpec, logger logr.Logger) *operator {
op := &operator{
func New(name string, mgr runtimeManager.Manager, spec *opv1a1.OperatorSpec, opts Options) *Operator {
logger := opts.Logger
if logger.GetSink() == nil {
logger = logr.Discard()
}

op := &Operator{
name: name,
mgr: mgr,
spec: spec,
controllers: []*dcontroller.Controller{},
statusChan: opts.StatusChannel,
logger: logger,
log: logger.WithName("operator").WithValues("name", name),
}
Expand All @@ -52,55 +67,69 @@ func New(mgr runtimeManager.Manager, name string, spec *opv1a1.OperatorSpec, log
op.log.Error(err, "failed to create controller", "controller", config.Name)
}

// the controller returned is always valid: this makes sure we will receive the
// status update triggers to show the controller errors to the user
op.controllers = append(op.controllers, c)
}

return op
}

// Load creates a new operator from a serialized operator spec.
func Load(name string, mgr runtimeManager.Manager, file string, opts Options) (*Operator, error) {
b, err := os.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
var spec opv1a1.OperatorSpec
if err := yaml.Unmarshal(b, &spec); err != nil {
return nil, fmt.Errorf("failed to parse operator spec: %w", err)
}

return New(name, mgr, &spec, opts), nil
}

// Start starts the operator. It blocks
func (op *operator) Start(ctx context.Context) error {
func (op *Operator) Start(ctx context.Context) error {
op.log.Info("starting")
op.ctx = ctx
return op.mgr.Start(ctx)
}

// GetManager returns the controller runtime manager associated with the operator.
func (op *operator) GetManager() runtimeManager.Manager {
func (op *Operator) GetManager() runtimeManager.Manager {
return op.mgr
}

// Trigger can be used to ask a status update trigger on the operator.
func (op *operator) Trigger() {
if err := op.UpdateStatus(op.mgr.GetClient()); err != nil {
op.log.Error(err, "failed to update status")
}
// GetName returns the name of the operator.
func (op *Operator) GetName() string {
return op.name
}

func (op *operator) UpdateStatus(c client.Client) error {
// Trigger can be used to ask a status update trigger on the operator.
func (op *Operator) Trigger() {
ctx := op.ctx
if ctx == nil {
// this should never happen
// we are not started yet:
ctx = context.TODO()
}

spec := opv1a1.Operator{}
key := types.NamespacedName{Name: op.name}
err := c.Get(ctx, key, &spec)
if err != nil {
return fmt.Errorf("cannot Get operator resource: %w", err)
}

spec.Status = op.GetStatus(spec.GetGeneration())
if err := c.Status().Update(ctx, &spec); err != nil {
return fmt.Errorf("cannot write status: %w", err)
}
// make this async so that we won't block the operator
go func() {
defer close(op.statusChan)

return nil
select {
case op.statusChan <- StatusTrigger{}:
op.log.V(4).Info("operator status update triggered")
return
case <-ctx.Done():
return
}
}()
}

// GetStatus populates the operator status with the controller statuses.
func (op *operator) GetStatus(gen int64) opv1a1.OperatorStatus {
func (op *Operator) GetStatus(gen int64) opv1a1.OperatorStatus {
cs := []opv1a1.ControllerStatus{}
for _, c := range op.controllers {
if c != nil {
Expand Down

0 comments on commit 7d30758

Please sign in to comment.