Skip to content

Commit

Permalink
fix(promotion): patch Application w/ Unstructured (#2428)
Browse files Browse the repository at this point in the history
Signed-off-by: Hidde Beydals <[email protected]>
  • Loading branch information
hiddeco authored Aug 13, 2024
1 parent 81d085e commit 9f24ba3
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 23 deletions.
79 changes: 62 additions & 17 deletions internal/controller/promotion/argocd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/gobwas/glob"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
libargocd "github.com/akuity/kargo/internal/argocd"
argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1"
"github.com/akuity/kargo/internal/controller/freight"
"github.com/akuity/kargo/internal/git"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/logging"
)

Expand Down Expand Up @@ -70,9 +72,8 @@ type argoCDMechanism struct {
) (argocd.ApplicationSource, error)
argoCDAppPatchFn func(
context.Context,
client.Object,
client.Patch,
...client.PatchOption,
kubeclient.ObjectWithKind,
kubeclient.UnstructuredPatchFn,
) error
logAppEventFn func(ctx context.Context, app *argocd.Application, user, reason, message string)
}
Expand All @@ -90,7 +91,7 @@ func newArgoCDMechanism(kargoClient, argocdClient client.Client) Mechanism {
a.getAuthorizedApplicationFn = a.getAuthorizedApplication
a.applyArgoCDSourceUpdateFn = a.applyArgoCDSourceUpdate
if argocdClient != nil {
a.argoCDAppPatchFn = argocdClient.Patch
a.argoCDAppPatchFn = a.argoCDAppPatch
a.logAppEventFn = a.logAppEvent
}
return a
Expand Down Expand Up @@ -360,9 +361,6 @@ func (a *argoCDMechanism) syncApplication(
desiredSource *argocd.ApplicationSource,
desiredSources argocd.ApplicationSources,
) error {
// Create a patch for the Application.
patch := client.MergeFrom(app.DeepCopy())

// Initiate a "hard" refresh.
if app.ObjectMeta.Annotations == nil {
app.ObjectMeta.Annotations = make(map[string]string, 1)
Expand Down Expand Up @@ -404,18 +402,23 @@ func (a *argoCDMechanism) syncApplication(
app.Operation.Sync.Revisions = append(app.Operation.Sync.Revisions, source.TargetRevision)
}

// Patch the Application with the changes from above.
if err := a.argoCDAppPatchFn(
ctx,
app,
patch,
); err != nil {
// Patch the Argo CD Application.
if err := a.argoCDAppPatchFn(ctx, app, func(src, dst unstructured.Unstructured) error {
// If the resource has been modified since we fetched it, an update
// can result in unexpected merge results. Detect this, and return an
// error if it occurs.
if src.GetGeneration() != dst.GetGeneration() {
return fmt.Errorf("unable to update sources to desired revisions: resource has been modified")
}

dst.SetAnnotations(src.GetAnnotations())
dst.Object["spec"] = recursiveMerge(src.Object["spec"], dst.Object["spec"])
dst.Object["operation"] = src.Object["operation"]
return nil
}); err != nil {
return fmt.Errorf("error patching Argo CD Application %q: %w", app.Name, err)
}
logging.LoggerFromContext(ctx).Debug(
"patched Argo CD Application",
"app", app.Name,
)
logging.LoggerFromContext(ctx).Debug("patched Argo CD Application", "app", app.Name)

// NB: This attempts to mimic the behavior of the Argo CD API server,
// which logs an event when a sync is initiated. However, we do not
Expand All @@ -436,6 +439,14 @@ func (a *argoCDMechanism) syncApplication(
return nil
}

func (a *argoCDMechanism) argoCDAppPatch(
ctx context.Context,
app kubeclient.ObjectWithKind,
modify kubeclient.UnstructuredPatchFn,
) error {
return kubeclient.PatchUnstructured(ctx, a.argocdClient, app, modify)
}

func (a *argoCDMechanism) logAppEvent(ctx context.Context, app *argocd.Application, user, reason, message string) {
logger := logging.LoggerFromContext(ctx).WithValues("app", app.Name)

Expand Down Expand Up @@ -814,3 +825,37 @@ func operationPhaseToPromotionPhase(phases ...argocd.OperationPhase) kargoapi.Pr
return ""
}
}

func recursiveMerge(src, dst any) any {
switch src := src.(type) {
case map[string]any:
dst, ok := dst.(map[string]any)
if !ok {
return src
}
for srcK, srcV := range src {
if dstV, ok := dst[srcK]; ok {
dst[srcK] = recursiveMerge(srcV, dstV)
} else {
dst[srcK] = srcV
}
}
case []any:
dst, ok := dst.([]any)
if !ok {
return src
}
result := make([]any, len(src))
for i, srcV := range src {
if i < len(dst) {
result[i] = recursiveMerge(srcV, dst[i])
} else {
result[i] = srcV
}
}
return result
default:
return src
}
return dst
}
116 changes: 110 additions & 6 deletions internal/controller/promotion/argocd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,6 +19,7 @@ import (
kargoapi "github.com/akuity/kargo/api/v1alpha1"
libargocd "github.com/akuity/kargo/internal/argocd"
argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/logging"
)

Expand Down Expand Up @@ -1315,9 +1317,8 @@ func TestArgoCDSyncApplication(t *testing.T) {
promoMech: &argoCDMechanism{
argoCDAppPatchFn: func(
context.Context,
client.Object,
client.Patch,
...client.PatchOption,
kubeclient.ObjectWithKind,
kubeclient.UnstructuredPatchFn,
) error {
return errors.New("something went wrong")
},
Expand All @@ -1341,9 +1342,8 @@ func TestArgoCDSyncApplication(t *testing.T) {
promoMech: &argoCDMechanism{
argoCDAppPatchFn: func(
context.Context,
client.Object,
client.Patch,
...client.PatchOption,
kubeclient.ObjectWithKind,
kubeclient.UnstructuredPatchFn,
) error {
return nil
},
Expand Down Expand Up @@ -2129,3 +2129,107 @@ func TestBuildHelmParamChangesForArgoCDAppSource(t *testing.T) {
result,
)
}

func TestRecursiveMerge(t *testing.T) {
testCases := []struct {
name string
src any
dst any
expected any
}{
{
name: "merge maps",
src: map[string]any{
"key1": "value1",
"key2": map[string]any{
"subkey1": "subvalue1",
"subkey2": true,
},
},
dst: map[string]any{
"key1": "old_value1",
"key2": map[string]any{
"subkey2": false,
"subkey3": "subvalue3",
},
},
expected: map[string]any{
"key1": "value1",
"key2": map[string]any{
"subkey1": "subvalue1",
"subkey2": true,
"subkey3": "subvalue3",
},
},
},
{
name: "merge arrays",
src: []any{
"value1",
map[string]any{
"key1": "subvalue1",
},
true,
},
dst: []any{
"old_value1",
map[string]any{
"key1": "old_subvalue1",
"key2": "subvalue2",
},
false,
},
expected: []any{
"value1",
map[string]any{
"key1": "subvalue1",
"key2": "subvalue2",
},
true,
},
},
{
name: "merge incompatible types (map to array)",
src: map[string]any{"key1": "value1"},
dst: []any{"old_value1"},
expected: map[string]any{"key1": "value1"},
},
{
name: "merge incompatible types (array to map)",
src: []any{"value1"},
dst: map[string]any{"key1": "old_value1"},
expected: []any{"value1"},
},
{
name: "overwrite types (string to int)",
src: "value1",
dst: 42,
expected: "value1",
},
{
name: "overwrite types (int to string)",
src: true,
dst: "old_value1",
expected: true,
},
{
name: "overwrite value with nil",
src: nil,
dst: map[string]any{"key1": "old_value1"},
expected: nil,
},
{
name: "overwrite nil with value",
src: map[string]any{"key1": "value1"},
dst: nil,
expected: map[string]any{"key1": "value1"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := recursiveMerge(tc.src, tc.dst)
assert.Equal(t, tc.expected, result)
})
}
}
67 changes: 67 additions & 0 deletions internal/kubeclient/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package kubeclient
import (
"context"
"encoding/json"
"fmt"

jsonpatch "github.com/evanphx/json-patch/v5"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -56,3 +60,66 @@ func PatchStatus[T HasStatus[S], S any](
}
return kubeClient.Status().Patch(ctx, resource, client.RawPatch(types.MergePatchType, patch))
}

type ObjectWithKind interface {
client.Object
schema.ObjectKind
}

// UnstructuredPatchFn is a function which modifies the destination
// unstructured object based on the source unstructured object.
type UnstructuredPatchFn func(src, dest unstructured.Unstructured) error

// PatchUnstructured patches a Kubernetes object using unstructured objects.
// It fetches the object from the API server, applies modifications via the
// provided UnstructuredPatchFn, and patches the object back to the server.
//
// The UnstructuredPatchFn is called with src (a copy of the original object
// converted to unstructured format) and dest (the object fetched from the
// API server).
//
// It returns an error if it fails to fetch the object, apply modifications,
// patch the object, or convert the result back to its typed form.
func PatchUnstructured(ctx context.Context, c client.Client, obj ObjectWithKind, modify UnstructuredPatchFn) error {
destObj := unstructured.Unstructured{}
destObj.SetGroupVersionKind(obj.GroupVersionKind())
if err := c.Get(ctx, client.ObjectKeyFromObject(obj), &destObj); err != nil {
return fmt.Errorf(
"unable to get unstructured object for %s %q in namespace %q: %w",
destObj.GroupVersionKind().Kind, obj.GetName(), obj.GetNamespace(), err,
)
}

// Create a patch for the unstructured object.
//
// As we expect the object to be modified by the callback, while it may
// also simultaneously be modified by other clients (e.g. someone updating
// the object via `kubectl`), we use an optimistic lock to ensure that we
// only apply the patch if the object has not been modified since we
// fetched it.
patch := client.MergeFromWithOptions(destObj.DeepCopy(), client.MergeFromWithOptimisticLock{})

// Convert the typed object to an unstructured object.
srcObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return fmt.Errorf("could not convert typed source object to unstructured object: %w", err)
}
srcApp := unstructured.Unstructured{Object: srcObj}

// Apply modifications to the unstructured object.
if err = modify(srcApp, destObj); err != nil {
return fmt.Errorf("failed to apply modifications to unstructured object: %w", err)
}

// Issue the patch to the unstructured object.
if err = c.Patch(ctx, &destObj, patch); err != nil {
return fmt.Errorf("failed to patch the object: %w", err)
}

// Convert the unstructured object back to the typed object.
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(destObj.Object, obj); err != nil {
return fmt.Errorf("error converting unstructured object to typed object: %w", err)
}

return nil
}

0 comments on commit 9f24ba3

Please sign in to comment.