Skip to content

Commit

Permalink
feat: if updated sts/rs equal current sts/rs, don't create new resource
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jan 7, 2024
1 parent e72e717 commit c4cad18
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 59 deletions.
55 changes: 26 additions & 29 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type addCore struct {
func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult {
logger := log.FromContext(ctx)
preSts := getNewStatefulSet(instance)
preStsHash := preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance)

patchCalculateFunc := func(storage, new *appsv1.StatefulSet) *patch.PatchResult {
Expand All @@ -49,6 +50,12 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
_ = ctrl.SetControllerReference(instance, preSts, a.Scheme)
if err := a.Handler.Create(preSts); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
// If already exists, check if preSts is the same as the current statefulSet
if preStsHash == instance.Status.CoreNodesStatus.CurrentRevision {
_ = a.updateEMQXStatus(ctx, instance, "RevertStatefulSet", "Revert to current statefulSet", preStsHash)
return subResult{}
}

if instance.Status.CoreNodesStatus.CollisionCount == nil {
instance.Status.CoreNodesStatus.CollisionCount = pointer.Int32(0)
}
Expand All @@ -58,21 +65,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}
return subResult{err: emperror.Wrap(err, "failed to create statefulSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "CreateNewStatefulSet", "Create new statefulSet", preStsHash)
return subResult{}
}

Expand All @@ -95,24 +88,28 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "UpdateStatefulSet", "Update exist statefulSet", preStsHash)
}
return subResult{}
}

func (a *addCore) updateEMQXStatus(ctx context.Context, instance *appsv2beta1.EMQX, reason, message, podTemplateHash string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
instance.Status.CoreNodesStatus.UpdateRevision = podTemplateHash
return a.Client.Status().Update(ctx, instance)
})
}

func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)

Expand Down
56 changes: 26 additions & 30 deletions controllers/apps/v2beta1/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i

logger := log.FromContext(ctx)
preRs := getNewReplicaSet(instance)
preRsHash := preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance)

patchCalculateFunc := func(storage, new *appsv1.ReplicaSet) *patch.PatchResult {
Expand All @@ -56,6 +57,12 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
_ = ctrl.SetControllerReference(instance, preRs, a.Scheme)
if err := a.Handler.Create(preRs); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
// If already exists, check if preRs is the same as the current replicaSet
if preRsHash == instance.Status.ReplicantNodesStatus.CurrentRevision {
_ = a.updateEMQXStatus(ctx, instance, "RevertReplicaSet", "Revert to current replicaSet", preRsHash)
return subResult{}
}

if instance.Status.ReplicantNodesStatus.CollisionCount == nil {
instance.Status.ReplicantNodesStatus.CollisionCount = pointer.Int32(0)
}
Expand All @@ -65,22 +72,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}
return subResult{err: emperror.Wrap(err, "failed to create replicaSet")}
}

// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "CreateReplicaSet", "Create new replicaSet", preRsHash)
return subResult{}
}

Expand All @@ -103,24 +95,28 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "UpdateReplicaSet", "Update exist replicaSet", preRsHash)
}
return subResult{}
}

func (a *addRepl) updateEMQXStatus(ctx context.Context, instance *appsv2beta1.EMQX, reason, message, podTemplateHash string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
instance.Status.ReplicantNodesStatus.UpdateRevision = podTemplateHash
return a.Client.Status().Update(ctx, instance)
})
}

func getNewReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)

Expand Down

0 comments on commit c4cad18

Please sign in to comment.