Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Demonstrate new inventory client interface #652

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions pkg/apply/task/inv_add_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/inventory2"
"sigs.k8s.io/cli-utils/pkg/object"
)

Expand All @@ -30,12 +31,13 @@ var (
// before the actual object is applied.
type InvAddTask struct {
TaskName string
InvClient inventory.Client
InvClient inventory2.Client
DynamicClient dynamic.Interface
Mapper meta.RESTMapper
InvInfo inventory.Info
Objects object.UnstructuredSet
DryRun common.DryRunStrategy
StatusPolicy inventory.StatusPolicy
}

func (i *InvAddTask) Name() string {
Expand Down Expand Up @@ -71,11 +73,43 @@ func (i *InvAddTask) Start(taskContext *taskrunner.TaskContext) {
}
klog.V(4).Infof("merging %d local objects into inventory", len(i.Objects))
currentObjs := object.UnstructuredSetToObjMetadataSet(i.Objects)
_, err := i.InvClient.Merge(i.InvInfo, currentObjs, i.DryRun)
err := i.extendInventory(currentObjs)
i.sendTaskResult(taskContext, err)
}()
}

// extendInventory adds the specified objects to the inventory, if not already
// present.
func (i *InvAddTask) extendInventory(objs object.ObjMetadataSet) error {
if len(objs) == 0 {
return nil
}
id := inventory2.ID{
Name: i.InvInfo.Name(),
Namespace: i.InvInfo.Namespace(),
}
inv, err := i.InvClient.Get(context.TODO(), id)
if err != nil {
return fmt.Errorf("getting inventory: %w")
}

oldObjs := inventory.ObjMetadataSetFromObjectReferenceList(inv.Spec.Objects)
newObjs := oldObjs.Union(objs)
inv.Spec.Objects = inventory.ObjectReferenceListFromObjMetadataSet(newObjs)

if err = i.InvClient.Update(context.TODO(), inv, i.updateOptionList()...); err != nil {
return fmt.Errorf("updating inventory: %w")
}
return nil
}

func (i *InvAddTask) updateOptionList() []inventory2.UpdateOption {
return []inventory2.UpdateOption{
inventory2.WithDryRun(i.DryRun),
inventory2.WithStatus(i.StatusPolicy),
}
}

// Cancel is not supported by the InvAddTask.
func (i *InvAddTask) Cancel(_ *taskrunner.TaskContext) {}

Expand Down
65 changes: 51 additions & 14 deletions pkg/apply/task/inv_set_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,29 @@
package task

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/inventory2"
"sigs.k8s.io/cli-utils/pkg/object"
)

