Skip to content

Commit

Permalink
check readiness using kstatus
Browse files Browse the repository at this point in the history
This change replaces all the many functions and ways of calculating
readiness of objects into one unified way that uses kstatus.Compute() to
check if the object is in progress or current. Only the objects that are
current are considered to be ready. This takes advantage of the kstatus
compatibility of Flux's APIs and also makes sure that they remain
kstatus compatible.

The new isObjectReady() function is also aware of static/statusless
objects and knows how to check their readiness using kstatus. This
prepares the CLI for the upcoming static API objects.

All the is*Ready() functions for specific objects have been removed.

This change doesn't affect any of the existing tests results.

Introduce suspend and resume subcommands for alert-provider.

Signed-off-by: Sunny <[email protected]>
  • Loading branch information
darkowlzz committed Nov 28, 2023
1 parent f20fe76 commit db708de
Show file tree
Hide file tree
Showing 25 changed files with 468 additions and 422 deletions.
2 changes: 1 addition & 1 deletion cmd/flux/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e

logger.Waitingf("waiting for %s reconciliation", names.kind)
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isReady(kubeClient, namespacedName, object)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, object.asClientObject())); err != nil {
return err
}
logger.Successf("%s reconciliation completed", names.kind)
Expand Down
22 changes: 1 addition & 21 deletions cmd/flux/create_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,7 +132,7 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Alert reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isAlertReady(kubeClient, namespacedName, &alert)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &alert)); err != nil {
return err
}
logger.Successf("Alert %s is ready", name)
Expand Down Expand Up @@ -170,22 +169,3 @@ func upsertAlert(ctx context.Context, kubeClient client.Client,
logger.Successf("Alert updated")
return namespacedName, nil
}

func isAlertReady(kubeClient client.Client, namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, alert)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 1 addition & 21 deletions cmd/flux/create_alertprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -128,7 +127,7 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Provider reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isAlertProviderReady(kubeClient, namespacedName, &provider)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &provider)); err != nil {
return err
}

Expand Down Expand Up @@ -167,22 +166,3 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client,
logger.Successf("Provider updated")
return namespacedName, nil
}

func isAlertProviderReady(kubeClient client.Client, namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, provider)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
19 changes: 1 addition & 18 deletions cmd/flux/create_helmrelease.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -304,7 +303,7 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for HelmRelease reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isHelmReleaseReady(kubeClient, namespacedName, &helmRelease)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &helmRelease)); err != nil {
return err
}
logger.Successf("HelmRelease %s is ready", name)
Expand Down Expand Up @@ -344,22 +343,6 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client,
return namespacedName, nil
}

func isHelmReleaseReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRelease)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if helmRelease.Generation != helmRelease.Status.ObservedGeneration {
return false, nil
}

return apimeta.IsStatusConditionTrue(helmRelease.Status.Conditions, meta.ReadyCondition), nil
}
}

func validateStrategy(input string) bool {
allowedStrategy := []string{"Revision", "ChartVersion"}

Expand Down
27 changes: 1 addition & 26 deletions cmd/flux/create_kustomization.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -264,7 +263,7 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Kustomization reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isKustomizationReady(kubeClient, namespacedName, &kustomization)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &kustomization)); err != nil {
return err
}
logger.Successf("Kustomization %s is ready", name)
Expand Down Expand Up @@ -303,27 +302,3 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client,
logger.Successf("Kustomization updated")
return namespacedName, nil
}

func isKustomizationReady(kubeClient client.Client, namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, kustomization)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if kustomization.Generation != kustomization.Status.ObservedGeneration {
return false, nil
}

if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 1 addition & 21 deletions cmd/flux/create_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -140,7 +139,7 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Receiver reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &receiver)); err != nil {
return err
}
logger.Successf("Receiver %s is ready", name)
Expand Down Expand Up @@ -179,22 +178,3 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client,
logger.Successf("Receiver updated")
return namespacedName, nil
}

func isReceiverReady(kubeClient client.Client, namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, receiver)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
29 changes: 1 addition & 28 deletions cmd/flux/create_source_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"

Expand Down Expand Up @@ -205,7 +204,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Bucket source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isBucketReady(kubeClient, namespacedName, bucket)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, bucket)); err != nil {
return err
}
logger.Successf("Bucket source reconciliation completed")
Expand Down Expand Up @@ -247,29 +246,3 @@ func upsertBucket(ctx context.Context, kubeClient client.Client,
logger.Successf("Bucket source updated")
return namespacedName, nil
}

func isBucketReady(kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, bucket)
if err != nil {
return false, err
}

if c := conditions.Get(bucket, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != bucket.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
29 changes: 1 addition & 28 deletions cmd/flux/create_source_git.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/yaml"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1"

Expand Down Expand Up @@ -326,7 +325,7 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for GitRepository source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isGitRepositoryReady(kubeClient, namespacedName, &gitRepository)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &gitRepository)); err != nil {
return err
}
logger.Successf("GitRepository source reconciliation completed")
Expand Down Expand Up @@ -368,29 +367,3 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("GitRepository source updated")
return namespacedName, nil
}

func isGitRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, gitRepository)
if err != nil {
return false, err
}

if c := conditions.Get(gitRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != gitRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
11 changes: 11 additions & 0 deletions cmd/flux/create_source_git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,21 @@ func TestCreateSourceGit(t *testing.T) {
Time: time.Now(),
},
}
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"Failed",
command,
assertError("failed message"),
func(repo *sourcev1.GitRepository) {
stalledCondition := metav1.Condition{
Type: meta.StalledCondition,
Status: metav1.ConditionTrue,
Reason: sourcev1.URLInvalidReason,
Message: "failed message",
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, stalledCondition)
newCondition := metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionFalse,
Expand All @@ -195,6 +204,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"NoArtifact",
Expand All @@ -210,6 +220,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
},
}
Expand Down
29 changes: 1 addition & 28 deletions cmd/flux/create_source_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -232,7 +231,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for HelmRepository source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isHelmRepositoryReady(kubeClient, namespacedName, helmRepository)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, helmRepository)); err != nil {
return err
}
logger.Successf("HelmRepository source reconciliation completed")
Expand Down Expand Up @@ -279,29 +278,3 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("source updated")
return namespacedName, nil
}

func isHelmRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRepository)
if err != nil {
return false, err
}

if c := conditions.Get(helmRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != helmRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
Loading

0 comments on commit db708de

Please sign in to comment.