// DeleteOrUpdateInvTask encapsulates structures necessary to set the
// inventory references at the end of the apply/prune.
type DeleteOrUpdateInvTask struct {
TaskName string
InvClient inventory.Client
InvClient inventory2.Client
InvInfo inventory.Info
PrevInventory object.ObjMetadataSet
DryRun common.DryRunStrategy
StatusPolicy inventory.StatusPolicy
// if Destroy is set, the inventory will be deleted if all objects were successfully pruned
Destroy bool
}
Expand All @@ -47,12 +53,14 @@ func (i *DeleteOrUpdateInvTask) Identifiers() object.ObjMetadataSet {
// If Destroy is false, the inventory will be updated.
func (i *DeleteOrUpdateInvTask) Start(taskContext *taskrunner.TaskContext) {
go func() {
klog.V(2).Infof("inventory set task starting (name: %q)", i.TaskName)
var err error
if i.Destroy && i.destroySuccessful(taskContext) {
err = i.deleteInventory()
} else {
err = i.updateInventory(taskContext)
}
klog.V(2).Infof("inventory set task completing (name: %q)", i.TaskName)
taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err}
}()
}
Expand Down Expand Up @@ -84,7 +92,16 @@ func (i *DeleteOrUpdateInvTask) StatusUpdate(_ *taskrunner.TaskContext, _ object
// - Deleted resources (successful)
// - Abandoned resources (successful)
func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskContext) error {
klog.V(2).Infof("inventory set task starting (name: %q)", i.TaskName)
id := inventory2.ID{
Name: i.InvInfo.Name(),
Namespace: i.InvInfo.Namespace(),
}
inv, err := i.InvClient.Get(context.TODO(), id)
if err != nil {
return fmt.Errorf("getting inventory: %w")
}
prevObjs := inventory.ObjMetadataSetFromObjectReferenceList(inv.Spec.Objects).Unique()

invObjs := object.ObjMetadataSet{}

// TODO: Just use InventoryManager.Store()
Expand All @@ -100,7 +117,7 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont
// This will remove new resources that failed to apply from the inventory,
// because even tho they were added by InvAddTask, the PrevInventory
// represents the inventory before the pipeline has run.
applyFailures := i.PrevInventory.Intersection(im.FailedApplies())
applyFailures := prevObjs.Intersection(im.FailedApplies())
klog.V(4).Infof("keep in inventory %d failed applies", len(applyFailures))
invObjs = invObjs.Union(applyFailures)

Expand All @@ -109,7 +126,7 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont
// It's likely that all the skipped applies are already in the inventory,
// because the apply filters all currently depend on cluster state,
// but we're doing the intersection anyway just to be sure.
applySkips := i.PrevInventory.Intersection(im.SkippedApplies())
applySkips := prevObjs.Intersection(im.SkippedApplies())
klog.V(4).Infof("keep in inventory %d skipped applies", len(applySkips))
invObjs = invObjs.Union(applySkips)

Expand All @@ -118,7 +135,7 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont
// It's likely that all the delete failures are already in the inventory,
// because the set of resources to prune comes from the inventory,
// but we're doing the intersection anyway just to be sure.
pruneFailures := i.PrevInventory.Intersection(im.FailedDeletes())
pruneFailures := prevObjs.Intersection(im.FailedDeletes())
klog.V(4).Infof("set inventory %d failed prunes", len(pruneFailures))
invObjs = invObjs.Union(pruneFailures)

Expand All @@ -127,19 +144,19 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont
// It's likely that all the skipped deletes are already in the inventory,
// because the set of resources to prune comes from the inventory,
// but we're doing the intersection anyway just to be sure.
pruneSkips := i.PrevInventory.Intersection(im.SkippedDeletes())
pruneSkips := prevObjs.Intersection(im.SkippedDeletes())
klog.V(4).Infof("keep in inventory %d skipped prunes", len(pruneSkips))
invObjs = invObjs.Union(pruneSkips)

// If an object failed to reconcile and was previously stored in the inventory,
// then keep it in the inventory so it can be waited on next time.
reconcileFailures := i.PrevInventory.Intersection(im.FailedReconciles())
reconcileFailures := prevObjs.Intersection(im.FailedReconciles())
klog.V(4).Infof("set inventory %d failed reconciles", len(reconcileFailures))
invObjs = invObjs.Union(reconcileFailures)

// If an object timed out reconciling and was previously stored in the inventory,
// then keep it in the inventory so it can be waited on next time.
reconcileTimeouts := i.PrevInventory.Intersection(im.TimeoutReconciles())
reconcileTimeouts := prevObjs.Intersection(im.TimeoutReconciles())
klog.V(4).Infof("keep in inventory %d timeout reconciles", len(reconcileTimeouts))
invObjs = invObjs.Union(reconcileTimeouts)

Expand All @@ -150,24 +167,44 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont

// If an object is invalid and was previously stored in the inventory,
// then keep it in the inventory so it can be applied/pruned next time.
invalidObjects := i.PrevInventory.Intersection(taskContext.InvalidObjects())
invalidObjects := prevObjs.Intersection(taskContext.InvalidObjects())
klog.V(4).Infof("keep in inventory %d invalid objects", len(invalidObjects))
invObjs = invObjs.Union(invalidObjects)

// Update inventory spec in memory
inv.Spec.Objects = inventory.ObjectReferenceListFromObjMetadataSet(invObjs)

klog.V(4).Infof("get the apply status for %d objects", len(invObjs))
objStatus := taskContext.InventoryManager().Inventory().Status.Objects
inv.Status.Objects = taskContext.InventoryManager().Inventory().Status.Objects

klog.V(4).Infof("set inventory %d total objects", len(invObjs))
err := i.InvClient.Replace(i.InvInfo, invObjs, objStatus, i.DryRun)
if err = i.InvClient.Update(context.TODO(), inv, i.updateOptionList()...); err != nil {
return fmt.Errorf("updating inventory: %w")
}
return nil
}

klog.V(2).Infof("inventory set task completing (name: %q)", i.TaskName)
return err
func (i *DeleteOrUpdateInvTask) updateOptionList() []inventory2.UpdateOption {
return []inventory2.UpdateOption{
inventory2.WithDryRun(i.DryRun),
inventory2.WithStatus(i.StatusPolicy),
}
}

func (i *DeleteOrUpdateInvTask) deleteOptionList() []inventory2.DeleteOption {
return []inventory2.DeleteOption{
inventory2.WithDryRun(i.DryRun),
inventory2.WithStatus(i.StatusPolicy),
}
}

// deleteInventory deletes the inventory object from the cluster.
func (i *DeleteOrUpdateInvTask) deleteInventory() error {
klog.V(2).Infof("delete inventory task starting (name: %q)", i.Name())
err := i.InvClient.DeleteInventoryObj(i.InvInfo, i.DryRun)
inv := &actuation.Inventory{}
inv.SetName(i.InvInfo.Name())
inv.SetName(i.InvInfo.Namespace())
err := i.InvClient.Delete(context.TODO(), inv, i.deleteOptionList()...)
// Not found is not error, since this means it was already deleted.
if apierrors.IsNotFound(err) {
err = nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/inventory/type-conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,19 @@ func ObjMetadataFromObjectReference(ref actuation.ObjectReference) object.ObjMet
Namespace: ref.Namespace,
}
}

func ObjectReferenceListFromObjMetadataSet(ids []object.ObjMetadata) []actuation.ObjectReference {
var refs []actuation.ObjectReference
for _, ref := range ids {
refs = append(refs, ObjectReferenceFromObjMetadata(ref))
}
return refs
}

func ObjMetadataSetFromObjectReferenceList(refs []actuation.ObjectReference) object.ObjMetadataSet {
var ids object.ObjMetadataSet
for _, ref := range refs {
ids = append(ids, ObjMetadataFromObjectReference(ref))
}
return ids
}
116 changes: 116 additions & 0 deletions pkg/inventory2/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package inventory2

import (
"context"

"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ID client.ObjectKey

type Client interface {
ReadClient
WriteClient
}

type ReadClient interface {
Get(ctx context.Context, id ID, opts ...GetOption) (*actuation.Inventory, error)
List(ctx context.Context, opts ...ListOption) error
}

type WriteClient interface {
Create(ctx context.Context, inv *actuation.Inventory, opts ...CreateOption) error
Update(ctx context.Context, inv *actuation.Inventory, opts ...UpdateOption) error
Delete(ctx context.Context, inv *actuation.Inventory, opts ...DeleteOption) error
}

type CreateOption interface {
ApplyCreateOptions(opts *CreateOptions)
}

type CreateOptions struct {
DryRunStrategy common.DryRunStrategy
StatusPolicy inventory.StatusPolicy
}

type GetOption interface {
ApplyGetOptions(opts *GetOptions)
}

type GetOptions struct {
ResourceVersion string
LabelSelector string
}

type UpdateOption interface {
ApplyUpdateOptions(opts *UpdateOptions)
}

type UpdateOptions struct {
DryRunStrategy common.DryRunStrategy
StatusPolicy inventory.StatusPolicy
}

type DeleteOption interface {
ApplyDeleteOptions(opts *DeleteOptions)
}

type DeleteOptions struct {
DryRunStrategy common.DryRunStrategy
}

type ListOption interface {
ApplyListOptions(opts *ListOptions)
}

type ListOptions struct {
ResourceVersion string
LabelSelector string
}

func WithDryRun(strategy common.DryRunStrategy) DryRunOption {
return DryRunOption(strategy)
}

type DryRunOption common.DryRunStrategy

func (o DryRunOption) ApplyCreateOptions(opts *CreateOptions) {
opts.DryRunStrategy = common.DryRunStrategy(o)
}

func (o DryRunOption) ApplyUpdateOptions(opts *UpdateOptions) {
opts.DryRunStrategy = common.DryRunStrategy(o)
}

func (o DryRunOption) ApplyDeleteOptions(opts *DeleteOptions) {
opts.DryRunStrategy = common.DryRunStrategy(o)
}

var _ CreateOption = DryRunOption(common.DryRunServer)
var _ UpdateOption = DryRunOption(common.DryRunServer)
var _ DeleteOption = DryRunOption(common.DryRunServer)

func WithStatus(policy inventory.StatusPolicy) StatusOption {
return StatusOption(policy)
}

type StatusOption common.DryRunStrategy

func (o StatusOption) ApplyCreateOptions(opts *CreateOptions) {
opts.DryRunStrategy = common.DryRunStrategy(o)
}

func (o StatusOption) ApplyUpdateOptions(opts *UpdateOptions) {
opts.DryRunStrategy = common.DryRunStrategy(o)
}

func (o StatusOption) ApplyDeleteOptions(opts *DeleteOptions) {
opts.DryRunStrategy = common.DryRunStrategy(o)
}

var _ CreateOption = StatusOption(inventory.StatusPolicyAll)
var _ UpdateOption = StatusOption(inventory.StatusPolicyAll)
var _ DeleteOption = StatusOption(inventory.StatusPolicyAll)